Whamcloud - gitweb
(1) check child existance, and find mfd in reconstruct open;
authorhuanghua <huanghua>
Tue, 26 Sep 2006 06:44:35 +0000 (06:44 +0000)
committerhuanghua <huanghua>
Tue, 26 Sep 2006 06:44:35 +0000 (06:44 +0000)
(2) some other small fix about in unlink;

lustre/mdd/mdd_lov.c
lustre/mdt/mdt_open.c
lustre/mdt/mdt_reint.c

index 3f1bbca..ec3e57a 100644 (file)
@@ -533,7 +533,7 @@ int mdd_unlink_log(const struct lu_context *ctxt, struct mdd_device *mdd,
 {
         struct obd_device *obd = mdd2obd_dev(mdd);
 
-        if ((ma->ma_cookie_size > 0) &&
+        if ((ma->ma_cookie_size > 0) && ma->ma_valid&MA_LOV && 
             (mds_log_op_unlink(obd, ma->ma_lmm, ma->ma_lmm_size,
                               ma->ma_cookie, ma->ma_cookie_size) > 0)) {
                 ma->ma_valid |= MA_COOKIE;
index 9ef1d83..52ecd5a 100644 (file)
@@ -316,13 +316,15 @@ static int mdt_mfd_open(struct mdt_thread_info *info,
                         int created,
                         struct ldlm_reply *rep)
 {
+        struct ptlrpc_request  *req = mdt_info_req(info);
+        struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
+        struct md_attr         *ma  = &info->mti_attr;
+        struct lu_attr         *la  = &ma->ma_attr;
         struct mdt_file_data   *mfd;
         struct mdt_body        *repbody;
-        struct md_attr         *ma = &info->mti_attr;
-        struct lu_attr         *la = &ma->ma_attr;
-        struct ptlrpc_request  *req = mdt_info_req(info);
         int                     rc = 0;
         int                     isreg, isdir, islnk;
+        struct list_head        *t;
         ENTRY;
 
         LASSERT(ma->ma_valid & MA_INODE);
@@ -390,6 +392,23 @@ static int mdt_mfd_open(struct mdt_thread_info *info,
                         repbody->valid |= OBD_MD_FLEASIZE;
         }
 
+        mfd = NULL;
+        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
+                spin_lock(&med->med_open_lock);
+                list_for_each(t, &med->med_open_head) {
+                        mfd = list_entry(t, struct mdt_file_data, mfd_list);
+                        if (mfd->mfd_xid == req->rq_xid) {
+                                break;
+                        }
+                        mfd = NULL;
+                }
+                spin_unlock(&med->med_open_lock);
+        
+                if (mfd != NULL) {
+                        repbody->handle.cookie = mfd->mfd_handle.h_cookie;
+                        GOTO(out, rc = 0);
+                }
+        }
 
         if (flags & FMODE_WRITE) {
                 rc = mdt_write_get(info->mti_mdt, o);
@@ -409,10 +428,9 @@ static int mdt_mfd_open(struct mdt_thread_info *info,
                      &info->mti_uc);
         if (rc)
                 RETURN(rc);
-        
+
         mfd = mdt_mfd_new();
         if (mfd != NULL) {
-                struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
                 
                 /* keep a reference on this object for this open,
                 * and is released by mdt_mfd_close() */
@@ -433,15 +451,13 @@ static int mdt_mfd_open(struct mdt_thread_info *info,
                 rc = -ENOMEM;
 
 out:
-        if (!rc) {
-                struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
-
+        if (rc == 0) {
                 if (med->med_rmtclient) {
                         void *buf = req_capsule_server_get(&info->mti_pill,
                                                            &RMF_ACL);
 
                         rc = mdt_pack_remote_perm(info, o, buf);
-                        if (!rc) {
+                        if (rc == 0) {
                                 repbody->valid |= OBD_MD_FLRMTPERM;
                                 repbody->aclsize =
                                                 sizeof(struct mdt_remote_perm);
@@ -455,50 +471,94 @@ out:
 extern void mdt_req_from_mcd(struct ptlrpc_request *req,
                              struct mdt_client_data *mcd);
 
-
-/* TODO: much of the code here can be shared with mdt_reint_open() 
- *       and mdt_mfd_open(). This will be done later. :-)
- */
 void mdt_reconstruct_open(struct mdt_thread_info *info,
                           struct mdt_lock_handle *lhc)
 {
-        struct req_capsule      *pill  = &info->mti_pill;
-        struct ptlrpc_request   *req   = mdt_info_req(info);
-        struct mdt_export_data  *med   = &req->rq_export->exp_mdt_data;
-        struct mdt_client_data  *mcd   = med->med_mcd;
+        const struct lu_context *ctxt = info->mti_ctxt;
+        struct mdt_device       *mdt  = info->mti_mdt;
+        struct req_capsule      *pill = &info->mti_pill;
+        struct ptlrpc_request   *req  = mdt_info_req(info);
+        struct mdt_export_data  *med  = &req->rq_export->exp_mdt_data;
+        struct mdt_client_data  *mcd  = med->med_mcd;
+        struct md_attr          *ma   = &info->mti_attr;
+        struct mdt_reint_record *rr   = &info->mti_rr;
+        __u32                   flags = info->mti_spec.sp_cr_flags;
         struct ldlm_reply       *ldlm_rep;
-        int                      result;
+        struct mdt_object       *parent;
+        struct mdt_object       *child;
+        struct mdt_body         *repbody;
+        int                      rc;
         ENTRY;
 
         LASSERT(pill->rc_fmt == &RQF_LDLM_INTENT_OPEN);
         ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
 
+        repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
+        repbody->eadatasize = 0;
+        repbody->aclsize = 0;
+
+        ma->ma_lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
+        ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
+                                               &RMF_MDT_MD,
+                                               RCL_SERVER);
+        ma->ma_need = MA_INODE | MA_LOV;
+
         mdt_req_from_mcd(req, med->med_mcd);
         mdt_set_disposition(info, ldlm_rep, mcd->mcd_last_data);
 
         CERROR("This is reconstruct open: disp="LPX64", result=%d\n",
                 ldlm_rep->lock_policy_res1, req->rq_status);
 
-        if (mdt_get_disposition(ldlm_rep, DISP_OPEN_CREATE) && req->rq_status) {
+        if (mdt_get_disposition(ldlm_rep, DISP_OPEN_CREATE) && 
+            req->rq_status != 0) {
                 /* We did not create successfully, return error to client. */
                 mdt_shrink_reply(info, DLM_REPLY_REC_OFF + 1);
-                GOTO(out, 0);
+                GOTO(out, rc = req->rq_status);
+        }
+
+        if (mdt_get_disposition(ldlm_rep, DISP_OPEN_CREATE)) {
+                /* 
+                 * We failed after creation, but we do not know in which step 
+                 * we failed. So try to check the child object.
+                 */
+                parent = mdt_object_find(ctxt, mdt, rr->rr_fid1);
+                LASSERT(!IS_ERR(parent));
+
+                child = mdt_object_find(ctxt, mdt, rr->rr_fid2);
+                LASSERT(!IS_ERR(child));
+
+                rc = lu_object_exists(&child->mot_obj.mo_lu);
+                if (rc > 0) {
+                        struct md_object *next;
+                        next = mdt_object_child(child);
+                        rc = mo_attr_get(ctxt, next, ma, NULL);
+                        if (rc == 0)
+                              rc = mdt_mfd_open(info, parent, child, 
+                                                flags, 1, ldlm_rep);
+                } else if (rc < 0) {
+                        /* the child object was created on remote server */
+                        repbody->fid1 = *rr->rr_fid2;
+                        repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
+                        rc = 0;
+                } else if (rc == 0) {
+                        /* the child does not exist, we should do regular open */
+                        mdt_object_put(ctxt, parent);
+                        mdt_object_put(ctxt, child);
+                        GOTO(regular_open, 0);
+                }
+                mdt_object_put(ctxt, parent);
+                mdt_object_put(ctxt, child);
+                mdt_shrink_reply(info, DLM_REPLY_REC_OFF + 1);
+                GOTO(out, rc);
+        } else {
+regular_open:
+                /* We did not try to create, so we are a pure open */
+                rc = mdt_reint_open(info, lhc);
         }
-        /*
-         * XXX: is this correct to set here transno and status to zero? This is
-         * especially suspicious after calling mdt_req_from_mcd() above, which
-         * set these fields already.
-         */
-        lustre_msg_set_transno(req->rq_repmsg, 0);
-        lustre_msg_set_status(req->rq_repmsg, 0);
 
-        ldlm_rep->lock_policy_res1 = 0;
-        ldlm_rep->lock_policy_res2 = 0;
-        result = mdt_reint_open(info, lhc);
-        req->rq_status = result;
         EXIT;
-out: 
+out:
+        req->rq_status = rc;
         lustre_msg_set_status(req->rq_repmsg, req->rq_status);
 }
 
@@ -623,10 +683,10 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
         }
 
         CDEBUG(D_INODE, "I am going to open "DFID"/("DFID":%s) "
-                        "cr_flag=0%o mode=%06o replay=%d\n",
+                        "cr_flag=0%o mode=0%06o msg_flag=0x%x\n",
                         PFID(rr->rr_fid1), PFID(rr->rr_fid2),
                         rr->rr_name, create_flags, la->la_mode,
-                        lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY);
+                        lustre_msg_get_flags(req->rq_reqmsg));
 
         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
                 /* This is a replay request. */
index f8e43bc..3c3be96 100644 (file)
@@ -346,6 +346,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
         ma->ma_cookie_size = req_capsule_get_size(&info->mti_pill,
                                                   &RMF_LOGCOOKIES,
                                                   RCL_SERVER);
+        ma->ma_need = MA_INODE | MA_LOV | MA_COOKIE;
 
         if (!ma->ma_lmm || !ma->ma_cookie)
                 GOTO(out_unlock_parent, rc = -EINVAL);
@@ -514,7 +515,6 @@ static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
                 mdt_handle_last_unlink(info, mtgt, ma);
 
         EXIT;
-out_unlock_tgt:
         if (mtgt) {
                 mdt_object_unlock_put(info, mtgt, lh_tgt, rc);
         }