From 9126d8159d5d5b61a600e7427d0c173084a710e6 Mon Sep 17 00:00:00 2001 From: Alex Zhuravlev Date: Tue, 15 Sep 2015 12:36:35 +0300 Subject: [PATCH] LU-6634 llog: destroy plain llog if init fails llog_cat_add_rec() should destroy the plain llog in the same transaction if initialization of that failed. also, llog_osd_write_rec() should check the object still exists as it's possible that another thread failed to initialize and destroyed the llog. Change-Id: I7b823d34b32b5caaf0cc17b4cfe278a07a78ec15 Signed-off-by: Alex Zhuravlev Reviewed-on: http://review.whamcloud.com/16427 Tested-by: Jenkins Tested-by: Maloo Reviewed-by: wangdi Reviewed-by: Andreas Dilger Reviewed-by: Oleg Drokin --- lustre/include/lustre_log.h | 5 ++++ lustre/include/obd_support.h | 1 + lustre/obdclass/llog_cat.c | 71 ++++++++++++++++++++++++++++---------------- lustre/obdclass/llog_osd.c | 47 ++++++++++++++++++----------- lustre/osd-zfs/osd_object.c | 12 +++++++- lustre/tests/sanity.sh | 9 ++++++ 6 files changed, 101 insertions(+), 44 deletions(-) diff --git a/lustre/include/lustre_log.h b/lustre/include/lustre_log.h index 6922954..7d26af2 100644 --- a/lustre/include/lustre_log.h +++ b/lustre/include/lustre_log.h @@ -497,6 +497,11 @@ static inline int llog_connect(struct llog_ctxt *ctxt, RETURN(rc); } +static inline int llog_is_full(struct llog_handle *llh) +{ + return llh->lgh_last_idx >= LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr) - 1; +} + struct llog_cfg_rec { struct llog_rec_hdr lcr_hdr; struct lustre_cfg lcr_cfg; diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h index 5d897fd..0f58fb0 100644 --- a/lustre/include/obd_support.h +++ b/lustre/include/obd_support.h @@ -247,6 +247,7 @@ extern char obd_jobid_var[]; #define OBD_FAIL_MDS_STALE_DIR_LAYOUT 0x158 #define OBD_FAIL_MDS_REINT_MULTI_NET 0x159 #define OBD_FAIL_MDS_REINT_MULTI_NET_REP 0x15a +#define OBD_FAIL_MDS_LLOG_CREATE_FAILED2 0x15b /* layout lock */ #define OBD_FAIL_MDS_NO_LL_GETATTR 0x170 diff --git a/lustre/obdclass/llog_cat.c b/lustre/obdclass/llog_cat.c index afa7653..65b2d0b 100644 --- a/lustre/obdclass/llog_cat.c +++ b/lustre/obdclass/llog_cat.c @@ -167,7 +167,7 @@ static int llog_cat_new_log(const struct lu_env *env, rc = llog_write_rec(env, cathandle, &rec->lid_hdr, &loghandle->u.phd.phd_cookie, LLOG_NEXT_IDX, th); if (rc < 0) - GOTO(out, rc); + GOTO(out_destroy, rc); CDEBUG(D_OTHER, "new plain log "DOSTID":%x for index %u of catalog" DOSTID"\n", POSTID(&loghandle->lgh_id.lgl_oi), @@ -180,6 +180,17 @@ out: dt_trans_stop(env, dt, handle); RETURN(0); + +out_destroy: + /* to signal llog_cat_close() it shouldn't try to destroy the llog, + * we want to destroy it in this transaction, otherwise the object + * becomes an orphan */ + loghandle->lgh_hdr->llh_flags &= ~LLOG_F_ZAP_WHEN_EMPTY; + /* this is to mimic full log, so another llog_cat_current_log() + * can skip it and ask for another onet */ + loghandle->lgh_last_idx = LLOG_HDR_BITMAP_SIZE(llh) + 1; + llog_trans_destroy(env, loghandle, th); + RETURN(rc); } /* Open an existent log handle and add it to the open list. @@ -318,6 +329,12 @@ static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle, struct llog_handle *loghandle = NULL; ENTRY; + + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED2)) { + down_write_nested(&cathandle->lgh_lock, LLOGH_CAT); + GOTO(next, loghandle); + } + down_read_nested(&cathandle->lgh_lock, LLOGH_CAT); loghandle = cathandle->u.chd.chd_current_log; if (loghandle) { @@ -325,8 +342,7 @@ static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle, down_write_nested(&loghandle->lgh_lock, LLOGH_LOG); llh = loghandle->lgh_hdr; - if (llh == NULL || - loghandle->lgh_last_idx < LLOG_HDR_BITMAP_SIZE(llh) - 1) { + if (llh == NULL || !llog_is_full(loghandle)) { up_read(&cathandle->lgh_lock); RETURN(loghandle); } else { @@ -346,7 +362,7 @@ static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle, down_write_nested(&loghandle->lgh_lock, LLOGH_LOG); llh = loghandle->lgh_hdr; LASSERT(llh); - if (loghandle->lgh_last_idx < LLOG_HDR_BITMAP_SIZE(llh) - 1) { + if (!llog_is_full(loghandle)) { up_write(&cathandle->lgh_lock); RETURN(loghandle); } else { @@ -354,6 +370,7 @@ static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle, } } +next: CDEBUG(D_INODE, "use next log\n"); loghandle = cathandle->u.chd.chd_next_log; @@ -375,10 +392,12 @@ int llog_cat_add_rec(const struct lu_env *env, struct llog_handle *cathandle, struct thandle *th) { struct llog_handle *loghandle; - int rc; - ENTRY; + int rc, retried = 0; + ENTRY; LASSERT(rec->lrh_len <= cathandle->lgh_ctxt->loc_chunk_size); + +retry: loghandle = llog_cat_current_log(cathandle, th); LASSERT(!IS_ERR(loghandle)); @@ -387,33 +406,35 @@ int llog_cat_add_rec(const struct lu_env *env, struct llog_handle *cathandle, rc = llog_cat_new_log(env, cathandle, loghandle, th); if (rc < 0) { up_write(&loghandle->lgh_lock); + /* nobody should be trying to use this llog */ + down_write(&cathandle->lgh_lock); + if (cathandle->u.chd.chd_current_log == loghandle) + cathandle->u.chd.chd_current_log = NULL; + up_write(&cathandle->lgh_lock); RETURN(rc); } } /* now let's try to add the record */ rc = llog_write_rec(env, loghandle, rec, reccookie, LLOG_NEXT_IDX, th); - if (rc < 0) + if (rc < 0) { CDEBUG_LIMIT(rc == -ENOSPC ? D_HA : D_ERROR, "llog_write_rec %d: lh=%p\n", rc, loghandle); + /* -ENOSPC is returned if no empty records left + * and when it's lack of space on the stogage. + * there is no point to try again if it's the second + * case. many callers (like llog test) expect ENOSPC, + * so we preserve this error code, but look for the + * actual cause here */ + if (rc == -ENOSPC && llog_is_full(loghandle)) + rc = -ENOBUFS; + } up_write(&loghandle->lgh_lock); - if (rc == -ENOSPC) { - /* try to use next log */ - loghandle = llog_cat_current_log(cathandle, th); - LASSERT(!IS_ERR(loghandle)); - /* new llog can be created concurrently */ - if (!llog_exist(loghandle)) { - rc = llog_cat_new_log(env, cathandle, loghandle, th); - if (rc < 0) { - up_write(&loghandle->lgh_lock); - RETURN(rc); - } - } - /* now let's try to add the record */ - rc = llog_write_rec(env, loghandle, rec, reccookie, - LLOG_NEXT_IDX, th); - if (rc < 0) - CERROR("llog_write_rec %d: lh=%p\n", rc, loghandle); - up_write(&loghandle->lgh_lock); + + if (rc == -ENOBUFS) { + if (retried++ == 0) + GOTO(retry, rc); + CERROR("%s: error on 2nd llog: rc = %d\n", + cathandle->lgh_ctxt->loc_obd->obd_name, rc); } RETURN(rc); diff --git a/lustre/obdclass/llog_osd.c b/lustre/obdclass/llog_osd.c index fb8fba4..e7ae395 100644 --- a/lustre/obdclass/llog_osd.c +++ b/lustre/obdclass/llog_osd.c @@ -111,6 +111,23 @@ static int llog_osd_create_new_object(const struct lu_env *env, } /** + * Implementation of the llog_operations::lop_exist + * + * This function checks that llog exists on storage. + * + * \param[in] handle llog handle of the current llog + * + * \retval true if llog object exists and is not just destroyed + * \retval false if llog doesn't exist or just destroyed + */ +static int llog_osd_exist(struct llog_handle *handle) +{ + LASSERT(handle->lgh_obj); + return dt_object_exists(handle->lgh_obj) && + !lu_object_is_dying(handle->lgh_obj->do_lu.lo_header); +} + +/** * Write a padding record to the llog * * This function writes a padding record to the end of llog. That may @@ -378,6 +395,9 @@ static int llog_osd_write_rec(const struct lu_env *env, CDEBUG(D_OTHER, "new record %x to "DFID"\n", rec->lrh_type, PFID(lu_object_fid(&o->do_lu))); + if (!llog_osd_exist(loghandle)) + RETURN(-ENOENT); + /* record length should not bigger than */ if (reclen > loghandle->lgh_hdr->llh_hdr.lrh_len) RETURN(-E2BIG); @@ -521,6 +541,14 @@ static int llog_osd_write_rec(const struct lu_env *env, * process them page-at-a-time if needed. If it will cross a chunk * boundary, write in a fake (but referenced) entry to pad the chunk. */ + + + /* simulate ENOSPC when new plain llog is being added to the + * catalog */ + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED2) && + llh->llh_flags & LLOG_F_IS_CAT) + RETURN(-ENOSPC); + LASSERT(lgi->lgi_attr.la_valid & LA_SIZE); lgi->lgi_off = lgi->lgi_attr.la_size; left = chunk_size - (lgi->lgi_off & (chunk_size - 1)); @@ -535,7 +563,7 @@ static int llog_osd_write_rec(const struct lu_env *env, } /* if it's the last idx in log file, then return -ENOSPC * or wrap around if a catalog */ - if ((loghandle->lgh_last_idx >= LLOG_HDR_BITMAP_SIZE(llh) - 1) || + if (llog_is_full(loghandle) || unlikely(llh->llh_flags & LLOG_F_IS_CAT && OBD_FAIL_PRECHECK(OBD_FAIL_CAT_RECORDS) && loghandle->lgh_last_idx >= cfs_fail_val)) { @@ -1223,23 +1251,6 @@ out: } /** - * Implementation of the llog_operations::lop_exist - * - * This function checks that llog exists on storage. - * - * \param[in] handle llog handle of the current llog - * - * \retval true if llog object exists and is not just destroyed - * \retval false if llog doesn't exist or just destroyed - */ -static int llog_osd_exist(struct llog_handle *handle) -{ - LASSERT(handle->lgh_obj); - return (dt_object_exists(handle->lgh_obj) && - !lu_object_is_dying(handle->lgh_obj->do_lu.lo_header)); -} - -/** * Get dir for regular fid log object * * Get directory for regular fid log object, and these regular fid log diff --git a/lustre/osd-zfs/osd_object.c b/lustre/osd-zfs/osd_object.c index 39b31ba..8ef92c0 100644 --- a/lustre/osd-zfs/osd_object.c +++ b/lustre/osd-zfs/osd_object.c @@ -583,7 +583,17 @@ static int osd_object_destroy(const struct lu_env *env, obj->oo_attr.la_gid, rc); oid = obj->oo_db->db_object; - if (obj->oo_destroy == OSD_DESTROY_SYNC) { + if (unlikely(obj->oo_destroy == OSD_DESTROY_NONE)) { + /* this may happen if the destroy wasn't declared + * e.g. when the object is created and then destroyed + * in the same transaction - we don't need additional + * space for destroy specifically */ + LASSERT(obj->oo_attr.la_size <= osd_sync_destroy_max_size); + rc = -dmu_object_free(osd->od_os, oid, oh->ot_tx); + if (rc) + CERROR("%s: failed to free %s "LPU64": rc = %d\n", + osd->od_svname, buf, oid, rc); + } else if (obj->oo_destroy == OSD_DESTROY_SYNC) { rc = -dmu_object_free(osd->od_os, oid, oh->ot_tx); if (rc) CERROR("%s: failed to free %s "LPU64": rc = %d\n", diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index 043a801..f43e434 100755 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -5277,6 +5277,15 @@ test_60d() { } run_test 60d "test printk console message masking" +test_60e() { + [ $PARALLEL == "yes" ] && skip "skip parallel run" && return + touch $DIR/$tfile +#define OBD_FAIL_MDS_LLOG_CREATE_FAILED2 0x15b + do_facet mds1 lctl set_param fail_loc=0x15b + rm $DIR/$tfile +} +run_test 60e "no space while new llog is being created" + test_61() { [ $PARALLEL == "yes" ] && skip "skip parallel run" && return f="$DIR/f61" -- 1.8.3.1