Whamcloud - gitweb
(1) fix open re-construct: re-construct if create error; otherwise, do as regular...
authorhuanghua <huanghua>
Tue, 19 Sep 2006 04:27:08 +0000 (04:27 +0000)
committerhuanghua <huanghua>
Tue, 19 Sep 2006 04:27:08 +0000 (04:27 +0000)
(2) fix some pointers initialization in md_attr in mdt;
(3) add some EXCEPT in replay-single.sh

lustre/mdd/mdd_handler.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_open.c
lustre/mdt/mdt_recovery.c
lustre/tests/replay-single.sh

index 7de92ff..04be69b 100644 (file)
@@ -162,13 +162,13 @@ static inline int mdd_is_append(struct mdd_object *obj)
         return obj->mod_flags & APPEND_OBJ;
 }
 
-static void mdd_set_dead_obj(struct mdd_object *obj)
+static inline void mdd_set_dead_obj(struct mdd_object *obj)
 {
         if (obj)
                 obj->mod_flags |= DEAD_OBJ;
 }
 
-static int mdd_is_dead_obj(struct mdd_object *obj)
+static inline int mdd_is_dead_obj(struct mdd_object *obj)
 {
         return obj && obj->mod_flags & DEAD_OBJ;
 }
@@ -270,6 +270,7 @@ static int __mdd_lmm_get(const struct lu_context *ctxt,
                          struct mdd_object *mdd_obj, struct md_attr *ma)
 {
         int rc;
+        ENTRY;
 
         LASSERT(ma->ma_lmm != NULL && ma->ma_lmm_size > 0);
         rc = mdd_get_md(ctxt, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
@@ -1201,15 +1202,16 @@ int __mdd_object_kill(const struct lu_context *ctxt,
                       struct md_attr *ma)
 {
         int rc = 0;
+        ENTRY;
 
         mdd_set_dead_obj(obj);
-        if (S_ISREG(mdd_object_type(obj))) {
+        if (S_ISREG(mdd_object_type(obj)) && ma->ma_need&MA_LOV) {
                 rc = __mdd_lmm_get(ctxt, obj, ma);
-                if (ma->ma_valid & MA_LOV)
+                if (ma->ma_valid&MA_LOV && ma->ma_need&MA_COOKIE)
                         rc = mdd_unlink_log(ctxt, mdo2mdd(&obj->mod_obj),
                                             obj, ma);
         }
-        return rc;
+        RETURN(rc);
 }
 /* caller should take a lock before calling */
 static int __mdd_finish_unlink(const struct lu_context *ctxt,
index 2725826..b42cb26 100644 (file)
@@ -1322,8 +1322,8 @@ static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags)
         RETURN(rc);
 }
 
+#if 0
 struct lu_context_key mdt_txn_key;
-
 static inline void mdt_finish_reply(struct mdt_thread_info *info, int rc)
 {
         struct mdt_device     *mdt = info->mti_mdt;
@@ -1356,6 +1356,8 @@ static inline void mdt_finish_reply(struct mdt_thread_info *info, int rc)
         lustre_msg_set_last_xid(req->rq_repmsg, req_exp_last_xid(req));
         //lustre_msg_set_last_xid(req->rq_repmsg, req->rq_xid);
 }
+#endif
+
 
 /*
  * Invoke handler for this request opc. Also do necessary preprocessing
@@ -1895,7 +1897,6 @@ static int mdt_intent_reint(enum mdt_it_code opcode,
                 RETURN(-EFAULT);
 
         rep->lock_policy_res2 = rc;
-        mdt_set_disposition(info, rep, DISP_IT_EXECD);
 
         /* cross-ref case, the lock should be returned to the client */
         if (rc == -EREMOTE) {
index 0705f88..3c32195 100644 (file)
@@ -265,7 +265,7 @@ struct mdt_thread_info {
 
         /* transaction number of current request */
         __u64                      mti_transno;
-        __u32                      mti_trans_flags;
+        __u32                      mti_has_trans:1; /* has txn already? */
 
         /* opdata for mdt_open(), has the same as ldlm_reply:lock_policy_res1.
          * mdt_update_last_rcvd() stores this value onto disk for recovery
index 088fcb9..bde0e9c 100644 (file)
@@ -338,7 +338,6 @@ static int mdt_mfd_open(struct mdt_thread_info *info,
          */
         if (islnk || (!isreg && !isdir &&
             (req->rq_export->exp_connect_flags & OBD_CONNECT_NODEVOH))) {
-                //info->mti_trans_flags |= MDT_NONEED_TRANSNO;
                 lustre_msg_set_transno(req->rq_repmsg, 0);
                 RETURN(0);
         }
@@ -440,178 +439,40 @@ extern void mdt_req_from_mcd(struct ptlrpc_request *req,
  */
 void mdt_reconstruct_open(struct mdt_thread_info *info)
 {
-        struct lu_fid          *child_fid = &info->mti_tmp_fid1;
-        __u32                   flags  = info->mti_spec.sp_cr_flags;
         struct req_capsule      *pill  = &info->mti_pill;
         struct ptlrpc_request   *req   = mdt_info_req(info);
         struct mdt_export_data  *med   = &req->rq_export->exp_mdt_data;
         struct mdt_client_data  *mcd   = med->med_mcd;
-        struct md_attr          *ma    = &info->mti_attr;
-        struct lu_attr          *la    = &ma->ma_attr;
-        struct mdt_device       *mdt   = info->mti_mdt;
-        struct mdt_reint_record *rr    = &info->mti_rr;
-        const struct lu_context *ctxt  = info->mti_ctxt;
-        struct mdt_object       *parent;
-        struct mdt_object       *child;
-        struct ldlm_reply       *ldlm_rep = NULL;
-        struct mdt_body         *repbody;
-        struct mdt_file_data    *mfd;
-        struct list_head        *h;
+        struct ldlm_reply       *ldlm_rep;
         int                      result;
-        int                      isreg, isdir, islnk;
         ENTRY;
 
         LASSERT(pill->rc_fmt == &RQF_LDLM_INTENT_OPEN);
+        ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
 
         mdt_req_from_mcd(req, med->med_mcd);
         mdt_set_disposition(info, ldlm_rep, mcd->mcd_last_data);
 
-        ma->ma_lmm = req_capsule_server_get(pill, &RMF_MDT_MD);
-        ma->ma_lmm_size = mdt->mdt_max_mdsize;
-        ma->ma_need = MA_INODE | MA_LOV;
-        repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
-        repbody->eadatasize = 0;
-        repbody->aclsize = 0;
+        CERROR("This is re-CCCCCCCCCCConstruct open: disp="LPX64", result=%d\n",
+                ldlm_rep->lock_policy_res1,
+                req->rq_status);
 
         if (mdt_get_disposition(ldlm_rep, DISP_OPEN_CREATE) && req->rq_status) {
                 /* we did not create successfully, return error to client. */
-                EXIT;
-                return;
-        }
-
-        mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD);
-        mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
-        parent = mdt_object_find(ctxt, mdt, rr->rr_fid1);
-        if (IS_ERR(parent)) {
-                /* FIXME: should this be assert? */
-                CERROR("Cannot find parent "DFID" while reconstruct open\n",
-                        PFID(rr->rr_fid1));
-                GOTO(out, result = PTR_ERR(parent));
-        }
-
-        result = mdo_lookup(ctxt, mdt_object_child(parent),
-                            rr->rr_name, child_fid);
-        if (result == 0)
-                mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
-        else {
-                if (result == -ENOENT || result == -ESTALE) {
-                        mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
-                        result = -ENOENT;
-                }
-                GOTO(out_parent, result);
-        }
-        
-        LASSERT(lu_fid_eq(rr->rr_fid2, child_fid));
-        child = mdt_object_find(ctxt, mdt, child_fid);
-        if (IS_ERR(child))
-                GOTO(out_parent, result = PTR_ERR(child));
-
-        result = mo_attr_get(ctxt, mdt_object_child(child), ma);
-        if (result == -EREMOTE) {
-                /* the object is on remote node
-                 * return its FID for remote open */
-                repbody->fid1 = *mdt_object_fid(child);
-                repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
-                GOTO(out_child, result = 0);
-        } else if(result != 0)
-                GOTO(out_child, result);
-
-        LASSERT(ma->ma_valid & MA_INODE);
-        isreg = S_ISREG(la->la_mode);
-        isdir = S_ISDIR(la->la_mode);
-        islnk = S_ISLNK(la->la_mode);
-        mdt_pack_attr2body(repbody, la, mdt_object_fid(child));
-
-        /* if we are following a symlink, don't open; and
-         * do not return open handle for special nodes as client required
-         */
-        if (islnk || (!isreg && !isdir &&
-            (req->rq_export->exp_connect_flags & OBD_CONNECT_NODEVOH))) {
-                info->mti_trans_flags |= MDT_NONEED_TRANSNO;
-                GOTO(out_child, 0);
-        }
-
-        mdt_set_disposition(info, ldlm_rep, DISP_OPEN_OPEN);
-        if (!mdt_get_disposition(ldlm_rep, DISP_OPEN_CREATE) &&
-             (flags & MDS_OPEN_EXCL) && (flags & MDS_OPEN_CREAT))
-                GOTO(out_child, result = -EEXIST);
-
-        if (isdir) {
-                if (flags & (MDS_OPEN_CREAT | FMODE_WRITE))
-                        /* we are trying to create or
-                         * write an existing dir. */
-                        GOTO(out_child, result = -EISDIR);
-        } else if (flags & MDS_OPEN_DIRECTORY)
-                GOTO(out_child, result = -ENOTDIR);
-
-        /* at this point, regular file should have lov */
-        LASSERT(ergo(isreg, ma->ma_valid & MA_LOV));
-
-        if (ma->ma_valid & MA_LOV) {
-                LASSERT(ma->ma_lmm_size);
-                repbody->eadatasize = ma->ma_lmm_size;
-                if (isdir)
-                        repbody->valid |= OBD_MD_FLDIREA;
-                else
-                        repbody->valid |= OBD_MD_FLEASIZE;
+                mdt_shrink_reply(info, DLM_REPLY_REC_OFF + 1);
+                GOTO(out, 0);
         }
-
-        if (flags & FMODE_WRITE) {
-                /* FIXME: in recovery, need to pass old epoch here */
-                result = mdt_write_get(info->mti_mdt, child);
-                if (result == 0) {
-                        /* FIXME: in recovery, need to pass old epoch here */
-                        mdt_epoch_open(info, child, 0);
-                        repbody->ioepoch = child->mot_ioepoch;
-                }
-        } else if (flags & MDS_FMODE_EXEC)
-                result = mdt_write_deny(mdt, child);
-        if (result)
-                GOTO(out_child, result);
-
-        result = mo_open(ctxt, mdt_object_child(child), flags);
-        if (result)
-                GOTO(out_child, result);
-
-        mfd = NULL;
-        spin_lock(&med->med_open_lock);
-        list_for_each(h, &med->med_open_head) {
-                mfd = list_entry(h, struct mdt_file_data, mfd_list);
-                if (mfd->mfd_xid == req->rq_xid) {
-                        break;
-                }
-                mfd = NULL;
-        }
-        spin_unlock(&med->med_open_lock);
-
-        if (mfd != NULL) {
-                repbody->handle.cookie = mfd->mfd_handle.h_cookie;
-                GOTO(out_child, 0);
-        }
-
-        mfd = mdt_mfd_new();
-        if (mfd != NULL) {
-                mdt_object_get(ctxt, child);
-
-                mfd->mfd_mode = flags;
-                mfd->mfd_object = child;
-                mfd->mfd_xid = req->rq_xid;
-
-                med = &req->rq_export->exp_mdt_data;
-                spin_lock(&med->med_open_lock);
-                list_add(&mfd->mfd_list, &med->med_open_head);
-                spin_unlock(&med->med_open_lock);
-                repbody->handle.cookie = mfd->mfd_handle.h_cookie;
-        } else
-                result = -ENOMEM;
-        EXIT;
-out_child:
-        mdt_object_put(ctxt, child);
-out_parent:
-        mdt_object_put(ctxt, parent);
-out:
-        mdt_shrink_reply(info, DLM_REPLY_REC_OFF + 1);
+        lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
+        lustre_msg_set_transno(req->rq_repmsg, 0);
+        lustre_msg_set_status(req->rq_repmsg, 0);
+        ldlm_rep->lock_policy_res1 = 0;
+        ldlm_rep->lock_policy_res2 = 0;
+        result = mdt_open(info);
         req->rq_status = result;
+        EXIT;
+out: 
+        lustre_msg_set_status(req->rq_repmsg, req->rq_status);
 }
 
 static int mdt_open_by_fid(struct mdt_thread_info* info, 
@@ -633,6 +494,7 @@ static int mdt_open_by_fid(struct mdt_thread_info* info,
         if (rc > 0) {
                 const struct lu_context *ctxt = info->mti_ctxt;
 
+                mdt_set_disposition(info, rep, DISP_IT_EXECD);
                 mdt_set_disposition(info, rep, DISP_LOOKUP_EXECD);
                 mdt_set_disposition(info, rep, DISP_LOOKUP_POS);
                 rc = mo_attr_get(ctxt, mdt_object_child(o), ma);
@@ -718,7 +580,9 @@ int mdt_open(struct mdt_thread_info *info)
         repbody->aclsize = 0;
 
         ma->ma_lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
-        ma->ma_lmm_size = mdt->mdt_max_mdsize;
+        ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
+                                               &RMF_MDT_MD,
+                                               RCL_SERVER);
         ma->ma_need = MA_INODE | MA_LOV;
 
         LASSERT(info->mti_pill.rc_fmt == &RQF_LDLM_INTENT_OPEN);
@@ -756,6 +620,7 @@ int mdt_open(struct mdt_thread_info *info)
         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK))
                 GOTO(out, result = -ENOMEM);
 
+        mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD);
         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
         if (rr->rr_name[0] == 0) {
                 /* this is cross-ref open */
@@ -803,6 +668,7 @@ int mdt_open(struct mdt_thread_info *info)
 
         if (result == -ENOENT) {
                 /* not found and with MDS_OPEN_CREAT: let's create it */
+                mdt_set_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
                 result = mdo_create(info->mti_ctxt,
                                     mdt_object_child(parent),
                                     rr->rr_name,
@@ -812,7 +678,6 @@ int mdt_open(struct mdt_thread_info *info)
                 if (result == -ERESTART)
                         GOTO(out_child, result);
                 else {        
-                        mdt_set_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
                         if (result != 0)
                                 GOTO(out_child, result);
                 }
@@ -957,6 +822,17 @@ int mdt_close(struct mdt_thread_info *info)
         if (rc == 0) {
                 repbody = req_capsule_server_get(&info->mti_pill, 
                                                  &RMF_MDT_BODY);
+                ma->ma_lmm = req_capsule_server_get(&info->mti_pill, 
+                                                    &RMF_MDT_MD);
+                ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
+                                                       &RMF_MDT_MD,
+                                                       RCL_SERVER);
+                ma->ma_cookie = req_capsule_server_get(&info->mti_pill,
+                                                       &RMF_LOGCOOKIES);
+                ma->ma_cookie_size = req_capsule_get_size(&info->mti_pill,
+                                                          &RMF_LOGCOOKIES,
+                                                          RCL_SERVER);
+                ma->ma_need = MA_INODE | MA_LOV | MA_COOKIE;
                 repbody->eadatasize = 0;
                 repbody->aclsize = 0;
         }
@@ -976,24 +852,6 @@ int mdt_close(struct mdt_thread_info *info)
                 list_del_init(&mfd->mfd_list);
                 spin_unlock(&med->med_open_lock);
 
-                if (repbody != NULL) {
-                        ma->ma_lmm = 
-                                req_capsule_server_get(&info->mti_pill,
-                                                       &RMF_MDT_MD);
-                        ma->ma_lmm_size = 
-                                req_capsule_get_size(&info->mti_pill,
-                                                     &RMF_MDT_MD,
-                                                     RCL_SERVER);
-                        ma->ma_cookie = 
-                                req_capsule_server_get(&info->mti_pill,
-                                                       &RMF_LOGCOOKIES);
-                        ma->ma_cookie_size = 
-                                req_capsule_get_size(&info->mti_pill,
-                                                     &RMF_LOGCOOKIES,
-                                                     RCL_SERVER);
-                        ma->ma_need = MA_INODE;
-                }
-                
                 /* Do not lose object before last unlink. */
                 o = mfd->mfd_object;
                 mdt_object_get(info->mti_ctxt, o);
index 5da6aeb..038ef64 100644 (file)
@@ -814,7 +814,13 @@ static int mdt_txn_stop_cb(const struct lu_context *ctx,
                 txi->txi_transno = 0;
                 return 0;
         }
-        LASSERT(req != NULL);
+
+        if (mti->mti_has_trans) {
+                CERROR("More than one transaction "LPU64"\n", mti->mti_transno);
+                return 0;
+        }
+
+        mti->mti_has_trans = 1;
         /*TODO: checks for recovery cases, see mds_finish_transno */
         spin_lock(&mdt->mdt_transno_lock);
         if (txn->th_result != 0) {
@@ -827,9 +833,6 @@ static int mdt_txn_stop_cb(const struct lu_context *ctx,
                 mti->mti_transno = ++ mdt->mdt_last_transno;
         } else {
                 /* should be replay */
-                if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY))
-                        CERROR("Double transaction ("LPU64") per thread!\n",
-                               mti->mti_transno);                
                 if (mti->mti_transno > mdt->mdt_last_transno)
                         mdt->mdt_last_transno = mti->mti_transno;
         }
@@ -929,10 +932,18 @@ void mdt_req_from_mcd(struct ptlrpc_request *req,
 {
         DEBUG_REQ(D_HA, req, "restoring transno "LPD64"/status %d",
                   mcd->mcd_last_transno, mcd->mcd_last_result);
-        req->rq_transno = mcd->mcd_last_transno;
-        req->rq_status = mcd->mcd_last_result;
-        lustre_msg_set_transno(req->rq_repmsg, req->rq_transno);
-        lustre_msg_set_status(req->rq_repmsg, req->rq_status);
+
+        if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE) {
+                req->rq_transno = mcd->mcd_last_close_transno;
+                req->rq_status = mcd->mcd_last_close_result;
+                lustre_msg_set_transno(req->rq_repmsg, req->rq_transno);
+                lustre_msg_set_status(req->rq_repmsg, req->rq_status);
+        } else {
+                req->rq_transno = mcd->mcd_last_transno;
+                req->rq_status = mcd->mcd_last_result;
+                lustre_msg_set_transno(req->rq_repmsg, req->rq_transno);
+                lustre_msg_set_status(req->rq_repmsg, req->rq_status);
+        }
         //mds_steal_ack_locks(req);
 }
 
index 443222f..5a69fc4 100755 (executable)
@@ -17,6 +17,10 @@ init_test_env $@
 # bug number: 2766 4176
 ALWAYS_EXCEPT="0b  39   $REPLAY_SINGLE_EXCEPT"
 
+# failed in our b_new_cmd, please update this.
+ALWAYS_EXCEPT=" 5 11 20b 25 30 31 52 58 $ALWAYS_EXCEPT"
+
+
 build_test_filter
 
 SETUP=${SETUP:-"setup"}
@@ -41,6 +45,7 @@ fi
 
 mkdir -p $DIR
 
+
 test_0() {
     replay_barrier mds
     fail mds
@@ -853,7 +858,7 @@ test_41() {
     do_facet client dd if=/dev/zero of=$f bs=4k count=1 || return 3
     cancel_lru_locks osc
     # fail ost2 and read from ost1
-    local osc2dev=`grep ${ost2_svc}-osc- $LPROC/devices | awk '{print $1}'`
+    local osc2dev=`grep ${ost2_svc}lustre-clilov-osc- $LPROC/devices | awk '{print $1}'`
     [ "$osc2dev" ] || return 4
     $LCTL --device $osc2dev deactivate || return 1
     do_facet client dd if=$f of=/dev/null bs=4k count=1 || return 3
@@ -1002,7 +1007,7 @@ test_48() {
 run_test 48 "MDS->OSC failure during precreate cleanup (2824)"
 
 test_50() {
-    local oscdev=`grep ${ost1_svc}-osc- $LPROC/devices | awk '{print $1}'`
+    local oscdev=`grep ${ost1_svc}lustre-clilov-osc- $LPROC/devices | awk '{print $1}'`
     [ "$oscdev" ] || return 1
     $LCTL --device $oscdev recover &&  $LCTL --device $oscdev recover
     # give the mds_lov_sync threads a chance to run