obd->obd_lvfs_ctxt.fs = get_ds();
obd->obd_lvfs_ctxt.cb_ops = filter_lvfs_ops;
- sema_init(&filter->fo_init_lock, 1);
+ init_mutex(&filter->fo_init_lock);
filter->fo_committed_group = 0;
rc = filter_prep(obd);
return rc;
}
+static int filter_group_llog_finish(struct obd_llog_group *olg)
+{
+ struct llog_ctxt *ctxt;
+ int rc = 0, rc2 = 0;
+ ENTRY;
+
+ ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
+ if (ctxt)
+ rc = llog_cleanup(ctxt);
+
+ ctxt = llog_group_get_ctxt(olg, LLOG_SIZE_ORIG_CTXT);
+ if (ctxt)
+ rc2 = llog_cleanup(ctxt);
+ if (!rc)
+ rc = rc2;
+
+ RETURN(rc);
+}
+
static struct llog_operations filter_mds_ost_repl_logops /* initialized below*/;
static struct llog_operations filter_size_orig_logops = {
lop_setup: llog_obd_origin_setup,
lop_add: llog_obd_origin_add
};
-static int filter_llog_init(struct obd_device *obd, int group,
- struct obd_device *tgt, int count,
- struct llog_catid *catid,
- struct obd_uuid *uuid)
+static int filter_olg_init(struct obd_device *obd, struct obd_llog_group *olg,
+ struct obd_device *tgt)
+{
+ int rc;
+
+ rc = llog_setup(obd, olg, LLOG_MDS_OST_REPL_CTXT, tgt, 0, NULL,
+ &filter_mds_ost_repl_logops);
+ if (rc)
+ GOTO(cleanup, rc);
+
+ rc = llog_setup(obd, olg, LLOG_SIZE_ORIG_CTXT, tgt, 0, NULL,
+ &filter_size_orig_logops);
+ if (rc)
+ GOTO(cleanup, rc);
+cleanup:
+ if (rc)
+ filter_group_llog_finish(olg);
+ RETURN(rc);
+}
+
+static int
+filter_default_olg_init(struct obd_device *obd, struct obd_llog_group *olg,
+ struct obd_device *tgt)
{
struct filter_obd *filter = &obd->u.filter;
- struct obd_llog_group *olg;
struct llog_ctxt *ctxt;
int rc;
- ENTRY;
- olg = filter_find_olg(obd, group);
- if (IS_ERR(olg))
- RETURN(PTR_ERR(olg));
-
- if (group == OBD_LLOG_GROUP) {
- LASSERT(filter->fo_lcm == NULL);
- OBD_ALLOC(filter->fo_lcm, sizeof(struct llog_commit_master));
- if (!filter->fo_lcm)
- RETURN(-ENOMEM);
+ LASSERT(filter->fo_lcm == NULL);
+ OBD_ALLOC(filter->fo_lcm, sizeof(struct llog_commit_master));
+ if (!filter->fo_lcm)
+ RETURN(-ENOMEM);
- rc = llog_init_commit_master((struct llog_commit_master *)
- filter->fo_lcm);
- if (rc)
- GOTO(cleanup, rc);
+ rc = llog_init_commit_master((struct llog_commit_master *)
+ filter->fo_lcm);
+ if (rc)
+ GOTO(cleanup, rc);
filter_mds_ost_repl_logops = llog_client_ops;
filter_mds_ost_repl_logops.lop_cancel = llog_obd_repl_cancel;
filter_mds_ost_repl_logops.lop_connect = llog_repl_connect;
filter_mds_ost_repl_logops.lop_sync = llog_obd_repl_sync;
- } else {
- LASSERT(filter->fo_lcm != NULL);
- }
- rc = llog_setup(obd, olg, LLOG_MDS_OST_REPL_CTXT, tgt, 0, NULL,
- &filter_mds_ost_repl_logops);
+
+ rc = filter_olg_init(obd, olg, tgt);
if (rc)
GOTO(cleanup, rc);
- /* FIXME - assign unlink_cb for filter's recovery */
- LASSERT(olg);
ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
-
LASSERT(ctxt != NULL);
ctxt->llog_proc_cb = filter_recov_log_mds_ost_cb;
ctxt->loc_lcm = obd->u.filter.fo_lcm;
+ /* Only start log commit thread for default llog ctxt*/
rc = llog_start_commit_thread(ctxt->loc_lcm);
llog_ctxt_put(ctxt);
if (rc)
GOTO(cleanup, rc);
-
- rc = llog_setup(obd, olg, LLOG_SIZE_ORIG_CTXT, tgt, 0, NULL,
- &filter_size_orig_logops);
-
cleanup:
if (rc) {
llog_cleanup_commit_master(filter->fo_lcm, 0);
}
RETURN(rc);
}
-
-static int filter_group_llog_finish(struct obd_llog_group *olg)
+
+static int filter_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
+ struct obd_device *tgt, int count,
+ struct llog_catid *catid,
+ struct obd_uuid *uuid)
{
+ struct filter_obd *filter = &obd->u.filter;
struct llog_ctxt *ctxt;
- int rc = 0, rc2 = 0;
+ int rc;
ENTRY;
- ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
- if (ctxt)
- rc = llog_cleanup(ctxt);
+ LASSERT(olg != NULL);
+ if (olg == &obd->obd_olg)
+ return filter_default_olg_init(obd, olg, tgt);
- ctxt = llog_group_get_ctxt(olg, LLOG_SIZE_ORIG_CTXT);
- if (ctxt)
- rc2 = llog_cleanup(ctxt);
- if (!rc)
- rc = rc2;
+ /* For other group llog, which will be initialized in
+ * llog_connect(after default_olg_init, so fo_lcm must
+ * already be initialized */
+ LASSERT(filter->fo_lcm != NULL);
+ rc = filter_olg_init(obd, olg, tgt);
+ if (rc)
+ RETURN(rc);
+ ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
+ LASSERT(ctxt != NULL);
+ ctxt->llog_proc_cb = filter_recov_log_mds_ost_cb;
+ ctxt->loc_lcm = obd->u.filter.fo_lcm;
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
RETURN(rc);
}
+static struct obd_llog_group *
+filter_find_olg_internal(struct filter_obd *filter, int group)
+{
+ struct obd_llog_group *olg;
+
+ LASSERT_SPIN_LOCKED(&filter->fo_llog_list_lock);
+ list_for_each_entry(olg, &filter->fo_llog_list, olg_list) {
+ if (olg->olg_group == group)
+ RETURN(olg);
+ }
+ RETURN(NULL);
+}
+
struct obd_llog_group *filter_find_olg(struct obd_device *obd, int group)
{
- struct obd_llog_group *olg, *nolg;
+ struct obd_llog_group *olg = NULL;
+ struct filter_obd *filter;
+
+ filter = &obd->u.filter;
+
+ if (group == OBD_LLOG_GROUP)
+ RETURN(&obd->obd_olg);
+
+ spin_lock(&filter->fo_llog_list_lock);
+ olg = filter_find_olg_internal(filter, group);
+ spin_unlock(&filter->fo_llog_list_lock);
+
+ RETURN(olg);
+}
+
+struct obd_llog_group *filter_find_create_olg(struct obd_device *obd, int group)
+{
+ struct obd_llog_group *olg = NULL;
struct filter_obd *filter;
int rc;
RETURN(&obd->obd_olg);
spin_lock(&filter->fo_llog_list_lock);
- list_for_each_entry(olg, &filter->fo_llog_list, olg_list) {
- if (olg->olg_group == group) {
- spin_unlock(&filter->fo_llog_list_lock);
- RETURN(olg);
+ olg = filter_find_olg_internal(filter, group);
+ if (olg) {
+ if (olg->olg_initializing) {
+ GOTO(out_unlock, olg = ERR_PTR(-EBUSY));
+ } else {
+ GOTO(out_unlock, olg);
}
}
- spin_unlock(&filter->fo_llog_list_lock);
-
OBD_ALLOC_PTR(olg);
if (olg == NULL)
- RETURN(ERR_PTR(-ENOMEM));
+ GOTO(out_unlock, olg = ERR_PTR(-ENOMEM));
llog_group_init(olg, group);
- spin_lock(&filter->fo_llog_list_lock);
- list_for_each_entry(nolg, &filter->fo_llog_list, olg_list) {
- LASSERT(nolg->olg_group != group);
- }
list_add(&olg->olg_list, &filter->fo_llog_list);
+ olg->olg_initializing = 1;
spin_unlock(&filter->fo_llog_list_lock);
rc = llog_cat_initialize(obd, olg, 1, NULL);
if (rc) {
- spin_lock(&filter->fo_llog_list_lock);
- list_del(&olg->olg_list);
- spin_unlock(&filter->fo_llog_list_lock);
- OBD_FREE_PTR(olg);
- RETURN(ERR_PTR(rc));
+ spin_lock(&filter->fo_llog_list_lock);
+ list_del(&olg->olg_list);
+ spin_unlock(&filter->fo_llog_list_lock);
+ OBD_FREE_PTR(olg);
+ GOTO(out, olg = ERR_PTR(-ENOMEM));
}
+ spin_lock(&filter->fo_llog_list_lock);
+ olg->olg_initializing = 0;
+ spin_unlock(&filter->fo_llog_list_lock);
CDEBUG(D_OTHER, "%s: new llog group %u (0x%p)\n",
- obd->obd_name, group, olg);
-
+ obd->obd_name, group, olg);
+out:
RETURN(olg);
+
+out_unlock:
+ spin_unlock(&filter->fo_llog_list_lock);
+ GOTO(out, olg);
}
static int filter_llog_connect(struct obd_export *exp,
(unsigned) body->lgdc_logid.lgl_ogen);
olg = filter_find_olg(obd, body->lgdc_logid.lgl_ogr);
- if (IS_ERR(olg))
- RETURN(PTR_ERR(olg));
+ if (!olg) {
+ CERROR(" %s: can not find olg of group %d\n",
+ obd->obd_name, (int)body->lgdc_logid.lgl_ogr);
+ RETURN(-ENOENT);
+ }
llog_group_set_export(olg, exp);
ctxt = llog_group_get_ctxt(olg, body->lgdc_ctxt_idx);
static int filter_llog_preclean (struct obd_device *obd)
{
- struct obd_llog_group *olg;
+ struct obd_llog_group *olg, *tmp;
struct filter_obd *filter;
+ struct list_head remove_list;
int rc = 0;
ENTRY;
CERROR("failed to cleanup llogging subsystem\n");
filter = &obd->u.filter;
+ CFS_INIT_LIST_HEAD(&remove_list);
+
spin_lock(&filter->fo_llog_list_lock);
while (!list_empty(&filter->fo_llog_list)) {
olg = list_entry(filter->fo_llog_list.next,
struct obd_llog_group, olg_list);
list_del(&olg->olg_list);
- spin_unlock(&filter->fo_llog_list_lock);
-
+ list_add(&olg->olg_list, &remove_list);
+ }
+ spin_unlock(&filter->fo_llog_list_lock);
+
+ list_for_each_entry_safe(olg, tmp, &remove_list, olg_list) {
+ list_del_init(&olg->olg_list);
rc = filter_group_llog_finish(olg);
if (rc)
CERROR("failed to cleanup llogging subsystem for %u\n",
olg->olg_group);
OBD_FREE_PTR(olg);
- spin_lock(&filter->fo_llog_list_lock);
}
- spin_unlock(&filter->fo_llog_list_lock);
RETURN(rc);
}
struct obd_llog_group *olg;
fcc = obdo_logcookie(oa);
olg = filter_find_olg(obd, oa->o_gr);
- if (IS_ERR(olg))
- GOTO(cleanup, rc = PTR_ERR(olg));
+ if (!olg) {
+ CERROR(" %s: can not find olg of group %d\n",
+ obd->obd_name, (int)oa->o_gr);
+ GOTO(cleanup, rc = PTR_ERR(olg));
+ }
llog_group_set_export(olg, exp);
ctxt = llog_group_get_ctxt(olg, fcc->lgc_subsys + 1);
group = (int)(*(__u32 *)val);
LASSERT(group >= FILTER_GROUP_MDS0);
- olg = filter_find_olg(obd, group);
+ olg = filter_find_create_olg(obd, group);
if (IS_ERR(olg))
RETURN(PTR_ERR(olg));
- llog_group_set_export(olg, exp);
+ llog_group_set_export(olg, exp);
+
ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
LASSERTF(ctxt != NULL, "ctxt is null\n"),