/* Tell the client if we're in recovery. */
/* If this is the first client, start the recovery timer */
- CWARN("%s: connection from %s@%s/%lu %s\n", target->obd_name, cluuid.uuid,
+ CWARN("%s: connection from %s@%s/%lu %st"LPU64"\n", target->obd_name, cluuid.uuid,
ptlrpc_peernid2str(&req->rq_peer, peer_str), *cfp,
- target->obd_recovering ? "(recovering)" : "");
+ target->obd_recovering ? "recovering/" : "",
+ conn_data->transno);
if (target->obd_recovering) {
lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECOVERING);
GOTO(out, rc = 0);
}
- if (target->obd_recovering)
+ spin_lock_bh(&target->obd_processing_task_lock);
+ if (target->obd_recovering && export->exp_connected == 0) {
+ __u64 t = conn_data->transno;
+ export->exp_connected = 1;
+ if ((lustre_msg_get_op_flags(req->rq_reqmsg) & MSG_CONNECT_TRANSNO)
+ && t < target->obd_next_recovery_transno)
+ target->obd_next_recovery_transno = t;
target->obd_connected_clients++;
+ if (target->obd_connected_clients == target->obd_max_recoverable_clients)
+ wake_up(&target->obd_next_transno_waitq);
+ }
+ spin_unlock_bh(&target->obd_processing_task_lock);
memcpy(&conn, lustre_msg_buf(req->rq_reqmsg, offset + 2, sizeof(conn)),
sizeof(conn));
max = obd->obd_max_recoverable_clients;
connected = obd->obd_connected_clients;
- completed = max - atomic_read(&obd->obd_req_replay_clients);
+ completed = max - obd->obd_recoverable_clients;
queue_len = obd->obd_requests_queued_for_recovery;
next_transno = obd->obd_next_recovery_transno;
wake_up = 1;
} else if (queue_len + completed == max) {
LASSERT(req->rq_reqmsg->transno >= next_transno);
- CDEBUG(D_ERROR,
+ CDEBUG(req_transno > obd->obd_last_committed ? D_ERROR : D_HA,
"waking for skipped transno (skip: "LPD64
", ql: %d, comp: %d, conn: %d, next: "LPD64")\n",
next_transno, queue_len, completed, max, req_transno);
obd->obd_next_recovery_transno = req_transno;
wake_up = 1;
+ } else if (queue_len == atomic_read(&obd->obd_req_replay_clients)) {
+ /* some clients haven't connected in time, but we need
+ * their requests to continue recovery. so, we abort ... */
+ CDEBUG(D_ERROR, "abort due to missed clients: queue: %d max: %d\n",
+ queue_len, max);
+ obd->obd_abort_recovery = 1;
+ wake_up = 1;
}
spin_unlock_bh(&obd->obd_processing_task_lock);
return 1;
}
+static int connect_done(struct obd_export *exp)
+{
+ if (exp->exp_connected)
+ return 1;
+ return 0;
+}
+
+static int check_for_clients(struct obd_device *obd)
+{
+ if (obd->obd_abort_recovery)
+ return 1;
+ LASSERT(obd->obd_connected_clients <= obd->obd_max_recoverable_clients);
+ if (obd->obd_connected_clients == obd->obd_max_recoverable_clients)
+ return 1;
+ return 0;
+}
+
static int target_recovery_thread(void *arg)
{
struct obd_device *obd = arg;
struct ptlrpc_request *req;
struct target_recovery_data *trd = &obd->obd_recovery_data;
char peer_str[PTL_NALFMT_SIZE];
+ struct l_wait_info lwi = { 0 };
unsigned long flags;
ENTRY;
obd->obd_recovering = 1;
complete(&trd->trd_starting);
- /* The first stage: replay requests */
- CWARN("1: request replay stage - %d clients\n",
- atomic_read(&obd->obd_req_replay_clients));
+ /* first of all, we have to know the first transno to replay */
+ obd->obd_abort_recovery = 0;
+ l_wait_event(obd->obd_next_transno_waitq,
+ check_for_clients(obd), &lwi);
+
+ spin_lock_bh(&obd->obd_processing_task_lock);
+ target_cancel_recovery_timer(obd);
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+
+ /* If some clients haven't connected in time, evict them */
+ if (obd->obd_abort_recovery) {
+ int stale;
+ CERROR("some clients haven't connect in time, evict them ...\n");
+ obd->obd_abort_recovery = 0;
+ stale = class_disconnect_stale_exports(obd, connect_done, 0);
+ atomic_sub(stale, &obd->obd_req_replay_clients);
+ atomic_sub(stale, &obd->obd_lock_replay_clients);
+ }
+
+ /* next stage: replay requests */
+ CWARN("1: request replay stage - %d clients from t"LPU64"\n",
+ atomic_read(&obd->obd_req_replay_clients),
+ obd->obd_next_recovery_transno);
while ((req = target_next_replay_req(obd))) {
LASSERT(trd->trd_processing_task == current->pid);
DEBUG_REQ(D_HA, req, "processing t"LPD64" from %s: ",
init_completion(&trd->trd_finishing);
trd->trd_recovery_handler = handler;
- if (kernel_thread(target_recovery_thread, obd, 0) == 0)
+ if (kernel_thread(target_recovery_thread, obd, 0) > 0) {
wait_for_completion(&trd->trd_starting);
- else
+ LASSERT(obd->obd_recovering != 0);
+ } else
rc = -ECHILD;
return rc;
LASSERT(atomic_read(&obd->obd_req_replay_clients) > 0);
exp->exp_req_replay_needed = 0;
atomic_dec(&obd->obd_req_replay_clients);
+ obd->obd_recoverable_clients--;
if (atomic_read(&obd->obd_req_replay_clients) == 0) {
CDEBUG(D_HA, "all clients have replayed reqs\n");
wake_up(&obd->obd_next_transno_waitq);
* handled will pass through here and be processed immediately.
*/
spin_lock_bh(&obd->obd_processing_task_lock);
- if (transno < obd->obd_next_recovery_transno) {
+ if (transno < obd->obd_next_recovery_transno && check_for_clients(obd)) {
/* Processing the queue right now, don't re-add. */
LASSERT(list_empty(&req->rq_list));
spin_unlock_bh(&obd->obd_processing_task_lock);