+static struct llog_operations changelog_orig_logops;
+
+static int llog_changelog_cancel_cb(const struct lu_env *env,
+ struct llog_handle *llh,
+ struct llog_rec_hdr *hdr, void *data)
+{
+ struct llog_changelog_rec *rec = (struct llog_changelog_rec *)hdr;
+ struct llog_cookie cookie;
+ long long endrec = *(long long *)data;
+ int rc, err;
+ struct obd_device *obd;
+ void *trans_h;
+ struct inode *inode;
+ ENTRY;
+
+ /* This is always a (sub)log, not the catalog */
+ LASSERT(llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN);
+
+ if (rec->cr.cr_index > endrec)
+ /* records are in order, so we're done */
+ RETURN(LLOG_PROC_BREAK);
+
+ cookie.lgc_lgl = llh->lgh_id;
+ cookie.lgc_index = hdr->lrh_index;
+ obd = llh->lgh_ctxt->loc_exp->exp_obd;
+ inode = llh->lgh_file->f_dentry->d_inode;
+
+ /* XXX This is a workaround for the deadlock of changelog adding vs.
+ * changelog cancelling. Changelog adding always start transaction
+ * before acquiring the catlog lock (lgh_lock), whereas, changelog
+ * cancelling do start transaction after holding catlog lock.
+ *
+ * We start the transaction earlier here to keep the locking ordering:
+ * 'start transaction -> catlog lock'. LU-81. */
+ trans_h = fsfilt_start_log(obd, inode, FSFILT_OP_CANCEL_UNLINK,
+ NULL, 1);
+ if (IS_ERR(trans_h)) {
+ CERROR("fsfilt_start_log failed: %ld\n", PTR_ERR(trans_h));
+ RETURN(PTR_ERR(trans_h));
+ }
+
+ /* cancel them one at a time. I suppose we could store up the cookies
+ and cancel them all at once; probably more efficient, but this is
+ done as a user call, so who cares... */
+ rc = llog_cat_cancel_records(env, llh->u.phd.phd_cat_handle, 1,
+ &cookie);
+
+ err = fsfilt_commit(obd, inode, trans_h, 0);
+ if (err) {
+ CERROR("fsfilt_commit failed: %d\n", err);
+ rc = (rc >= 0) ? err : rc;
+ }
+
+ RETURN(rc < 0 ? rc : 0);
+}
+
+static int llog_changelog_cancel(const struct lu_env *env,
+ struct llog_ctxt *ctxt,
+ struct lov_stripe_md *lsm, int count,
+ struct llog_cookie *cookies, int flags)
+{
+ struct llog_handle *cathandle = ctxt->loc_handle;
+ int rc;
+ ENTRY;
+
+ /* This should only be called with the catalog handle */
+ LASSERT(cathandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT);
+
+ rc = llog_cat_process(env, cathandle, llog_changelog_cancel_cb,
+ (void *)cookies, 0, 0);
+ if (rc >= 0)
+ /* 0 or 1 means we're done */
+ rc = 0;
+ else
+ CERROR("cancel idx %u of catalog "LPX64" rc=%d\n",
+ cathandle->lgh_last_idx, cathandle->lgh_id.lgl_oid, rc);
+
+ RETURN(rc);
+}
+
+int mds_changelog_llog_init(struct obd_device *obd, struct obd_device *tgt)
+{
+ int rc;
+
+ /* see osc_llog_init */
+ changelog_orig_logops = llog_lvfs_ops;
+ changelog_orig_logops.lop_setup = llog_obd_origin_setup;
+ changelog_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
+ changelog_orig_logops.lop_add = llog_obd_origin_add;
+ changelog_orig_logops.lop_cancel = llog_changelog_cancel;
+
+ rc = llog_setup_named(obd, &obd->obd_olg, LLOG_CHANGELOG_ORIG_CTXT,
+ tgt, 1, NULL, CHANGELOG_CATALOG,
+ &changelog_orig_logops);
+ if (rc) {
+ CERROR("changelog llog setup failed %d\n", rc);
+ RETURN(rc);
+ }
+
+ rc = llog_setup_named(obd, &obd->obd_olg, LLOG_CHANGELOG_USER_ORIG_CTXT,
+ tgt, 1, NULL, CHANGELOG_USERS,
+ &changelog_orig_logops);
+ if (rc) {
+ CERROR("changelog users llog setup failed %d\n", rc);
+ RETURN(rc);
+ }
+
+ RETURN(rc);
+}
+EXPORT_SYMBOL(mds_changelog_llog_init);
+