X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;ds=sidebyside;f=lustre%2Fmds%2Fmds_log.c;h=dd7a7ca6f19c9210b826a15e99196a096805eff1;hb=2c74bfcb7a06addb42e79c1f561afb0acfaaa7d0;hp=0e8447ffc16f4879df325b440d0a361cd4ff6b08;hpb=837eab06232794ccf5dd523f778debc9253dad1c;p=fs%2Flustre-release.git diff --git a/lustre/mds/mds_log.c b/lustre/mds/mds_log.c index 0e8447f..dd7a7ca 100644 --- a/lustre/mds/mds_log.c +++ b/lustre/mds/mds_log.c @@ -72,7 +72,7 @@ static int mds_llog_origin_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec, RETURN(rc); } -static int mds_llog_origin_connect(struct llog_ctxt *ctxt, int count, +static int mds_llog_origin_connect(struct llog_ctxt *ctxt, struct llog_logid *logid, struct llog_gen *gen, struct obd_uuid *uuid) @@ -84,11 +84,16 @@ static int mds_llog_origin_connect(struct llog_ctxt *ctxt, int count, ENTRY; lctxt = llog_get_context(lov_obd, ctxt->loc_idx); - rc = llog_connect(lctxt, count, logid, gen, uuid); + rc = llog_connect(lctxt, logid, gen, uuid); llog_ctxt_put(lctxt); RETURN(rc); } +static struct llog_operations mds_ost_orig_logops = { + lop_add: mds_llog_origin_add, + lop_connect: mds_llog_origin_connect, +}; + static int mds_llog_repl_cancel(struct llog_ctxt *ctxt, struct lov_stripe_md *lsm, int count, struct llog_cookie *cookies, int flags) { @@ -104,39 +109,128 @@ static int mds_llog_repl_cancel(struct llog_ctxt *ctxt, struct lov_stripe_md *ls RETURN(rc); } -static struct llog_operations mds_ost_orig_logops = { - lop_add: mds_llog_origin_add, - lop_connect: mds_llog_origin_connect, -}; - static struct llog_operations mds_size_repl_logops = { lop_cancel: mds_llog_repl_cancel, }; +static struct llog_operations changelog_orig_logops; + +static int llog_changelog_cancel_cb(struct llog_handle *llh, + struct llog_rec_hdr *hdr, void *data) +{ + struct llog_changelog_rec *rec = (struct llog_changelog_rec *)hdr; + struct llog_cookie cookie; + long long endrec = *(long long *)data; + int rc; + ENTRY; + + /* This is always a (sub)log, not the catalog */ + LASSERT(llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN); + + if (rec->cr_index > endrec) + /* records are in order, so we're done */ + RETURN(LLOG_PROC_BREAK); + + cookie.lgc_lgl = llh->lgh_id; + cookie.lgc_index = hdr->lrh_index; + + /* cancel them one at a time. I suppose we could store up the cookies + and cancel them all at once; probably more efficient, but this is + done as a user call, so who cares... */ + rc = llog_cat_cancel_records(llh->u.phd.phd_cat_handle, 1, &cookie); + RETURN(rc < 0 ? rc : 0); +} + +static int llog_changelog_cancel(struct llog_ctxt *ctxt, + struct lov_stripe_md *lsm, int count, + struct llog_cookie *cookies, int flags) +{ + struct llog_handle *cathandle = ctxt->loc_handle; + int rc; + ENTRY; + + /* This should only be called with the catalog handle */ + LASSERT(cathandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT); + + rc = llog_cat_process(cathandle, llog_changelog_cancel_cb, + (void *)cookies, 0, 0); + if (rc >= 0) + /* 0 or 1 means we're done */ + rc = 0; + else + CERROR("cancel idx %u of catalog "LPX64" rc=%d\n", + cathandle->lgh_last_idx, cathandle->lgh_id.lgl_oid, rc); + + RETURN(rc); +} + +int mds_changelog_llog_init(struct obd_device *obd, struct obd_device *tgt) +{ + int rc; + + /* see osc_llog_init */ + changelog_orig_logops = llog_lvfs_ops; + changelog_orig_logops.lop_setup = llog_obd_origin_setup; + changelog_orig_logops.lop_cleanup = llog_obd_origin_cleanup; + changelog_orig_logops.lop_add = llog_obd_origin_add; + changelog_orig_logops.lop_cancel = llog_changelog_cancel; + + rc = llog_setup_named(obd, &obd->obd_olg, LLOG_CHANGELOG_ORIG_CTXT, + tgt, 1, NULL, CHANGELOG_CATALOG, + &changelog_orig_logops); + if (rc) { + CERROR("changelog llog setup failed %d\n", rc); + RETURN(rc); + } + + rc = llog_setup_named(obd, &obd->obd_olg, LLOG_CHANGELOG_USER_ORIG_CTXT, + tgt, 1, NULL, CHANGELOG_USERS, + &changelog_orig_logops); + if (rc) { + CERROR("changelog users llog setup failed %d\n", rc); + RETURN(rc); + } + + RETURN(rc); +} +EXPORT_SYMBOL(mds_changelog_llog_init); + int mds_llog_init(struct obd_device *obd, struct obd_llog_group *olg, - struct obd_device *tgt, int count, struct llog_catid *logid, + struct obd_device *tgt, int count, struct llog_catid *logid, struct obd_uuid *uuid) { struct obd_device *lov_obd = obd->u.mds.mds_osc_obd; + struct llog_ctxt *ctxt; int rc; ENTRY; LASSERT(olg == &obd->obd_olg); - rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 0, NULL, - &mds_ost_orig_logops); + rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, + 0, NULL, &mds_ost_orig_logops); if (rc) RETURN(rc); - rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 0, NULL, - &mds_size_repl_logops); + rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, + 0, NULL, &mds_size_repl_logops); if (rc) - RETURN(rc); + GOTO(err_llog, rc); rc = obd_llog_init(lov_obd, &lov_obd->obd_olg, tgt, count, logid, uuid); - if (rc) + if (rc) { CERROR("lov_llog_init err %d\n", rc); + GOTO(err_cleanup, rc); + } RETURN(rc); +err_cleanup: + ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT); + if (ctxt) + llog_cleanup(ctxt); +err_llog: + ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT); + if (ctxt) + llog_cleanup(ctxt); + return rc; } int mds_llog_finish(struct obd_device *obd, int count) @@ -155,5 +249,17 @@ int mds_llog_finish(struct obd_device *obd, int count) if (!rc) rc = rc2; + ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT); + if (ctxt) + rc2 = llog_cleanup(ctxt); + if (!rc) + rc = rc2; + + ctxt = llog_get_context(obd, LLOG_CHANGELOG_USER_ORIG_CTXT); + if (ctxt) + rc2 = llog_cleanup(ctxt); + if (!rc) + rc = rc2; + RETURN(rc); }