#include <lustre_fsfilt.h>
#include <lprocfs_status.h>
#include <lustre_log.h>
-#include <lustre_commit_confd.h>
#include <libcfs/list.h>
#include <lustre_disk.h>
#include <lustre_quota.h>
int rc;
ENTRY;
- OBD_ALLOC(filter->fo_lcm, sizeof(struct llog_commit_master));
+ filter->fo_lcm = llog_recov_thread_init(obd->obd_name);
if (!filter->fo_lcm)
RETURN(-ENOMEM);
- rc = llog_init_commit_master((struct llog_commit_master *)
- filter->fo_lcm);
- if (rc)
- GOTO(cleanup, rc);
-
filter_mds_ost_repl_logops = llog_client_ops;
filter_mds_ost_repl_logops.lop_cancel = llog_obd_repl_cancel;
- filter_mds_ost_repl_logops.lop_connect = llog_repl_connect;
+ filter_mds_ost_repl_logops.lop_connect = llog_obd_repl_connect;
filter_mds_ost_repl_logops.lop_sync = llog_obd_repl_sync;
rc = llog_setup(obd, LLOG_MDS_OST_REPL_CTXT, tgt, 0, NULL,
&filter_mds_ost_repl_logops);
if (rc)
- GOTO(cleanup, rc);
+ GOTO(cleanup_lcm, rc);
/* FIXME - assign unlink_cb for filter's recovery */
ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT);
ctxt->llog_proc_cb = filter_recov_log_mds_ost_cb;
- ctxt->loc_lcm = obd->u.filter.fo_lcm;
- rc = llog_start_commit_thread(ctxt->loc_lcm);
+ ctxt->loc_lcm = filter->fo_lcm;
llog_ctxt_put(ctxt);
- if (rc)
- GOTO(cleanup, rc);
rc = llog_setup(obd, LLOG_SIZE_ORIG_CTXT, tgt, 0, NULL,
&filter_size_orig_logops);
-
-cleanup:
- if (rc) {
- llog_cleanup_commit_master(filter->fo_lcm, 0);
- OBD_FREE(filter->fo_lcm, sizeof(struct llog_commit_master));
- filter->fo_lcm = NULL;
- }
+ if (rc)
+ GOTO(cleanup_ctxt, rc);
RETURN(rc);
+cleanup_ctxt:
+ ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT);
+ if (ctxt)
+ llog_cleanup(ctxt);
+cleanup_lcm:
+ llog_recov_thread_fini(filter->fo_lcm, 1);
+ filter->fo_lcm = NULL;
+ return rc;
}
static int filter_llog_finish(struct obd_device *obd, int count)
{
+ struct filter_obd *filter = &obd->u.filter;
struct llog_ctxt *ctxt;
int rc = 0, rc2 = 0;
ENTRY;
- if (obd->u.filter.fo_lcm) {
- llog_cleanup_commit_master((struct llog_commit_master *)
- obd->u.filter.fo_lcm, 1);
- OBD_FREE(obd->u.filter.fo_lcm,
- sizeof(struct llog_commit_master));
- obd->u.filter.fo_lcm = NULL;
- }
-
ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT);
- if (ctxt)
+ if (ctxt) {
+ /*
+ * Balance class_import_get() called in llog_receptor_accept().
+ * This is safe to do here, as llog is already synchronized and
+ * its import may go.
+ */
+ mutex_down(&ctxt->loc_sem);
+ if (ctxt->loc_imp) {
+ class_import_put(ctxt->loc_imp);
+ ctxt->loc_imp = NULL;
+ }
+ mutex_up(&ctxt->loc_sem);
rc = llog_cleanup(ctxt);
-
+ }
ctxt = llog_get_context(obd, LLOG_SIZE_ORIG_CTXT);
if (ctxt)
rc2 = llog_cleanup(ctxt);
if (!rc)
rc = rc2;
+ if (filter->fo_lcm) {
+ llog_recov_thread_fini(filter->fo_lcm, obd->obd_force);
+ filter->fo_lcm = NULL;
+ }
+
RETURN(rc);
}