X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fmds%2Fmds_log.c;h=70aac6ebdce3397098c62b1696c394d45520d1e5;hb=cad6a6dedbe139e48173a9f491a3457de71026e0;hp=705d7300be1c93ac94410090751dcb5dfb487b38;hpb=b0f15edd90807569acdc50bb973a0e80c87ea78e;p=fs%2Flustre-release.git diff --git a/lustre/mds/mds_log.c b/lustre/mds/mds_log.c index 705d730..70aac6e 100644 --- a/lustre/mds/mds_log.c +++ b/lustre/mds/mds_log.c @@ -26,7 +26,7 @@ * GPL HEADER END */ /* - * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. */ /* @@ -60,7 +60,7 @@ static int mds_llog_origin_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec, struct llog_cookie *logcookies, int numcookies) { struct obd_device *obd = ctxt->loc_obd; - struct obd_device *lov_obd = obd->u.mds.mds_osc_obd; + struct obd_device *lov_obd = obd->u.mds.mds_lov_obd; struct llog_ctxt *lctxt; int rc; ENTRY; @@ -72,28 +72,33 @@ static int mds_llog_origin_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec, RETURN(rc); } -static int mds_llog_origin_connect(struct llog_ctxt *ctxt, int count, +static int mds_llog_origin_connect(struct llog_ctxt *ctxt, struct llog_logid *logid, struct llog_gen *gen, struct obd_uuid *uuid) { struct obd_device *obd = ctxt->loc_obd; - struct obd_device *lov_obd = obd->u.mds.mds_osc_obd; + struct obd_device *lov_obd = obd->u.mds.mds_lov_obd; struct llog_ctxt *lctxt; int rc; ENTRY; lctxt = llog_get_context(lov_obd, ctxt->loc_idx); - rc = llog_connect(lctxt, count, logid, gen, uuid); + rc = llog_connect(lctxt, logid, gen, uuid); llog_ctxt_put(lctxt); RETURN(rc); } +static struct llog_operations mds_ost_orig_logops = { + lop_add: mds_llog_origin_add, + lop_connect: mds_llog_origin_connect, +}; + static int mds_llog_repl_cancel(struct llog_ctxt *ctxt, struct lov_stripe_md *lsm, int count, struct llog_cookie *cookies, int flags) { struct obd_device *obd = ctxt->loc_obd; - struct obd_device *lov_obd = obd->u.mds.mds_osc_obd; + struct obd_device *lov_obd = obd->u.mds.mds_lov_obd; struct llog_ctxt *lctxt; int rc; ENTRY; @@ -104,36 +109,112 @@ static int mds_llog_repl_cancel(struct llog_ctxt *ctxt, struct lov_stripe_md *ls RETURN(rc); } -static struct llog_operations mds_ost_orig_logops = { - lop_add: mds_llog_origin_add, - lop_connect: mds_llog_origin_connect, -}; - static struct llog_operations mds_size_repl_logops = { lop_cancel: mds_llog_repl_cancel, }; +static struct llog_operations changelog_orig_logops; + +static int llog_changelog_cancel_cb(struct llog_handle *llh, + struct llog_rec_hdr *hdr, void *data) +{ + struct llog_changelog_rec *rec = (struct llog_changelog_rec *)hdr; + struct llog_cookie cookie; + long long endrec = *(long long *)data; + int rc; + ENTRY; + + /* This is always a (sub)log, not the catalog */ + LASSERT(llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN); + + if (rec->cr.cr_index > endrec) + /* records are in order, so we're done */ + RETURN(LLOG_PROC_BREAK); + + cookie.lgc_lgl = llh->lgh_id; + cookie.lgc_index = hdr->lrh_index; + + /* cancel them one at a time. I suppose we could store up the cookies + and cancel them all at once; probably more efficient, but this is + done as a user call, so who cares... */ + rc = llog_cat_cancel_records(llh->u.phd.phd_cat_handle, 1, &cookie); + RETURN(rc < 0 ? rc : 0); +} + +static int llog_changelog_cancel(struct llog_ctxt *ctxt, + struct lov_stripe_md *lsm, int count, + struct llog_cookie *cookies, int flags) +{ + struct llog_handle *cathandle = ctxt->loc_handle; + int rc; + ENTRY; + + /* This should only be called with the catalog handle */ + LASSERT(cathandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT); + + rc = llog_cat_process(cathandle, llog_changelog_cancel_cb, + (void *)cookies, 0, 0); + if (rc >= 0) + /* 0 or 1 means we're done */ + rc = 0; + else + CERROR("cancel idx %u of catalog "LPX64" rc=%d\n", + cathandle->lgh_last_idx, cathandle->lgh_id.lgl_oid, rc); + + RETURN(rc); +} + +int mds_changelog_llog_init(struct obd_device *obd, struct obd_device *tgt) +{ + int rc; + + /* see osc_llog_init */ + changelog_orig_logops = llog_lvfs_ops; + changelog_orig_logops.lop_setup = llog_obd_origin_setup; + changelog_orig_logops.lop_cleanup = llog_obd_origin_cleanup; + changelog_orig_logops.lop_add = llog_obd_origin_add; + changelog_orig_logops.lop_cancel = llog_changelog_cancel; + + rc = llog_setup_named(obd, &obd->obd_olg, LLOG_CHANGELOG_ORIG_CTXT, + tgt, 1, NULL, CHANGELOG_CATALOG, + &changelog_orig_logops); + if (rc) { + CERROR("changelog llog setup failed %d\n", rc); + RETURN(rc); + } + + rc = llog_setup_named(obd, &obd->obd_olg, LLOG_CHANGELOG_USER_ORIG_CTXT, + tgt, 1, NULL, CHANGELOG_USERS, + &changelog_orig_logops); + if (rc) { + CERROR("changelog users llog setup failed %d\n", rc); + RETURN(rc); + } + + RETURN(rc); +} +EXPORT_SYMBOL(mds_changelog_llog_init); + int mds_llog_init(struct obd_device *obd, struct obd_llog_group *olg, - struct obd_device *tgt, int count, struct llog_catid *logid, - struct obd_uuid *uuid) + struct obd_device *disk_obd, int *index) { - struct obd_device *lov_obd = obd->u.mds.mds_osc_obd; + struct obd_device *lov_obd = obd->u.mds.mds_lov_obd; struct llog_ctxt *ctxt; int rc; ENTRY; LASSERT(olg == &obd->obd_olg); - rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 0, NULL, - &mds_ost_orig_logops); + rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, disk_obd, + 0, NULL, &mds_ost_orig_logops); if (rc) RETURN(rc); - rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 0, NULL, - &mds_size_repl_logops); + rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, disk_obd, + 0, NULL, &mds_size_repl_logops); if (rc) GOTO(err_llog, rc); - rc = obd_llog_init(lov_obd, &lov_obd->obd_olg, tgt, count, logid, uuid); + rc = obd_llog_init(lov_obd, &lov_obd->obd_olg, disk_obd, index); if (rc) { CERROR("lov_llog_init err %d\n", rc); GOTO(err_cleanup, rc); @@ -167,5 +248,79 @@ int mds_llog_finish(struct obd_device *obd, int count) if (!rc) rc = rc2; + ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT); + if (ctxt) + rc2 = llog_cleanup(ctxt); + if (!rc) + rc = rc2; + + ctxt = llog_get_context(obd, LLOG_CHANGELOG_USER_ORIG_CTXT); + if (ctxt) + rc2 = llog_cleanup(ctxt); + if (!rc) + rc = rc2; + RETURN(rc); } + +static int mds_llog_add_unlink(struct obd_device *obd, + struct lov_stripe_md *lsm, obd_count count, + struct llog_cookie *logcookie, int cookies) +{ + struct llog_unlink_rec *lur; + struct llog_ctxt *ctxt; + int rc; + + /* first prepare unlink log record */ + OBD_ALLOC_PTR(lur); + if (!lur) + RETURN(rc = -ENOMEM); + lur->lur_hdr.lrh_len = lur->lur_tail.lrt_len = sizeof(*lur); + lur->lur_hdr.lrh_type = MDS_UNLINK_REC; + lur->lur_count = count; + + ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT); + rc = llog_add(ctxt, &lur->lur_hdr, lsm, logcookie, cookies); + llog_ctxt_put(ctxt); + + OBD_FREE_PTR(lur); + RETURN(rc); +} + +int mds_log_op_unlink(struct obd_device *obd, + struct lov_mds_md *lmm, int lmm_size, + struct llog_cookie *logcookies, int cookies_size) +{ + struct mds_obd *mds = &obd->u.mds; + struct lov_stripe_md *lsm = NULL; + int rc; + ENTRY; + + if (IS_ERR(mds->mds_lov_obd)) + RETURN(PTR_ERR(mds->mds_lov_obd)); + + rc = obd_unpackmd(mds->mds_lov_exp, &lsm, lmm, lmm_size); + if (rc < 0) + RETURN(rc); + rc = mds_llog_add_unlink(obd, lsm, 0, logcookies, + cookies_size / sizeof(struct llog_cookie)); + obd_free_memmd(mds->mds_lov_exp, &lsm); + RETURN(rc); +} +EXPORT_SYMBOL(mds_log_op_unlink); + +int mds_log_op_orphan(struct obd_device *obd, struct lov_stripe_md *lsm, + obd_count count) +{ + struct mds_obd *mds = &obd->u.mds; + struct llog_cookie logcookie; + int rc; + ENTRY; + + if (IS_ERR(mds->mds_lov_obd)) + RETURN(PTR_ERR(mds->mds_lov_obd)); + + rc = mds_llog_add_unlink(obd, lsm, count - 1, &logcookie, 1); + RETURN(rc); +} +