+ EXIT;
+}
+
+/** Checking routines for recovery */
+static int check_for_recovery_ready(struct lu_target *lut)
+{
+ struct obd_device *obd = lut->lut_obd;
+ unsigned int clnts = atomic_read(&obd->obd_connected_clients);
+
+ CDEBUG(D_HA, "connected %d stale %d max_recoverable_clients %d"
+ " abort %d expired %d\n", clnts, obd->obd_stale_clients,
+ obd->obd_max_recoverable_clients, obd->obd_abort_recovery,
+ obd->obd_recovery_expired);
+
+ if (obd->obd_force_abort_recovery)
+ return 1;
+
+ if (!obd->obd_abort_recovery && !obd->obd_recovery_expired) {
+ LASSERT(clnts <= obd->obd_max_recoverable_clients);
+ if (clnts + obd->obd_stale_clients <
+ obd->obd_max_recoverable_clients)
+ return 0;
+ }
+
+ if (lut->lut_tdtd != NULL) {
+ if (!lut->lut_tdtd->tdtd_replay_ready) {
+ /* Let's extend recovery timer, in case the recovery
+ * timer expired, and some clients got evicted */
+ extend_recovery_timer(obd, obd->obd_recovery_timeout,
+ true);
+ return 0;
+ } else {
+ dtrq_list_dump(lut->lut_tdtd, D_HA);
+ }
+ }
+
+ return 1;
+}
+
+enum {
+ REQUEST_RECOVERY = 1,
+ UPDATE_RECOVERY = 2,
+};
+
+static __u64 get_next_replay_req_transno(struct obd_device *obd)
+{
+ __u64 transno = 0;
+
+ if (!list_empty(&obd->obd_req_replay_queue)) {
+ struct ptlrpc_request *req;
+
+ req = list_entry(obd->obd_req_replay_queue.next,
+ struct ptlrpc_request, rq_list);
+ transno = lustre_msg_get_transno(req->rq_reqmsg);
+ }
+
+ return transno;
+}
+__u64 get_next_transno(struct lu_target *lut, int *type)
+{
+ struct obd_device *obd = lut->lut_obd;
+ struct target_distribute_txn_data *tdtd = lut->lut_tdtd;
+ __u64 transno = 0;
+ __u64 update_transno;
+ ENTRY;
+
+ transno = get_next_replay_req_transno(obd);
+ if (type != NULL)
+ *type = REQUEST_RECOVERY;
+
+ if (tdtd == NULL)
+ RETURN(transno);
+
+ update_transno = distribute_txn_get_next_transno(tdtd);
+ if (transno == 0 || (transno >= update_transno &&
+ update_transno != 0)) {
+ transno = update_transno;
+ if (type != NULL)
+ *type = UPDATE_RECOVERY;
+ }
+
+ RETURN(transno);
+}
+
+/**
+ * drop duplicate replay request
+ *
+ * Because the operation has been replayed by update recovery, the request
+ * with the same transno will be dropped and also notify the client to send
+ * next replay request.
+ *
+ * \param[in] env execution environment
+ * \param[in] obd failover obd device
+ * \param[in] req request to be dropped
+ */
+static void drop_duplicate_replay_req(struct lu_env *env,
+ struct obd_device *obd,
+ struct ptlrpc_request *req)
+{
+ DEBUG_REQ(D_HA, req, "remove t"LPD64" from %s because of duplicate"
+ " update records are found.\n",
+ lustre_msg_get_transno(req->rq_reqmsg),
+ libcfs_nid2str(req->rq_peer.nid));
+
+ /* Right now, only for MDS reint operation update replay and
+ * normal request replay can have the same transno */
+ if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_REINT) {
+ req_capsule_set(&req->rq_pill, &RQF_MDS_REINT);
+ req->rq_status = req_capsule_server_pack(&req->rq_pill);
+ if (likely(req->rq_export))
+ target_committed_to_req(req);
+ lustre_msg_set_transno(req->rq_repmsg, req->rq_transno);
+ target_send_reply(req, req->rq_status, 0);
+ } else {
+ DEBUG_REQ(D_ERROR, req, "wrong opc" "from %s\n",
+ libcfs_nid2str(req->rq_peer.nid));
+ }
+ target_exp_dequeue_req_replay(req);
+ target_request_copy_put(req);
+ obd->obd_replayed_requests++;
+}
+
+/**
+ * Update last_rcvd of the update
+ *
+ * Because update recovery might update the last_rcvd by updates, i.e.
+ * it will not update the last_rcvd information in memory, so we need
+ * refresh these information in memory after update recovery.
+ *
+ * \param[in] obd obd_device under recoverying.
+ * \param[in] dtrq the update replay requests being replayed.
+ */
+static void target_update_lcd(struct lu_env *env, struct lu_target *lut,
+ struct distribute_txn_replay_req *dtrq)
+{
+ struct obd_device *obd = lut->lut_obd;
+ struct obd_export *export;
+ struct tg_export_data *ted;
+ struct distribute_txn_replay_req_sub *dtrqs;
+ struct seq_server_site *site;
+ struct update_records *ur;
+ const struct lu_fid *fid;
+ struct update_ops *ops;
+ struct update_params *params;
+ struct update_op *op;
+ __u32 mdt_index;
+ unsigned int i;
+ struct lsd_client_data *lcd = NULL;
+
+ /* if Updates has been executed(committed) on the recovery target,
+ * i.e. the updates is not being executed on the target, so we do
+ * not need update it in memory */
+ site = lu_site2seq(obd->obd_lu_dev->ld_site);
+ mdt_index = site->ss_node_id;
+ dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
+ if (dtrqs != NULL)
+ return;
+
+ if (dtrq->dtrq_lur == NULL)
+ return;
+
+ /* Find the update last_rcvd record */
+ fid = lu_object_fid(&lut->lut_last_rcvd->do_lu);
+ ur = &dtrq->dtrq_lur->lur_update_rec;
+ ops = &ur->ur_ops;
+ params = update_records_get_params(ur);
+ for (i = 0, op = &ops->uops_op[0]; i < ur->ur_update_count;
+ i++, op = update_op_next_op(op)) {
+ __u64 pos;
+ __u16 size;
+ void *buf;
+
+ if (!lu_fid_eq(&op->uop_fid, fid))
+ continue;
+
+ if (op->uop_type != OUT_WRITE)
+ continue;
+
+ buf = update_params_get_param_buf(params, op->uop_params_off[1],
+ ur->ur_param_count, NULL);
+ if (buf == NULL)
+ continue;
+
+ pos = le64_to_cpu(*(__u64 *)buf);
+ if (pos == 0)
+ continue;
+
+ buf = update_params_get_param_buf(params, op->uop_params_off[0],
+ ur->ur_param_count, &size);
+ if (buf == NULL)
+ continue;
+
+ if (size != sizeof(*lcd))
+ continue;
+ lcd = buf;
+ }
+
+ if (lcd == NULL || lcd->lcd_uuid[0] == '\0')
+ return;
+
+ /* locate the export then update the exp_target_data if needed */
+ export = cfs_hash_lookup(obd->obd_uuid_hash, lcd->lcd_uuid);
+ if (export == NULL)
+ return;
+
+ ted = &export->exp_target_data;
+ if (lcd->lcd_last_xid > ted->ted_lcd->lcd_last_xid) {
+ CDEBUG(D_HA, "%s update xid from "LPU64" to "LPU64"\n",
+ lut->lut_obd->obd_name, ted->ted_lcd->lcd_last_xid,
+ lcd->lcd_last_xid);
+ ted->ted_lcd->lcd_last_xid = lcd->lcd_last_xid;
+ ted->ted_lcd->lcd_last_result = lcd->lcd_last_result;
+ }
+ class_export_put(export);
+}
+
+static void replay_request_or_update(struct lu_env *env,
+ struct lu_target *lut,
+ struct target_recovery_data *trd,
+ struct ptlrpc_thread *thread)
+{
+ struct obd_device *obd = lut->lut_obd;
+ struct ptlrpc_request *req = NULL;
+ int type;
+ __u64 transno;
+ ENTRY;
+
+ CDEBUG(D_HA, "Waiting for transno "LPD64"\n",
+ obd->obd_next_recovery_transno);
+
+ /* Replay all of request and update by transno */
+ do {
+ struct target_distribute_txn_data *tdtd = lut->lut_tdtd;
+
+ CFS_FAIL_TIMEOUT(OBD_FAIL_TGT_REPLAY_DELAY2, cfs_fail_val);
+
+ /** It is needed to extend recovery window above
+ * recovery_time_soft. Extending is possible only in the
+ * end of recovery window (see more details in
+ * handle_recovery_req()).
+ */
+ CFS_FAIL_TIMEOUT_MS(OBD_FAIL_TGT_REPLAY_DELAY, 300);
+
+ if (target_recovery_overseer(lut, check_for_next_transno,
+ exp_req_replay_healthy)) {
+ abort_req_replay_queue(obd);
+ abort_lock_replay_queue(obd);
+ }
+
+ spin_lock(&obd->obd_recovery_task_lock);
+ transno = get_next_transno(lut, &type);
+ if (type == REQUEST_RECOVERY && tdtd != NULL &&
+ transno == tdtd->tdtd_last_update_transno) {
+ /* Drop replay request from client side, if the
+ * replay has been executed by update with the
+ * same transno */
+ req = list_entry(obd->obd_req_replay_queue.next,
+ struct ptlrpc_request, rq_list);
+ list_del_init(&req->rq_list);
+ obd->obd_requests_queued_for_recovery--;
+ spin_unlock(&obd->obd_recovery_task_lock);
+ drop_duplicate_replay_req(env, obd, req);
+ } else if (type == REQUEST_RECOVERY && transno != 0) {
+ req = list_entry(obd->obd_req_replay_queue.next,
+ struct ptlrpc_request, rq_list);
+ list_del_init(&req->rq_list);
+ obd->obd_requests_queued_for_recovery--;
+ spin_unlock(&obd->obd_recovery_task_lock);
+ LASSERT(trd->trd_processing_task == current_pid());
+ DEBUG_REQ(D_HA, req, "processing t"LPD64" from %s",
+ lustre_msg_get_transno(req->rq_reqmsg),
+ libcfs_nid2str(req->rq_peer.nid));
+
+ handle_recovery_req(thread, req,
+ trd->trd_recovery_handler);
+ /**
+ * bz18031: increase next_recovery_transno before
+ * target_request_copy_put() will drop exp_rpc reference
+ */
+ spin_lock(&obd->obd_recovery_task_lock);
+ obd->obd_next_recovery_transno++;
+ spin_unlock(&obd->obd_recovery_task_lock);
+ target_exp_dequeue_req_replay(req);
+ target_request_copy_put(req);
+ obd->obd_replayed_requests++;
+ } else if (type == UPDATE_RECOVERY && transno != 0) {
+ struct distribute_txn_replay_req *dtrq;
+
+ spin_unlock(&obd->obd_recovery_task_lock);
+
+ LASSERT(tdtd != NULL);
+ dtrq = distribute_txn_get_next_req(tdtd);
+ lu_context_enter(&thread->t_env->le_ctx);
+ tdtd->tdtd_replay_handler(env, tdtd, dtrq);
+ lu_context_exit(&thread->t_env->le_ctx);
+ extend_recovery_timer(obd, obd_timeout, true);
+ LASSERT(tdtd->tdtd_last_update_transno <= transno);
+ tdtd->tdtd_last_update_transno = transno;
+ spin_lock(&obd->obd_recovery_task_lock);
+ if (transno > obd->obd_next_recovery_transno)
+ obd->obd_next_recovery_transno = transno;
+ spin_unlock(&obd->obd_recovery_task_lock);
+ target_update_lcd(env, lut, dtrq);
+ dtrq_destory(dtrq);
+ } else {
+ spin_unlock(&obd->obd_recovery_task_lock);
+ LASSERT(list_empty(&obd->obd_req_replay_queue));
+ LASSERT(atomic_read(&obd->obd_req_replay_clients) == 0);
+ /** evict exports failed VBR */
+ class_disconnect_stale_exports(obd, exp_vbr_healthy);
+ break;
+ }
+ } while (1);