*/
/*
* This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
*
* lustre/obdclass/llog.c
*
#include <obd_support.h>
#include <obd_class.h>
#include "llog_internal.h"
+
/*
* Allocate a new log or catalog handle
* Used inside llog_open().
int rc = 0;
if (refcount_dec_and_test(&loghandle->lgh_refcount)) {
- struct llog_operations *lop;
+ const struct llog_operations *lop;
rc = llog_handle2ops(loghandle, &lop);
if (!rc) {
struct llog_handle *handle,
struct thandle *th)
{
- struct llog_operations *lop;
+ const struct llog_operations *lop;
int rc;
ENTRY;
int llog_trans_destroy(const struct lu_env *env, struct llog_handle *handle,
struct thandle *th)
{
- struct llog_operations *lop;
+ const struct llog_operations *lop;
int rc;
ENTRY;
int llog_destroy(const struct lu_env *env, struct llog_handle *handle)
{
- struct llog_operations *lop;
- struct dt_device *dt;
- struct thandle *th;
+ const struct llog_operations *lop;
+ struct dt_device *dt;
+ struct thandle *th;
int rc;
ENTRY;
* be accessed anymore, let's return 0 for now, and
* the orphan will be handled by LFSCK. */
CERROR("%s: can't destroy empty llog "DFID": rc = %d\n",
- loghandle->lgh_ctxt->loc_obd->obd_name,
+ loghandle2name(loghandle),
PFID(&loghandle->lgh_id.lgl_oi.oi_fid), rc);
GOTO(out_unlock, rc = 0);
}
}
out_unlock:
+ if (rc < 0) {
+ /* restore bitmap while holding a mutex */
+ if (subtract_count) {
+ loghandle->lgh_hdr->llh_count += num;
+ subtract_count = false;
+ }
+ for (i = i - 1; i >= 0; i--)
+ set_bit_le(index[i], LLOG_HDR_BITMAP(llh));
+ }
mutex_unlock(&loghandle->lgh_hdr_mutex);
up_write(&loghandle->lgh_lock);
out_trans:
rc1 = dt_trans_stop(env, dt, th);
if (rc == 0)
rc = rc1;
- if (rc < 0) {
+ if (rc1 < 0) {
mutex_lock(&loghandle->lgh_hdr_mutex);
if (subtract_count)
loghandle->lgh_hdr->llh_count += num;
int llog_read_header(const struct lu_env *env, struct llog_handle *handle,
const struct obd_uuid *uuid)
{
- struct llog_operations *lop;
+ const struct llog_operations *lop;
int rc;
ENTRY;
(llh->llh_flags & LLOG_F_IS_CAT &&
flags & LLOG_F_IS_PLAIN))) {
CERROR("%s: llog type is %s but initializing %s\n",
- handle->lgh_ctxt->loc_obd->obd_name,
+ loghandle2name(handle),
llh->llh_flags & LLOG_F_IS_CAT ?
"catalog" : "plain",
flags & LLOG_F_IS_CAT ? "catalog" : "plain");
if (unlikely(uuid &&
!obd_uuid_equals(uuid, &llh->llh_tgtuuid))) {
CERROR("%s: llog uuid mismatch: %s/%s\n",
- handle->lgh_ctxt->loc_obd->obd_name,
+ loghandle2name(handle),
(char *)uuid->uuid,
(char *)llh->llh_tgtuuid.uuid);
GOTO(out, rc = -EEXIST);
llh->llh_flags |= LLOG_F_IS_FIXSIZE;
} else if (!(flags & LLOG_F_IS_PLAIN)) {
CERROR("%s: unknown flags: %#x (expected %#x or %#x)\n",
- handle->lgh_ctxt->loc_obd->obd_name,
- flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
+ loghandle2name(handle), flags, LLOG_F_IS_CAT,
+ LLOG_F_IS_PLAIN);
rc = -EINVAL;
}
llh->llh_flags |= fmt;
}
EXPORT_SYMBOL(llog_init_handle);
+int llog_verify_record(const struct llog_handle *llh, struct llog_rec_hdr *rec)
+{
+ int chunk_size = llh->lgh_hdr->llh_hdr.lrh_len;
+
+ if (rec->lrh_len == 0 || rec->lrh_len > chunk_size) {
+ CERROR("%s: record is too large: %d > %d\n",
+ loghandle2name(llh), rec->lrh_len, chunk_size);
+ return -EINVAL;
+ }
+ if (rec->lrh_index >= LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr)) {
+ CERROR("%s: index is too high: %d\n",
+ loghandle2name(llh), rec->lrh_index);
+ return -EINVAL;
+ }
+ if ((rec->lrh_type & LLOG_OP_MASK) != LLOG_OP_MAGIC) {
+ CERROR("%s: magic %x is bad\n",
+ loghandle2name(llh), rec->lrh_type);
+ return -EINVAL;
+ }
+
+ return 0;
+}
+EXPORT_SYMBOL(llog_verify_record);
+
static int llog_process_thread(void *arg)
{
struct llog_process_info *lpi = arg;
int saved_index = 0;
int last_called_index = 0;
bool repeated = false;
+ bool refresh_idx = false;
ENTRY;
struct llog_rec_hdr *rec;
off_t chunk_offset = 0;
unsigned int buf_offset = 0;
- bool partial_chunk;
int lh_last_idx;
int synced_idx = 0;
CDEBUG(D_OTHER, "cur_offset %llu, chunk_offset %llu,"
" buf_offset %u, rc = %d\n", cur_offset,
(__u64)chunk_offset, buf_offset, rc);
+ if (rc == -ESTALE)
+ GOTO(out, rc = 0);
/* we`ve tried to reread the chunk, but there is no
* new records */
if (rc == -EIO && repeated && (chunk_offset + buf_offset) ==
* The absolute offset of the current chunk is calculated
* from cur_offset value and stored in chunk_offset variable.
*/
- if ((cur_offset & (chunk_size - 1)) != 0) {
- partial_chunk = true;
+ if ((cur_offset & (chunk_size - 1)) != 0)
chunk_offset = cur_offset & ~(chunk_size - 1);
- } else {
- partial_chunk = false;
+ else
chunk_offset = cur_offset - chunk_size;
- }
/* NB: when rec->lrh_len is accessed it is already swabbed
* since it is used at the "end" of the loop and the rec
* could be less than index. So we detect last index
* for processing as index == lh_last_idx+1. But when
* catalog is wrapped and full lgh_last_idx=llh_cat_idx,
- * the first processing index is llh_cat_idx+1.
+ * the first processing index is llh_cat_idx+1.The
+ * exception is !(lgh_last_idx == llh_cat_idx &&
+ * index == llh_cat_idx + 1), and after simplification
+ * it turns to
+ * lh_last_idx != LLOG_HDR_TAIL(llh)->lrt_index
+ * This exception is working for catalog only.
*/
if ((index == lh_last_idx && synced_idx != index) ||
(index == (lh_last_idx + 1) &&
- !(index == (llh->llh_cat_idx + 1) &&
- (llh->llh_flags & LLOG_F_IS_CAT))) ||
+ lh_last_idx != LLOG_HDR_TAIL(llh)->lrt_index) ||
(rec->lrh_index == 0 && !repeated)) {
/* save offset inside buffer for the re-read */
repeated = false;
- if (rec->lrh_len == 0 || rec->lrh_len > chunk_size) {
- CWARN("%s: invalid length %d in llog "DFID
- "record for index %d/%d\n",
- loghandle->lgh_ctxt->loc_obd->obd_name,
- rec->lrh_len,
+ rc = llog_verify_record(loghandle, rec);
+ if (rc) {
+ CERROR("%s: invalid record in llog "DFID
+ " record for index %d/%d: rc = %d\n",
+ loghandle2name(loghandle),
PFID(&loghandle->lgh_id.lgl_oi.oi_fid),
- rec->lrh_index, index);
-
- GOTO(out, rc = -EINVAL);
+ rec->lrh_index, index, rc);
+ /*
+ * the block seem to be corrupted, let's try
+ * with the next one. reset rc to go to the
+ * next chunk.
+ */
+ refresh_idx = true;
+ index = 0;
+ GOTO(repeat, rc = 0);
}
if (rec->lrh_index < index) {
}
if (rec->lrh_index != index) {
- CERROR("%s: "DFID" Invalid record: index %u"
- " but expected %u\n",
- loghandle->lgh_ctxt->loc_obd->obd_name,
- PFID(&loghandle->lgh_id.lgl_oi.oi_fid),
- rec->lrh_index, index);
- GOTO(out, rc = -ERANGE);
+ /*
+ * the last time we couldn't parse the block due
+ * to corruption, thus has no idea about the
+ * next index, take it from the block, once.
+ */
+ if (refresh_idx) {
+ refresh_idx = false;
+ index = rec->lrh_index;
+ } else {
+ CERROR("%s: "DFID" Invalid record: index"
+ " %u but expected %u\n",
+ loghandle2name(loghandle),
+ PFID(&loghandle->lgh_id.lgl_oi.oi_fid),
+ rec->lrh_index, index);
+ GOTO(out, rc = -ERANGE);
+ }
}
CDEBUG(D_OTHER,
__u64 tmp_off;
int tmp_idx;
- CDEBUG(D_OTHER, "index: %d, lh_last_idx: %d "
+ CDEBUG((llh->llh_flags & LLOG_F_IS_CAT ?
+ D_HA : D_OTHER),
+ "index: %d, lh_last_idx: %d "
"synced_idx: %d lgh_last_idx: %d\n",
index, lh_last_idx, synced_idx,
loghandle->lgh_last_idx);
rc = llog_cancel_rec(lpi->lpi_env,
loghandle,
rec->lrh_index);
+ /* Allow parallel cancelling, ENOENT
+ * means record was canceled at another
+ * processing thread or callback
+ */
+ if (rc == -ENOENT)
+ rc = 0;
}
if (rc)
GOTO(out, rc);
}
out:
+ CDEBUG(D_HA, "stop processing %s "DOSTID":%x index %d count %d\n",
+ ((llh->llh_flags & LLOG_F_IS_CAT) ? "catalog" : "plain"),
+ POSTID(&loghandle->lgh_id.lgl_oi), loghandle->lgh_id.lgl_ogen,
+ index, llh->llh_count);
+
if (cd != NULL)
cd->lpcd_last_idx = last_called_index;
* retry until the umount or abort recovery, see
* lod_sub_recovery_thread() */
CERROR("%s retry remote llog process\n",
- loghandle->lgh_ctxt->loc_obd->obd_name);
+ loghandle2name(loghandle));
rc = -EAGAIN;
} else {
/* something bad happened to the processing of a local
* discard any remaining bits in the header */
CERROR("%s: Local llog found corrupted #"DOSTID":%x"
" %s index %d count %d\n",
- loghandle->lgh_ctxt->loc_obd->obd_name,
+ loghandle2name(loghandle),
POSTID(&loghandle->lgh_id.lgl_oi),
loghandle->lgh_id.lgl_ogen,
((llh->llh_flags & LLOG_F_IS_CAT) ? "catalog" :
}
task_unlock(lpi->lpi_reftask);
- unshare_fs_struct();
-
/* client env has no keys, tags is just 0 */
rc = lu_env_init(&env, LCT_LOCAL | LCT_MG_THREAD);
if (rc)
struct llog_process_info *lpi;
struct llog_process_data *d = data;
struct llog_process_cat_data *cd = catdata;
- int rc;
+ __u32 flags = loghandle->lgh_hdr->llh_flags;
+ int rc;
ENTRY;
lpi->lpi_catdata = catdata;
CDEBUG(D_OTHER, "Processing "DFID" flags 0x%03x startcat %d startidx %d first_idx %d last_idx %d\n",
- PFID(&loghandle->lgh_id.lgl_oi.oi_fid),
- loghandle->lgh_hdr->llh_flags, d ? d->lpd_startcat : -1,
- d ? d->lpd_startidx : -1, cd ? cd->lpcd_first_idx : -1,
- cd ? cd->lpcd_last_idx : -1);
+ PFID(&loghandle->lgh_id.lgl_oi.oi_fid), flags,
+ (flags & LLOG_F_IS_CAT) && d ? d->lpd_startcat : -1,
+ (flags & LLOG_F_IS_CAT) && d ? d->lpd_startidx : -1,
+ cd ? cd->lpcd_first_idx : -1, cd ? cd->lpcd_last_idx : -1);
if (fork) {
struct task_struct *task;
if (IS_ERR(task)) {
rc = PTR_ERR(task);
CERROR("%s: cannot start thread: rc = %d\n",
- loghandle->lgh_ctxt->loc_obd->obd_name, rc);
+ loghandle2name(loghandle), rc);
GOTO(out_lpi, rc);
}
wait_for_completion(&lpi->lpi_completion);
*/
int llog_exist(struct llog_handle *loghandle)
{
- struct llog_operations *lop;
- int rc;
+ const struct llog_operations *lop;
+ int rc;
ENTRY;
struct llog_handle *loghandle, struct thandle *th)
{
const struct cred *old_cred;
- struct llog_operations *lop;
+ const struct llog_operations *lop;
int rc;
ENTRY;
struct thandle *th)
{
const struct cred *old_cred;
- struct llog_operations *lop;
+ const struct llog_operations *lop;
int rc;
ENTRY;
struct thandle *th)
{
const struct cred *old_cred;
- struct llog_operations *lop;
+ const struct llog_operations *lop;
int rc;
ENTRY;
int idx, struct thandle *th)
{
const struct cred *old_cred;
- struct llog_operations *lop;
+ const struct llog_operations *lop;
int rc, buflen;
ENTRY;
RETURN(-EPROTO);
} else if (th == NULL) {
CERROR("%s: missed transaction handle\n",
- handle->lgh_obj->do_lu.lo_dev->ld_obd->obd_name);
+ loghandle2name(handle));
RETURN(-EPROTO);
} else if (handle->lgh_hdr == NULL) {
CERROR("%s: loghandle %p with no header\n",
- handle->lgh_obj->do_lu.lo_dev->ld_obd->obd_name,
- handle);
+ loghandle2name(handle), handle);
RETURN(-EPROTO);
}
rc = llh->lgh_obj->do_ops->do_attr_get(env, llh->lgh_obj, &la);
if (rc) {
CERROR("%s: attr_get failed for "DFID": rc = %d\n",
- llh->lgh_ctxt->loc_obd->obd_name,
- PFID(&llh->lgh_id.lgl_oi.oi_fid), rc);
+ loghandle2name(llh), PFID(&llh->lgh_id.lgl_oi.oi_fid),
+ rc);
return 0;
}