- checks that lower transno can't overwrite the bigger one in last_rcvd
slot.
- evict client if bad transno was sent in replay, this is done by simulating
VBR failure
- keep assertion for server-generated transno, this is logical error
- fix issue with resent-replay open which can cause unexpected
transaction while closing the old mfd.
Change-Id: Ib523c25408b4d821f52d95c40a4fbd7d79d6cbe6
Signed-off-by: Mikhail Pershin <tappro@whamcloud.com>
Reviewed-on: http://review.whamcloud.com/452
Tested-by: Hudson
Reviewed-by: Niu Yawei <niu@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
class_handle_unhash(&old_mfd->mfd_handle);
cfs_list_del_init(&old_mfd->mfd_list);
cfs_spin_unlock(&med->med_open_lock);
class_handle_unhash(&old_mfd->mfd_handle);
cfs_list_del_init(&old_mfd->mfd_list);
cfs_spin_unlock(&med->med_open_lock);
+ /* no attr update for that close */
+ la->la_valid = 0;
mdt_mfd_close(info, old_mfd);
}
CDEBUG(D_HA, "Store old cookie "LPX64" in new mfd\n",
mdt_mfd_close(info, old_mfd);
}
CDEBUG(D_HA, "Store old cookie "LPX64" in new mfd\n",
LASSERT(ergo(mti->mti_transno == 0, rc != 0));
if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE ||
lustre_msg_get_opc(req->rq_reqmsg) == MDS_DONE_WRITING) {
LASSERT(ergo(mti->mti_transno == 0, rc != 0));
if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE ||
lustre_msg_get_opc(req->rq_reqmsg) == MDS_DONE_WRITING) {
- if (mti->mti_transno != 0)
+ if (mti->mti_transno != 0) {
+ if (lcd->lcd_last_close_transno > mti->mti_transno) {
+ LASSERT(req_is_replay(req));
+ CERROR("Trying to overwrite bigger transno:"
+ "on-disk: "LPU64", new: "LPU64"\n",
+ lcd->lcd_last_close_transno,
+ mti->mti_transno);
+ cfs_spin_lock(&req->rq_export->exp_lock);
+ req->rq_export->exp_vbr_failed = 1;
+ cfs_spin_unlock(&req->rq_export->exp_lock);
+ cfs_mutex_up(&ted->ted_lcd_lock);
+ RETURN(-EOVERFLOW);
+ }
lcd->lcd_last_close_transno = mti->mti_transno;
lcd->lcd_last_close_transno = mti->mti_transno;
lcd->lcd_last_close_xid = req->rq_xid;
lcd->lcd_last_close_result = rc;
} else {
lcd->lcd_last_close_xid = req->rq_xid;
lcd->lcd_last_close_result = rc;
} else {
lcd->lcd_pre_versions[2] = pre_versions[2];
lcd->lcd_pre_versions[3] = pre_versions[3];
}
lcd->lcd_pre_versions[2] = pre_versions[2];
lcd->lcd_pre_versions[3] = pre_versions[3];
}
- if (mti->mti_transno != 0)
+ if (mti->mti_transno != 0) {
+ if (lcd->lcd_last_transno > mti->mti_transno) {
+ LASSERT(req_is_replay(req));
+ CERROR("Trying to overwrite bigger transno:"
+ "on-disk: "LPU64", new: "LPU64"\n",
+ lcd->lcd_last_transno,
+ mti->mti_transno);
+ cfs_spin_lock(&req->rq_export->exp_lock);
+ req->rq_export->exp_vbr_failed = 1;
+ cfs_spin_unlock(&req->rq_export->exp_lock);
+ cfs_mutex_up(&ted->ted_lcd_lock);
+ RETURN(-EOVERFLOW);
+ }
lcd->lcd_last_transno = mti->mti_transno;
lcd->lcd_last_transno = mti->mti_transno;
lcd->lcd_last_xid = req->rq_xid;
lcd->lcd_last_result = rc;
/*XXX: save intent_disposition in mdt_thread_info?
lcd->lcd_last_xid = req->rq_xid;
lcd->lcd_last_result = rc;
/*XXX: save intent_disposition in mdt_thread_info?
if (oti->oti_transno == 0) {
last_rcvd = le64_to_cpu(lsd->lsd_last_transno) + 1;
lsd->lsd_last_transno = cpu_to_le64(last_rcvd);
if (oti->oti_transno == 0) {
last_rcvd = le64_to_cpu(lsd->lsd_last_transno) + 1;
lsd->lsd_last_transno = cpu_to_le64(last_rcvd);
+ LASSERT(last_rcvd >= le64_to_cpu(lcd->lcd_last_transno));
} else {
last_rcvd = oti->oti_transno;
if (last_rcvd > le64_to_cpu(lsd->lsd_last_transno))
lsd->lsd_last_transno = cpu_to_le64(last_rcvd);
} else {
last_rcvd = oti->oti_transno;
if (last_rcvd > le64_to_cpu(lsd->lsd_last_transno))
lsd->lsd_last_transno = cpu_to_le64(last_rcvd);
+ if (unlikely(last_rcvd < le64_to_cpu(lcd->lcd_last_transno))) {
+ CERROR("Trying to overwrite bigger transno, on-disk: "
+ LPU64", new: "LPU64"\n",
+ le64_to_cpu(lcd->lcd_last_transno), last_rcvd);
+ cfs_spin_lock(&exp->exp_lock);
+ exp->exp_vbr_failed = 1;
+ cfs_spin_unlock(&exp->exp_lock);
+ cfs_spin_unlock(&obt->obt_lut->lut_translock);
+ cfs_mutex_up(&ted->ted_lcd_lock);
+ RETURN(-EOVERFLOW);
+ }
}
oti->oti_transno = last_rcvd;
}
oti->oti_transno = last_rcvd;
- LASSERT(last_rcvd >= le64_to_cpu(lcd->lcd_last_transno));
lcd->lcd_last_transno = cpu_to_le64(last_rcvd);
lcd->lcd_pre_versions[0] = cpu_to_le64(oti->oti_pre_version);
lcd->lcd_last_xid = cpu_to_le64(oti->oti_xid);
lcd->lcd_last_transno = cpu_to_le64(last_rcvd);
lcd->lcd_pre_versions[0] = cpu_to_le64(oti->oti_pre_version);
lcd->lcd_last_xid = cpu_to_le64(oti->oti_xid);