RETURN(0);
}
-int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
+int target_handle_connect(struct ptlrpc_request *req)
{
struct obd_device *target;
struct obd_export *export = NULL;
struct obd_uuid remote_uuid;
struct list_head *p;
char *str, *tmp;
- int rc = 0, abort_recovery;
+ int rc = 0;
unsigned long flags;
int initial_conn = 0;
char peer_str[PTL_NALFMT_SIZE];
LBUG();
}
- spin_lock_bh(&target->obd_processing_task_lock);
- abort_recovery = target->obd_abort_recovery;
- spin_unlock_bh(&target->obd_processing_task_lock);
- if (abort_recovery)
- target_abort_recovery(target);
-
tmp = lustre_msg_buf(req->rq_reqmsg, 2, sizeof conn);
if (tmp == NULL)
GOTO(out, rc = -EPROTO);
target->obd_recovering ? "(recovering)" : "");
if (target->obd_recovering) {
lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECOVERING);
- target_start_recovery_timer(target, handler);
+ target_start_recovery_timer(target);
}
#if 0
/* Tell the client if we support replayable requests */
* Recovery functions
*/
+struct ptlrpc_request *
+ptlrpc_clone_req( struct ptlrpc_request *orig_req)
+{
+ struct ptlrpc_request *copy_req;
+ struct lustre_msg *copy_reqmsg;
+
+ OBD_ALLOC(copy_req, sizeof *copy_req);
+ if (!copy_req)
+ return NULL;
+ OBD_ALLOC(copy_reqmsg, orig_req->rq_reqlen);
+ if (!copy_reqmsg){
+ OBD_FREE(copy_req, sizeof *copy_req);
+ return NULL;
+ }
+
+ memcpy(copy_req, orig_req, sizeof *copy_req);
+ memcpy(copy_reqmsg, orig_req->rq_reqmsg, orig_req->rq_reqlen);
+ /* the copied req takes over the reply state */
+ orig_req->rq_reply_state = NULL;
+
+ copy_req->rq_reqmsg = copy_reqmsg;
+ class_export_get(copy_req->rq_export);
+ INIT_LIST_HEAD(©_req->rq_list);
+
+ return copy_req;
+}
+void ptlrpc_free_clone( struct ptlrpc_request *req)
+{
+ class_export_put(req->rq_export);
+ list_del(&req->rq_list);
+ OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
+ OBD_FREE(req, sizeof *req);
+}
+
static void abort_delayed_replies(struct obd_device *obd)
{
struct ptlrpc_request *req;
}
}
-void target_abort_recovery(void *data)
+static void target_abort_recovery(void *data)
{
struct obd_device *obd = data;
int rc;
CERROR("disconnecting clients and aborting recovery\n");
- spin_lock_bh(&obd->obd_processing_task_lock);
- if (!obd->obd_recovering) {
- spin_unlock_bh(&obd->obd_processing_task_lock);
- EXIT;
- return;
- }
-
- obd->obd_recovering = obd->obd_abort_recovery = 0;
-
- wake_up(&obd->obd_next_transno_waitq);
- target_cancel_recovery_timer(obd);
- spin_unlock_bh(&obd->obd_processing_task_lock);
+ LASSERT(!obd->obd_recovering);
class_disconnect_exports(obd, 0);
if (OBT(obd) && OBP(obd, postrecov)) {
rc = OBP(obd, postrecov)(obd);
if (rc >= 0)
- CWARN("Cleanup %d orphans after recovery was aborted\n", rc);
+ CWARN("Cleanup %d orphans after recovery was aborted\n",
+ rc);
else
CERROR("postrecov failed %d\n", rc);
}
/* Only start it the first time called */
-void target_start_recovery_timer(struct obd_device *obd, svc_handler_t handler)
+void target_start_recovery_timer(struct obd_device *obd)
{
spin_lock_bh(&obd->obd_processing_task_lock);
- if (obd->obd_recovery_handler) {
+ if (!obd->obd_recovering || timer_pending(&obd->obd_recovery_timer)) {
spin_unlock_bh(&obd->obd_processing_task_lock);
return;
}
CWARN("%s: starting recovery timer (%us)\n", obd->obd_name,
OBD_RECOVERY_TIMEOUT / HZ);
- obd->obd_recovery_handler = handler;
obd->obd_recovery_timer.function = target_recovery_expired;
obd->obd_recovery_timer.data = (unsigned long)obd;
+ mod_timer(&obd->obd_recovery_timer, jiffies + OBD_RECOVERY_TIMEOUT);
spin_unlock_bh(&obd->obd_processing_task_lock);
+}
+
+static void target_finish_recovery(struct obd_device *obd)
+{
+ struct list_head *tmp, *n;
+ int rc2;
+
+ ldlm_reprocess_all_ns(obd->obd_namespace);
+
+ CWARN("%s: all clients recovered, calling postrecov\n",
+ obd->obd_name);
+ /* when recovery finished, cleanup orphans on mds and ost */
+ if (OBT(obd) && OBP(obd, postrecov)) {
+ rc2 = OBP(obd, postrecov)(obd);
+ if (rc2 >= 0)
+ CWARN("%s: all clients recovered, %d MDS "
+ "orphans deleted\n", obd->obd_name, rc2);
+ else
+ CERROR("postrecov failed %d\n", rc2);
+ }
- reset_recovery_timer(obd);
+ CWARN("%s: recovery over, sending delayed replies\n",
+ obd->obd_name);
+ list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
+ struct ptlrpc_request *req;
+ req = list_entry(tmp, struct ptlrpc_request, rq_list);
+ DEBUG_REQ(D_ERROR, req, "delayed:");
+ ptlrpc_reply(req);
+ ptlrpc_free_clone(req);
+ }
+ ptlrpc_run_recovery_over_upcall(obd);
}
static int check_for_next_transno(struct obd_device *obd)
__u64 next_transno, req_transno;
spin_lock_bh(&obd->obd_processing_task_lock);
- req = list_entry(obd->obd_recovery_queue.next,
- struct ptlrpc_request, rq_list);
+ if (!list_empty(&obd->obd_recovery_queue)) {
+ req = list_entry(obd->obd_recovery_queue.next,
+ struct ptlrpc_request, rq_list);
+ req_transno = req->rq_reqmsg->transno;
+ } else {
+ req_transno = 0;
+ }
+
max = obd->obd_max_recoverable_clients;
- req_transno = req->rq_reqmsg->transno;
connected = obd->obd_connected_clients;
completed = max - obd->obd_recoverable_clients;
queue_len = obd->obd_requests_queued_for_recovery;
if (obd->obd_abort_recovery) {
CDEBUG(D_HA, "waking for aborted recovery\n");
wake_up = 1;
- } else if (!obd->obd_recovering) {
- CDEBUG(D_HA, "waking for completed recovery (?)\n");
+ } else if (max == completed) {
+ CDEBUG(D_HA, "waking for completed recovery\n");
wake_up = 1;
} else if (req_transno == next_transno) {
CDEBUG(D_HA, "waking for next ("LPD64")\n", next_transno);
wake_up = 1;
} else if (queue_len + completed == max) {
+ LASSERT(req->rq_reqmsg->transno >= next_transno);
CDEBUG(D_ERROR,
"waking for skipped transno (skip: "LPD64
", ql: %d, comp: %d, conn: %d, next: "LPD64")\n",
wake_up = 1;
}
spin_unlock_bh(&obd->obd_processing_task_lock);
- LASSERT(req->rq_reqmsg->transno >= next_transno);
+
return wake_up;
}
-static void process_recovery_queue(struct obd_device *obd)
+static struct ptlrpc_request *
+target_next_replay_req(struct obd_device *obd)
{
- struct ptlrpc_request *req;
- int abort_recovery = 0;
struct l_wait_info lwi = { 0 };
- ENTRY;
+ struct ptlrpc_request *req;
- for (;;) {
- spin_lock_bh(&obd->obd_processing_task_lock);
- LASSERT(obd->obd_processing_task == current->pid);
+ CDEBUG(D_HA, "Waiting for transno "LPD64"\n",
+ obd->obd_next_recovery_transno);
+ l_wait_event(obd->obd_next_transno_waitq,
+ check_for_next_transno(obd), &lwi);
+
+ spin_lock_bh(&obd->obd_processing_task_lock);
+ if (obd->obd_abort_recovery) {
+ req = NULL;
+ } else if (!list_empty(&obd->obd_recovery_queue)) {
req = list_entry(obd->obd_recovery_queue.next,
struct ptlrpc_request, rq_list);
+ list_del_init(&req->rq_list);
+ obd->obd_requests_queued_for_recovery--;
+ } else {
+ req = NULL;
+ }
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+ return req;
+}
- if (req->rq_reqmsg->transno != obd->obd_next_recovery_transno) {
- spin_unlock_bh(&obd->obd_processing_task_lock);
- CDEBUG(D_HA, "Waiting for transno "LPD64" (1st is "
- LPD64")\n",
- obd->obd_next_recovery_transno,
- req->rq_reqmsg->transno);
- l_wait_event(obd->obd_next_transno_waitq,
- check_for_next_transno(obd), &lwi);
+
+static int target_recovery_thread(void *arg)
+{
+ struct obd_device *obd = arg;
+ struct ptlrpc_request *req;
+ struct target_recovery_data *trd = &obd->obd_recovery_data;
+ unsigned long flags;
+ ENTRY;
+
+ kportal_daemonize("tgt-recov");
+
+ SIGNAL_MASK_LOCK(current, flags);
+ sigfillset(¤t->blocked);
+ RECALC_SIGPENDING;
+ SIGNAL_MASK_UNLOCK(current, flags);
+
+ CERROR("%s: started recovery thread pid %d\n", obd->obd_name,
+ current->pid);
+ trd->trd_processing_task = current->pid;
+
+ obd->obd_recovering = 1;
+ complete(&trd->trd_starting);
+
+ while (obd->obd_recovering) {
+ LASSERT(trd->trd_processing_task == current->pid);
+ req = target_next_replay_req(obd);
+ if (req != NULL) {
+ DEBUG_REQ(D_HA, req, "processing t"LPD64" : ",
+ req->rq_reqmsg->transno);
+ (void)trd->trd_recovery_handler(req);
+ obd->obd_replayed_requests++;
+ reset_recovery_timer(obd);
+ /* bug 1580: decide how to properly sync() in recovery*/
+ //mds_fsync_super(mds->mds_sb);
+ ptlrpc_free_clone(req);
spin_lock_bh(&obd->obd_processing_task_lock);
- abort_recovery = obd->obd_abort_recovery;
+ obd->obd_next_recovery_transno++;
spin_unlock_bh(&obd->obd_processing_task_lock);
- if (abort_recovery) {
- target_abort_recovery(obd);
- return;
+ } else {
+ /* recovery is over */
+ spin_lock_bh(&obd->obd_processing_task_lock);
+ obd->obd_recovering = 0;
+ target_cancel_recovery_timer(obd);
+ if (obd->obd_abort_recovery) {
+ obd->obd_abort_recovery = 0;
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+ target_abort_recovery(obd);
+ } else {
+ LASSERT(obd->obd_recoverable_clients == 0);
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+ target_finish_recovery(obd);
}
- continue;
}
- list_del_init(&req->rq_list);
- obd->obd_requests_queued_for_recovery--;
- spin_unlock_bh(&obd->obd_processing_task_lock);
+ }
- DEBUG_REQ(D_HA, req, "processing: ");
- (void)obd->obd_recovery_handler(req);
- obd->obd_replayed_requests++;
- reset_recovery_timer(obd);
- /* bug 1580: decide how to properly sync() in recovery */
- //mds_fsync_super(mds->mds_sb);
- class_export_put(req->rq_export);
- OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
- OBD_FREE(req, sizeof *req);
- spin_lock_bh(&obd->obd_processing_task_lock);
- obd->obd_next_recovery_transno++;
- if (list_empty(&obd->obd_recovery_queue)) {
- obd->obd_processing_task = 0;
- spin_unlock_bh(&obd->obd_processing_task_lock);
- break;
- }
+ trd->trd_processing_task = 0;
+ complete(&trd->trd_finishing);
+ return 0;
+}
+
+int target_start_recovery_thread(struct obd_device *obd, svc_handler_t handler)
+{
+ int rc = 0;
+ struct target_recovery_data *trd = &obd->obd_recovery_data;
+
+ memset(trd, 0, sizeof(*trd));
+ init_completion(&trd->trd_starting);
+ init_completion(&trd->trd_finishing);
+ trd->trd_recovery_handler = handler;
+
+ if (kernel_thread(target_recovery_thread, obd, 0) == 0)
+ wait_for_completion(&trd->trd_starting);
+ else
+ rc = -ECHILD;
+
+ return rc;
+}
+
+void target_stop_recovery_thread(struct obd_device *obd)
+{
+ spin_lock_bh(&obd->obd_processing_task_lock);
+ if (obd->obd_recovery_data.trd_processing_task > 0) {
+ struct target_recovery_data *trd = &obd->obd_recovery_data;
+ CERROR("%s: aborting recovery\n", obd->obd_name);
+ obd->obd_abort_recovery = 1;
+ wake_up(&obd->obd_next_transno_waitq);
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+ wait_for_completion(&trd->trd_finishing);
+ } else {
spin_unlock_bh(&obd->obd_processing_task_lock);
}
- EXIT;
}
int target_queue_recovery_request(struct ptlrpc_request *req,
struct list_head *tmp;
int inserted = 0;
__u64 transno = req->rq_reqmsg->transno;
- struct ptlrpc_request *saved_req;
- struct lustre_msg *reqmsg;
/* CAVEAT EMPTOR: The incoming request message has been swabbed
* (i.e. buflens etc are in my own byte order), but type-dependent
return 1;
}
- /* XXX If I were a real man, these LBUGs would be sane cleanups. */
- /* XXX just like the request-dup code in queue_final_reply */
- OBD_ALLOC(saved_req, sizeof *saved_req);
- if (!saved_req)
- LBUG();
- OBD_ALLOC(reqmsg, req->rq_reqlen);
- if (!reqmsg)
- LBUG();
-
- spin_lock_bh(&obd->obd_processing_task_lock);
/* If we're processing the queue, we want don't want to queue this
* message.
* Also, a resent, replayed request that has already been
* handled will pass through here and be processed immediately.
*/
- if (obd->obd_processing_task == current->pid ||
+ spin_lock_bh(&obd->obd_processing_task_lock);
+ if (obd->obd_recovery_data.trd_processing_task == current->pid ||
transno < obd->obd_next_recovery_transno) {
/* Processing the queue right now, don't re-add. */
lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
LASSERT(list_empty(&req->rq_list));
spin_unlock_bh(&obd->obd_processing_task_lock);
- OBD_FREE(reqmsg, req->rq_reqlen);
- OBD_FREE(saved_req, sizeof *saved_req);
return 1;
}
+ spin_unlock_bh(&obd->obd_processing_task_lock);
/* A resent, replayed request that is still on the queue; just drop it.
The queued request will handle this. */
- if ((lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT | MSG_REPLAY)) ==
- (MSG_RESENT | MSG_REPLAY)) {
+ if ((lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT | MSG_REPLAY))
+ == (MSG_RESENT | MSG_REPLAY)) {
DEBUG_REQ(D_ERROR, req, "dropping resent queued req");
- spin_unlock_bh(&obd->obd_processing_task_lock);
- OBD_FREE(reqmsg, req->rq_reqlen);
- OBD_FREE(saved_req, sizeof *saved_req);
return 0;
}
- memcpy(saved_req, req, sizeof *req);
- memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
- req = saved_req;
- req->rq_reqmsg = reqmsg;
- class_export_get(req->rq_export);
- INIT_LIST_HEAD(&req->rq_list);
+ req = ptlrpc_clone_req(req);
+ if (req == NULL)
+ return -ENOMEM;
+
+ spin_lock_bh(&obd->obd_processing_task_lock);
/* XXX O(n^2) */
list_for_each(tmp, &obd->obd_recovery_queue) {
}
obd->obd_requests_queued_for_recovery++;
-
- if (obd->obd_processing_task != 0) {
- /* Someone else is processing this queue, we'll leave it to
- * them.
- */
- wake_up(&obd->obd_next_transno_waitq);
- spin_unlock_bh(&obd->obd_processing_task_lock);
- return 0;
- }
-
- /* Nobody is processing, and we know there's (at least) one to process
- * now, so we'll do the honours.
- */
- obd->obd_processing_task = current->pid;
+ wake_up(&obd->obd_next_transno_waitq);
spin_unlock_bh(&obd->obd_processing_task_lock);
- process_recovery_queue(obd);
return 0;
}
int target_queue_final_reply(struct ptlrpc_request *req, int rc)
{
struct obd_device *obd = target_req2obd(req);
- struct ptlrpc_request *saved_req;
- struct lustre_msg *reqmsg;
- int recovery_done = 0;
- int rc2;
LASSERT ((rc == 0) == (req->rq_reply_state != NULL));
LASSERT (!req->rq_reply_state->rs_difficult);
LASSERT(list_empty(&req->rq_list));
- /* XXX a bit like the request-dup code in queue_recovery_request */
- OBD_ALLOC(saved_req, sizeof *saved_req);
- if (!saved_req)
- LBUG();
- OBD_ALLOC(reqmsg, req->rq_reqlen);
- if (!reqmsg)
- LBUG();
- memcpy(saved_req, req, sizeof *saved_req);
- memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
- /* the copied req takes over the reply state */
- req->rq_reply_state = NULL;
- req = saved_req;
- req->rq_reqmsg = reqmsg;
- class_export_get(req->rq_export);
- list_add(&req->rq_list, &obd->obd_delayed_reply_queue);
+
+ req = ptlrpc_clone_req(req);
spin_lock_bh(&obd->obd_processing_task_lock);
+
+ list_add(&req->rq_list, &obd->obd_delayed_reply_queue);
+
/* only count the first "replay over" request from each
export */
if (req->rq_export->exp_replay_needed) {
--obd->obd_recoverable_clients;
req->rq_export->exp_replay_needed = 0;
- }
- recovery_done = (obd->obd_recoverable_clients == 0);
- spin_unlock_bh(&obd->obd_processing_task_lock);
-
- if (recovery_done) {
- struct list_head *tmp, *n;
- ldlm_reprocess_all_ns(req->rq_export->exp_obd->obd_namespace);
- CWARN("%s: all clients recovered, sending delayed replies\n",
- obd->obd_name);
- spin_lock_bh(&obd->obd_processing_task_lock);
- obd->obd_recovering = 0;
- target_cancel_recovery_timer(obd);
- spin_unlock_bh(&obd->obd_processing_task_lock);
-
- /* when recovery finished, cleanup orphans on mds and ost */
- if (OBT(obd) && OBP(obd, postrecov)) {
- rc2 = OBP(obd, postrecov)(obd);
- if (rc2 >= 0)
- CWARN("%s: all clients recovered, %d MDS "
- "orphans deleted\n", obd->obd_name, rc2);
- else
- CERROR("postrecov failed %d\n", rc2);
- }
-
- list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
- req = list_entry(tmp, struct ptlrpc_request, rq_list);
- DEBUG_REQ(D_ERROR, req, "delayed:");
- ptlrpc_reply(req);
- class_export_put(req->rq_export);
- list_del(&req->rq_list);
- OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
- OBD_FREE(req, sizeof *req);
- }
- ptlrpc_run_recovery_over_upcall(obd);
- } else {
CWARN("%s: %d recoverable clients remain\n",
- obd->obd_name, obd->obd_recoverable_clients);
- wake_up(&obd->obd_next_transno_waitq);
+ obd->obd_name, obd->obd_recoverable_clients);
}
-
+ wake_up(&obd->obd_next_transno_waitq);
+ spin_unlock_bh(&obd->obd_processing_task_lock);
return 1;
}