spin_unlock(&obd->obd_recovery_task_lock);
if (lut->lut_tdtd != NULL &&
- !list_empty(&lut->lut_tdtd->tdtd_replay_list))
+ (!list_empty(&lut->lut_tdtd->tdtd_replay_list) ||
+ !list_empty(&lut->lut_tdtd->tdtd_replay_finish_list))) {
dtrq_list_dump(lut->lut_tdtd, D_ERROR);
+ dtrq_list_destroy(lut->lut_tdtd);
+ }
obd->obd_recovery_end = cfs_time_current_sec();
{
struct ptlrpc_request *req = NULL;
struct obd_device *obd = lut->lut_obd;
+ struct target_distribute_txn_data *tdtd = lut->lut_tdtd;
int wake_up = 0, connected, completed, queue_len;
__u64 req_transno = 0;
__u64 update_transno = 0;
req_transno = lustre_msg_get_transno(req->rq_reqmsg);
}
- if (lut->lut_tdtd != NULL) {
- struct target_distribute_txn_data *tdtd;
-
- tdtd = lut->lut_tdtd;
- update_transno = distribute_txn_get_next_transno(lut->lut_tdtd);
- }
+ if (tdtd != NULL)
+ update_transno = distribute_txn_get_next_transno(tdtd);
connected = atomic_read(&obd->obd_connected_clients);
completed = connected - atomic_read(&obd->obd_req_replay_clients);
} else if (obd->obd_recovery_expired) {
CDEBUG(D_HA, "waking for expired recovery\n");
wake_up = 1;
+ } else if (tdtd != NULL && req != NULL &&
+ is_req_replayed_by_update(req)) {
+ LASSERTF(req_transno < next_transno, "req_transno "LPU64
+ "next_transno"LPU64"\n", req_transno, next_transno);
+ CDEBUG(D_HA, "waking for duplicate req ("LPU64")\n",
+ req_transno);
+ wake_up = 1;
} else if (req_transno == next_transno ||
(update_transno != 0 && update_transno <= next_transno)) {
CDEBUG(D_HA, "waking for next ("LPD64")\n", next_transno);
spin_lock(&obd->obd_recovery_task_lock);
transno = get_next_transno(lut, &type);
- if (type == REQUEST_RECOVERY && tdtd != NULL &&
- transno == tdtd->tdtd_last_update_transno) {
+ if (type == REQUEST_RECOVERY && transno != 0) {
/* Drop replay request from client side, if the
* replay has been executed by update with the
* same transno */
req = list_entry(obd->obd_req_replay_queue.next,
struct ptlrpc_request, rq_list);
+
list_del_init(&req->rq_list);
obd->obd_requests_queued_for_recovery--;
spin_unlock(&obd->obd_recovery_task_lock);
- drop_duplicate_replay_req(env, obd, req);
- } else if (type == REQUEST_RECOVERY && transno != 0) {
- req = list_entry(obd->obd_req_replay_queue.next,
- struct ptlrpc_request, rq_list);
- list_del_init(&req->rq_list);
- obd->obd_requests_queued_for_recovery--;
- spin_unlock(&obd->obd_recovery_task_lock);
+
+ /* Let's check if the request has been redone by
+ * update replay */
+ if (is_req_replayed_by_update(req)) {
+ struct distribute_txn_replay_req *dtrq;
+
+ dtrq = distribute_txn_lookup_finish_list(tdtd,
+ req->rq_xid);
+ LASSERT(dtrq != NULL);
+ spin_lock(&tdtd->tdtd_replay_list_lock);
+ list_del_init(&dtrq->dtrq_list);
+ spin_unlock(&tdtd->tdtd_replay_list_lock);
+ dtrq_destroy(dtrq);
+
+ drop_duplicate_replay_req(env, obd, req);
+
+ continue;
+ }
+
LASSERT(trd->trd_processing_task == current_pid());
DEBUG_REQ(D_HA, req, "processing t"LPD64" from %s",
lustre_msg_get_transno(req->rq_reqmsg),
tdtd->tdtd_replay_handler(env, tdtd, dtrq);
lu_context_exit(&thread->t_env->le_ctx);
extend_recovery_timer(obd, obd_timeout, true);
- LASSERT(tdtd->tdtd_last_update_transno <= transno);
- tdtd->tdtd_last_update_transno = transno;
+
+ /* Add it to the replay finish list */
+ spin_lock(&tdtd->tdtd_replay_list_lock);
+ if (dtrq->dtrq_xid != 0) {
+ CDEBUG(D_HA, "Move x"LPU64" t"LPU64
+ " to finish list\n", dtrq->dtrq_xid,
+ dtrq->dtrq_master_transno);
+ list_add(&dtrq->dtrq_list,
+ &tdtd->tdtd_replay_finish_list);
+ } else {
+ dtrq_destroy(dtrq);
+ }
+ spin_unlock(&tdtd->tdtd_replay_list_lock);
+
spin_lock(&obd->obd_recovery_task_lock);
- if (transno > obd->obd_next_recovery_transno)
- obd->obd_next_recovery_transno = transno;
+ if (transno == obd->obd_next_recovery_transno)
+ obd->obd_next_recovery_transno++;
+ else if (transno > obd->obd_next_recovery_transno)
+ obd->obd_next_recovery_transno = transno + 1;
spin_unlock(&obd->obd_recovery_task_lock);
- dtrq_destroy(dtrq);
+
} else {
spin_unlock(&obd->obd_recovery_task_lock);
LASSERT(list_empty(&obd->obd_req_replay_queue));
CDEBUG(D_HA, "Next recovery transno: "LPU64
", current: "LPU64", replaying\n",
obd->obd_next_recovery_transno, transno);
+
+ /* If the request has been replayed by update replay, then sends this
+ * request to the recovery thread (replay_request_or_update()), where
+ * it will be handled */
spin_lock(&obd->obd_recovery_task_lock);
- if (transno < obd->obd_next_recovery_transno) {
+ if (transno < obd->obd_next_recovery_transno &&
+ !is_req_replayed_by_update(req)) {
/* Processing the queue right now, don't re-add. */
LASSERT(list_empty(&req->rq_list));
spin_unlock(&obd->obd_recovery_task_lock);
list_del_init(&dtrq->dtrq_list);
dtrq_destroy(dtrq);
}
+ list_for_each_entry_safe(dtrq, tmp, &tdtd->tdtd_replay_finish_list,
+ dtrq_list) {
+ list_del_init(&dtrq->dtrq_list);
+ dtrq_destroy(dtrq);
+ }
spin_unlock(&tdtd->tdtd_replay_list_lock);
}
EXPORT_SYMBOL(dtrq_list_destroy);
}
EXPORT_SYMBOL(distribute_txn_get_next_transno);
+struct distribute_txn_replay_req *
+distribute_txn_lookup_finish_list(struct target_distribute_txn_data *tdtd,
+ __u64 xid)
+{
+ struct distribute_txn_replay_req *dtrq = NULL;
+ struct distribute_txn_replay_req *iter;
+
+ spin_lock(&tdtd->tdtd_replay_list_lock);
+ list_for_each_entry(iter, &tdtd->tdtd_replay_finish_list, dtrq_list) {
+ if (iter->dtrq_xid == xid) {
+ dtrq = iter;
+ break;
+ }
+ }
+ spin_unlock(&tdtd->tdtd_replay_list_lock);
+ return dtrq;
+}
+
+bool is_req_replayed_by_update(struct ptlrpc_request *req)
+{
+ struct lu_target *tgt = class_exp2tgt(req->rq_export);
+ struct distribute_txn_replay_req *dtrq;
+
+ if (tgt->lut_tdtd == NULL)
+ return false;
+
+ dtrq = distribute_txn_lookup_finish_list(tgt->lut_tdtd, req->rq_xid);
+ if (dtrq == NULL)
+ return false;
+
+ return true;
+}
+EXPORT_SYMBOL(is_req_replayed_by_update);
+
/**
* Check if the update of one object is committed
*
struct target_distribute_txn_data *tdtd,
struct thandle *th,
struct thandle *master_th,
+ struct distribute_txn_replay_req *dtrq,
struct tx_arg *ta_arg)
{
struct tgt_session_info *tsi;
tsi->tsi_opdata = lrd->lrd_data;
tsi->tsi_result = lrd->lrd_result;
tsi->tsi_client_gen = lrd->lrd_client_gen;
+ dtrq->dtrq_xid = lrd->lrd_xid;
top_th = container_of(th, struct top_thandle, tt_super);
top_th->tt_master_sub_thandle = master_th;
cfs_hash_putref(hash);
* tgt_last_rcvd_update() can be called correctly */
if (rc == 0 && dt_obj == tdtd->tdtd_lut->lut_reply_data)
update_recovery_update_ses(env, tdtd, th,
- st->st_sub_th, ta_arg);
+ st->st_sub_th, dtrq, ta_arg);
if (unlikely(rc < 0)) {
CDEBUG(D_HA, "error during execution of #%u from"
run_test 70b "dbench ${MDSCOUNT}mdts recovery; $CLIENTCOUNT clients"
# end multi-client tests
+random_fail_mdt() {
+ local max_index=$1
+ local duration=$2
+ local monitor_pid=$3
+ local elapsed
+ local start_ts=$(date +%s)
+ local num_failovers=0
+ local fail_index
+
+ elapsed=$(($(date +%s) - start_ts))
+ while [ $elapsed -lt $duration ]; do
+ fail_index=$((RANDOM%max_index+1))
+ kill -0 $monitor_pid ||
+ error "$monitor_pid stopped"
+ sleep 120
+ replay_barrier mds$fail_index
+ sleep 10
+ # Increment the number of failovers
+ num_failovers=$((num_failovers+1))
+ log "$TESTNAME fail mds$fail_index $num_failovers times"
+ fail mds$fail_index
+ elapsed=$(($(date +%s) - start_ts))
+ done
+}
+
+cleanup_70c() {
+ trap 0
+ kill -9 $tar_70c_pid
+}
+test_70c () {
+ local clients=${CLIENTS:-$HOSTNAME}
+ local rc=0
+
+ zconf_mount_clients $clients $MOUNT
+
+ local duration=300
+ [ "$SLOW" = "no" ] && duration=180
+ # set duration to 900 because it takes some time to boot node
+ [ "$FAILURE_MODE" = HARD ] && duration=600
+
+ local elapsed
+ local start_ts=$(date +%s)
+
+ trap cleanup_70c EXIT
+ (
+ while true; do
+ test_mkdir -p -c$MDSCOUNT $DIR/$tdir || break
+ if [ $MDSCOUNT -ge 2 ]; then
+ $LFS setdirstripe -D -c$MDSCOUNT $DIR/$tdir ||
+ error "set default dirstripe failed"
+ fi
+ cd $DIR/$tdir || break
+ tar cf - /etc | tar xf - || error "tar failed"
+ cd $DIR || break
+ rm -rf $DIR/$tdir || break
+ done
+ )&
+ tar_70c_pid=$!
+ echo "Started tar $tar_70c_pid"
+
+ random_fail_mdt $MDSCOUNT $duration $tar_70c_pid
+ kill -0 $tar_70c_pid || error "tar $tar_70c_pid stopped"
+
+ cleanup_70c
+ true
+}
+run_test 70c "tar ${MDSCOUNT}mdts recovery"
+
test_73a() {
multiop_bg_pause $DIR/$tfile O_tSc ||
error "multiop_bg_pause $DIR/$tfile failed"