From 591f8771df00e1c3279019281e5f7d2e7c7e4877 Mon Sep 17 00:00:00 2001 From: Emoly Liu Date: Fri, 21 Aug 2015 23:48:58 +0800 Subject: [PATCH] LU-5297 osp: process unsuccessful osp sync records properly Unsuccessful records can be classifed into two types: failed and invalid. And they should be handled differently. This patch improves the handling process by the following fixes. First, correct the return values of osp_sync_new_xxx_job() to separate the record types: - 0 on success - negative on error - 1 on invalid record Second, process these two types of records differently: - When a failed record is processed, opd_syn_{changes,rpc_in_flight, rpc_in_progess} should be decreased, and opd_syn_last_processed_id should be bumped. - When an invalid record is processed, besides above process should be taken, this record should be deleted at the end. ("unknown record type is treated as invalid record".) Third, simplify the sending rec process in osp_sync_process_queues(), remove the unnecessary loop waiting and continue processing other records directly. Also, OBD_FAIL_OSP_CHECK_INVALID_REC and OBD_FAIL_OSP_CHECK_ENOMEM are defined and used in sanity.sh test_239a/b respectively to verify the fix. Signed-off-by: Emoly Liu Change-Id: I9c55f43f160a3d9e51892a2dc2f45a52f9b6f2c8 Reviewed-on: http://review.whamcloud.com/14925 Reviewed-by: Niu Yawei Tested-by: Jenkins Tested-by: Maloo Reviewed-by: Alex Zhuravlev Reviewed-by: Mike Pershin Reviewed-by: Oleg Drokin --- lustre/include/obd_support.h | 3 + lustre/osp/osp_sync.c | 132 +++++++++++++++++++++---------------------- lustre/tests/sanity.sh | 22 ++++++++ 3 files changed, 88 insertions(+), 69 deletions(-) diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h index e6d6741..dfc5dc2 100644 --- a/lustre/include/obd_support.h +++ b/lustre/include/obd_support.h @@ -597,6 +597,9 @@ extern char obd_jobid_var[]; #define OBD_FAIL_DT_DELETE 0x2017 #define OBD_FAIL_DT_LOOKUP 0x2018 +#define OBD_FAIL_OSP_CHECK_INVALID_REC 0x2100 +#define OBD_FAIL_OSP_CHECK_ENOMEM 0x2101 + /* Assign references to moved code to reduce code changes */ #define OBD_FAIL_PRECHECK(id) CFS_FAIL_PRECHECK(id) #define OBD_FAIL_CHECK(id) CFS_FAIL_CHECK(id) diff --git a/lustre/osp/osp_sync.c b/lustre/osp/osp_sync.c index 4b6d68a..6b67818 100644 --- a/lustre/osp/osp_sync.c +++ b/lustre/osp/osp_sync.c @@ -591,6 +591,10 @@ static struct ptlrpc_request *osp_sync_new_job(struct osp_device *d, /* Prepare the request */ imp = d->opd_obd->u.cli.cl_import; LASSERT(imp); + + if (OBD_FAIL_CHECK(OBD_FAIL_OSP_CHECK_ENOMEM)) + RETURN(ERR_PTR(-ENOMEM)); + req = ptlrpc_request_alloc(imp, format); if (req == NULL) RETURN(ERR_PTR(-ENOMEM)); @@ -633,6 +637,7 @@ static struct ptlrpc_request *osp_sync_new_job(struct osp_device *d, * \param[in] h llog record * * \retval 0 on success + * \retval 1 on invalid record * \retval negative negated errno on error */ static int osp_sync_new_setattr_job(struct osp_device *d, @@ -646,14 +651,15 @@ static int osp_sync_new_setattr_job(struct osp_device *d, ENTRY; LASSERT(h->lrh_type == MDS_SETATTR64_REC); + if (OBD_FAIL_CHECK(OBD_FAIL_OSP_CHECK_INVALID_REC)) + RETURN(1); /* lsr_valid can only be 0 or have OBD_MD_{FLUID,FLGID} set, * so no bits other than these should be set. */ if ((rec->lsr_valid & ~(OBD_MD_FLUID | OBD_MD_FLGID)) != 0) { CERROR("%s: invalid setattr record, lsr_valid:"LPU64"\n", d->opd_obd->obd_name, rec->lsr_valid); - /* return 0 so that sync thread can continue processing - * other records. */ - RETURN(0); + /* return 1 on invalid record */ + RETURN(1); } req = osp_sync_new_job(d, llh, h, OST_SETATTR, &RQF_OST_SETATTR); @@ -674,7 +680,7 @@ static int osp_sync_new_setattr_job(struct osp_device *d, body->oa.o_valid |= rec->lsr_valid; osp_sync_send_new_rpc(d, req); - RETURN(1); + RETURN(0); } /** @@ -717,7 +723,7 @@ static int osp_sync_new_unlink_job(struct osp_device *d, body->oa.o_valid |= OBD_MD_FLOBJCOUNT; osp_sync_send_new_rpc(d, req); - RETURN(1); + RETURN(0); } /** @@ -731,7 +737,6 @@ static int osp_sync_new_unlink_job(struct osp_device *d, * use OUT for OST as well, this will allow batching and better code * unification. * - * \param[in] env LU environment provided by the caller * \param[in] d OSP device * \param[in] llh llog handle where the record is stored * \param[in] h llog record @@ -739,8 +744,7 @@ static int osp_sync_new_unlink_job(struct osp_device *d, * \retval 0 on success * \retval negative negated errno on error */ -static int osp_sync_new_unlink64_job(const struct lu_env *env, - struct osp_device *d, +static int osp_sync_new_unlink64_job(struct osp_device *d, struct llog_handle *llh, struct llog_rec_hdr *h) { @@ -766,7 +770,7 @@ static int osp_sync_new_unlink64_job(const struct lu_env *env, body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID | OBD_MD_FLOBJCOUNT; osp_sync_send_new_rpc(d, req); - RETURN(1); + RETURN(0); } /** @@ -784,18 +788,18 @@ static int osp_sync_new_unlink64_job(const struct lu_env *env, * \param[in] d OSP device * \param[in] llh llog handle where the record is stored * \param[in] rec llog record - * - * \retval 0 on success - * \retval negative negated errno on error */ -static int osp_sync_process_record(const struct lu_env *env, - struct osp_device *d, - struct llog_handle *llh, - struct llog_rec_hdr *rec) +static void osp_sync_process_record(const struct lu_env *env, + struct osp_device *d, + struct llog_handle *llh, + struct llog_rec_hdr *rec) { + struct llog_handle *cathandle = llh->u.phd.phd_cat_handle; struct llog_cookie cookie; int rc = 0; + ENTRY; + cookie.lgc_lgl = llh->lgh_id; cookie.lgc_subsys = LLOG_MDS_OST_ORIG_CTXT; cookie.lgc_index = rec->lrh_index; @@ -812,10 +816,9 @@ static int osp_sync_process_record(const struct lu_env *env, } /* cancel any generation record */ - rc = llog_cat_cancel_records(env, llh->u.phd.phd_cat_handle, - 1, &cookie); + rc = llog_cat_cancel_records(env, cathandle, 1, &cookie); - return rc; + RETURN_EXIT; } /* @@ -836,7 +839,7 @@ static int osp_sync_process_record(const struct lu_env *env, rc = osp_sync_new_unlink_job(d, llh, rec); break; case MDS_UNLINK64_REC: - rc = osp_sync_new_unlink64_job(env, d, llh, rec); + rc = osp_sync_new_unlink64_job(d, llh, rec); break; case MDS_SETATTR64_REC: rc = osp_sync_new_setattr_job(d, llh, rec); @@ -844,41 +847,53 @@ static int osp_sync_process_record(const struct lu_env *env, default: CERROR("%s: unknown record type: %x\n", d->opd_obd->obd_name, rec->lrh_type); - /* we should continue processing */ + /* treat "unknown record type" as "invalid" */ + rc = 1; + break; } - /* rc > 0 means sync RPC being added to the queue */ - if (likely(rc > 0)) { - spin_lock(&d->opd_syn_lock); - if (d->opd_syn_prev_done) { - LASSERT(d->opd_syn_changes > 0); - LASSERT(rec->lrh_id <= d->opd_syn_last_committed_id); - /* - * NOTE: it's possible to meet same id if - * OST stores few stripes of same file - */ - if (rec->lrh_id > d->opd_syn_last_processed_id) { - d->opd_syn_last_processed_id = rec->lrh_id; - wake_up(&d->opd_syn_barrier_waitq); - } + spin_lock(&d->opd_syn_lock); - d->opd_syn_changes--; + /* For all kinds of records, not matter successful or not, + * we should decrease changes and bump last_processed_id. + */ + if (d->opd_syn_prev_done) { + LASSERT(d->opd_syn_changes > 0); + LASSERT(rec->lrh_id <= d->opd_syn_last_committed_id); + /* NOTE: it's possible to meet same id if + * OST stores few stripes of same file + */ + if (rec->lrh_id > d->opd_syn_last_processed_id) { + d->opd_syn_last_processed_id = rec->lrh_id; + wake_up(&d->opd_syn_barrier_waitq); } - CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n", - d->opd_obd->obd_name, d->opd_syn_rpc_in_flight, - d->opd_syn_rpc_in_progress); - spin_unlock(&d->opd_syn_lock); - rc = 0; - } else { - spin_lock(&d->opd_syn_lock); + d->opd_syn_changes--; + } + if (rc != 0) { d->opd_syn_rpc_in_flight--; d->opd_syn_rpc_in_progress--; - spin_unlock(&d->opd_syn_lock); + } + CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n", + d->opd_obd->obd_name, d->opd_syn_rpc_in_flight, + d->opd_syn_rpc_in_progress); + + spin_unlock(&d->opd_syn_lock); + + /* Delete the invalid record */ + if (rc == 1) { + rc = llog_cat_cancel_records(env, cathandle, 1, &cookie); + if (rc != 0) + CERROR("%s: can't delete invalid record: " + "fid = "DFID", rec_id = %u, rc = %d\n", + d->opd_obd->obd_name, + PFID(lu_object_fid(&cathandle->lgh_obj->do_lu)), + rec->lrh_id, rc); } - CDEBUG(D_HA, "found record %x, %d, idx %u, id %u: %d\n", - rec->lrh_type, rec->lrh_len, rec->lrh_index, rec->lrh_id, rc); - return rc; + CDEBUG(D_HA, "found record %x, %d, idx %u, id %u\n", + rec->lrh_type, rec->lrh_len, rec->lrh_index, rec->lrh_id); + + RETURN_EXIT; } /** @@ -1006,7 +1021,6 @@ static int osp_sync_process_queues(const struct lu_env *env, void *data) { struct osp_device *d = data; - int rc; do { struct l_wait_info lwi = { 0 }; @@ -1031,27 +1045,7 @@ static int osp_sync_process_queues(const struct lu_env *env, d->opd_syn_rpc_in_flight); return 0; } - - /* - * try to send, in case of disconnection, suspend - * processing till we can send this request - */ - do { - rc = osp_sync_process_record(env, d, llh, rec); - /* - * XXX: probably different handling is needed - * for some bugs, like immediate exit or if - * OSP gets inactive - */ - if (rc) { - CERROR("can't send: %d\n", rc); - l_wait_event(d->opd_syn_waitq, - !osp_sync_running(d) || - osp_sync_has_work(d), - &lwi); - } - } while (rc != 0 && osp_sync_running(d)); - + osp_sync_process_record(env, d, llh, rec); llh = NULL; rec = NULL; } diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index 4f78323..a0fc920 100644 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -13152,6 +13152,28 @@ test_239() { } run_test 239 "osp_sync test" +test_239a() { #LU-5297 + touch $DIR/$tfile + #define OBD_FAIL_OSP_CHECK_INVALID_REC 0x2100 + do_facet $SINGLEMDS $LCTL set_param fail_loc=0x2100 + chgrp $RUNAS_GID $DIR/$tfile + wait_delete_completed +} +run_test 239a "process invalid osp sync record correctly" + +test_239b() { #LU-5297 + touch $DIR/$tfile1 + #define OBD_FAIL_OSP_CHECK_ENOMEM 0x2101 + do_facet $SINGLEMDS $LCTL set_param fail_loc=0x2101 + chgrp $RUNAS_GID $DIR/$tfile1 + wait_delete_completed + do_facet $SINGLEMDS $LCTL set_param fail_loc=0 + touch $DIR/$tfile2 + chgrp $RUNAS_GID $DIR/$tfile2 + wait_delete_completed +} +run_test 239b "process osp sync record with ENOMEM error correctly" + test_240() { [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return -- 1.8.3.1