*/
static void llcd_free(struct llog_canceld_ctxt *llcd)
{
+ LASSERT(atomic_read(&llcd_count) > 0);
OBD_SLAB_FREE(llcd, llcd_cache, llcd->llcd_size);
atomic_dec(&llcd_count);
}
*/
static void llcd_put(struct llog_ctxt *ctxt)
{
+ struct llog_commit_master *lcm;
struct llog_canceld_ctxt *llcd;
+ lcm = ctxt->loc_lcm;
llcd = llcd_detach(ctxt);
if (llcd)
llcd_free(llcd);
+
+ if (atomic_read(&lcm->lcm_count) == 0)
+ cfs_waitq_signal(&lcm->lcm_waitq);
}
/**
return rc;
}
-static atomic_t llog_tcount = ATOMIC_INIT(0);
-
/**
* Start recovery thread which actually deals llcd sending. This
* is all ptlrpc standard thread based so there is not much of work
RETURN(rc);
}
lcm->lcm_set = lcm->lcm_pc.pc_set;
- atomic_inc(&llog_tcount);
-
RETURN(rc);
}
EXPORT_SYMBOL(llog_recov_thread_start);
*/
void llog_recov_thread_stop(struct llog_commit_master *lcm, int force)
{
+ struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
ENTRY;
-
+
/**
* Let all know that we're stopping. This will also make
* llcd_send() refuse any new llcds.
* for processing now.
*/
ptlrpcd_stop(&lcm->lcm_pc, force);
-
+
/*
- * No llcds on this @lcm should left.
+ * Wait for llcd number == 0. Note, this is infinite wait.
+ * All other parts should make sure that no lost llcd is left.
*/
- LASSERTF(atomic_read(&lcm->lcm_count) == 0,
- "Busy llcds found on lcm %p - (%d)\n",
- lcm, atomic_read(&lcm->lcm_count));
+ l_wait_event(lcm->lcm_waitq,
+ atomic_read(&lcm->lcm_count) == 0, &lwi);
EXIT;
}
EXPORT_SYMBOL(llog_recov_thread_stop);
RETURN(NULL);
/*
- * Try to create threads with unique names and user id.
+ * Try to create threads with unique names.
*/
snprintf(lcm->lcm_name, sizeof(lcm->lcm_name),
- "ll_log_commit_%s_%02d", name,
- atomic_read(&llog_tcount));
+ "ll_log_commit_%s", name);
strncpy(lcm->lcm_name, name, sizeof(lcm->lcm_name));
+ cfs_waitq_init(&lcm->lcm_waitq);
atomic_set(&lcm->lcm_count, 0);
rc = llog_recov_thread_start(lcm);
if (rc) {
}
EXPORT_SYMBOL(llog_recov_thread_fini);
-static int llog_obd_repl_generic(struct llog_ctxt *ctxt,
- void *handle, void *arg)
+static int llog_recov_thread_replay(struct llog_ctxt *ctxt,
+ void *cb, void *arg)
{
struct obd_device *obd = ctxt->loc_obd;
struct llog_process_cat_args *lpca;
if (!lpca)
RETURN(-ENOMEM);
- lpca->lpca_cb = handle;
+ lpca->lpca_cb = cb;
lpca->lpca_arg = arg;
/*
RETURN(rc);
}
-int llog_obd_repl_connect(struct llog_ctxt *ctxt, int count,
+int llog_obd_repl_connect(struct llog_ctxt *ctxt, int count,
struct llog_logid *logid, struct llog_gen *gen,
struct obd_uuid *uuid)
{
- struct llog_canceld_ctxt *llcd;
int rc;
ENTRY;
- mutex_down(&ctxt->loc_sem);
-
/*
- * Send back cached llcd before recovery from llog if we have any.
+ * Send back cached llcd from llog before recovery if we have any.
+ * This is void is nothing cached is found there.
*/
- if (ctxt->loc_llcd) {
- CWARN("Llcd %p:%p is not empty\n", ctxt->loc_llcd, ctxt);
- mutex_up(&ctxt->loc_sem);
- llog_sync(ctxt, NULL);
- mutex_down(&ctxt->loc_sem);
- }
-
- llcd = llcd_get(ctxt);
- if (!llcd) {
- mutex_up(&ctxt->loc_sem);
- RETURN(-ENOMEM);
- }
+ llog_sync(ctxt, NULL);
+ /*
+ * Start recovery in separate thread.
+ */
+ mutex_down(&ctxt->loc_sem);
ctxt->loc_gen = *gen;
-
- rc = llog_obd_repl_generic(ctxt, ctxt->llog_proc_cb, logid);
- if (rc != 0) {
- llcd_put(ctxt);
- CERROR("Error recovery process: %d\n", rc);
- }
+ rc = llog_recov_thread_replay(ctxt, ctxt->llog_proc_cb, logid);
mutex_up(&ctxt->loc_sem);
+
RETURN(rc);
}
EXPORT_SYMBOL(llog_obd_repl_connect);
struct lov_stripe_md *lsm, int count,
struct llog_cookie *cookies, int flags)
{
+ struct llog_commit_master *lcm;
struct llog_canceld_ctxt *llcd;
int rc = 0;
ENTRY;
LASSERT(ctxt != NULL);
mutex_down(&ctxt->loc_sem);
+ lcm = ctxt->loc_lcm;
/*
* Let's check if we have all structures alive. We also check for
GOTO(out, rc = -ENODEV);
}
- if (test_bit(LLOG_LCM_FL_EXIT, &ctxt->loc_lcm->lcm_flags)) {
+ if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags)) {
CDEBUG(D_RPCTRACE, "Commit thread is stopping for ctxt %p\n",
ctxt);
GOTO(out, rc = -ENODEV);
llcd = llcd_get(ctxt);
if (!llcd)
GOTO(out, rc = -ENOMEM);
+ /*
+ * Allocation is successful, let's check for stop
+ * flag again to fall back as soon as possible.
+ */
+ if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags))
+ GOTO(out, rc = -ENODEV);
}
/*
llcd = llcd_get(ctxt);
if (!llcd)
GOTO(out, rc = -ENOMEM);
+ /*
+ * Allocation is successful, let's check for stop
+ * flag again to fall back as soon as possible.
+ */
+ if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags))
+ GOTO(out, rc = -ENODEV);
}
/*
}
EXIT;
out:
+ if (rc)
+ llcd_put(ctxt);
mutex_up(&ctxt->loc_sem);
return rc;
}
int rc = 0;
ENTRY;
+ mutex_down(&ctxt->loc_sem);
if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) {
CDEBUG(D_RPCTRACE, "Reverse import disconnect\n");
/*
* Check for llcd which might be left attached to @ctxt.
* Let's kill it.
*/
- mutex_down(&ctxt->loc_sem);
llcd_put(ctxt);
mutex_up(&ctxt->loc_sem);
} else {
+ mutex_up(&ctxt->loc_sem);
rc = llog_cancel(ctxt, NULL, 0, NULL, OBD_LLOG_FL_SENDNOW);
}
RETURN(rc);
*/
void llog_recov_fini(void)
{
- int count;
-
/*
* Kill llcd cache when thread is stopped and we're sure no
* llcd in use left.
* In 2.6.22 cfs_mem_cache_destroy() will not return error
* for busy resources. Let's check it another way.
*/
- count = atomic_read(&llcd_count);
- LASSERTF(count == 0, "Can't destroy llcd cache! Number of "
- "busy llcds: %d\n", count);
+ LASSERTF(atomic_read(&llcd_count) == 0,
+ "Can't destroy llcd cache! Number of "
+ "busy llcds: %d\n", atomic_read(&llcd_count));
cfs_mem_cache_destroy(llcd_cache);
llcd_cache = NULL;
}