Whamcloud - gitweb
merge b_devel to b_eq: 20030728
authorericm <ericm>
Mon, 28 Jul 2003 16:36:05 +0000 (16:36 +0000)
committerericm <ericm>
Mon, 28 Jul 2003 16:36:05 +0000 (16:36 +0000)
(5) osc ost ptlbd ptlrpc

lustre/ptlrpc/recov_thread.c

index 95b4151..45cda95 100644 (file)
@@ -33,8 +33,8 @@
 #endif
 
 #ifndef __KERNEL__
-#include <portals/list.h>
-#include <liblustre.h>
+# include <portals/list.h>
+# include <liblustre.h>
 #endif
 #include <linux/kp30.h>
 #include <linux/fs.h>
@@ -47,7 +47,6 @@
 #include <portals/list.h>
 #include "ptlrpc_internal.h"
 
-
 static struct llog_commit_master lustre_lcm;
 static struct llog_commit_master *lcm = &lustre_lcm;
 
@@ -126,15 +125,12 @@ static int log_commit_thread(void *arg)
         struct llog_commit_daemon *lcd;
         struct llog_commit_data *llcd, *n;
         unsigned long flags;
+        ENTRY;
 
         OBD_ALLOC(lcd, sizeof(*lcd));
         if (lcd == NULL)
                 RETURN(-ENOMEM);
 
-        INIT_LIST_HEAD(&lcd->lcd_lcm_list);
-        INIT_LIST_HEAD(&lcd->lcd_llcd_list);
-        lcd->lcd_lcm = lcm;
-
         lock_kernel();
         ptlrpc_daemonize(); /* thread never needs to do IO */
 
@@ -144,10 +140,16 @@ static int log_commit_thread(void *arg)
         SIGNAL_MASK_UNLOCK(current, flags);
 
         spin_lock(&lcm->lcm_thread_lock);
-        THREAD_NAME(current->comm, "ll_log_commit_%d", lcm->lcm_thread_total++);
+        THREAD_NAME(current->comm, "ll_log_commit_%d",
+                    atomic_read(&lcm->lcm_thread_total));
+        atomic_inc(&lcm->lcm_thread_total);
         spin_unlock(&lcm->lcm_thread_lock);
         unlock_kernel();
 
+        INIT_LIST_HEAD(&lcd->lcd_lcm_list);
+        INIT_LIST_HEAD(&lcd->lcd_llcd_list);
+        lcd->lcd_lcm = lcm;
+
         CDEBUG(D_HA, "%s started\n", current->comm);
         do {
                 struct ptlrpc_request *request;
@@ -172,8 +174,7 @@ static int log_commit_thread(void *arg)
                                          lcm->lcm_flags & LLOG_LCM_FL_EXIT);
 
                 /* If we are the last available thread, start a new one in case
-                 * we get blocked on an RPC (nobody else will start a new one).
-                 */
+                 * we get blocked on an RPC (nobody else will start a new one)*/
                 spin_lock(&lcm->lcm_thread_lock);
                 atomic_dec(&lcm->lcm_thread_numidle);
                 list_move(&lcd->lcd_lcm_list, &lcm->lcm_thread_busy);
@@ -192,7 +193,7 @@ static int log_commit_thread(void *arg)
                 }
 
                 if (atomic_read(&lcm->lcm_thread_numidle) <= 1 &&
-                    lcm->lcm_thread_total < lcm->lcm_thread_max) {
+                    atomic_read(&lcm->lcm_thread_total) < lcm->lcm_thread_max) {
                         rc = llog_start_commit_thread();
                         if (rc < 0)
                                 CERROR("error starting thread: rc %d\n", rc);
@@ -200,8 +201,7 @@ static int log_commit_thread(void *arg)
 
                 /* Move all of the pending cancels from the same OST off of
                  * the list, so we don't get multiple threads blocked and/or
-                 * doing upcalls on the same OST in case of failure.
-                 */
+                 * doing upcalls on the same OST in case of failure. */
                 spin_lock(&lcm->lcm_llcd_lock);
                 if (!list_empty(sending_list)) {
                         list_move_tail(sending_list->next,
@@ -289,7 +289,7 @@ static int log_commit_thread(void *arg)
         /* If we are force exiting, just drop all of the cookies. */
         if (lcm->lcm_flags & LLOG_LCM_FL_EXIT_FORCE) {
                 spin_lock(&lcm->lcm_llcd_lock);
-                list_splice(&lcm->lcm_llcd_pending,&lcd->lcd_llcd_list);
+                list_splice(&lcm->lcm_llcd_pending, &lcd->lcd_llcd_list);
                 list_splice(&lcm->lcm_llcd_resend, &lcd->lcd_llcd_list);
                 list_splice(&lcm->lcm_llcd_free, &lcd->lcd_llcd_list);
                 spin_unlock(&lcm->lcm_llcd_lock);
@@ -298,8 +298,14 @@ static int log_commit_thread(void *arg)
                         llcd_put(llcd);
         }
 
-        CDEBUG(D_HA, "%s exiting\n", current->comm);
         OBD_FREE(lcd, sizeof(*lcd));
+
+        spin_lock(&lcm->lcm_thread_lock);
+        atomic_dec(&lcm->lcm_thread_total);
+        spin_unlock(&lcm->lcm_thread_lock);
+        wake_up(&lcm->lcm_waitq);
+
+        CDEBUG(D_HA, "%s exiting\n", current->comm);
         return 0;
 }
 
@@ -308,10 +314,13 @@ int llog_start_commit_thread(void)
         int rc;
         ENTRY;
 
+        if (atomic_read(&lcm->lcm_thread_total) >= lcm->lcm_thread_max)
+                RETURN(0);
+
         rc = kernel_thread(log_commit_thread, lcm, CLONE_VM | CLONE_FILES);
         if (rc < 0) {
-                CERROR("error starting thread #%d: %d\n", lcm->lcm_thread_total,
-                       rc);
+                CERROR("error starting thread #%d: %d\n",
+                       atomic_read(&lcm->lcm_thread_total), rc);
                 RETURN(rc);
         }
 
@@ -332,10 +341,18 @@ int llog_init_commit_master(void)
         spin_lock_init(&lcm->lcm_llcd_lock);
         atomic_set(&lcm->lcm_llcd_numfree, 0);
         lcm->lcm_llcd_minfree = 0;
+        lcm->lcm_thread_max = 5;
         return 0;
 }
 
-int llog_cleanup_commit_master(void)
+int llog_cleanup_commit_master(int force)
 {
+        lcm->lcm_flags |= LLOG_LCM_FL_EXIT;
+        if (force)
+                lcm->lcm_flags |= LLOG_LCM_FL_EXIT_FORCE;
+        wake_up(&lcm->lcm_waitq);
+
+        wait_event_interruptible(lcm->lcm_waitq,
+                                 atomic_read(&lcm->lcm_thread_total) == 0);
         return 0;
 }