Whamcloud - gitweb
- safe operations with recovery queues
authortappro <tappro>
Mon, 24 Nov 2008 17:32:04 +0000 (17:32 +0000)
committertappro <tappro>
Mon, 24 Nov 2008 17:32:04 +0000 (17:32 +0000)
  b:16711
  i:adilger,green

lustre/ldlm/ldlm_lib.c

index 49fd0bf..3cd7bda 100644 (file)
@@ -1098,7 +1098,7 @@ void ptlrpc_free_clone(struct ptlrpc_request *req)
         ptlrpc_req_drop_rs(req);
         sptlrpc_svc_ctx_decref(req);
         class_export_rpc_put(req->rq_export);
-        list_del(&req->rq_list);
+        list_del_init(&req->rq_list);
 
         if (req->rq_user_desc) {
                 int ngroups = req->rq_user_desc->pud_ngroups;
@@ -1158,6 +1158,21 @@ static void target_finish_recovery(struct obd_device *obd)
                       obd->obd_name);
 
         ldlm_reprocess_all_ns(obd->obd_namespace);
+        spin_lock_bh(&obd->obd_processing_task_lock);
+        if (list_empty(&obd->obd_req_replay_queue) &&
+            list_empty(&obd->obd_lock_replay_queue) &&
+            list_empty(&obd->obd_final_req_queue)) {
+                obd->obd_processing_task = 0;
+        } else {
+                CERROR("%s: Recovery queues ( %s%s%s) are empty\n",
+                       obd->obd_name,
+                       list_empty(&obd->obd_req_replay_queue) ? "" : "req ",
+                       list_empty(&obd->obd_lock_replay_queue) ? "" : "lock ",
+                       list_empty(&obd->obd_final_req_queue) ? "" : "final ");
+                spin_unlock_bh(&obd->obd_processing_task_lock);
+                LBUG();
+        }
+        spin_unlock_bh(&obd->obd_processing_task_lock);
 
         /* when recovery finished, cleanup orphans on mds and ost */
         if (OBT(obd) && OBP(obd, postrecov)) {
@@ -1173,8 +1188,13 @@ static void target_finish_recovery(struct obd_device *obd)
 static void abort_req_replay_queue(struct obd_device *obd)
 {
         struct ptlrpc_request *req, *n;
+        struct list_head abort_list;
 
-        list_for_each_entry_safe(req, n, &obd->obd_req_replay_queue, rq_list) {
+        CFS_INIT_LIST_HEAD(&abort_list);
+        spin_lock_bh(&obd->obd_processing_task_lock);
+        list_splice_init(&obd->obd_req_replay_queue, &abort_list);
+        spin_unlock_bh(&obd->obd_processing_task_lock);
+        list_for_each_entry_safe(req, n, &abort_list, rq_list) {
                 DEBUG_REQ(D_WARNING, req, "aborted:");
                 req->rq_status = -ENOTCONN;
                 if (ptlrpc_error(req)) {
@@ -1189,7 +1209,12 @@ static void abort_req_replay_queue(struct obd_device *obd)
 static void abort_lock_replay_queue(struct obd_device *obd)
 {
         struct ptlrpc_request *req, *n;
+        struct list_head abort_list;
 
+        CFS_INIT_LIST_HEAD(&abort_list);
+        spin_lock_bh(&obd->obd_processing_task_lock);
+        list_splice_init(&obd->obd_lock_replay_queue, &abort_list);
+        spin_unlock_bh(&obd->obd_processing_task_lock);
         list_for_each_entry_safe(req, n, &obd->obd_lock_replay_queue, rq_list){
                 DEBUG_REQ(D_ERROR, req, "aborted:");
                 req->rq_status = -ENOTCONN;
@@ -1214,10 +1239,12 @@ static void abort_lock_replay_queue(struct obd_device *obd)
 void target_cleanup_recovery(struct obd_device *obd)
 {
         struct ptlrpc_request *req, *n;
+        struct list_head clean_list;
         ENTRY;
 
         LASSERT(obd->obd_stopping);
 
+        CFS_INIT_LIST_HEAD(&clean_list);
         spin_lock_bh(&obd->obd_processing_task_lock);
         if (!obd->obd_recovering) {
                 spin_unlock_bh(&obd->obd_processing_task_lock);
@@ -1226,19 +1253,23 @@ void target_cleanup_recovery(struct obd_device *obd)
         }
         obd->obd_recovering = obd->obd_abort_recovery = 0;
         target_cancel_recovery_timer(obd);
+
+        list_splice_init(&obd->obd_req_replay_queue, &clean_list);
         spin_unlock_bh(&obd->obd_processing_task_lock);
 
-        list_for_each_entry_safe(req, n, &obd->obd_req_replay_queue, rq_list) {
-                LASSERT (req->rq_reply_state == 0);
+        list_for_each_entry_safe(req, n, &clean_list, rq_list) {
+                LASSERT(req->rq_reply_state == 0);
                 target_exp_dequeue_req_replay(req);
                 ptlrpc_free_clone(req);
         }
-        list_for_each_entry_safe(req, n, &obd->obd_lock_replay_queue, rq_list){
-                LASSERT (req->rq_reply_state == 0);
-                ptlrpc_free_clone(req);
-        }
-        list_for_each_entry_safe(req, n, &obd->obd_final_req_queue, rq_list) {
-                LASSERT (req->rq_reply_state == 0);
+
+        spin_lock_bh(&obd->obd_processing_task_lock);
+        list_splice_init(&obd->obd_lock_replay_queue, &clean_list);
+        list_splice_init(&obd->obd_final_req_queue, &clean_list);
+        spin_unlock_bh(&obd->obd_processing_task_lock);
+
+        list_for_each_entry_safe(req, n, &clean_list, rq_list){
+                LASSERT(req->rq_reply_state == 0);
                 ptlrpc_free_clone(req);
         }