#endif
#ifndef __KERNEL__
-#include <portals/list.h>
-#include <liblustre.h>
+# include <portals/list.h>
+# include <liblustre.h>
#endif
#include <linux/kp30.h>
#include <linux/fs.h>
#include <portals/list.h>
#include "ptlrpc_internal.h"
-
static struct llog_commit_master lustre_lcm;
static struct llog_commit_master *lcm = &lustre_lcm;
struct llog_commit_daemon *lcd;
struct llog_commit_data *llcd, *n;
unsigned long flags;
+ ENTRY;
OBD_ALLOC(lcd, sizeof(*lcd));
if (lcd == NULL)
RETURN(-ENOMEM);
- INIT_LIST_HEAD(&lcd->lcd_lcm_list);
- INIT_LIST_HEAD(&lcd->lcd_llcd_list);
- lcd->lcd_lcm = lcm;
-
lock_kernel();
ptlrpc_daemonize(); /* thread never needs to do IO */
SIGNAL_MASK_UNLOCK(current, flags);
spin_lock(&lcm->lcm_thread_lock);
- THREAD_NAME(current->comm, "ll_log_commit_%d", lcm->lcm_thread_total++);
+ THREAD_NAME(current->comm, "ll_log_commit_%d",
+ atomic_read(&lcm->lcm_thread_total));
+ atomic_inc(&lcm->lcm_thread_total);
spin_unlock(&lcm->lcm_thread_lock);
unlock_kernel();
+ INIT_LIST_HEAD(&lcd->lcd_lcm_list);
+ INIT_LIST_HEAD(&lcd->lcd_llcd_list);
+ lcd->lcd_lcm = lcm;
+
CDEBUG(D_HA, "%s started\n", current->comm);
do {
struct ptlrpc_request *request;
lcm->lcm_flags & LLOG_LCM_FL_EXIT);
/* If we are the last available thread, start a new one in case
- * we get blocked on an RPC (nobody else will start a new one).
- */
+ * we get blocked on an RPC (nobody else will start a new one)*/
spin_lock(&lcm->lcm_thread_lock);
atomic_dec(&lcm->lcm_thread_numidle);
list_move(&lcd->lcd_lcm_list, &lcm->lcm_thread_busy);
}
if (atomic_read(&lcm->lcm_thread_numidle) <= 1 &&
- lcm->lcm_thread_total < lcm->lcm_thread_max) {
+ atomic_read(&lcm->lcm_thread_total) < lcm->lcm_thread_max) {
rc = llog_start_commit_thread();
if (rc < 0)
CERROR("error starting thread: rc %d\n", rc);
/* Move all of the pending cancels from the same OST off of
* the list, so we don't get multiple threads blocked and/or
- * doing upcalls on the same OST in case of failure.
- */
+ * doing upcalls on the same OST in case of failure. */
spin_lock(&lcm->lcm_llcd_lock);
if (!list_empty(sending_list)) {
list_move_tail(sending_list->next,
/* If we are force exiting, just drop all of the cookies. */
if (lcm->lcm_flags & LLOG_LCM_FL_EXIT_FORCE) {
spin_lock(&lcm->lcm_llcd_lock);
- list_splice(&lcm->lcm_llcd_pending,&lcd->lcd_llcd_list);
+ list_splice(&lcm->lcm_llcd_pending, &lcd->lcd_llcd_list);
list_splice(&lcm->lcm_llcd_resend, &lcd->lcd_llcd_list);
list_splice(&lcm->lcm_llcd_free, &lcd->lcd_llcd_list);
spin_unlock(&lcm->lcm_llcd_lock);
llcd_put(llcd);
}
- CDEBUG(D_HA, "%s exiting\n", current->comm);
OBD_FREE(lcd, sizeof(*lcd));
+
+ spin_lock(&lcm->lcm_thread_lock);
+ atomic_dec(&lcm->lcm_thread_total);
+ spin_unlock(&lcm->lcm_thread_lock);
+ wake_up(&lcm->lcm_waitq);
+
+ CDEBUG(D_HA, "%s exiting\n", current->comm);
return 0;
}
int rc;
ENTRY;
+ if (atomic_read(&lcm->lcm_thread_total) >= lcm->lcm_thread_max)
+ RETURN(0);
+
rc = kernel_thread(log_commit_thread, lcm, CLONE_VM | CLONE_FILES);
if (rc < 0) {
- CERROR("error starting thread #%d: %d\n", lcm->lcm_thread_total,
- rc);
+ CERROR("error starting thread #%d: %d\n",
+ atomic_read(&lcm->lcm_thread_total), rc);
RETURN(rc);
}
spin_lock_init(&lcm->lcm_llcd_lock);
atomic_set(&lcm->lcm_llcd_numfree, 0);
lcm->lcm_llcd_minfree = 0;
+ lcm->lcm_thread_max = 5;
return 0;
}
-int llog_cleanup_commit_master(void)
+int llog_cleanup_commit_master(int force)
{
+ lcm->lcm_flags |= LLOG_LCM_FL_EXIT;
+ if (force)
+ lcm->lcm_flags |= LLOG_LCM_FL_EXIT_FORCE;
+ wake_up(&lcm->lcm_waitq);
+
+ wait_event_interruptible(lcm->lcm_waitq,
+ atomic_read(&lcm->lcm_thread_total) == 0);
return 0;
}