From a96be7018d90585dce16ce4b126485ce41629cd2 Mon Sep 17 00:00:00 2001 From: wang di Date: Sat, 25 Jul 2015 09:21:46 -0700 Subject: [PATCH] LU-6846 llog: combine cancel rec and destroy Merge cancel llog record and llog object destroy in a single transaction. Otherwise it will cause the race between cancel record and llog object destroy, especally when deleting remote record, i.e. if delete record RPC arrives after destroy, then delete record will not find the object, cause LBUG. Signed-off-by: wang di Change-Id: I33773eeb859fffa871ce01689c5bf1b4196a6658 Reviewed-on: http://review.whamcloud.com/15730 Tested-by: Jenkins Reviewed-by: Mike Pershin Tested-by: Maloo Reviewed-by: James Simmons Reviewed-by: Oleg Drokin --- lustre/include/lustre_log.h | 26 ++------ lustre/obdclass/llog.c | 155 ++++++++++++++++++++++++++++++++------------ lustre/obdclass/llog_osd.c | 140 ++++++++++++++++++++++----------------- lustre/osp/osp_md_object.c | 33 +--------- lustre/ptlrpc/llog_client.c | 3 +- 5 files changed, 206 insertions(+), 151 deletions(-) diff --git a/lustre/include/lustre_log.h b/lustre/include/lustre_log.h index 458cc2e..393ce28 100644 --- a/lustre/include/lustre_log.h +++ b/lustre/include/lustre_log.h @@ -218,8 +218,10 @@ int llog_catalog_list(const struct lu_env *env, struct dt_device *d, int llog_initiator_connect(struct llog_ctxt *ctxt); struct llog_operations { + int (*lop_declare_destroy)(const struct lu_env *env, + struct llog_handle *handle, struct thandle *th); int (*lop_destroy)(const struct lu_env *env, - struct llog_handle *handle); + struct llog_handle *handle, struct thandle *th); int (*lop_next_block)(const struct lu_env *env, struct llog_handle *h, int *curr_idx, int next_idx, __u64 *offset, void *buf, int len); @@ -467,24 +469,6 @@ static inline int llog_ctxt_null(struct obd_device *obd, int index) return (llog_group_ctxt_null(&obd->obd_olg, index)); } -static inline int llog_destroy(const struct lu_env *env, - struct llog_handle *handle) -{ - struct llog_operations *lop; - int rc; - - ENTRY; - - rc = llog_handle2ops(handle, &lop); - if (rc) - RETURN(rc); - if (lop->lop_destroy == NULL) - RETURN(-EOPNOTSUPP); - - rc = lop->lop_destroy(env, handle); - RETURN(rc); -} - static inline int llog_next_block(const struct lu_env *env, struct llog_handle *loghandle, int *cur_idx, int next_idx, __u64 *cur_offset, void *buf, @@ -564,6 +548,10 @@ int llog_declare_create(const struct lu_env *env, struct llog_handle *loghandle, struct thandle *th); int llog_create(const struct lu_env *env, struct llog_handle *handle, struct thandle *th); +int llog_trans_destroy(const struct lu_env *env, struct llog_handle *handle, + struct thandle *th); +int llog_destroy(const struct lu_env *env, struct llog_handle *handle); + int llog_declare_write_rec(const struct lu_env *env, struct llog_handle *handle, struct llog_rec_hdr *rec, int idx, diff --git a/lustre/obdclass/llog.c b/lustre/obdclass/llog.c index 334f752..3aeea1d 100644 --- a/lustre/obdclass/llog.c +++ b/lustre/obdclass/llog.c @@ -104,8 +104,98 @@ void llog_handle_put(struct llog_handle *loghandle) llog_free_handle(loghandle); } -static int llog_cancel_rec_internal(const struct lu_env *env, - struct llog_handle *loghandle, int index) +static int llog_declare_destroy(const struct lu_env *env, + struct llog_handle *handle, + struct thandle *th) +{ + struct llog_operations *lop; + int rc; + + ENTRY; + + rc = llog_handle2ops(handle, &lop); + if (rc) + RETURN(rc); + if (lop->lop_declare_destroy == NULL) + RETURN(-EOPNOTSUPP); + + rc = lop->lop_declare_destroy(env, handle, th); + + RETURN(rc); +} + +int llog_trans_destroy(const struct lu_env *env, struct llog_handle *handle, + struct thandle *th) +{ + struct llog_operations *lop; + int rc; + ENTRY; + + rc = llog_handle2ops(handle, &lop); + if (rc < 0) + RETURN(rc); + if (lop->lop_destroy == NULL) + RETURN(-EOPNOTSUPP); + + LASSERT(handle->lgh_obj != NULL); + if (!dt_object_exists(handle->lgh_obj)) + RETURN(0); + + rc = lop->lop_destroy(env, handle, th); + + RETURN(rc); +} + +int llog_destroy(const struct lu_env *env, struct llog_handle *handle) +{ + struct llog_operations *lop; + struct dt_device *dt; + struct thandle *th; + int rc; + + ENTRY; + + rc = llog_handle2ops(handle, &lop); + if (rc < 0) + RETURN(rc); + if (lop->lop_destroy == NULL) + RETURN(-EOPNOTSUPP); + + if (handle->lgh_obj == NULL) { + /* if lgh_obj == NULL, then it is from client side destroy */ + rc = lop->lop_destroy(env, handle, NULL); + RETURN(rc); + } + + if (!dt_object_exists(handle->lgh_obj)) + RETURN(0); + + dt = lu2dt_dev(handle->lgh_obj->do_lu.lo_dev); + + th = dt_trans_create(env, dt); + if (IS_ERR(th)) + RETURN(PTR_ERR(th)); + + rc = llog_declare_destroy(env, handle, th); + if (rc != 0) + GOTO(out_trans, rc); + + rc = dt_trans_start_local(env, dt, th); + if (rc < 0) + GOTO(out_trans, rc); + + rc = lop->lop_destroy(env, handle, th); + +out_trans: + dt_trans_stop(env, dt, th); + + RETURN(rc); +} +EXPORT_SYMBOL(llog_destroy); + +/* returns negative on error; 0 if success; 1 if success & log destroyed */ +int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle, + int index) { struct dt_device *dt; struct llog_log_hdr *llh = loghandle->lgh_hdr; @@ -114,8 +204,16 @@ static int llog_cancel_rec_internal(const struct lu_env *env, ENTRY; - LASSERT(loghandle); - LASSERT(loghandle->lgh_ctxt); + CDEBUG(D_RPCTRACE, "Canceling %d in log "DOSTID"\n", index, + POSTID(&loghandle->lgh_id.lgl_oi)); + + if (index == 0) { + CERROR("Can't cancel index 0 which is header\n"); + RETURN(-EINVAL); + } + + LASSERT(loghandle != NULL); + LASSERT(loghandle->lgh_ctxt != NULL); LASSERT(loghandle->lgh_obj != NULL); dt = lu2dt_dev(loghandle->lgh_obj->do_lu.lo_dev); @@ -128,6 +226,9 @@ static int llog_cancel_rec_internal(const struct lu_env *env, if (rc < 0) GOTO(out_trans, rc); + if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY)) + rc = llog_declare_destroy(env, loghandle, th); + th->th_wait_submit = 1; rc = dt_trans_start_local(env, dt, th); if (rc < 0) @@ -147,44 +248,11 @@ static int llog_cancel_rec_internal(const struct lu_env *env, loghandle->lgh_hdr->llh_count--; else ext2_set_bit(index, LLOG_HDR_BITMAP(llh)); -out_unlock: - mutex_unlock(&loghandle->lgh_hdr_mutex); - up_write(&loghandle->lgh_lock); -out_trans: - dt_trans_stop(env, dt, th); - RETURN(rc); -} - -/* returns negative on error; 0 if success; 1 if success & log destroyed */ -int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle, - int index) -{ - struct llog_log_hdr *llh = loghandle->lgh_hdr; - int rc = 0; - ENTRY; - - CDEBUG(D_RPCTRACE, "Canceling %d in log "DOSTID"\n", - index, POSTID(&loghandle->lgh_id.lgl_oi)); - - if (index == 0) { - CERROR("Can't cancel index 0 which is header\n"); - RETURN(-EINVAL); - } - - rc = llog_cancel_rec_internal(env, loghandle, index); - if (rc < 0) { - CERROR("%s: fail to write header for llog #"DOSTID - "#%08x: rc = %d\n", - loghandle->lgh_ctxt->loc_obd->obd_name, - POSTID(&loghandle->lgh_id.lgl_oi), - loghandle->lgh_id.lgl_ogen, rc); - RETURN(rc); - } if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) && (llh->llh_count == 1) && (loghandle->lgh_last_idx == LLOG_HDR_BITMAP_SIZE(llh) - 1)) { - rc = llog_destroy(env, loghandle); + rc = llog_trans_destroy(env, loghandle, th); if (rc < 0) { /* Sigh, can not destroy the final plain llog, but * the bitmap has been clearly, so the record can not @@ -195,12 +263,17 @@ int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle, loghandle->lgh_ctxt->loc_obd->obd_name, POSTID(&loghandle->lgh_id.lgl_oi), loghandle->lgh_id.lgl_ogen, rc); - RETURN(0); + GOTO(out_unlock, rc = 0); } - RETURN(LLOG_DEL_PLAIN); + rc = LLOG_DEL_PLAIN; } - RETURN(0); +out_unlock: + mutex_unlock(&loghandle->lgh_hdr_mutex); + up_write(&loghandle->lgh_lock); +out_trans: + dt_trans_stop(env, dt, th); + RETURN(rc); } static int llog_read_header(const struct lu_env *env, diff --git a/lustre/obdclass/llog_osd.c b/lustre/obdclass/llog_osd.c index 519fd27..4912db9 100644 --- a/lustre/obdclass/llog_osd.c +++ b/lustre/obdclass/llog_osd.c @@ -422,9 +422,6 @@ static int llog_osd_write_rec(const struct lu_env *env, if (idx == LLOG_HEADER_IDX) { /* llog header update */ - LASSERT(reclen >= sizeof(struct llog_log_hdr)); - LASSERT(rec == &llh->llh_hdr); - lgi->lgi_off = 0; lgi->lgi_buf.lb_len = reclen; lgi->lgi_buf.lb_buf = rec; @@ -1482,14 +1479,11 @@ llog_osd_regular_fid_del_name_entry(const struct lu_env *env, RETURN(rc); } - /** - * Implementation of the llog_operations::lop_destroy + * Implementation of the llog_operations::lop_declare_destroy * - * This function destroys the llog and deletes also entry in the + * This function declare destroys the llog and deletes also entry in the * llog directory in case of named llog. Llog should be opened prior that. - * Destroy method is not part of external transaction and does everything - * inside. * * \param[in] env execution environment * \param[in] loghandle llog handle of the current llog @@ -1497,14 +1491,12 @@ llog_osd_regular_fid_del_name_entry(const struct lu_env *env, * \retval 0 on successful destroy * \retval negative value on error */ -static int llog_osd_destroy(const struct lu_env *env, - struct llog_handle *loghandle) +static int llog_osd_declare_destroy(const struct lu_env *env, + struct llog_handle *loghandle, + struct thandle *th) { struct llog_ctxt *ctxt; struct dt_object *o, *llog_dir = NULL; - struct dt_device *d; - struct thandle *th; - char *name = NULL; int rc; ENTRY; @@ -1515,78 +1507,104 @@ static int llog_osd_destroy(const struct lu_env *env, o = loghandle->lgh_obj; LASSERT(o); - d = lu2dt_dev(o->do_lu.lo_dev); - LASSERT(d); - LASSERT(d == ctxt->loc_exp->exp_obd->obd_lvfs_ctxt.dt); - - th = dt_trans_create(env, d); - if (IS_ERR(th)) - RETURN(PTR_ERR(th)); - if (loghandle->lgh_name) { llog_dir = llog_osd_dir_get(env, ctxt); if (IS_ERR(llog_dir)) - GOTO(out_trans, rc = PTR_ERR(llog_dir)); + RETURN(PTR_ERR(llog_dir)); - name = loghandle->lgh_name; rc = dt_declare_delete(env, llog_dir, - (struct dt_key *)name, th); - if (rc) - GOTO(out_trans, rc); + (struct dt_key *)loghandle->lgh_name, + th); + if (rc < 0) + GOTO(out_put, rc); } rc = dt_declare_ref_del(env, o, th); if (rc < 0) - GOTO(out_trans, rc); + GOTO(out_put, rc); rc = dt_declare_destroy(env, o, th); - if (rc) - GOTO(out_trans, rc); + if (rc < 0) + GOTO(out_put, rc); if (loghandle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) { rc = llog_osd_regular_fid_del_name_entry(env, o, th, true); if (rc < 0) - GOTO(out_trans, rc); + GOTO(out_put, rc); } - rc = dt_trans_start_local(env, d, th); - if (rc) - GOTO(out_trans, rc); +out_put: + if (!(IS_ERR_OR_NULL(llog_dir))) + lu_object_put(env, &llog_dir->do_lu); - th->th_wait_submit = 1; + RETURN(rc); +} + + +/** + * Implementation of the llog_operations::lop_destroy + * + * This function destroys the llog and deletes also entry in the + * llog directory in case of named llog. Llog should be opened prior that. + * Destroy method is not part of external transaction and does everything + * inside. + * + * \param[in] env execution environment + * \param[in] loghandle llog handle of the current llog + * + * \retval 0 on successful destroy + * \retval negative value on error + */ +static int llog_osd_destroy(const struct lu_env *env, + struct llog_handle *loghandle, struct thandle *th) +{ + struct llog_ctxt *ctxt; + struct dt_object *o, *llog_dir = NULL; + int rc; + + ENTRY; + + ctxt = loghandle->lgh_ctxt; + LASSERT(ctxt != NULL); + + o = loghandle->lgh_obj; + LASSERT(o != NULL); dt_write_lock(env, o, 0); - if (dt_object_exists(o)) { - if (name) { - dt_read_lock(env, llog_dir, 0); - rc = dt_delete(env, llog_dir, - (struct dt_key *) name, - th); - dt_read_unlock(env, llog_dir); - if (rc) { - CERROR("%s: can't remove llog %s: rc = %d\n", - o->do_lu.lo_dev->ld_obd->obd_name, - name, rc); - GOTO(out_unlock, rc); - } - } - dt_ref_del(env, o, th); - rc = dt_destroy(env, o, th); - if (rc) - GOTO(out_unlock, rc); + if (!dt_object_exists(o)) + GOTO(out_unlock, rc = 0); - if (loghandle->lgh_ctxt->loc_flags & - LLOG_CTXT_FLAG_NORMAL_FID) { - rc = llog_osd_regular_fid_del_name_entry(env, o, th, - false); - if (rc < 0) - GOTO(out_unlock, rc); + if (loghandle->lgh_name) { + llog_dir = llog_osd_dir_get(env, ctxt); + if (IS_ERR(llog_dir)) + RETURN(PTR_ERR(llog_dir)); + + dt_read_lock(env, llog_dir, 0); + rc = dt_delete(env, llog_dir, + (struct dt_key *)loghandle->lgh_name, + th); + dt_read_unlock(env, llog_dir); + if (rc) { + CERROR("%s: can't remove llog %s: rc = %d\n", + o->do_lu.lo_dev->ld_obd->obd_name, + loghandle->lgh_name, rc); + GOTO(out_unlock, rc); } } + + dt_ref_del(env, o, th); + rc = dt_destroy(env, o, th); + if (rc < 0) + GOTO(out_unlock, rc); + + if (loghandle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) { + rc = llog_osd_regular_fid_del_name_entry(env, o, th, false); + if (rc < 0) + GOTO(out_unlock, rc); + } + out_unlock: dt_write_unlock(env, o); -out_trans: - dt_trans_stop(env, d, th); if (!(IS_ERR_OR_NULL(llog_dir))) lu_object_put(env, &llog_dir->do_lu); RETURN(rc); @@ -1683,6 +1701,7 @@ struct llog_operations llog_osd_ops = { .lop_next_block = llog_osd_next_block, .lop_prev_block = llog_osd_prev_block, .lop_read_header = llog_osd_read_header, + .lop_declare_destroy = llog_osd_declare_destroy, .lop_destroy = llog_osd_destroy, .lop_setup = llog_osd_setup, .lop_cleanup = llog_osd_cleanup, @@ -1700,6 +1719,7 @@ struct llog_operations llog_common_cat_ops = { .lop_next_block = llog_osd_next_block, .lop_prev_block = llog_osd_prev_block, .lop_read_header = llog_osd_read_header, + .lop_declare_destroy = llog_osd_declare_destroy, .lop_destroy = llog_osd_destroy, .lop_setup = llog_osd_setup, .lop_cleanup = llog_osd_cleanup, diff --git a/lustre/osp/osp_md_object.c b/lustre/osp/osp_md_object.c index 6738763..af49456 100644 --- a/lustre/osp/osp_md_object.c +++ b/lustre/osp/osp_md_object.c @@ -956,35 +956,6 @@ int osp_md_declare_object_destroy(const struct lu_env *env, } /** - * Interpreter call for object destroy - * - * Object destroy interpreter, which will be called after destroying - * the remote object to set flags and status. - * - * \param[in] env execution environment - * \param[in] reply update reply - * \param[in] req ptlrpc update request for destroying object - * \param[in] obj object to be destroyed - * \param[in] data data used in this function. - * \param[in] index index(position) of destroy update in the whole - * updates - * \param[in] rc update result on the remote MDT. - * - * \retval only return 0 for now - */ -static int osp_md_object_destroy_interpreter(const struct lu_env *env, - struct object_update_reply *reply, - struct ptlrpc_request *req, - struct osp_object *obj, - void *data, int index, int rc) -{ - /* not needed in cache any more */ - set_bit(LU_OBJECT_HEARD_BANSHEE, - &obj->opo_obj.do_lu.lo_header->loh_flags); - return 0; -} - -/** * Implement OSP layer dt_object_operations::do_destroy() interface. * * Pack the destroy update into the RPC buffer, which will be sent @@ -1019,8 +990,10 @@ int osp_md_object_destroy(const struct lu_env *env, struct dt_object *dt, if (rc != 0) RETURN(rc); + set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags); rc = osp_insert_update_callback(env, update, dt2osp_obj(dt), NULL, - osp_md_object_destroy_interpreter); + NULL); + RETURN(rc); } diff --git a/lustre/ptlrpc/llog_client.c b/lustre/ptlrpc/llog_client.c index e618714..4860542 100644 --- a/lustre/ptlrpc/llog_client.c +++ b/lustre/ptlrpc/llog_client.c @@ -141,7 +141,8 @@ out: } static int llog_client_destroy(const struct lu_env *env, - struct llog_handle *loghandle) + struct llog_handle *loghandle, + struct thandle *th) { struct obd_import *imp; struct ptlrpc_request *req = NULL; -- 1.8.3.1