From 72d47edfa5b5068c5883e867ab4d199f7ba63af3 Mon Sep 17 00:00:00 2001 From: tianying Date: Thu, 13 Nov 2003 05:57:06 +0000 Subject: [PATCH] b:2215 - OSTs fetch unlink llog records from MDS post replay 1. add lop_connect and lop_precleanup to llog_operations 2. rename llog_obd_ctxt to llog_ctxt; llog_commit_data to llog_canceld_ctxt 3. split out llog functions in llog_client.c and llog_server.c and remove llogd.c 4. add one test-59 to sanity.sh to verify cancellation of llog records async 5. fix calling of mds_cleanup_orphans, add test-34 to replay-single.sh 6. fix some codes about llog --- lustre/ldlm/ldlm_lib.c | 14 ++- lustre/lov/lov_log.c | 58 +++++++-- lustre/mds/mds_internal.h | 8 +- lustre/mds/mds_log.c | 63 ++++------ lustre/mds/mds_unlink_open.c | 8 +- lustre/obdclass/llog_obd.c | 37 ++++-- lustre/obdfilter/filter_internal.h | 5 +- lustre/obdfilter/filter_log.c | 60 +++++++++- lustre/ptlrpc/llog_client.c | 213 +++++++++++++++++++++++++++++++++ lustre/ptlrpc/llog_net.c | 79 ++++++++++-- lustre/ptlrpc/llog_server.c | 239 +++++++++++++++++++++++++++++++++++++ lustre/tests/replay-single.sh | 127 ++++++++++++-------- 12 files changed, 776 insertions(+), 135 deletions(-) create mode 100644 lustre/ptlrpc/llog_client.c create mode 100644 lustre/ptlrpc/llog_server.c diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index 8551c86..106fcb6 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -640,7 +640,10 @@ void target_abort_recovery(void *data) /* when recovery was abort, cleanup orphans for mds */ if (OBT(obd) && OBP(obd, postrecov)) { rc = OBP(obd, postrecov)(obd); - CERROR("Cleanup %d orphans after recovery was abort!\n", rc); + if (rc >= 0) + CERROR("Cleanup %d orphans after recovery was aborted\n", rc); + else + CERROR("postrecov failed %d\n", rc); } abort_delayed_replies(obd); @@ -922,11 +925,14 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc) obd->obd_name); obd->obd_recovering = 0; - /* when recovering finished, cleanup orphans for mds */ + /* when recovering finished, cleanup orphans for mds */ if (OBT(obd) && OBP(obd, postrecov)) { rc2 = OBP(obd, postrecov)(obd); - CERROR("%s: all clients recovered, %d MDS orphans " - "deleted\n", obd->obd_name, rc2); + if (rc2 >= 0) + CERROR("%s: all clients recovered, %d MDS orphans " + "deleted\n", obd->obd_name, rc2); + else + CERROR("postrecov failed %d\n", rc2); } list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) { diff --git a/lustre/lov/lov_log.c b/lustre/lov/lov_log.c index b11ccf1..6ef7497 100644 --- a/lustre/lov/lov_log.c +++ b/lustre/lov/lov_log.c @@ -53,7 +53,7 @@ #include "lov_internal.h" #if 0 -static int lov_logop_cleanup(struct llog_obd_ctxt *ctxt) +static int lov_logop_cleanup(struct llog_ctxt *ctxt) { struct lov_obd *lov = &ctxt->loc_obd->u.lov; int i, rc = 0; @@ -61,7 +61,7 @@ static int lov_logop_cleanup(struct llog_obd_ctxt *ctxt) ENTRY; for (i = 0; i < lov->desc.ld_tgt_count; i++) { struct obd_device *child = lov->tgts[i].ltd_exp->exp_obd; - struct llog_obd_ctxt *cctxt = child->obd_llog_ctxt[ctxt->loc_idx]; + struct llog_ctxt *cctxt = llog_get_context(child, ctxt->loc_idx); rc = llog_cleanup(cctxt); if (rc) { CERROR("error lov_llog_open %d\n", i); @@ -77,30 +77,64 @@ static int lov_logop_cleanup(struct llog_obd_ctxt *ctxt) * we need to keep cookies in stripe order, even if some are NULL, so that * the right cookies are passed back to the right OSTs at the client side. * Unset cookies should be all-zero (which will never occur naturally). */ -static int lov_llog_origin_add(struct llog_obd_ctxt *ctxt, +static int lov_llog_origin_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec, struct lov_stripe_md *lsm, struct llog_cookie *logcookies, int numcookies) { struct obd_device *obd = ctxt->loc_obd; struct lov_obd *lov = &obd->u.lov; struct lov_oinfo *loi; + struct llog_unlink_rec *lur; int i, rc = 0; ENTRY; + OBD_ALLOC(lur, sizeof(*lur)); + if (!lur) + RETURN(-ENOMEM); + lur->lur_hdr.lrh_len = lur->lur_tail.lrt_len = sizeof(*lur); + lur->lur_hdr.lrh_type = MDS_UNLINK_REC; + LASSERT(logcookies && numcookies >= lsm->lsm_stripe_count); for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) { struct obd_device *child = lov->tgts[loi->loi_ost_idx].ltd_exp->exp_obd; - struct llog_obd_ctxt *cctxt = child->obd_llog_ctxt[ctxt->loc_idx]; - rc += llog_add(cctxt, rec, NULL, logcookies + rc, numcookies - rc); + struct llog_ctxt *cctxt = llog_get_context(child, ctxt->loc_idx); + + lur->lur_oid = loi->loi_id; + lur->lur_ogen = loi->loi_gr; + rc += llog_add(cctxt, &lur->lur_hdr, NULL, logcookies + rc, numcookies - rc); + + } + OBD_FREE(lur, sizeof(*lur)); + + RETURN(rc); +} + +static int lov_llog_origin_connect(struct llog_ctxt *ctxt, int count, + struct llog_logid *logid, + struct llog_ctxt_gen *gen) +{ + struct obd_device *obd = ctxt->loc_obd; + struct lov_obd *lov = &obd->u.lov; + int i, rc = 0; + ENTRY; + LASSERT(lov->desc.ld_tgt_count == count); + for (i = 0; i < lov->desc.ld_tgt_count; i++) { + struct obd_device *child = lov->tgts[i].ltd_exp->exp_obd; + struct llog_ctxt *cctxt = llog_get_context(child, ctxt->loc_idx); + rc = llog_connect(cctxt, 1, logid, gen); + if (rc) { + CERROR("error osc_llog_connect %d\n", i); + break; + } } RETURN(rc); } /* the replicators commit callback */ -static int lov_llog_repl_cancel(struct llog_obd_ctxt *ctxt, struct lov_stripe_md *lsm, +static int lov_llog_repl_cancel(struct llog_ctxt *ctxt, struct lov_stripe_md *lsm, int count, struct llog_cookie *cookies, int flags) { struct lov_obd *lov; @@ -116,10 +150,9 @@ static int lov_llog_repl_cancel(struct llog_obd_ctxt *ctxt, struct lov_stripe_md lov = &obd->u.lov; for (i = 0; i < count; i++, cookies++, loi++) { struct obd_device *child = lov->tgts[loi->loi_ost_idx].ltd_exp->exp_obd; - struct llog_obd_ctxt *cctxt = child->obd_llog_ctxt[ctxt->loc_idx]; + struct llog_ctxt *cctxt = llog_get_context(child, ctxt->loc_idx); int err; - err = llog_cancel(cctxt, NULL, 1, cookies, flags); if (err && lov->tgts[loi->loi_ost_idx].active) { CERROR("error: objid "LPX64" subobj "LPX64 @@ -133,12 +166,11 @@ static int lov_llog_repl_cancel(struct llog_obd_ctxt *ctxt, struct lov_stripe_md } static struct llog_operations lov_unlink_orig_logops = { -// lop_cleanup: lov_logop_cleanup, lop_add: lov_llog_origin_add, + lop_connect: lov_llog_origin_connect }; static struct llog_operations lov_size_repl_logops = { -// lop_cleanup: lov_logop_cleanup, lop_cancel: lov_llog_repl_cancel }; @@ -150,7 +182,7 @@ int lov_llog_init(struct obd_device *obd, struct obd_device *tgt, int i, rc = 0; ENTRY; - rc = llog_setup(obd, LLOG_UNLINK_ORIG_CTXT, tgt, 0, NULL, + rc = llog_setup(obd, LLOG_UNLINK_ORIG_CTXT, tgt, 0, NULL, &lov_unlink_orig_logops); if (rc) RETURN(rc); @@ -178,11 +210,11 @@ int lov_llog_finish(struct obd_device *obd, int count) int i, rc = 0; ENTRY; - rc = llog_cleanup(obd->obd_llog_ctxt[LLOG_UNLINK_ORIG_CTXT]); + rc = llog_cleanup(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT)); if (rc) RETURN(rc); - rc = llog_cleanup(obd->obd_llog_ctxt[LLOG_SIZE_REPL_CTXT]); + rc = llog_cleanup(llog_get_context(obd, LLOG_SIZE_REPL_CTXT)); if (rc) RETURN(rc); diff --git a/lustre/mds/mds_internal.h b/lustre/mds/mds_internal.h index 0a0dccc..bb2f1e2 100644 --- a/lustre/mds/mds_internal.h +++ b/lustre/mds/mds_internal.h @@ -35,10 +35,10 @@ int mds_cleanup_orphans(struct obd_device *obd); /* mds/mds_log.c */ -int mds_log_op_unlink(struct obd_device *obd, struct inode *inode, struct lustre_msg *repmsg, - int offset); -int mds_llog_init(struct obd_device *obd, struct obd_device *tgt, - int count, struct llog_logid *logid); +int mds_log_op_unlink(struct obd_device *obd, struct inode *inode, + struct lustre_msg *repmsg, int offset); +int mds_llog_init(struct obd_device *obd, struct obd_device *tgt, int count, + struct llog_logid *logid); int mds_llog_finish(struct obd_device *obd, int count); /* mds/mds_lov.c */ diff --git a/lustre/mds/mds_log.c b/lustre/mds/mds_log.c index 6e30673..a9b02ee 100644 --- a/lustre/mds/mds_log.c +++ b/lustre/mds/mds_log.c @@ -37,46 +37,46 @@ #include "mds_internal.h" -#if 0 -static int mds_logop_cleanup(struct llog_obd_ctxt *ctxt) +static int mds_llog_origin_add(struct llog_ctxt *ctxt, + struct llog_rec_hdr *rec, struct lov_stripe_md *lsm, + struct llog_cookie *logcookies, int numcookies) { struct obd_device *obd = ctxt->loc_obd; struct obd_device *lov_obd = obd->u.mds.mds_osc_obd; - struct llog_obd_ctxt *lctxt; + struct llog_ctxt *lctxt; int rc; ENTRY; - lctxt = lov_obd->obd_llog_ctxt[ctxt->loc_idx]; - rc = llog_cleanup(lctxt); + lctxt = llog_get_context(lov_obd, ctxt->loc_idx); + rc = llog_add(lctxt, rec, lsm, logcookies, numcookies); RETURN(rc); } -#endif -static int mds_llog_origin_add(struct llog_obd_ctxt *ctxt, - struct llog_rec_hdr *rec, struct lov_stripe_md *lsm, - struct llog_cookie *logcookies, int numcookies) +static int mds_llog_origin_connect(struct llog_ctxt *ctxt, int count, + struct llog_logid *logid, + struct llog_ctxt_gen *gen) { struct obd_device *obd = ctxt->loc_obd; struct obd_device *lov_obd = obd->u.mds.mds_osc_obd; - struct llog_obd_ctxt *lctxt; + struct llog_ctxt *lctxt; int rc; ENTRY; - lctxt = lov_obd->obd_llog_ctxt[ctxt->loc_idx]; - rc = llog_add(lctxt, rec, lsm, logcookies, numcookies); + lctxt = llog_get_context(lov_obd, ctxt->loc_idx); + rc = llog_connect(lctxt, count, logid, gen); RETURN(rc); } -static int mds_llog_repl_cancel(struct llog_obd_ctxt *ctxt, struct lov_stripe_md *lsm, +static int mds_llog_repl_cancel(struct llog_ctxt *ctxt, struct lov_stripe_md *lsm, int count, struct llog_cookie *cookies, int flags) { struct obd_device *obd = ctxt->loc_obd; struct obd_device *lov_obd = obd->u.mds.mds_osc_obd; - struct llog_obd_ctxt *lctxt; + struct llog_ctxt *lctxt; int rc; ENTRY; - lctxt = lov_obd->obd_llog_ctxt[ctxt->loc_idx]; + lctxt = llog_get_context(lov_obd, ctxt->loc_idx); rc = llog_cancel(lctxt, lsm, count, cookies,flags); RETURN(rc); } @@ -86,9 +86,8 @@ int mds_log_op_unlink(struct obd_device *obd, struct inode *inode, { struct mds_obd *mds = &obd->u.mds; struct lov_stripe_md *lsm = NULL; - struct llog_unlink_rec *lur; #ifdef ENABLE_ORPHANS - struct llog_obd_ctxt *ctxt; + struct llog_ctxt *ctxt; #endif int rc; ENTRY; @@ -102,35 +101,23 @@ int mds_log_op_unlink(struct obd_device *obd, struct inode *inode, if (rc < 0) RETURN(rc); - OBD_ALLOC(lur, sizeof(*lur)); - if (!lur) - RETURN(-ENOMEM); - lur->lur_hdr.lrh_len = lur->lur_tail.lrt_len = sizeof(*lur); - lur->lur_hdr.lrh_type = MDS_UNLINK_REC; - lur->lur_oid = inode->i_ino; - lur->lur_ogen = inode->i_generation; - #ifdef ENABLE_ORPHANS - ctxt = obd->obd_llog_ctxt[LLOG_UNLINK_ORIG_CTXT]; - rc = llog_add(ctxt, &lur->lur_hdr, - lsm, lustre_msg_buf(repmsg, offset + 1, 0), - repmsg->buflens[offset + 1] / - sizeof(struct llog_cookie)); + ctxt = llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT); + rc = llog_add(ctxt, NULL, lsm, lustre_msg_buf(repmsg, offset + 1, 0), + repmsg->buflens[offset + 1] / sizeof(struct llog_cookie)); #endif obd_free_memmd(mds->mds_osc_exp, &lsm); - OBD_FREE(lur, sizeof(*lur)); RETURN(rc); } static struct llog_operations mds_unlink_orig_logops = { - //lop_cleanup: mds_logop_cleanup, lop_add: mds_llog_origin_add, + lop_connect: mds_llog_origin_connect, }; static struct llog_operations mds_size_repl_logops = { - //lop_cleanup: mds_logop_cleanup, lop_cancel: mds_llog_repl_cancel }; @@ -141,11 +128,13 @@ int mds_llog_init(struct obd_device *obd, struct obd_device *tgt, int rc; ENTRY; - rc = llog_setup(obd, LLOG_UNLINK_ORIG_CTXT, tgt, 0, NULL, &mds_unlink_orig_logops); + rc = llog_setup(obd, LLOG_UNLINK_ORIG_CTXT, tgt, 0, NULL, + &mds_unlink_orig_logops); if (rc) RETURN(rc); - rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, 0, NULL, &mds_size_repl_logops); + rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, 0, NULL, + &mds_size_repl_logops); if (rc) RETURN(rc); @@ -162,11 +151,11 @@ int mds_llog_finish(struct obd_device *obd, int count) int rc; ENTRY; - rc = llog_cleanup(obd->obd_llog_ctxt[LLOG_UNLINK_ORIG_CTXT]); + rc = llog_cleanup(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT)); if (rc) RETURN(rc); - rc = llog_cleanup(obd->obd_llog_ctxt[LLOG_SIZE_REPL_CTXT]); + rc = llog_cleanup(llog_get_context(obd, LLOG_SIZE_REPL_CTXT)); if (rc) RETURN(rc); diff --git a/lustre/mds/mds_unlink_open.c b/lustre/mds/mds_unlink_open.c index 0695127..6f56f2d 100644 --- a/lustre/mds/mds_unlink_open.c +++ b/lustre/mds/mds_unlink_open.c @@ -240,8 +240,6 @@ int mds_cleanup_orphans(struct obd_device *obd) int rc = 0, rc2 = 0, item = 0; ENTRY; - RETURN(0); - push_ctxt(&saved, &obd->obd_ctxt, NULL); dget(mds->mds_pending_dir); mntget(mds->mds_vfsmnt); @@ -275,7 +273,7 @@ int mds_cleanup_orphans(struct obd_device *obd) GOTO(err_out, rc2 = PTR_ERR(dchild)); } if (!dchild->d_inode) { - CDEBUG(D_ERROR, "orphan %s has been deleted\n", + CDEBUG(D_ERROR, "orphan %s has been removed\n", ptr->d_name); GOTO(next, rc2 = 0); } @@ -288,12 +286,10 @@ int mds_cleanup_orphans(struct obd_device *obd) GOTO(next, rc2 = 0); } - CDEBUG(D_ERROR, "start to remove %s from mds and ost!\n", - ptr->d_name); rc2 = mds_unlink(obd, dchild, child_inode, pending_dir); if (rc2 == 0) { item ++; - CDEBUG(D_ERROR, "removed orphan %s object successfully!\n", + CDEBUG(D_ERROR, "removed orphan %s from MDS and OST\n", ptr->d_name); } else { l_dput(dchild); diff --git a/lustre/obdclass/llog_obd.c b/lustre/obdclass/llog_obd.c index 2210f50..c2ba42f 100644 --- a/lustre/obdclass/llog_obd.c +++ b/lustre/obdclass/llog_obd.c @@ -25,7 +25,7 @@ int llog_setup(struct obd_device *obd, int index, struct obd_device *disk_obd, int count, struct llog_logid *logid, struct llog_operations *op) { int rc = 0; - struct llog_obd_ctxt *ctxt; + struct llog_ctxt *ctxt; ENTRY; if (index < 0 || index >= LLOG_MAX_CTXTS) @@ -51,11 +51,12 @@ int llog_setup(struct obd_device *obd, int index, struct obd_device *disk_obd, } EXPORT_SYMBOL(llog_setup); -int llog_cleanup(struct llog_obd_ctxt *ctxt) +int llog_cleanup(struct llog_ctxt *ctxt) { int rc = 0; ENTRY; + down(&ctxt->loc_sem); LASSERT(ctxt); if (CTXTP(ctxt, cleanup)) @@ -64,13 +65,28 @@ int llog_cleanup(struct llog_obd_ctxt *ctxt) ctxt->loc_obd->obd_llog_ctxt[ctxt->loc_idx] = NULL; class_export_put(ctxt->loc_exp); ctxt->loc_exp = NULL; + up(&ctxt->loc_sem); OBD_FREE(ctxt, sizeof(*ctxt)); RETURN(rc); } EXPORT_SYMBOL(llog_cleanup); -int llog_add(struct llog_obd_ctxt *ctxt, +int llog_precleanup(struct llog_ctxt *ctxt) +{ + ENTRY; + + if (!ctxt) + RETURN(0); + + if (ctxt->loc_llcd) + llog_cancel(ctxt, NULL, 0, NULL, OBD_LLOG_FL_SENDNOW); + + RETURN(0); +} +EXPORT_SYMBOL(llog_precleanup); + +int llog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec, struct lov_stripe_md *lsm, struct llog_cookie *logcookies, int numcookies) { @@ -78,14 +94,16 @@ int llog_add(struct llog_obd_ctxt *ctxt, ENTRY; LASSERT(ctxt); + down(&ctxt->loc_sem); CTXT_CHECK_OP(ctxt, add, -EOPNOTSUPP); rc = CTXTP(ctxt, add)(ctxt, rec, lsm, logcookies, numcookies); + up(&ctxt->loc_sem); RETURN(rc); } EXPORT_SYMBOL(llog_add); -int llog_cancel(struct llog_obd_ctxt *ctxt, struct lov_stripe_md *lsm, +int llog_cancel(struct llog_ctxt *ctxt, struct lov_stripe_md *lsm, int count, struct llog_cookie *cookies, int flags) { int rc; @@ -104,7 +122,7 @@ EXPORT_SYMBOL(llog_cancel); int llog_obd_origin_setup(struct obd_device *obd, int index, struct obd_device *disk_obd, int count, struct llog_logid *logid) { - struct llog_obd_ctxt *ctxt; + struct llog_ctxt *ctxt; struct llog_handle *handle; struct obd_run_ctxt saved; int rc; @@ -115,8 +133,9 @@ int llog_obd_origin_setup(struct obd_device *obd, int index, struct obd_device * LASSERT(count == 1); - LASSERT(obd->obd_llog_ctxt[index]); - ctxt = obd->obd_llog_ctxt[index]; + ctxt = llog_get_context(obd, index); + LASSERT(ctxt); + log_gen_init(ctxt); if (logid->lgl_oid) rc = llog_create(ctxt, &handle, logid, NULL); @@ -141,7 +160,7 @@ int llog_obd_origin_setup(struct obd_device *obd, int index, struct obd_device * } EXPORT_SYMBOL(llog_obd_origin_setup); -int llog_obd_origin_cleanup(struct llog_obd_ctxt *ctxt) +int llog_obd_origin_cleanup(struct llog_ctxt *ctxt) { if (!ctxt) return 0; @@ -155,7 +174,7 @@ EXPORT_SYMBOL(llog_obd_origin_cleanup); /* add for obdfilter/sz and mds/unlink */ -int llog_obd_origin_add(struct llog_obd_ctxt *ctxt, +int llog_obd_origin_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec, struct lov_stripe_md *lsm, struct llog_cookie *logcookies, int numcookies) { diff --git a/lustre/obdfilter/filter_internal.h b/lustre/obdfilter/filter_internal.h index 32fa3fb..d5bb1ea 100644 --- a/lustre/obdfilter/filter_internal.h +++ b/lustre/obdfilter/filter_internal.h @@ -141,10 +141,11 @@ int filter_log_sz_change(struct llog_handle *cathandle, __u32 io_epoch, struct llog_cookie *logcookie, struct inode *inode); -int filter_get_catalog(struct obd_device *); +//int filter_get_catalog(struct obd_device *); void filter_cancel_cookies_cb(struct obd_device *obd, __u64 transno, void *cb_data, int error); - +int filter_recov_log_unlink_cb(struct llog_handle *llh, + struct llog_rec_hdr *rec, void *data); /* filter_san.c */ int filter_san_setup(struct obd_device *obd, obd_count len, void *buf); diff --git a/lustre/obdfilter/filter_log.c b/lustre/obdfilter/filter_log.c index 8f526b6..341ad6c 100644 --- a/lustre/obdfilter/filter_log.c +++ b/lustre/obdfilter/filter_log.c @@ -97,7 +97,63 @@ void filter_cancel_cookies_cb(struct obd_device *obd, __u64 transno, void *cb_data, int error) { struct llog_cookie *cookie = cb_data; - llog_obd_repl_cancel(obd->obd_llog_ctxt[LLOG_UNLINK_REPL_CTXT], - NULL, 1, cookie, OBD_LLOG_FL_SENDNOW); + llog_cancel(llog_get_context(obd, cookie->lgc_subsys + 1), + NULL, 1, cookie, 0); + //NULL, 1, cookie, OBD_LLOG_FL_SENDNOW); OBD_FREE(cb_data, sizeof(struct llog_cookie)); } + +/* Callback for processing the unlink log record received from MDS by + * llog_client_api. + */ +int filter_recov_log_unlink_cb(struct llog_handle *llh, + struct llog_rec_hdr *rec, void *data) +{ + struct llog_ctxt *ctxt = llh->lgh_ctxt; + struct obd_device *obd = ctxt->loc_obd; + struct obd_export *exp = obd->obd_self_export; + struct llog_cookie cookie; + struct llog_unlink_rec *lur; + struct obdo *oa; + struct obd_trans_info oti = { 0 }; + obd_id oid; + int rc = 0; + ENTRY; + + if (!le32_to_cpu(llh->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN) { + CERROR("log is not plain\n"); + RETURN(-EINVAL); + } + if (rec->lrh_type != MDS_UNLINK_REC) { + CERROR("log record type error\n"); + RETURN(-EINVAL); + } + + cookie.lgc_lgl = llh->lgh_id; + cookie.lgc_subsys = LLOG_UNLINK_ORIG_CTXT; + cookie.lgc_index = le32_to_cpu(rec->lrh_index); + + lur = (struct llog_unlink_rec *)rec; + + oa = obdo_alloc(); + if (oa == NULL) + RETURN(-ENOMEM); + oa->o_valid |= OBD_MD_FLCOOKIE; + oa->o_id = lur->lur_oid; + oa->o_gr = lur->lur_ogen; + memcpy(obdo_logcookie(oa), &cookie, sizeof(cookie)); + oid = oa->o_id; + + rc = obd_destroy(exp, oa, NULL, &oti); + obdo_free(oa); + if (rc == -ENOENT) { + CERROR("object already removed: send cookie\n"); + llog_cancel(ctxt, NULL, 1, &cookie, 0); + RETURN(0); + } + + if (rc == 0) + CERROR("object: "LPU64" in record destroyed successful\n", oid); + + RETURN(rc); +} diff --git a/lustre/ptlrpc/llog_client.c b/lustre/ptlrpc/llog_client.c new file mode 100644 index 0000000..996dca8 --- /dev/null +++ b/lustre/ptlrpc/llog_client.c @@ -0,0 +1,213 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2001-2003 Cluster File Systems, Inc. + * Author: Andreas Dilger + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + * remote api for llog - client side + * + */ + +#define DEBUG_SUBSYSTEM S_LOG + +#ifndef EXPORT_SYMTAB +#define EXPORT_SYMTAB +#endif + +#include +#include +#include +#include +#include + +/* This is a callback from the llog_* functions. + * Assumes caller has already pushed us into the kernel context. */ +static int llog_client_create(struct llog_ctxt *ctxt, struct llog_handle **res, + struct llog_logid *logid, char *name) +{ + struct obd_import *imp; + struct llogd_body req_body; + struct llogd_body *body; + struct llog_handle *handle; + struct ptlrpc_request *req = NULL; + int size[2] = {sizeof(req_body)}; + char *tmp[2] = {(char*) &req_body}; + int bufcount = 1; + int repsize[] = {sizeof (req_body)}; + int rc; + ENTRY; + + LASSERT(ctxt->loc_imp); + imp = ctxt->loc_imp; + + handle = llog_alloc_handle(); + if (handle == NULL) + RETURN(-ENOMEM); + *res = handle; + + memset(&req_body, 0, sizeof(req_body)); + if (logid) + req_body.lgd_logid = *logid; + req_body.lgd_ctxt_idx = ctxt->loc_idx - 1; + + if (name) { + size[bufcount] = strlen(name) + 1; + tmp[bufcount] = name; + bufcount++; + } + + req = ptlrpc_prep_req(imp, LLOG_ORIGIN_HANDLE_CREATE, bufcount, size, tmp); + if (!req) + GOTO(err_free, rc = -ENOMEM); + + req->rq_replen = lustre_msg_size(1, repsize); + rc = ptlrpc_queue_wait(req); + if (rc) + GOTO(err_free, rc); + + body = lustre_swab_repbuf(req, 0, sizeof(*body), + lustre_swab_llogd_body); + if (body == NULL) { + CERROR ("Can't unpack llogd_body\n"); + GOTO(err_free, rc =-EFAULT); + } + + handle->lgh_id = body->lgd_logid; + handle->lgh_ctxt = ctxt; + +out: + if (req) + ptlrpc_req_finished(req); + RETURN(rc); + +err_free: + llog_free_handle(handle); + goto out; +} + + +static int llog_client_next_block(struct llog_handle *loghandle, + int *cur_idx, int next_idx, + __u64 *cur_offset, void *buf, int len) +{ + struct obd_import *imp = loghandle->lgh_ctxt->loc_imp; + struct ptlrpc_request *req = NULL; + struct llogd_body *body; + void * ptr; + int size = sizeof(*body); + int repsize[2] = {sizeof (*body)}; + int rc; + ENTRY; + + req = ptlrpc_prep_req(imp, LLOG_ORIGIN_HANDLE_NEXT_BLOCK, 1, &size, NULL); + if (!req) + GOTO(out, rc = -ENOMEM); + + body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body)); + body->lgd_logid = loghandle->lgh_id; + body->lgd_ctxt_idx = loghandle->lgh_ctxt->loc_idx - 1; + body->lgd_llh_flags = loghandle->lgh_hdr->llh_flags; + body->lgd_cur_offset = *cur_offset; + body->lgd_index = next_idx; + body->lgd_saved_index = *cur_idx; + body->lgd_len = len; + repsize[1] = len; + + req->rq_replen = lustre_msg_size(2, repsize); + rc = ptlrpc_queue_wait(req); + if (rc) + GOTO(out, rc); + + body = lustre_swab_repbuf(req, 0, sizeof(*body), + lustre_swab_llogd_body); + if (body == NULL) { + CERROR ("Can't unpack llogd_body\n"); + GOTO(out, rc =-EFAULT); + } + + ptr = lustre_msg_buf(req->rq_repmsg, 1, len); + if (ptr == NULL) { + CERROR ("Can't unpack bitmap\n"); + GOTO(out, rc =-EFAULT); + } + + *cur_idx = body->lgd_saved_index; + *cur_offset = body->lgd_cur_offset; + + memcpy(buf, ptr, len); + +out: + if (req) + ptlrpc_req_finished(req); + RETURN(rc); +} + + +static int llog_client_read_header(struct llog_handle *handle) +{ + struct obd_import *imp = handle->lgh_ctxt->loc_imp; + struct ptlrpc_request *req = NULL; + struct llogd_body *body; + struct llog_log_hdr *hdr; + int size = sizeof(*body); + int repsize = sizeof (*hdr); + int rc; + ENTRY; + + req = ptlrpc_prep_req(imp, LLOG_ORIGIN_HANDLE_READ_HEADER, 1, &size, NULL); + if (!req) + GOTO(out, rc = -ENOMEM); + + body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body)); + body->lgd_logid = handle->lgh_id; + body->lgd_ctxt_idx = handle->lgh_ctxt->loc_idx - 1; + body->lgd_llh_flags = handle->lgh_hdr->llh_flags; + + req->rq_replen = lustre_msg_size(1, &repsize); + rc = ptlrpc_queue_wait(req); + if (rc) + GOTO(out, rc); + + hdr = lustre_swab_repbuf(req, 0, sizeof(*hdr), + lustre_swab_llog_hdr); + if (hdr == NULL) { + CERROR ("Can't unpack llog_hdr\n"); + GOTO(out, rc =-EFAULT); + } + memcpy(handle->lgh_hdr, hdr, sizeof (*hdr)); + +out: + if (req) + ptlrpc_req_finished(req); + RETURN(rc); +} + +static int llog_client_close(struct llog_handle *handle) +{ + int rc = 0; + + RETURN(rc); +} + + +struct llog_operations llog_client_ops = { + lop_next_block: llog_client_next_block, + lop_read_header: llog_client_read_header, + lop_create: llog_client_create, + lop_close: llog_client_close, +}; diff --git a/lustre/ptlrpc/llog_net.c b/lustre/ptlrpc/llog_net.c index c553aa8..880af3c 100644 --- a/lustre/ptlrpc/llog_net.c +++ b/lustre/ptlrpc/llog_net.c @@ -40,11 +40,12 @@ #ifdef ENABLE_ORPHANS -int llog_origin_handle_cancel(struct llog_obd_ctxt *ctxt, - struct ptlrpc_request *req) +int llog_origin_handle_cancel(struct ptlrpc_request *req) { - struct obd_device *obd = ctxt->loc_exp->exp_obd; + struct obd_device *obd = req->rq_export->exp_obd; + struct obd_device *disk_obd; struct llog_cookie *logcookies; + struct llog_ctxt *ctxt; int num_cookies, rc = 0; struct obd_run_ctxt saved; struct llog_handle *cathandle; @@ -57,21 +58,84 @@ int llog_origin_handle_cancel(struct llog_obd_ctxt *ctxt, RETURN(-EFAULT); } + ctxt = llog_get_context(obd, logcookies->lgc_subsys); + if (ctxt == NULL) { + CERROR("llog subsys not setup or already cleanup\n"); + RETURN(-ENOENT); + } + down(&ctxt->loc_sem); + disk_obd = ctxt->loc_exp->exp_obd; cathandle = ctxt->loc_handle; LASSERT(cathandle); - push_ctxt(&saved, &obd->obd_ctxt, NULL); + push_ctxt(&saved, &disk_obd->obd_ctxt, NULL); rc = llog_cat_cancel_records(cathandle, num_cookies, logcookies); if (rc) CERROR("cancel %d llog-records failed: %d\n", num_cookies, rc); - pop_ctxt(&saved, &obd->obd_ctxt, NULL); + else + CERROR("cancel %d llog-records successful\n", num_cookies); + + pop_ctxt(&saved, &disk_obd->obd_ctxt, NULL); + up(&ctxt->loc_sem); RETURN(rc); } EXPORT_SYMBOL(llog_origin_handle_cancel); #endif + +int llog_origin_connect(struct llog_ctxt *ctxt, int count, + struct llog_logid *logid, + struct llog_ctxt_gen *gen) +{ + struct obd_import *imp; + struct ptlrpc_request *request; + struct llogd_conn_body *req_body; + int size = sizeof(struct llogd_conn_body); + int rc; + ENTRY; + + LASSERT(ctxt->loc_imp); + imp = ctxt->loc_imp; -int llog_receptor_accept(struct llog_obd_ctxt *ctxt, struct obd_import *imp) + request = ptlrpc_prep_req(imp, LLOG_ORIGIN_CONNECT, 1, &size, NULL); + if (!request) + RETURN(-ENOMEM); + + req_body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof(*req_body)); + + req_body->lgdc_gen = ctxt->loc_gen; + req_body->lgdc_logid = ctxt->loc_handle->lgh_id; + req_body->lgdc_ctxt_idx = ctxt->loc_idx + 1; + request->rq_replen = lustre_msg_size(0, NULL); + + rc = ptlrpc_queue_wait(request); + ptlrpc_req_finished(request); + + RETURN(rc); +} +EXPORT_SYMBOL(llog_origin_connect); + +int llog_handle_connect(struct ptlrpc_request *req) +{ + struct obd_device *obd = req->rq_export->exp_obd; + struct llogd_conn_body *req_body; + struct llog_ctxt *ctxt; + int rc; + ENTRY; + + req_body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*req_body)); + + ctxt = llog_get_context(obd, req_body->lgdc_ctxt_idx); + rc = llog_connect(ctxt, 1, &req_body->lgdc_logid, + &req_body->lgdc_gen); + if (rc != 0) + CERROR("failed at llog_relp_connect\n"); + + RETURN(rc); +} +EXPORT_SYMBOL(llog_handle_connect); + +int llog_receptor_accept(struct llog_ctxt *ctxt, struct obd_import *imp) { ENTRY; LASSERT(ctxt); @@ -80,7 +144,7 @@ int llog_receptor_accept(struct llog_obd_ctxt *ctxt, struct obd_import *imp) } EXPORT_SYMBOL(llog_receptor_accept); -int llog_initiator_connect(struct llog_obd_ctxt *ctxt) +int llog_initiator_connect(struct llog_ctxt *ctxt) { ENTRY; LASSERT(ctxt); @@ -88,4 +152,3 @@ int llog_initiator_connect(struct llog_obd_ctxt *ctxt) RETURN(0); } EXPORT_SYMBOL(llog_initiator_connect); - diff --git a/lustre/ptlrpc/llog_server.c b/lustre/ptlrpc/llog_server.c new file mode 100644 index 0000000..6b117d0 --- /dev/null +++ b/lustre/ptlrpc/llog_server.c @@ -0,0 +1,239 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2001-2003 Cluster File Systems, Inc. + * Author: Andreas Dilger + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + * remote api for llog - server side + * + */ + +#define DEBUG_SUBSYSTEM S_LOG + +#ifndef EXPORT_SYMTAB +#define EXPORT_SYMTAB +#endif + +#include +#include +#include +#include +#include + +int llog_origin_handle_create(struct ptlrpc_request *req) +{ + struct obd_export *exp = req->rq_export; + struct obd_device *obd = exp->exp_obd; + struct obd_device *disk_obd; + struct llog_handle *loghandle; + struct llogd_body *body; + struct obd_run_ctxt saved; + struct llog_logid *logid = NULL; + struct llog_ctxt *ctxt; + char * name = NULL; + int size = sizeof (*body); + int rc, rc2; + ENTRY; + + body = lustre_swab_reqbuf(req, 0, sizeof(*body), + lustre_swab_llogd_body); + if (body == NULL) { + CERROR ("Can't unpack llogd_body\n"); + GOTO(out, rc =-EFAULT); + } + + if (body->lgd_logid.lgl_oid > 0) + logid = &body->lgd_logid; + + if (req->rq_reqmsg->bufcount > 1) { + name = lustre_msg_string(req->rq_reqmsg, 1, 0); + if (name == NULL) { + CERROR("Can't unpack name\n"); + GOTO(out, rc = -EFAULT); + } + } + + ctxt = llog_get_context(obd, body->lgd_ctxt_idx); + LASSERT(ctxt != NULL); + disk_obd = ctxt->loc_exp->exp_obd; + push_ctxt(&saved, &disk_obd->obd_ctxt, NULL); + + rc = llog_create(ctxt, &loghandle, logid, name); + if (rc) + GOTO(out_pop, rc); + + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); + if (rc) + GOTO(out_close, rc = -ENOMEM); + + body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body)); + body->lgd_logid = loghandle->lgh_id; + +out_close: + rc2 = llog_close(loghandle); + if (!rc) + rc = rc2; +out_pop: + pop_ctxt(&saved, &disk_obd->obd_ctxt, NULL); +out: + RETURN(rc); +} + +int llog_origin_handle_next_block(struct ptlrpc_request *req) +{ + struct obd_export *exp = req->rq_export; + struct obd_device *obd = exp->exp_obd; + struct obd_device *disk_obd; + struct llog_handle *loghandle; + struct llogd_body *body; + struct obd_run_ctxt saved; + struct llog_ctxt *ctxt; + __u32 flags; + __u8 *buf; + void * ptr; + int size[] = {sizeof (*body), + LLOG_CHUNK_SIZE}; + int rc, rc2; + ENTRY; + + body = lustre_swab_reqbuf(req, 0, sizeof(*body), + lustre_swab_llogd_body); + if (body == NULL) { + CERROR ("Can't unpack llogd_body\n"); + GOTO(out, rc =-EFAULT); + } + + OBD_ALLOC(buf, LLOG_CHUNK_SIZE); + if (!buf) + GOTO(out, rc = -ENOMEM); + + ctxt = llog_get_context(obd, body->lgd_ctxt_idx); + LASSERT(ctxt != NULL); + disk_obd = ctxt->loc_exp->exp_obd; + push_ctxt(&saved, &disk_obd->obd_ctxt, NULL); + + rc = llog_create(ctxt, &loghandle, &body->lgd_logid, NULL); + if (rc) + GOTO(out_pop, rc); + + flags = body->lgd_llh_flags; + rc = llog_init_handle(loghandle, flags, NULL); + if (rc) + GOTO(out_close, rc); + + memset(buf, 0, LLOG_CHUNK_SIZE); + rc = llog_next_block(loghandle, &body->lgd_saved_index, + body->lgd_index, + &body->lgd_cur_offset, buf, LLOG_CHUNK_SIZE); + if (rc) + GOTO(out_close, rc); + + + rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg); + if (rc) + GOTO(out_close, rc = -ENOMEM); + + ptr = lustre_msg_buf(req->rq_repmsg, 0, sizeof (body)); + memcpy(ptr, body, sizeof(*body)); + + ptr = lustre_msg_buf(req->rq_repmsg, 1, LLOG_CHUNK_SIZE); + memcpy(ptr, buf, LLOG_CHUNK_SIZE); + +out_close: + rc2 = llog_close(loghandle); + if (!rc) + rc = rc2; + +out_pop: + pop_ctxt(&saved, &disk_obd->obd_ctxt, NULL); + OBD_FREE(buf, LLOG_CHUNK_SIZE); +out: + RETURN(rc); +} + +int llog_origin_handle_read_header(struct ptlrpc_request *req) +{ + struct obd_export *exp = req->rq_export; + struct obd_device *obd = exp->exp_obd; + struct obd_device *disk_obd; + struct llog_handle *loghandle; + struct llogd_body *body; + struct llog_log_hdr *hdr; + struct obd_run_ctxt saved; + struct llog_ctxt *ctxt; + __u32 flags; + __u8 *buf; + int size[] = {sizeof (*hdr)}; + int rc, rc2; + ENTRY; + + body = lustre_swab_reqbuf(req, 0, sizeof(*body), + lustre_swab_llogd_body); + if (body == NULL) { + CERROR ("Can't unpack llogd_body\n"); + GOTO(out, rc =-EFAULT); + } + + OBD_ALLOC(buf, LLOG_CHUNK_SIZE); + if (!buf) + GOTO(out, rc = -ENOMEM); + + ctxt = llog_get_context(obd, body->lgd_ctxt_idx); + LASSERT(ctxt != NULL); + disk_obd = ctxt->loc_exp->exp_obd; + push_ctxt(&saved, &disk_obd->obd_ctxt, NULL); + + rc = llog_create(ctxt, &loghandle, &body->lgd_logid, NULL); + if (rc) + GOTO(out_pop, rc); + + /* init_handle reads the header */ + flags = body->lgd_llh_flags; + rc = llog_init_handle(loghandle, flags, NULL); + if (rc) + GOTO(out_close, rc); + + + rc = lustre_pack_msg(1, size, NULL, &req->rq_replen, &req->rq_repmsg); + if (rc) + GOTO(out_close, rc = -ENOMEM); + + hdr = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*hdr)); + memcpy(hdr, loghandle->lgh_hdr, sizeof(*hdr)); + +out_close: + rc2 = llog_close(loghandle); + if (!rc) + rc = rc2; + +out_pop: + pop_ctxt(&saved, &disk_obd->obd_ctxt, NULL); + OBD_FREE(buf, LLOG_CHUNK_SIZE); + +out: + RETURN(rc); +} + +int llog_origin_handle_close(struct ptlrpc_request *req) +{ + int rc; + + rc = 0; + + RETURN(rc); +} diff --git a/lustre/tests/replay-single.sh b/lustre/tests/replay-single.sh index 2627ab4..4b1cdda 100755 --- a/lustre/tests/replay-single.sh +++ b/lustre/tests/replay-single.sh @@ -169,6 +169,8 @@ test_5() { grep -q "tag-$i" $DIR/$tfile-$i || error "f1c-$i" done rm -rf $DIR/$tfile-* + sleep 5 + # waiting for commitment of removal } run_test 5 "|x| 220 open(O_CREAT)" @@ -180,6 +182,8 @@ test_6() { fail mds $CHECKSTAT -t dir $DIR/$tdir || return 1 $CHECKSTAT -t file $DIR/$tdir/$tfile || return 2 + sleep 2 + # waiting for log process thread } run_test 6 "mkdir + contained create" @@ -405,8 +409,8 @@ test_20() { fail mds kill -USR1 $pid - wait $pid || return 2 - [ -e $DIR/$tfile ] && return 3 + wait $pid || return 1 + [ -e $DIR/$tfile ] && return 2 return 0 } run_test 20 "|X| open(O_CREAT), unlink, replay, close (test mds_cleanup_orphans)" @@ -440,8 +444,8 @@ test_22() { fail mds kill -USR1 $pid - wait $pid || return 2 - [ -e $DIR/$tfile ] && return 3 + wait $pid || return 1 + [ -e $DIR/$tfile ] && return 2 return 0 } run_test 22 "open(O_CREAT), |X| unlink, replay, close (test mds_cleanup_orphans)" @@ -475,8 +479,8 @@ test_24() { fail mds rm -f $DIR/$tfile kill -USR1 $pid - wait $pid || return 2 - [ -e $DIR/$tfile ] && return 3 + wait $pid || return 1 + [ -e $DIR/$tfile ] && return 2 return 0 } run_test 24 "open(O_CREAT), replay, unlink, close (test mds_cleanup_orphans)" @@ -491,134 +495,140 @@ test_25() { replay_barrier mds fail mds kill -USR1 $pid - wait $pid || return 2 - [ -e $DIR/$tfile ] && return 3 + wait $pid || return 1 + [ -e $DIR/$tfile ] && return 2 return 0 } run_test 25 "open(O_CREAT), unlink, replay, close (test mds_cleanup_orphans)" test_26() { replay_barrier mds - multiop $DIR/$tfile O_tSc & - pid=$! + multiop $DIR/$tfile-1 O_tSc & + pid1=$! multiop $DIR/$tfile-2 O_tSc & pid2=$! # give multiop a chance to open sleep 1 - rm -f $DIR/$tfile + rm -f $DIR/$tfile-1 rm -f $DIR/$tfile-2 kill -USR1 $pid2 - wait $pid2 || return 4 + wait $pid2 || return 1 fail mds - kill -USR1 $pid - wait $pid || return 2 - [ -e $DIR/$tfile ] && return 3 + kill -USR1 $pid1 + wait $pid1 || return 2 + [ -e $DIR/$tfile-1 ] && return 3 + [ -e $DIR/$tfile-2 ] && return 4 return 0 } run_test 26 "|X| open(O_CREAT), unlink two, close one, replay, close one (test mds_cleanup_orphans)" test_27() { replay_barrier mds - multiop $DIR/$tfile O_tSc & - pid=$! + multiop $DIR/$tfile-1 O_tSc & + pid1=$! multiop $DIR/$tfile-2 O_tSc & pid2=$! # give multiop a chance to open sleep 1 - rm -f $DIR/$tfile + rm -f $DIR/$tfile-1 rm -f $DIR/$tfile-2 fail mds - kill -USR1 $pid - wait $pid || return 2 + kill -USR1 $pid1 + wait $pid1 || return 1 kill -USR1 $pid2 - wait $pid2 || return 4 - [ -e $DIR/$tfile ] && return 3 + wait $pid2 || return 2 + [ -e $DIR/$tfile-1 ] && return 3 + [ -e $DIR/$tfile-2 ] && return 4 return 0 } run_test 27 "|X| open(O_CREAT), unlink two, replay, close two (test mds_cleanup_orphans)" test_28() { - multiop $DIR/$tfile O_tSc & - pid=$! + multiop $DIR/$tfile-1 O_tSc & + pid1=$! multiop $DIR/$tfile-2 O_tSc & pid2=$! # give multiop a chance to open sleep 1 replay_barrier mds - rm -f $DIR/$tfile + rm -f $DIR/$tfile-1 rm -f $DIR/$tfile-2 kill -USR1 $pid2 - wait $pid2 || return 4 + wait $pid2 || return 1 fail mds - kill -USR1 $pid - wait $pid || return 2 - [ -e $DIR/$tfile ] && return 3 + kill -USR1 $pid1 + wait $pid1 || return 2 + [ -e $DIR/$tfile-1 ] && return 3 + [ -e $DIR/$tfile-2 ] && return 4 return 0 } run_test 28 "open(O_CREAT), |X| unlink two, close one, replay, close one (test mds_cleanup_orphans)" test_29() { - multiop $DIR/$tfile O_tSc & - pid=$! + multiop $DIR/$tfile-1 O_tSc & + pid1=$! multiop $DIR/$tfile-2 O_tSc & pid2=$! # give multiop a chance to open sleep 1 replay_barrier mds - rm -f $DIR/$tfile + rm -f $DIR/$tfile-1 rm -f $DIR/$tfile-2 fail mds - kill -USR1 $pid - wait $pid || return 2 + kill -USR1 $pid1 + wait $pid1 || return 1 kill -USR1 $pid2 - wait $pid2 || return 4 - [ -e $DIR/$tfile ] && return 3 + wait $pid2 || return 2 + [ -e $DIR/$tfile-1 ] && return 3 + [ -e $DIR/$tfile-2 ] && return 4 return 0 } run_test 29 "open(O_CREAT), |X| unlink two, replay, close two (test mds_cleanup_orphans)" test_30() { - multiop $DIR/$tfile O_tSc & - pid=$! + multiop $DIR/$tfile-1 O_tSc & + pid1=$! multiop $DIR/$tfile-2 O_tSc & pid2=$! # give multiop a chance to open sleep 1 - rm -f $DIR/$tfile + rm -f $DIR/$tfile-1 rm -f $DIR/$tfile-2 replay_barrier mds fail mds - kill -USR1 $pid - wait $pid || return 2 + kill -USR1 $pid1 + wait $pid1 || return 1 kill -USR1 $pid2 - wait $pid2 || return 4 - [ -e $DIR/$tfile ] && return 3 + wait $pid2 || return 2 + [ -e $DIR/$tfile-1 ] && return 3 + [ -e $DIR/$tfile-2 ] && return 4 return 0 } run_test 30 "open(O_CREAT) two, unlink two, replay, close two (test mds_cleanup_orphans)" test_31() { - multiop $DIR/$tfile O_tSc & - pid=$! + multiop $DIR/$tfile-1 O_tSc & + pid1=$! multiop $DIR/$tfile-2 O_tSc & pid2=$! # give multiop a chance to open sleep 1 - rm -f $DIR/$tfile + rm -f $DIR/$tfile-1 replay_barrier mds rm -f $DIR/$tfile-2 fail mds - kill -USR1 $pid - wait $pid || return 2 + kill -USR1 $pid1 + wait $pid1 || return 1 kill -USR1 $pid2 - wait $pid2 || return 4 - [ -e $DIR/$tfile ] && return 3 + wait $pid2 || return 2 + [ -e $DIR/$tfile-1 ] && return 3 + [ -e $DIR/$tfile-2 ] && return 4 return 0 } run_test 31 "open(O_CREAT) two, unlink one, |X| unlink one, close two (test mds_cleanup_orphans)" @@ -650,5 +660,22 @@ test_33() { } run_test 33 "abort recovery before client does replay" +test_34() { + multiop $DIR/$tfile O_c & + pid=$! + # give multiop a chance to open + sleep 1 + rm -f $DIR/$tfile + + replay_barrier mds + fail_abort mds + kill -USR1 $pid + [ -e $DIR/$tfile ] && return 1 + sleep 5 + # wait for commitment of removal + return 0 +} +run_test 34 "abort recovery before client does replay (test mds_cleanup_orphans)" + equals_msg test complete, cleaning up cleanup -- 1.8.3.1