/* * GPL HEADER START * * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License version 2 only, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License version 2 for more details (a copy is included * in the LICENSE file that accompanied this code). * * You should have received a copy of the GNU General Public License * version 2 along with this program; If not, see * http://www.gnu.org/licenses/gpl-2.0.html * * GPL HEADER END */ /* * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * * Copyright (c) 2012, 2017, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ * * lustre/obdclass/llog.c * * OST<->MDS recovery logging infrastructure. * Invariants in implementation: * - we do not share logs among different OST<->MDS connections, so that * if an OST or MDS fails it need only look at log(s) relevant to itself * * Author: Andreas Dilger * Author: Alex Zhuravlev * Author: Mikhail Pershin */ #define DEBUG_SUBSYSTEM S_LOG #include #include #include #include #include #include #include #include "llog_internal.h" /* * Allocate a new log or catalog handle * Used inside llog_open(). */ static struct llog_handle *llog_alloc_handle(void) { struct llog_handle *loghandle; OBD_ALLOC_PTR(loghandle); if (loghandle == NULL) return NULL; init_rwsem(&loghandle->lgh_lock); mutex_init(&loghandle->lgh_hdr_mutex); init_rwsem(&loghandle->lgh_last_sem); INIT_LIST_HEAD(&loghandle->u.phd.phd_entry); refcount_set(&loghandle->lgh_refcount, 1); return loghandle; } /* * Free llog handle and header data if exists. Used in llog_close() only */ static void llog_free_handle(struct llog_handle *loghandle) { LASSERT(loghandle != NULL); /* failed llog_init_handle */ if (loghandle->lgh_hdr == NULL) goto out; if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN) LASSERT(list_empty(&loghandle->u.phd.phd_entry)); else if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT) LASSERT(list_empty(&loghandle->u.chd.chd_head)); OBD_FREE_LARGE(loghandle->lgh_hdr, loghandle->lgh_hdr_size); out: OBD_FREE_PTR(loghandle); } struct llog_handle *llog_handle_get(struct llog_handle *loghandle) { if (refcount_inc_not_zero(&loghandle->lgh_refcount)) return loghandle; return NULL; } int llog_handle_put(const struct lu_env *env, struct llog_handle *loghandle) { int rc = 0; if (refcount_dec_and_test(&loghandle->lgh_refcount)) { const struct llog_operations *lop; rc = llog_handle2ops(loghandle, &lop); if (!rc) { if (lop->lop_close) rc = lop->lop_close(env, loghandle); else rc = -EOPNOTSUPP; } llog_free_handle(loghandle); } return rc; } static int llog_declare_destroy(const struct lu_env *env, struct llog_handle *handle, struct thandle *th) { const struct llog_operations *lop; int rc; ENTRY; rc = llog_handle2ops(handle, &lop); if (rc) RETURN(rc); if (lop->lop_declare_destroy == NULL) RETURN(-EOPNOTSUPP); rc = lop->lop_declare_destroy(env, handle, th); RETURN(rc); } int llog_trans_destroy(const struct lu_env *env, struct llog_handle *handle, struct thandle *th) { const struct llog_operations *lop; int rc; ENTRY; rc = llog_handle2ops(handle, &lop); if (rc < 0) RETURN(rc); if (lop->lop_destroy == NULL) RETURN(-EOPNOTSUPP); LASSERT(handle->lgh_obj != NULL); if (!llog_exist(handle)) RETURN(0); rc = lop->lop_destroy(env, handle, th); RETURN(rc); } int llog_destroy(const struct lu_env *env, struct llog_handle *handle) { const struct llog_operations *lop; struct dt_device *dt; struct thandle *th; int rc; ENTRY; rc = llog_handle2ops(handle, &lop); if (rc < 0) RETURN(rc); if (lop->lop_destroy == NULL) RETURN(-EOPNOTSUPP); if (handle->lgh_obj == NULL) { /* if lgh_obj == NULL, then it is from client side destroy */ rc = lop->lop_destroy(env, handle, NULL); RETURN(rc); } if (!llog_exist(handle)) RETURN(0); dt = lu2dt_dev(handle->lgh_obj->do_lu.lo_dev); if (unlikely(unlikely(dt->dd_rdonly))) RETURN(-EROFS); th = dt_trans_create(env, dt); if (IS_ERR(th)) RETURN(PTR_ERR(th)); th->th_wait_submit = 1; rc = llog_declare_destroy(env, handle, th); if (rc != 0) GOTO(out_trans, rc); rc = dt_trans_start_local(env, dt, th); if (rc < 0) GOTO(out_trans, rc); rc = lop->lop_destroy(env, handle, th); out_trans: dt_trans_stop(env, dt, th); RETURN(rc); } EXPORT_SYMBOL(llog_destroy); /* returns negative on error; 0 if success; 1 if success & log destroyed */ int llog_cancel_arr_rec(const struct lu_env *env, struct llog_handle *loghandle, int num, int *index) { struct llog_thread_info *lgi = llog_info(env); struct dt_device *dt; struct llog_log_hdr *llh; struct thandle *th; __u32 tmp_lgc_index; int rc, i = 0; int rc1; bool subtract_count = false; ENTRY; LASSERT(loghandle != NULL); LASSERT(loghandle->lgh_ctxt != NULL); LASSERT(loghandle->lgh_obj != NULL); llh = loghandle->lgh_hdr; CDEBUG(D_OTHER, "Canceling %d records, first %d in log "DFID"\n", num, index[0], PLOGID(&loghandle->lgh_id)); dt = lu2dt_dev(loghandle->lgh_obj->do_lu.lo_dev); if (unlikely(unlikely(dt->dd_rdonly))) RETURN(0); th = dt_trans_create(env, dt); if (IS_ERR(th)) RETURN(PTR_ERR(th)); rc = llog_declare_write_rec(env, loghandle, &llh->llh_hdr, 0, th); if (rc < 0) GOTO(out_trans, rc); if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY)) { rc = llog_declare_destroy(env, loghandle, th); if (rc < 0) GOTO(out_trans, rc); } th->th_wait_submit = 1; rc = dt_trans_start_local(env, dt, th); if (rc < 0) GOTO(out_trans, rc); down_write(&loghandle->lgh_lock); /* clear bitmap */ mutex_lock(&loghandle->lgh_hdr_mutex); for (i = 0; i < num; ++i) { if (index[i] == 0) { CERROR("Can't cancel index 0 which is header\n"); GOTO(out_unlock, rc = -EINVAL); } if (!__test_and_clear_bit_le(index[i], LLOG_HDR_BITMAP(llh))) { CDEBUG(D_OTHER, "Catalog index %u already clear?\n", index[i]); GOTO(out_unlock, rc = -ENOENT); } } loghandle->lgh_hdr->llh_count -= num; subtract_count = true; /* Since llog_process_thread use lgi_cookie, it`s better to save them * and restore after using */ tmp_lgc_index = lgi->lgi_cookie.lgc_index; /* Pass this index to llog_osd_write_rec(), which will use the index * to only update the necesary bitmap. */ lgi->lgi_cookie.lgc_index = index[0]; /* update header */ rc = llog_write_rec(env, loghandle, &llh->llh_hdr, (num != 1 ? NULL : &lgi->lgi_cookie), LLOG_HEADER_IDX, th); lgi->lgi_cookie.lgc_index = tmp_lgc_index; if (rc != 0) GOTO(out_unlock, rc); if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) && (llh->llh_count == 1) && ((loghandle->lgh_last_idx == llog_max_idx(llh)) || (loghandle->u.phd.phd_cat_handle != NULL && loghandle->u.phd.phd_cat_handle->u.chd.chd_current_log != loghandle))) { /* never try to destroy it again */ llh->llh_flags &= ~LLOG_F_ZAP_WHEN_EMPTY; rc = llog_trans_destroy(env, loghandle, th); if (rc < 0) { /* Sigh, can not destroy the final plain llog, but * the bitmap has been clearly, so the record can not * be accessed anymore, let's return 0 for now, and * the orphan will be handled by LFSCK. */ CERROR("%s: can't destroy empty llog "DFID": rc = %d\n", loghandle2name(loghandle), PLOGID(&loghandle->lgh_id), rc); GOTO(out_unlock, rc = 0); } rc = LLOG_DEL_PLAIN; } out_unlock: if (rc < 0) { /* restore bitmap while holding a mutex */ if (subtract_count) { loghandle->lgh_hdr->llh_count += num; subtract_count = false; } for (i = i - 1; i >= 0; i--) set_bit_le(index[i], LLOG_HDR_BITMAP(llh)); } mutex_unlock(&loghandle->lgh_hdr_mutex); up_write(&loghandle->lgh_lock); out_trans: rc1 = dt_trans_stop(env, dt, th); if (rc == 0) rc = rc1; if (rc1 < 0) { mutex_lock(&loghandle->lgh_hdr_mutex); if (subtract_count) loghandle->lgh_hdr->llh_count += num; for (i = i - 1; i >= 0; i--) set_bit_le(index[i], LLOG_HDR_BITMAP(llh)); mutex_unlock(&loghandle->lgh_hdr_mutex); } RETURN(rc); } int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle, int index) { return llog_cancel_arr_rec(env, loghandle, 1, &index); } int llog_read_header(const struct lu_env *env, struct llog_handle *handle, const struct obd_uuid *uuid) { const struct llog_operations *lop; int rc; ENTRY; rc = llog_handle2ops(handle, &lop); if (rc) RETURN(rc); if (lop->lop_read_header == NULL) RETURN(-EOPNOTSUPP); rc = lop->lop_read_header(env, handle); if (rc == LLOG_EEMPTY) { struct llog_log_hdr *llh = handle->lgh_hdr; /* lrh_len should be initialized in llog_init_handle */ handle->lgh_last_idx = 0; /* header is record with index 0 */ llh->llh_count = 1; /* for the header record */ llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC; LASSERT(handle->lgh_ctxt->loc_chunk_size >= LLOG_MIN_CHUNK_SIZE); llh->llh_hdr.lrh_len = handle->lgh_ctxt->loc_chunk_size; llh->llh_hdr.lrh_index = 0; llh->llh_timestamp = ktime_get_real_seconds(); if (uuid) memcpy(&llh->llh_tgtuuid, uuid, sizeof(llh->llh_tgtuuid)); llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap); /* Since update llog header might also call this function, * let's reset the bitmap to 0 here */ memset(LLOG_HDR_BITMAP(llh), 0, llh->llh_hdr.lrh_len - llh->llh_bitmap_offset - sizeof(llh->llh_tail)); set_bit_le(0, LLOG_HDR_BITMAP(llh)); LLOG_HDR_TAIL(llh)->lrt_len = llh->llh_hdr.lrh_len; LLOG_HDR_TAIL(llh)->lrt_index = llh->llh_hdr.lrh_index; rc = 0; } RETURN(rc); } EXPORT_SYMBOL(llog_read_header); int llog_init_handle(const struct lu_env *env, struct llog_handle *handle, int flags, struct obd_uuid *uuid) { struct llog_log_hdr *llh; enum llog_flag fmt = flags & LLOG_F_EXT_MASK; int rc; int chunk_size = handle->lgh_ctxt->loc_chunk_size; ENTRY; LASSERT(handle->lgh_hdr == NULL); LASSERT(chunk_size >= LLOG_MIN_CHUNK_SIZE); OBD_ALLOC_LARGE(llh, chunk_size); if (llh == NULL) RETURN(-ENOMEM); handle->lgh_hdr = llh; handle->lgh_hdr_size = chunk_size; /* first assign flags to use llog_client_ops */ llh->llh_flags = flags; rc = llog_read_header(env, handle, uuid); if (rc == 0) { if (unlikely((llh->llh_flags & LLOG_F_IS_PLAIN && flags & LLOG_F_IS_CAT) || (llh->llh_flags & LLOG_F_IS_CAT && flags & LLOG_F_IS_PLAIN))) { CERROR("%s: llog type is %s but initializing %s\n", loghandle2name(handle), llh->llh_flags & LLOG_F_IS_CAT ? "catalog" : "plain", flags & LLOG_F_IS_CAT ? "catalog" : "plain"); GOTO(out, rc = -EINVAL); } else if (llh->llh_flags & (LLOG_F_IS_PLAIN | LLOG_F_IS_CAT)) { /* * it is possible to open llog without specifying llog * type so it is taken from llh_flags */ flags = llh->llh_flags; } else { /* for some reason the llh_flags has no type set */ CERROR("llog type is not specified!\n"); GOTO(out, rc = -EINVAL); } if (unlikely(uuid && !obd_uuid_equals(uuid, &llh->llh_tgtuuid))) { CERROR("%s: llog uuid mismatch: %s/%s\n", loghandle2name(handle), (char *)uuid->uuid, (char *)llh->llh_tgtuuid.uuid); GOTO(out, rc = -EINVAL); } } if (flags & LLOG_F_IS_CAT) { LASSERT(list_empty(&handle->u.chd.chd_head)); INIT_LIST_HEAD(&handle->u.chd.chd_head); llh->llh_size = sizeof(struct llog_logid_rec); llh->llh_flags |= LLOG_F_IS_FIXSIZE; } else if (!(flags & LLOG_F_IS_PLAIN)) { CERROR("%s: unknown flags: %#x (expected %#x or %#x)\n", loghandle2name(handle), flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN); rc = -EINVAL; } llh->llh_flags |= fmt; out: if (rc) { OBD_FREE_LARGE(llh, chunk_size); handle->lgh_hdr = NULL; } RETURN(rc); } EXPORT_SYMBOL(llog_init_handle); #define LLOG_ERROR_REC(lgh, rec, format, a...) \ CERROR("%s: "DFID" rec type=%x idx=%u len=%u, " format "\n" , \ loghandle2name(lgh), PLOGID(&lgh->lgh_id), (rec)->lrh_type, \ (rec)->lrh_index, (rec)->lrh_len, ##a) int llog_verify_record(const struct llog_handle *llh, struct llog_rec_hdr *rec) { int chunk_size = llh->lgh_hdr->llh_hdr.lrh_len; if ((rec->lrh_type & LLOG_OP_MASK) != LLOG_OP_MAGIC) LLOG_ERROR_REC(llh, rec, "magic is bad"); else if (rec->lrh_len == 0 || rec->lrh_len > chunk_size) LLOG_ERROR_REC(llh, rec, "bad record len, chunk size is %d", chunk_size); else if (rec->lrh_index > llog_max_idx(llh->lgh_hdr)) LLOG_ERROR_REC(llh, rec, "index is too high"); else return 0; return -EINVAL; } EXPORT_SYMBOL(llog_verify_record); static inline bool llog_is_index_skipable(int idx, struct llog_log_hdr *llh, struct llog_process_cat_data *cd) { if (cd && (cd->lpcd_read_mode & LLOG_READ_MODE_RAW)) return false; return !test_bit_le(idx, LLOG_HDR_BITMAP(llh)); } static int llog_process_thread(void *arg) { struct llog_process_info *lpi = arg; struct llog_handle *loghandle = lpi->lpi_loghandle; struct llog_log_hdr *llh = loghandle->lgh_hdr; struct llog_process_cat_data *cd = lpi->lpi_catdata; struct llog_thread_info *lti; char *buf; size_t chunk_size; __u64 cur_offset; int rc = 0, index = 1, last_index; int saved_index = 0; int last_called_index = 0; bool repeated = false; ENTRY; if (llh == NULL) RETURN(-EINVAL); lti = lpi->lpi_env == NULL ? NULL : llog_info(lpi->lpi_env); cur_offset = chunk_size = llh->llh_hdr.lrh_len; /* expect chunk_size to be power of two */ LASSERT(is_power_of_2(chunk_size)); OBD_ALLOC_LARGE(buf, chunk_size); if (buf == NULL) { lpi->lpi_rc = -ENOMEM; RETURN(0); } last_index = llog_max_idx(llh); if (cd) { if (cd->lpcd_first_idx >= llog_max_idx(llh)) /* End of the indexes -> Nothing to do */ GOTO(out, rc = 0); index = cd->lpcd_first_idx + 1; last_called_index = cd->lpcd_first_idx; if (cd->lpcd_last_idx > 0 && cd->lpcd_last_idx <= llog_max_idx(llh)) last_index = cd->lpcd_last_idx; else if (cd->lpcd_read_mode & LLOG_READ_MODE_RAW) last_index = loghandle->lgh_last_idx; } while (rc == 0) { struct llog_rec_hdr *rec; off_t chunk_offset = 0; unsigned int buf_offset = 0; int lh_last_idx; int synced_idx = 0; /* skip records not set in bitmap */ while (index <= last_index && llog_is_index_skipable(index, llh, cd)) ++index; /* There are no indices prior the last_index */ if (index > last_index) break; CDEBUG(D_OTHER, "index: %d last_index %d\n", index, last_index); repeat: /* get the buf with our target record; avoid old garbage */ memset(buf, 0, chunk_size); /* the record index for outdated chunk data */ /* it is safe to process buffer until saved lgh_last_idx */ lh_last_idx = LLOG_HDR_TAIL(llh)->lrt_index; rc = llog_next_block(lpi->lpi_env, loghandle, &saved_index, index, &cur_offset, buf, chunk_size); if (repeated && rc) CDEBUG(D_OTHER, "cur_offset %llu, chunk_offset %llu," " buf_offset %u, rc = %d\n", cur_offset, (__u64)chunk_offset, buf_offset, rc); if (rc == -ESTALE) GOTO(out, rc = 0); /* we`ve tried to reread the chunk, but there is no * new records */ if (repeated && (chunk_offset + buf_offset) == cur_offset && (rc == -EBADR || rc == -EIO)) GOTO(out, rc = 0); /* EOF while trying to skip to the next chunk */ if (!index && rc == -EBADR) GOTO(out, rc = 0); if (rc != 0) GOTO(out, rc); /* NB: after llog_next_block() call the cur_offset is the * offset of the next block after read one. * The absolute offset of the current chunk is calculated * from cur_offset value and stored in chunk_offset variable. */ if ((cur_offset & (chunk_size - 1)) != 0) chunk_offset = cur_offset & ~(chunk_size - 1); else chunk_offset = cur_offset - chunk_size; /* NB: when rec->lrh_len is accessed it is already swabbed * since it is used at the "end" of the loop and the rec * swabbing is done at the beginning of the loop. */ for (rec = (struct llog_rec_hdr *)(buf + buf_offset); (char *)rec < buf + chunk_size; rec = llog_rec_hdr_next(rec)) { CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n", rec, rec->lrh_type); if (LLOG_REC_HDR_NEEDS_SWABBING(rec)) lustre_swab_llog_rec(rec); CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n", rec->lrh_type, rec->lrh_index); /* start with first rec if block was skipped */ if (!index) { CDEBUG(D_OTHER, "%s: skipping to the index %u\n", loghandle2name(loghandle), rec->lrh_index); index = rec->lrh_index; } if (index == (synced_idx + 1) && synced_idx == LLOG_HDR_TAIL(llh)->lrt_index) GOTO(out, rc = 0); if (CFS_FAIL_PRECHECK(OBD_FAIL_LLOG_PROCESS_TIMEOUT) && cfs_fail_val == (unsigned int) (loghandle->lgh_id.lgl_oi.oi.oi_id & 0xFFFFFFFF)) { CFS_RACE(OBD_FAIL_LLOG_PROCESS_TIMEOUT); } /* the bitmap could be changed during processing * records from the chunk. For wrapped catalog * it means we can read deleted record and try to * process it. Check this case and reread the chunk. * It is safe to process to lh_last_idx, including * lh_last_idx if it was synced. We can not do <= * comparison, cause for wrapped catalog lgh_last_idx * could be less than index. So we detect last index * for processing as index == lh_last_idx+1. But when * catalog is wrapped and full lgh_last_idx=llh_cat_idx, * the first processing index is llh_cat_idx+1.The * exception is !(lgh_last_idx == llh_cat_idx && * index == llh_cat_idx + 1), and after simplification * it turns to * lh_last_idx != LLOG_HDR_TAIL(llh)->lrt_index * This exception is working for catalog only. * The last check is for the partial chunk boundary, * if it is reached then try to re-read for possible * new records once. */ if ((index == lh_last_idx && synced_idx != index) || (index == (lh_last_idx + 1) && lh_last_idx != LLOG_HDR_TAIL(llh)->lrt_index) || (((char *)rec - buf >= cur_offset - chunk_offset) && !repeated)) { /* save offset inside buffer for the re-read */ buf_offset = (char *)rec - (char *)buf; cur_offset = chunk_offset; repeated = true; /* We need to be sure lgh_last_idx * record was saved to disk */ down_read(&loghandle->lgh_last_sem); synced_idx = LLOG_HDR_TAIL(llh)->lrt_index; up_read(&loghandle->lgh_last_sem); CDEBUG(D_OTHER, "synced_idx: %d\n", synced_idx); goto repeat; } repeated = false; rc = llog_verify_record(loghandle, rec); if (rc) { CDEBUG(D_OTHER, "invalid record at index %d\n", index); /* * for fixed-sized llogs we can skip one record * by using llh_size from llog header. * Otherwise skip the next llog chunk. */ rc = 0; if (llh->llh_flags & LLOG_F_IS_FIXSIZE) { rec->lrh_len = llh->llh_size; goto next_rec; } /* make sure that is always next block */ cur_offset = chunk_offset + chunk_size; /* no goal to find, just next block to read */ index = 0; break; } if (rec->lrh_index < index) { CDEBUG(D_OTHER, "skipping lrh_index %d\n", rec->lrh_index); continue; } if (rec->lrh_index > index) { /* the record itself looks good, but we met a * gap which can be result of old bugs, just * keep going */ LLOG_ERROR_REC(loghandle, rec, "gap in index, expected %u", index); index = rec->lrh_index; } CDEBUG(D_OTHER, "lrh_index: %d lrh_len: %d (%d remains)\n", rec->lrh_index, rec->lrh_len, (int)(buf + chunk_size - (char *)rec)); /* lgh_cur_offset is used only at llog_test_3 */ loghandle->lgh_cur_offset = (char *)rec - (char *)buf + chunk_offset; /* if needed, process the callback on this record */ if (!llog_is_index_skipable(index, llh, cd)) { struct llog_cookie *lgc; __u64 tmp_off; int tmp_idx; CDEBUG((llh->llh_flags & LLOG_F_IS_CAT ? D_HA : D_OTHER), "index: %d, lh_last_idx: %d " "synced_idx: %d lgh_last_idx: %d\n", index, lh_last_idx, synced_idx, loghandle->lgh_last_idx); if (lti != NULL) { lgc = <i->lgi_cookie; /* store lu_env for recursive calls */ tmp_off = lgc->lgc_offset; tmp_idx = lgc->lgc_index; lgc->lgc_offset = (char *)rec - (char *)buf + chunk_offset; lgc->lgc_index = rec->lrh_index; } /* using lu_env for passing record offset to * llog_write through various callbacks */ rc = lpi->lpi_cb(lpi->lpi_env, loghandle, rec, lpi->lpi_cbdata); last_called_index = index; if (lti != NULL) { lgc->lgc_offset = tmp_off; lgc->lgc_index = tmp_idx; } if (rc == LLOG_PROC_BREAK || rc == LLOG_SKIP_PLAIN) { GOTO(out, rc); } else if (rc == LLOG_DEL_RECORD) { rc = llog_cancel_rec(lpi->lpi_env, loghandle, rec->lrh_index); /* Allow parallel cancelling, ENOENT * means record was canceled at another * processing thread or callback */ if (rc == -ENOENT) rc = 0; } if (rc) GOTO(out, rc); /* some stupid callbacks directly cancel records * and delete llog. Check it and stop * processing. */ if (loghandle->lgh_hdr == NULL || loghandle->lgh_hdr->llh_count == 1) GOTO(out, rc = 0); } next_rec: /* exit if the last index is reached */ if (index >= last_index) GOTO(out, rc = 0); ++index; } } out: CDEBUG(D_HA, "stop processing %s "DFID" index %d count %d\n", ((llh->llh_flags & LLOG_F_IS_CAT) ? "catalog" : "plain"), PLOGID(&loghandle->lgh_id), index, llh->llh_count); if (cd != NULL) cd->lpcd_last_idx = last_called_index; if (unlikely(rc == -EIO && loghandle->lgh_obj != NULL)) { if (dt_object_remote(loghandle->lgh_obj)) { /* If it is remote object, then -EIO might means * disconnection or eviction, let's return -EAGAIN, * so for update recovery log processing, it will * retry until the umount or abort recovery, see * lod_sub_recovery_thread() */ CERROR("%s retry remote llog process\n", loghandle2name(loghandle)); rc = -EAGAIN; } else { /* something bad happened to the processing of a local * llog file, probably I/O error or the log got * corrupted to be able to finally release the log we * discard any remaining bits in the header */ CERROR("%s: local llog is corrupted "DFID" %s index %d count %d\n", loghandle2name(loghandle), PLOGID(&loghandle->lgh_id), ((llh->llh_flags & LLOG_F_IS_CAT) ? "catalog" : "plain"), index, llh->llh_count); while (index <= last_index) { if (test_bit_le(index, LLOG_HDR_BITMAP(llh)) != 0) llog_cancel_rec(lpi->lpi_env, loghandle, index); index++; } rc = 0; } } OBD_FREE_LARGE(buf, chunk_size); lpi->lpi_rc = rc; return 0; } static int llog_process_thread_daemonize(void *arg) { struct llog_process_info *lpi = arg; struct lu_env env; int rc; struct nsproxy *new_ns, *curr_ns = current->nsproxy; task_lock(lpi->lpi_reftask); new_ns = lpi->lpi_reftask->nsproxy; if (curr_ns != new_ns) { get_nsproxy(new_ns); current->nsproxy = new_ns; /* XXX: we should call put_nsproxy() instead of * atomic_dec(&ns->count) directly. But put_nsproxy() cannot be * used outside of the kernel itself, because it calls * free_nsproxy() which is not exported by the kernel * (defined in kernel/nsproxy.c) */ if (curr_ns) nsproxy_dec(curr_ns); } task_unlock(lpi->lpi_reftask); unshare_fs_struct(); /* client env has no keys, tags is just 0 */ rc = lu_env_init(&env, LCT_LOCAL | LCT_MG_THREAD); if (rc) goto out; lpi->lpi_env = &env; rc = llog_process_thread(arg); lu_env_fini(&env); out: complete(&lpi->lpi_completion); return rc; } int llog_process_or_fork(const struct lu_env *env, struct llog_handle *loghandle, llog_cb_t cb, void *data, void *catdata, bool fork) { struct llog_process_info *lpi; struct llog_process_data *d = data; struct llog_process_cat_data *cd = catdata; __u32 flags = loghandle->lgh_hdr->llh_flags; int rc; ENTRY; OBD_ALLOC_PTR(lpi); if (lpi == NULL) { CERROR("cannot alloc pointer\n"); RETURN(-ENOMEM); } lpi->lpi_loghandle = loghandle; lpi->lpi_cb = cb; lpi->lpi_cbdata = data; lpi->lpi_catdata = catdata; CDEBUG(D_OTHER, "Processing "DFID" flags 0x%03x startcat %d startidx %d first_idx %d last_idx %d read_mode %d\n", PLOGID(&loghandle->lgh_id), flags, (flags & LLOG_F_IS_CAT) && d ? d->lpd_startcat : -1, (flags & LLOG_F_IS_CAT) && d ? d->lpd_startidx : -1, cd ? cd->lpcd_first_idx : -1, cd ? cd->lpcd_last_idx : -1, cd ? cd->lpcd_read_mode : -1); if (fork) { struct task_struct *task; /* The new thread can't use parent env, * init the new one in llog_process_thread_daemonize. */ lpi->lpi_env = NULL; init_completion(&lpi->lpi_completion); /* take reference to current, so that * llog_process_thread_daemonize() can use it to switch to * namespace associated with current */ lpi->lpi_reftask = current; task = kthread_run(llog_process_thread_daemonize, lpi, "llog_process_thread"); if (IS_ERR(task)) { rc = PTR_ERR(task); CERROR("%s: cannot start thread: rc = %d\n", loghandle2name(loghandle), rc); GOTO(out_lpi, rc); } wait_for_completion(&lpi->lpi_completion); } else { lpi->lpi_env = env; llog_process_thread(lpi); } rc = lpi->lpi_rc; out_lpi: OBD_FREE_PTR(lpi); RETURN(rc); } EXPORT_SYMBOL(llog_process_or_fork); int llog_process(const struct lu_env *env, struct llog_handle *loghandle, llog_cb_t cb, void *data, void *catdata) { int rc; rc = llog_process_or_fork(env, loghandle, cb, data, catdata, true); return rc == LLOG_DEL_PLAIN ? 0 : rc; } EXPORT_SYMBOL(llog_process); static inline const struct cred *llog_raise_resource(void) { struct cred *cred = NULL; if (cap_raised(current_cap(), CAP_SYS_RESOURCE)) return cred; cred = prepare_creds(); if (!cred) return cred; cap_raise(cred->cap_effective, CAP_SYS_RESOURCE); return override_creds(cred); } static inline void llog_restore_resource(const struct cred *old_cred) { if (old_cred) revert_creds(old_cred); } int llog_reverse_process(const struct lu_env *env, struct llog_handle *loghandle, llog_cb_t cb, void *data, void *catdata) { struct llog_log_hdr *llh = loghandle->lgh_hdr; struct llog_process_cat_data *cd = catdata; void *buf; int rc = 0, first_index = 1, index, idx; __u32 chunk_size = llh->llh_hdr.lrh_len; ENTRY; OBD_ALLOC_LARGE(buf, chunk_size); if (buf == NULL) RETURN(-ENOMEM); if (cd != NULL) first_index = cd->lpcd_first_idx + 1; if (cd != NULL && cd->lpcd_last_idx) index = cd->lpcd_last_idx; else index = llog_max_idx(llh); while (rc == 0) { struct llog_rec_hdr *rec; struct llog_rec_tail *tail; /* skip records not set in bitmap */ while (index >= first_index && llog_is_index_skipable(index, llh, cd)) --index; LASSERT(index >= first_index - 1); if (index == first_index - 1) break; /* get the buf with our target record; avoid old garbage */ memset(buf, 0, chunk_size); rc = llog_prev_block(env, loghandle, index, buf, chunk_size); if (rc) GOTO(out, rc); rec = buf; idx = rec->lrh_index; CDEBUG(D_RPCTRACE, "index %u : idx %u\n", index, idx); while (idx < index) { rec = (void *)rec + rec->lrh_len; if (LLOG_REC_HDR_NEEDS_SWABBING(rec)) lustre_swab_llog_rec(rec); idx ++; } LASSERT(idx == index); tail = (void *)rec + rec->lrh_len - sizeof(*tail); /* process records in buffer, starting where we found one */ while ((void *)tail > buf) { if (tail->lrt_index == 0) GOTO(out, rc = 0); /* no more records */ /* if needed, process the callback on this record */ if (!llog_is_index_skipable(index, llh, cd)) { rec = (void *)tail - tail->lrt_len + sizeof(*tail); rc = cb(env, loghandle, rec, data); if (rc == LLOG_PROC_BREAK || rc == LLOG_SKIP_PLAIN) { GOTO(out, rc); } else if (rc == LLOG_DEL_RECORD) { rc = llog_cancel_rec(env, loghandle, tail->lrt_index); } if (rc) GOTO(out, rc); } /* previous record, still in buffer? */ --index; if (index < first_index) GOTO(out, rc = 0); tail = (void *)tail - tail->lrt_len; } } out: if (buf != NULL) OBD_FREE_LARGE(buf, chunk_size); RETURN(rc); } EXPORT_SYMBOL(llog_reverse_process); /** * new llog API * * API functions: * llog_open - open llog, may not exist * llog_exist - check if llog exists * llog_close - close opened llog, pair for open, frees llog_handle * llog_declare_create - declare llog creation * llog_create - create new llog on disk, need transaction handle * llog_declare_write_rec - declaration of llog write * llog_write_rec - write llog record on disk, need transaction handle * llog_declare_add - declare llog catalog record addition * llog_add - add llog record in catalog, need transaction handle */ int llog_exist(struct llog_handle *loghandle) { const struct llog_operations *lop; int rc; ENTRY; rc = llog_handle2ops(loghandle, &lop); if (rc) RETURN(rc); if (lop->lop_exist == NULL) RETURN(-EOPNOTSUPP); rc = lop->lop_exist(loghandle); RETURN(rc); } EXPORT_SYMBOL(llog_exist); int llog_declare_create(const struct lu_env *env, struct llog_handle *loghandle, struct thandle *th) { const struct cred *old_cred; const struct llog_operations *lop; int rc; ENTRY; rc = llog_handle2ops(loghandle, &lop); if (rc) RETURN(rc); if (lop->lop_declare_create == NULL) RETURN(-EOPNOTSUPP); old_cred = llog_raise_resource(); rc = lop->lop_declare_create(env, loghandle, th); llog_restore_resource(old_cred); RETURN(rc); } int llog_create(const struct lu_env *env, struct llog_handle *handle, struct thandle *th) { const struct cred *old_cred; const struct llog_operations *lop; int rc; ENTRY; rc = llog_handle2ops(handle, &lop); if (rc) RETURN(rc); if (lop->lop_create == NULL) RETURN(-EOPNOTSUPP); old_cred = llog_raise_resource(); rc = lop->lop_create(env, handle, th); llog_restore_resource(old_cred); RETURN(rc); } int llog_declare_write_rec(const struct lu_env *env, struct llog_handle *handle, struct llog_rec_hdr *rec, int idx, struct thandle *th) { const struct cred *old_cred; const struct llog_operations *lop; int rc; ENTRY; rc = llog_handle2ops(handle, &lop); if (rc) RETURN(rc); LASSERT(lop); if (lop->lop_declare_write_rec == NULL) RETURN(-EOPNOTSUPP); old_cred = llog_raise_resource(); rc = lop->lop_declare_write_rec(env, handle, rec, idx, th); llog_restore_resource(old_cred); RETURN(rc); } int llog_write_rec(const struct lu_env *env, struct llog_handle *handle, struct llog_rec_hdr *rec, struct llog_cookie *logcookies, int idx, struct thandle *th) { const struct cred *old_cred; const struct llog_operations *lop; int rc, buflen; ENTRY; /* API sanity checks */ if (handle == NULL) { CERROR("loghandle is missed\n"); RETURN(-EPROTO); } else if (handle->lgh_obj == NULL) { CERROR("loghandle %p with NULL object\n", handle); RETURN(-EPROTO); } else if (th == NULL) { CERROR("%s: missed transaction handle\n", loghandle2name(handle)); RETURN(-EPROTO); } else if (handle->lgh_hdr == NULL) { CERROR("%s: loghandle %p with no header\n", loghandle2name(handle), handle); RETURN(-EPROTO); } rc = llog_handle2ops(handle, &lop); if (rc) RETURN(rc); if (lop->lop_write_rec == NULL) RETURN(-EOPNOTSUPP); buflen = rec->lrh_len; LASSERT(round_up(buflen, 8) == buflen); old_cred = llog_raise_resource(); rc = lop->lop_write_rec(env, handle, rec, logcookies, idx, th); llog_restore_resource(old_cred); RETURN(rc); } int llog_add(const struct lu_env *env, struct llog_handle *lgh, struct llog_rec_hdr *rec, struct llog_cookie *logcookies, struct thandle *th) { const struct cred *old_cred; int rc; ENTRY; if (lgh->lgh_logops->lop_add == NULL) RETURN(-EOPNOTSUPP); old_cred = llog_raise_resource(); rc = lgh->lgh_logops->lop_add(env, lgh, rec, logcookies, th); llog_restore_resource(old_cred); RETURN(rc); } EXPORT_SYMBOL(llog_add); int llog_declare_add(const struct lu_env *env, struct llog_handle *lgh, struct llog_rec_hdr *rec, struct thandle *th) { const struct cred *old_cred; int rc; ENTRY; if (lgh->lgh_logops->lop_declare_add == NULL) RETURN(-EOPNOTSUPP); old_cred = llog_raise_resource(); rc = lgh->lgh_logops->lop_declare_add(env, lgh, rec, th); llog_restore_resource(old_cred); RETURN(rc); } EXPORT_SYMBOL(llog_declare_add); /** * Helper function to open llog or create it if doesn't exist. * It hides all transaction handling from caller. */ int llog_open_create(const struct lu_env *env, struct llog_ctxt *ctxt, struct llog_handle **res, struct llog_logid *logid, char *name) { struct dt_device *d; struct thandle *th; int rc; ENTRY; rc = llog_open(env, ctxt, res, logid, name, LLOG_OPEN_NEW); if (rc) RETURN(rc); if (llog_exist(*res)) RETURN(0); LASSERT((*res)->lgh_obj != NULL); d = lu2dt_dev((*res)->lgh_obj->do_lu.lo_dev); if (unlikely(unlikely(d->dd_rdonly))) RETURN(-EROFS); th = dt_trans_create(env, d); if (IS_ERR(th)) GOTO(out, rc = PTR_ERR(th)); /* Create update llog object synchronously, which * happens during inialization process see * lod_sub_prep_llog(), to make sure the update * llog object is created before corss-MDT writing * updates into the llog object */ if (ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) th->th_sync = 1; th->th_wait_submit = 1; rc = llog_declare_create(env, *res, th); if (rc == 0) { rc = dt_trans_start_local(env, d, th); if (rc == 0) rc = llog_create(env, *res, th); } dt_trans_stop(env, d, th); out: if (rc) llog_close(env, *res); RETURN(rc); } EXPORT_SYMBOL(llog_open_create); /** * Helper function to delete existent llog. */ int llog_erase(const struct lu_env *env, struct llog_ctxt *ctxt, struct llog_logid *logid, char *name) { struct llog_handle *handle; int rc; ENTRY; /* nothing to erase */ if (name == NULL && logid == NULL) RETURN(0); rc = llog_open(env, ctxt, &handle, logid, name, LLOG_OPEN_EXISTS); if (rc < 0) RETURN(rc); rc = llog_destroy(env, handle); llog_close(env, handle); RETURN(rc); } EXPORT_SYMBOL(llog_erase); /* * Helper function for write record in llog. * It hides all transaction handling from caller. * Valid only with local llog. */ int llog_write(const struct lu_env *env, struct llog_handle *loghandle, struct llog_rec_hdr *rec, int idx) { struct dt_device *dt; struct thandle *th; bool need_cookie; int rc; ENTRY; LASSERT(loghandle); LASSERT(loghandle->lgh_ctxt); LASSERT(loghandle->lgh_obj != NULL); dt = lu2dt_dev(loghandle->lgh_obj->do_lu.lo_dev); if (unlikely(unlikely(dt->dd_rdonly))) RETURN(-EROFS); th = dt_trans_create(env, dt); if (IS_ERR(th)) RETURN(PTR_ERR(th)); rc = llog_declare_write_rec(env, loghandle, rec, idx, th); if (rc) GOTO(out_trans, rc); th->th_wait_submit = 1; rc = dt_trans_start_local(env, dt, th); if (rc) GOTO(out_trans, rc); need_cookie = !(idx == LLOG_HEADER_IDX || idx == LLOG_NEXT_IDX); down_write(&loghandle->lgh_lock); if (need_cookie) { struct llog_thread_info *lti = llog_info(env); /* cookie comes from llog_process_thread */ rc = llog_write_rec(env, loghandle, rec, <i->lgi_cookie, rec->lrh_index, th); /* upper layer didn`t pass cookie so change rc */ rc = (rc == 1 ? 0 : rc); } else { rc = llog_write_rec(env, loghandle, rec, NULL, idx, th); } up_write(&loghandle->lgh_lock); out_trans: dt_trans_stop(env, dt, th); RETURN(rc); } EXPORT_SYMBOL(llog_write); int llog_open(const struct lu_env *env, struct llog_ctxt *ctxt, struct llog_handle **lgh, struct llog_logid *logid, char *name, enum llog_open_param open_param) { const struct cred *old_cred; int rc; ENTRY; LASSERT(ctxt); LASSERT(ctxt->loc_logops); if (ctxt->loc_logops->lop_open == NULL) { *lgh = NULL; RETURN(-EOPNOTSUPP); } *lgh = llog_alloc_handle(); if (*lgh == NULL) RETURN(-ENOMEM); (*lgh)->lgh_ctxt = ctxt; (*lgh)->lgh_logops = ctxt->loc_logops; old_cred = llog_raise_resource(); rc = ctxt->loc_logops->lop_open(env, *lgh, logid, name, open_param); llog_restore_resource(old_cred); if (rc) { llog_free_handle(*lgh); *lgh = NULL; } RETURN(rc); } EXPORT_SYMBOL(llog_open); int llog_close(const struct lu_env *env, struct llog_handle *loghandle) { return llog_handle_put(env, loghandle); } EXPORT_SYMBOL(llog_close); /** * Helper function to get the llog size in records. It is used by MGS * mostly to check that config llog exists and contains data. * * \param[in] env execution environment * \param[in] ctxt llog context * \param[in] name llog name * * \retval true if there are records in llog besides a header * \retval false on error or llog without records */ int llog_is_empty(const struct lu_env *env, struct llog_ctxt *ctxt, char *name) { struct llog_handle *llh; int rc = 0; rc = llog_open(env, ctxt, &llh, NULL, name, LLOG_OPEN_EXISTS); if (rc < 0) { if (likely(rc == -ENOENT)) rc = 0; GOTO(out, rc); } rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL); if (rc) GOTO(out_close, rc); rc = llog_get_size(llh); out_close: llog_close(env, llh); out: /* The header is record 1, the llog is still considered as empty * if there is only header */ return (rc <= 1); } EXPORT_SYMBOL(llog_is_empty); /* this callback run in raw read mode (canceled record are processed) */ int llog_copy_handler(const struct lu_env *env, struct llog_handle *llh, struct llog_rec_hdr *rec, void *data) { struct llog_handle *copy_llh = data; int idx = rec->lrh_index; int rc; ENTRY; /* Append all records */ rc = llog_write(env, copy_llh, rec, LLOG_NEXT_IDX); /* Cancel the record if it is canceled on the source */ if (!rc && !test_bit_le(idx, LLOG_HDR_BITMAP(llh->lgh_hdr))) rc = llog_cancel_rec(env, copy_llh, copy_llh->lgh_last_idx); RETURN(rc); } /* backup plain llog */ int llog_backup(const struct lu_env *env, struct obd_device *obd, struct llog_ctxt *ctxt, struct llog_ctxt *bctxt, char *name, char *backup) { struct llog_handle *llh, *bllh; struct llog_process_cat_data cd = {0}; int rc; ENTRY; /* open original log */ rc = llog_open(env, ctxt, &llh, NULL, name, LLOG_OPEN_EXISTS); if (rc < 0) { /* the -ENOENT case is also reported to the caller * but silently so it should handle that if needed. */ if (rc != -ENOENT) CERROR("%s: failed to open log %s: rc = %d\n", obd->obd_name, name, rc); RETURN(rc); } rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL); if (rc) GOTO(out_close, rc); /* Make sure there's no old backup log */ rc = llog_erase(env, bctxt, NULL, backup); if (rc < 0 && rc != -ENOENT) GOTO(out_close, rc); /* open backup log */ rc = llog_open_create(env, bctxt, &bllh, NULL, backup); if (rc) { CERROR("%s: failed to open backup logfile %s: rc = %d\n", obd->obd_name, backup, rc); GOTO(out_close, rc); } /* check that backup llog is not the same object as original one */ if (llh->lgh_obj == bllh->lgh_obj) { CERROR("%s: backup llog %s to itself (%s), objects %p/%p\n", obd->obd_name, name, backup, llh->lgh_obj, bllh->lgh_obj); GOTO(out_backup, rc = -EEXIST); } rc = llog_init_handle(env, bllh, LLOG_F_IS_PLAIN, NULL); if (rc) GOTO(out_backup, rc); /* Read canceled records to have an exact copy */ cd.lpcd_read_mode = LLOG_READ_MODE_RAW; /* Copy log record by record */ rc = llog_process_or_fork(env, llh, llog_copy_handler, (void *)bllh, &cd, false); if (rc) CERROR("%s: failed to backup log %s: rc = %d\n", obd->obd_name, name, rc); out_backup: llog_close(env, bllh); out_close: llog_close(env, llh); RETURN(rc); } EXPORT_SYMBOL(llog_backup); /* Get size of llog */ __u64 llog_size(const struct lu_env *env, struct llog_handle *llh) { int rc; struct lu_attr la; rc = llh->lgh_obj->do_ops->do_attr_get(env, llh->lgh_obj, &la); if (rc) { CERROR("%s: attr_get failed for "DFID": rc = %d\n", loghandle2name(llh), PLOGID(&llh->lgh_id), rc); return 0; } return la.la_size; } EXPORT_SYMBOL(llog_size); /* set llog ctime to current, and set LLOG_F_RM_ON_ERR|LLOG_F_MAX_AGE flag in * log header. It will be reclaimed when expired (UPDATE_LOG_MAX_AGE old). */ int llog_retain(const struct lu_env *env, struct llog_handle *log) { struct dt_object *dto = log->lgh_obj; struct dt_device *dt = lu2dt_dev(log->lgh_obj->do_lu.lo_dev); struct lu_attr la = { 0 }; struct thandle *th; int rc; la.la_ctime = ktime_get_real_seconds(); la.la_valid = LA_CTIME; th = dt_trans_create(env, dt); if (IS_ERR(th)) return PTR_ERR(th); th->th_wait_submit = 1; log->lgh_hdr->llh_flags |= LLOG_F_MAX_AGE | LLOG_F_RM_ON_ERR; rc = llog_declare_write_rec(env, log, &log->lgh_hdr->llh_hdr, -1, th); if (rc) goto out_trans; rc = dt_declare_attr_set(env, dto, &la, th); if (rc) goto out_trans; rc = dt_trans_start_local(env, dt, th); if (rc) goto out_trans; rc = llog_write_rec(env, log, &log->lgh_hdr->llh_hdr, NULL, LLOG_HEADER_IDX, th); if (rc) goto out_trans; rc = dt_attr_set(env, dto, &la, th); out_trans: dt_trans_stop(env, dt, th); CDEBUG(D_OTHER, "retain log "DFID" rc = %d\n", PLOGID(&log->lgh_id), rc); return rc; } EXPORT_SYMBOL(llog_retain);