Whamcloud - gitweb
LU-128 Avoid assertion on wire data in last_rcvd update
authorMikhail Pershin <tappro@whamcloud.com>
Fri, 22 Apr 2011 18:24:25 +0000 (22:24 +0400)
committerOleg Drokin <green@whamcloud.com>
Thu, 12 May 2011 00:32:17 +0000 (17:32 -0700)
- checks that lower transno can't overwrite the bigger one in last_rcvd
  slot.
- evict client if bad transno was sent in replay, this is done by simulating
  VBR failure
- keep assertion for server-generated transno, this is logical error
- fix issue with resent-replay open which can cause unexpected
  transaction while closing the old mfd.

Change-Id: Ib523c25408b4d821f52d95c40a4fbd7d79d6cbe6
Signed-off-by: Mikhail Pershin <tappro@whamcloud.com>
Reviewed-on: http://review.whamcloud.com/452
Tested-by: Hudson
Reviewed-by: Niu Yawei <niu@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/mdt/mdt_open.c
lustre/mdt/mdt_recovery.c
lustre/obdfilter/filter.c

index 2b39c60..3f1e526 100644 (file)
@@ -684,6 +684,8 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
                                 class_handle_unhash(&old_mfd->mfd_handle);
                                 cfs_list_del_init(&old_mfd->mfd_list);
                                 cfs_spin_unlock(&med->med_open_lock);
                                 class_handle_unhash(&old_mfd->mfd_handle);
                                 cfs_list_del_init(&old_mfd->mfd_list);
                                 cfs_spin_unlock(&med->med_open_lock);
+                                /* no attr update for that close */
+                                la->la_valid = 0;
                                 mdt_mfd_close(info, old_mfd);
                         }
                         CDEBUG(D_HA, "Store old cookie "LPX64" in new mfd\n",
                                 mdt_mfd_close(info, old_mfd);
                         }
                         CDEBUG(D_HA, "Store old cookie "LPX64" in new mfd\n",
index e125d5b..2065bfd 100644 (file)
@@ -753,8 +753,21 @@ static int mdt_last_rcvd_update(struct mdt_thread_info *mti,
         LASSERT(ergo(mti->mti_transno == 0, rc != 0));
         if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE ||
             lustre_msg_get_opc(req->rq_reqmsg) == MDS_DONE_WRITING) {
         LASSERT(ergo(mti->mti_transno == 0, rc != 0));
         if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE ||
             lustre_msg_get_opc(req->rq_reqmsg) == MDS_DONE_WRITING) {
-                if (mti->mti_transno != 0)
+                if (mti->mti_transno != 0) {
+                        if (lcd->lcd_last_close_transno > mti->mti_transno) {
+                                LASSERT(req_is_replay(req));
+                                CERROR("Trying to overwrite bigger transno:"
+                                       "on-disk: "LPU64", new: "LPU64"\n",
+                                       lcd->lcd_last_close_transno,
+                                       mti->mti_transno);
+                                cfs_spin_lock(&req->rq_export->exp_lock);
+                                req->rq_export->exp_vbr_failed = 1;
+                                cfs_spin_unlock(&req->rq_export->exp_lock);
+                                cfs_mutex_up(&ted->ted_lcd_lock);
+                                RETURN(-EOVERFLOW);
+                        }
                         lcd->lcd_last_close_transno = mti->mti_transno;
                         lcd->lcd_last_close_transno = mti->mti_transno;
+                }
                 lcd->lcd_last_close_xid = req->rq_xid;
                 lcd->lcd_last_close_result = rc;
         } else {
                 lcd->lcd_last_close_xid = req->rq_xid;
                 lcd->lcd_last_close_result = rc;
         } else {
@@ -766,8 +779,21 @@ static int mdt_last_rcvd_update(struct mdt_thread_info *mti,
                         lcd->lcd_pre_versions[2] = pre_versions[2];
                         lcd->lcd_pre_versions[3] = pre_versions[3];
                 }
                         lcd->lcd_pre_versions[2] = pre_versions[2];
                         lcd->lcd_pre_versions[3] = pre_versions[3];
                 }
-                if (mti->mti_transno != 0)
+                if (mti->mti_transno != 0) {
+                        if (lcd->lcd_last_transno > mti->mti_transno) {
+                                LASSERT(req_is_replay(req));
+                                CERROR("Trying to overwrite bigger transno:"
+                                       "on-disk: "LPU64", new: "LPU64"\n",
+                                       lcd->lcd_last_transno,
+                                       mti->mti_transno);
+                                cfs_spin_lock(&req->rq_export->exp_lock);
+                                req->rq_export->exp_vbr_failed = 1;
+                                cfs_spin_unlock(&req->rq_export->exp_lock);
+                                cfs_mutex_up(&ted->ted_lcd_lock);
+                                RETURN(-EOVERFLOW);
+                        }
                         lcd->lcd_last_transno = mti->mti_transno;
                         lcd->lcd_last_transno = mti->mti_transno;
+                }
                 lcd->lcd_last_xid = req->rq_xid;
                 lcd->lcd_last_result = rc;
                 /*XXX: save intent_disposition in mdt_thread_info?
                 lcd->lcd_last_xid = req->rq_xid;
                 lcd->lcd_last_result = rc;
                 /*XXX: save intent_disposition in mdt_thread_info?
index 8e3779b..1ba0bca 100644 (file)
@@ -152,14 +152,25 @@ int filter_finish_transno(struct obd_export *exp, struct inode *inode,
         if (oti->oti_transno == 0) {
                 last_rcvd = le64_to_cpu(lsd->lsd_last_transno) + 1;
                 lsd->lsd_last_transno = cpu_to_le64(last_rcvd);
         if (oti->oti_transno == 0) {
                 last_rcvd = le64_to_cpu(lsd->lsd_last_transno) + 1;
                 lsd->lsd_last_transno = cpu_to_le64(last_rcvd);
+                LASSERT(last_rcvd >= le64_to_cpu(lcd->lcd_last_transno));
         } else {
                 last_rcvd = oti->oti_transno;
                 if (last_rcvd > le64_to_cpu(lsd->lsd_last_transno))
                         lsd->lsd_last_transno = cpu_to_le64(last_rcvd);
         } else {
                 last_rcvd = oti->oti_transno;
                 if (last_rcvd > le64_to_cpu(lsd->lsd_last_transno))
                         lsd->lsd_last_transno = cpu_to_le64(last_rcvd);
+                if (unlikely(last_rcvd < le64_to_cpu(lcd->lcd_last_transno))) {
+                        CERROR("Trying to overwrite bigger transno, on-disk: "
+                               LPU64", new: "LPU64"\n",
+                               le64_to_cpu(lcd->lcd_last_transno), last_rcvd);
+                        cfs_spin_lock(&exp->exp_lock);
+                        exp->exp_vbr_failed = 1;
+                        cfs_spin_unlock(&exp->exp_lock);
+                        cfs_spin_unlock(&obt->obt_lut->lut_translock);
+                        cfs_mutex_up(&ted->ted_lcd_lock);
+                        RETURN(-EOVERFLOW);
+                }
         }
         oti->oti_transno = last_rcvd;
 
         }
         oti->oti_transno = last_rcvd;
 
-        LASSERT(last_rcvd >= le64_to_cpu(lcd->lcd_last_transno));
         lcd->lcd_last_transno = cpu_to_le64(last_rcvd);
         lcd->lcd_pre_versions[0] = cpu_to_le64(oti->oti_pre_version);
         lcd->lcd_last_xid = cpu_to_le64(oti->oti_xid);
         lcd->lcd_last_transno = cpu_to_le64(last_rcvd);
         lcd->lcd_pre_versions[0] = cpu_to_le64(oti->oti_pre_version);
         lcd->lcd_last_xid = cpu_to_le64(oti->oti_xid);