From 70c7cb3ae965f79d1b60f8c8d9e132c9f93c51f6 Mon Sep 17 00:00:00 2001 From: tappro Date: Mon, 24 Nov 2008 17:32:04 +0000 Subject: [PATCH] - safe operations with recovery queues b:16711 i:adilger,green --- lustre/ldlm/ldlm_lib.c | 51 ++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 41 insertions(+), 10 deletions(-) diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index 49fd0bf..3cd7bda 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -1098,7 +1098,7 @@ void ptlrpc_free_clone(struct ptlrpc_request *req) ptlrpc_req_drop_rs(req); sptlrpc_svc_ctx_decref(req); class_export_rpc_put(req->rq_export); - list_del(&req->rq_list); + list_del_init(&req->rq_list); if (req->rq_user_desc) { int ngroups = req->rq_user_desc->pud_ngroups; @@ -1158,6 +1158,21 @@ static void target_finish_recovery(struct obd_device *obd) obd->obd_name); ldlm_reprocess_all_ns(obd->obd_namespace); + spin_lock_bh(&obd->obd_processing_task_lock); + if (list_empty(&obd->obd_req_replay_queue) && + list_empty(&obd->obd_lock_replay_queue) && + list_empty(&obd->obd_final_req_queue)) { + obd->obd_processing_task = 0; + } else { + CERROR("%s: Recovery queues ( %s%s%s) are empty\n", + obd->obd_name, + list_empty(&obd->obd_req_replay_queue) ? "" : "req ", + list_empty(&obd->obd_lock_replay_queue) ? "" : "lock ", + list_empty(&obd->obd_final_req_queue) ? "" : "final "); + spin_unlock_bh(&obd->obd_processing_task_lock); + LBUG(); + } + spin_unlock_bh(&obd->obd_processing_task_lock); /* when recovery finished, cleanup orphans on mds and ost */ if (OBT(obd) && OBP(obd, postrecov)) { @@ -1173,8 +1188,13 @@ static void target_finish_recovery(struct obd_device *obd) static void abort_req_replay_queue(struct obd_device *obd) { struct ptlrpc_request *req, *n; + struct list_head abort_list; - list_for_each_entry_safe(req, n, &obd->obd_req_replay_queue, rq_list) { + CFS_INIT_LIST_HEAD(&abort_list); + spin_lock_bh(&obd->obd_processing_task_lock); + list_splice_init(&obd->obd_req_replay_queue, &abort_list); + spin_unlock_bh(&obd->obd_processing_task_lock); + list_for_each_entry_safe(req, n, &abort_list, rq_list) { DEBUG_REQ(D_WARNING, req, "aborted:"); req->rq_status = -ENOTCONN; if (ptlrpc_error(req)) { @@ -1189,7 +1209,12 @@ static void abort_req_replay_queue(struct obd_device *obd) static void abort_lock_replay_queue(struct obd_device *obd) { struct ptlrpc_request *req, *n; + struct list_head abort_list; + CFS_INIT_LIST_HEAD(&abort_list); + spin_lock_bh(&obd->obd_processing_task_lock); + list_splice_init(&obd->obd_lock_replay_queue, &abort_list); + spin_unlock_bh(&obd->obd_processing_task_lock); list_for_each_entry_safe(req, n, &obd->obd_lock_replay_queue, rq_list){ DEBUG_REQ(D_ERROR, req, "aborted:"); req->rq_status = -ENOTCONN; @@ -1214,10 +1239,12 @@ static void abort_lock_replay_queue(struct obd_device *obd) void target_cleanup_recovery(struct obd_device *obd) { struct ptlrpc_request *req, *n; + struct list_head clean_list; ENTRY; LASSERT(obd->obd_stopping); + CFS_INIT_LIST_HEAD(&clean_list); spin_lock_bh(&obd->obd_processing_task_lock); if (!obd->obd_recovering) { spin_unlock_bh(&obd->obd_processing_task_lock); @@ -1226,19 +1253,23 @@ void target_cleanup_recovery(struct obd_device *obd) } obd->obd_recovering = obd->obd_abort_recovery = 0; target_cancel_recovery_timer(obd); + + list_splice_init(&obd->obd_req_replay_queue, &clean_list); spin_unlock_bh(&obd->obd_processing_task_lock); - list_for_each_entry_safe(req, n, &obd->obd_req_replay_queue, rq_list) { - LASSERT (req->rq_reply_state == 0); + list_for_each_entry_safe(req, n, &clean_list, rq_list) { + LASSERT(req->rq_reply_state == 0); target_exp_dequeue_req_replay(req); ptlrpc_free_clone(req); } - list_for_each_entry_safe(req, n, &obd->obd_lock_replay_queue, rq_list){ - LASSERT (req->rq_reply_state == 0); - ptlrpc_free_clone(req); - } - list_for_each_entry_safe(req, n, &obd->obd_final_req_queue, rq_list) { - LASSERT (req->rq_reply_state == 0); + + spin_lock_bh(&obd->obd_processing_task_lock); + list_splice_init(&obd->obd_lock_replay_queue, &clean_list); + list_splice_init(&obd->obd_final_req_queue, &clean_list); + spin_unlock_bh(&obd->obd_processing_task_lock); + + list_for_each_entry_safe(req, n, &clean_list, rq_list){ + LASSERT(req->rq_reply_state == 0); ptlrpc_free_clone(req); } -- 1.8.3.1