*/
/*
* This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
*
* lustre/obdclass/llog.c
*
#include <obd_support.h>
#include <obd_class.h>
#include "llog_internal.h"
+
/*
* Allocate a new log or catalog handle
* Used inside llog_open().
mutex_init(&loghandle->lgh_hdr_mutex);
init_rwsem(&loghandle->lgh_last_sem);
INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
- atomic_set(&loghandle->lgh_refcount, 1);
+ refcount_set(&loghandle->lgh_refcount, 1);
return loghandle;
}
OBD_FREE_PTR(loghandle);
}
-void llog_handle_get(struct llog_handle *loghandle)
+struct llog_handle *llog_handle_get(struct llog_handle *loghandle)
{
- atomic_inc(&loghandle->lgh_refcount);
+ if (refcount_inc_not_zero(&loghandle->lgh_refcount))
+ return loghandle;
+ return NULL;
}
-void llog_handle_put(struct llog_handle *loghandle)
+int llog_handle_put(const struct lu_env *env, struct llog_handle *loghandle)
{
- LASSERT(atomic_read(&loghandle->lgh_refcount) > 0);
- if (atomic_dec_and_test(&loghandle->lgh_refcount))
+ int rc = 0;
+
+ if (refcount_dec_and_test(&loghandle->lgh_refcount)) {
+ const struct llog_operations *lop;
+
+ rc = llog_handle2ops(loghandle, &lop);
+ if (!rc) {
+ if (lop->lop_close)
+ rc = lop->lop_close(env, loghandle);
+ else
+ rc = -EOPNOTSUPP;
+ }
llog_free_handle(loghandle);
+ }
+ return rc;
}
static int llog_declare_destroy(const struct lu_env *env,
struct llog_handle *handle,
struct thandle *th)
{
- struct llog_operations *lop;
+ const struct llog_operations *lop;
int rc;
ENTRY;
int llog_trans_destroy(const struct lu_env *env, struct llog_handle *handle,
struct thandle *th)
{
- struct llog_operations *lop;
+ const struct llog_operations *lop;
int rc;
ENTRY;
RETURN(-EOPNOTSUPP);
LASSERT(handle->lgh_obj != NULL);
- if (!dt_object_exists(handle->lgh_obj))
+ if (!llog_exist(handle))
RETURN(0);
rc = lop->lop_destroy(env, handle, th);
int llog_destroy(const struct lu_env *env, struct llog_handle *handle)
{
- struct llog_operations *lop;
- struct dt_device *dt;
- struct thandle *th;
+ const struct llog_operations *lop;
+ struct dt_device *dt;
+ struct thandle *th;
int rc;
ENTRY;
RETURN(rc);
}
- if (!dt_object_exists(handle->lgh_obj))
+ if (!llog_exist(handle))
RETURN(0);
dt = lu2dt_dev(handle->lgh_obj->do_lu.lo_dev);
CERROR("Can't cancel index 0 which is header\n");
GOTO(out_unlock, rc = -EINVAL);
}
- if (!ext2_clear_bit(index[i], LLOG_HDR_BITMAP(llh))) {
+ if (!__test_and_clear_bit_le(index[i], LLOG_HDR_BITMAP(llh))) {
CDEBUG(D_RPCTRACE, "Catalog index %u already clear?\n",
index[i]);
GOTO(out_unlock, rc = -ENOENT);
* be accessed anymore, let's return 0 for now, and
* the orphan will be handled by LFSCK. */
CERROR("%s: can't destroy empty llog "DFID": rc = %d\n",
- loghandle->lgh_ctxt->loc_obd->obd_name,
+ loghandle2name(loghandle),
PFID(&loghandle->lgh_id.lgl_oi.oi_fid), rc);
GOTO(out_unlock, rc = 0);
}
}
out_unlock:
+ if (rc < 0) {
+ /* restore bitmap while holding a mutex */
+ if (subtract_count) {
+ loghandle->lgh_hdr->llh_count += num;
+ subtract_count = false;
+ }
+ for (i = i - 1; i >= 0; i--)
+ set_bit_le(index[i], LLOG_HDR_BITMAP(llh));
+ }
mutex_unlock(&loghandle->lgh_hdr_mutex);
up_write(&loghandle->lgh_lock);
out_trans:
rc1 = dt_trans_stop(env, dt, th);
if (rc == 0)
rc = rc1;
- if (rc < 0) {
+ if (rc1 < 0) {
mutex_lock(&loghandle->lgh_hdr_mutex);
if (subtract_count)
loghandle->lgh_hdr->llh_count += num;
for (i = i - 1; i >= 0; i--)
- ext2_set_bit(index[i], LLOG_HDR_BITMAP(llh));
+ set_bit_le(index[i], LLOG_HDR_BITMAP(llh));
mutex_unlock(&loghandle->lgh_hdr_mutex);
}
RETURN(rc);
int llog_read_header(const struct lu_env *env, struct llog_handle *handle,
const struct obd_uuid *uuid)
{
- struct llog_operations *lop;
+ const struct llog_operations *lop;
int rc;
ENTRY;
memset(LLOG_HDR_BITMAP(llh), 0, llh->llh_hdr.lrh_len -
llh->llh_bitmap_offset -
sizeof(llh->llh_tail));
- ext2_set_bit(0, LLOG_HDR_BITMAP(llh));
+ set_bit_le(0, LLOG_HDR_BITMAP(llh));
LLOG_HDR_TAIL(llh)->lrt_len = llh->llh_hdr.lrh_len;
LLOG_HDR_TAIL(llh)->lrt_index = llh->llh_hdr.lrh_index;
rc = 0;
(llh->llh_flags & LLOG_F_IS_CAT &&
flags & LLOG_F_IS_PLAIN))) {
CERROR("%s: llog type is %s but initializing %s\n",
- handle->lgh_ctxt->loc_obd->obd_name,
+ loghandle2name(handle),
llh->llh_flags & LLOG_F_IS_CAT ?
"catalog" : "plain",
flags & LLOG_F_IS_CAT ? "catalog" : "plain");
if (unlikely(uuid &&
!obd_uuid_equals(uuid, &llh->llh_tgtuuid))) {
CERROR("%s: llog uuid mismatch: %s/%s\n",
- handle->lgh_ctxt->loc_obd->obd_name,
+ loghandle2name(handle),
(char *)uuid->uuid,
(char *)llh->llh_tgtuuid.uuid);
GOTO(out, rc = -EEXIST);
llh->llh_flags |= LLOG_F_IS_FIXSIZE;
} else if (!(flags & LLOG_F_IS_PLAIN)) {
CERROR("%s: unknown flags: %#x (expected %#x or %#x)\n",
- handle->lgh_ctxt->loc_obd->obd_name,
- flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
+ loghandle2name(handle), flags, LLOG_F_IS_CAT,
+ LLOG_F_IS_PLAIN);
rc = -EINVAL;
}
llh->llh_flags |= fmt;
}
EXPORT_SYMBOL(llog_init_handle);
+int llog_verify_record(const struct llog_handle *llh, struct llog_rec_hdr *rec)
+{
+ int chunk_size = llh->lgh_hdr->llh_hdr.lrh_len;
+
+ if (rec->lrh_len == 0 || rec->lrh_len > chunk_size) {
+ CERROR("%s: record is too large: %d > %d\n",
+ loghandle2name(llh), rec->lrh_len, chunk_size);
+ return -EINVAL;
+ }
+ if (rec->lrh_index >= LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr)) {
+ CERROR("%s: index is too high: %d\n",
+ loghandle2name(llh), rec->lrh_index);
+ return -EINVAL;
+ }
+ if ((rec->lrh_type & LLOG_OP_MASK) != LLOG_OP_MAGIC) {
+ CERROR("%s: magic %x is bad\n",
+ loghandle2name(llh), rec->lrh_type);
+ return -EINVAL;
+ }
+
+ return 0;
+}
+EXPORT_SYMBOL(llog_verify_record);
+
+static inline bool llog_is_index_skipable(int idx, struct llog_log_hdr *llh,
+ struct llog_process_cat_data *cd)
+{
+ if (cd && (cd->lpcd_read_mode & LLOG_READ_MODE_RAW))
+ return false;
+
+ return !test_bit_le(idx, LLOG_HDR_BITMAP(llh));
+}
+
static int llog_process_thread(void *arg)
{
struct llog_process_info *lpi = arg;
int saved_index = 0;
int last_called_index = 0;
bool repeated = false;
+ bool refresh_idx = false;
ENTRY;
last_called_index = cd->lpcd_first_idx;
index = cd->lpcd_first_idx + 1;
}
- if (cd != NULL && cd->lpcd_last_idx)
+ if (cd && cd->lpcd_last_idx)
last_index = cd->lpcd_last_idx;
+ else if (cd && (cd->lpcd_read_mode & LLOG_READ_MODE_RAW))
+ last_index = loghandle->lgh_last_idx;
else
last_index = LLOG_HDR_BITMAP_SIZE(llh) - 1;
struct llog_rec_hdr *rec;
off_t chunk_offset = 0;
unsigned int buf_offset = 0;
- bool partial_chunk;
int lh_last_idx;
int synced_idx = 0;
/* skip records not set in bitmap */
while (index <= last_index &&
- !ext2_test_bit(index, LLOG_HDR_BITMAP(llh)))
+ llog_is_index_skipable(index, llh, cd))
++index;
/* There are no indices prior the last_index */
CDEBUG(D_OTHER, "cur_offset %llu, chunk_offset %llu,"
" buf_offset %u, rc = %d\n", cur_offset,
(__u64)chunk_offset, buf_offset, rc);
+ if (rc == -ESTALE)
+ GOTO(out, rc = 0);
/* we`ve tried to reread the chunk, but there is no
* new records */
if (rc == -EIO && repeated && (chunk_offset + buf_offset) ==
* The absolute offset of the current chunk is calculated
* from cur_offset value and stored in chunk_offset variable.
*/
- if ((cur_offset & (chunk_size - 1)) != 0) {
- partial_chunk = true;
+ if ((cur_offset & (chunk_size - 1)) != 0)
chunk_offset = cur_offset & ~(chunk_size - 1);
- } else {
- partial_chunk = false;
+ else
chunk_offset = cur_offset - chunk_size;
- }
/* NB: when rec->lrh_len is accessed it is already swabbed
* since it is used at the "end" of the loop and the rec
* could be less than index. So we detect last index
* for processing as index == lh_last_idx+1. But when
* catalog is wrapped and full lgh_last_idx=llh_cat_idx,
- * the first processing index is llh_cat_idx+1.
+ * the first processing index is llh_cat_idx+1.The
+ * exception is !(lgh_last_idx == llh_cat_idx &&
+ * index == llh_cat_idx + 1), and after simplification
+ * it turns to
+ * lh_last_idx != LLOG_HDR_TAIL(llh)->lrt_index
+ * This exception is working for catalog only.
*/
if ((index == lh_last_idx && synced_idx != index) ||
(index == (lh_last_idx + 1) &&
- !(index == (llh->llh_cat_idx + 1) &&
- (llh->llh_flags & LLOG_F_IS_CAT))) ||
+ lh_last_idx != LLOG_HDR_TAIL(llh)->lrt_index) ||
(rec->lrh_index == 0 && !repeated)) {
/* save offset inside buffer for the re-read */
repeated = false;
- if (rec->lrh_len == 0 || rec->lrh_len > chunk_size) {
- CWARN("%s: invalid length %d in llog "DFID
- "record for index %d/%d\n",
- loghandle->lgh_ctxt->loc_obd->obd_name,
- rec->lrh_len,
+ rc = llog_verify_record(loghandle, rec);
+ if (rc) {
+ CERROR("%s: invalid record in llog "DFID
+ " record for index %d/%d: rc = %d\n",
+ loghandle2name(loghandle),
PFID(&loghandle->lgh_id.lgl_oi.oi_fid),
- rec->lrh_index, index);
-
- GOTO(out, rc = -EINVAL);
+ rec->lrh_index, index, rc);
+ /*
+ * the block seem to be corrupted, let's try
+ * with the next one. reset rc to go to the
+ * next chunk.
+ */
+ refresh_idx = true;
+ index = 0;
+ GOTO(repeat, rc = 0);
}
if (rec->lrh_index < index) {
continue;
}
- if (rec->lrh_index != index) {
- CERROR("%s: "DFID" Invalid record: index %u"
- " but expected %u\n",
- loghandle->lgh_ctxt->loc_obd->obd_name,
+ if (rec->lrh_index > index) {
+ /* the record itself looks good, but we met a
+ * gap which can be result of old bugs, just
+ * keep going */
+ CERROR("%s: "DFID" index %u, expected %u\n",
+ loghandle2name(loghandle),
PFID(&loghandle->lgh_id.lgl_oi.oi_fid),
rec->lrh_index, index);
- GOTO(out, rc = -ERANGE);
+ index = rec->lrh_index;
}
CDEBUG(D_OTHER,
loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
chunk_offset;
- /* if set, process the callback on this record */
- if (ext2_test_bit(index, LLOG_HDR_BITMAP(llh))) {
+ /* if needed, process the callback on this record */
+ if (!llog_is_index_skipable(index, llh, cd)) {
struct llog_cookie *lgc;
__u64 tmp_off;
int tmp_idx;
- CDEBUG(D_OTHER, "index: %d, lh_last_idx: %d "
+ CDEBUG((llh->llh_flags & LLOG_F_IS_CAT ?
+ D_HA : D_OTHER),
+ "index: %d, lh_last_idx: %d "
"synced_idx: %d lgh_last_idx: %d\n",
index, lh_last_idx, synced_idx,
loghandle->lgh_last_idx);
lgc->lgc_index = tmp_idx;
}
- if (rc == LLOG_PROC_BREAK) {
+ if (rc == LLOG_PROC_BREAK ||
+ rc == LLOG_SKIP_PLAIN) {
GOTO(out, rc);
} else if (rc == LLOG_DEL_RECORD) {
rc = llog_cancel_rec(lpi->lpi_env,
loghandle,
rec->lrh_index);
+ /* Allow parallel cancelling, ENOENT
+ * means record was canceled at another
+ * processing thread or callback
+ */
+ if (rc == -ENOENT)
+ rc = 0;
}
if (rc)
GOTO(out, rc);
}
out:
+ CDEBUG(D_HA, "stop processing %s "DOSTID":%x index %d count %d\n",
+ ((llh->llh_flags & LLOG_F_IS_CAT) ? "catalog" : "plain"),
+ POSTID(&loghandle->lgh_id.lgl_oi), loghandle->lgh_id.lgl_ogen,
+ index, llh->llh_count);
+
if (cd != NULL)
cd->lpcd_last_idx = last_called_index;
* retry until the umount or abort recovery, see
* lod_sub_recovery_thread() */
CERROR("%s retry remote llog process\n",
- loghandle->lgh_ctxt->loc_obd->obd_name);
+ loghandle2name(loghandle));
rc = -EAGAIN;
} else {
/* something bad happened to the processing of a local
* discard any remaining bits in the header */
CERROR("%s: Local llog found corrupted #"DOSTID":%x"
" %s index %d count %d\n",
- loghandle->lgh_ctxt->loc_obd->obd_name,
+ loghandle2name(loghandle),
POSTID(&loghandle->lgh_id.lgl_oi),
loghandle->lgh_id.lgl_ogen,
((llh->llh_flags & LLOG_F_IS_CAT) ? "catalog" :
"plain"), index, llh->llh_count);
while (index <= last_index) {
- if (ext2_test_bit(index,
+ if (test_bit_le(index,
LLOG_HDR_BITMAP(llh)) != 0)
llog_cancel_rec(lpi->lpi_env, loghandle,
index);
}
task_unlock(lpi->lpi_reftask);
- unshare_fs_struct();
-
/* client env has no keys, tags is just 0 */
rc = lu_env_init(&env, LCT_LOCAL | LCT_MG_THREAD);
if (rc)
struct llog_process_info *lpi;
struct llog_process_data *d = data;
struct llog_process_cat_data *cd = catdata;
- int rc;
+ __u32 flags = loghandle->lgh_hdr->llh_flags;
+ int rc;
ENTRY;
lpi->lpi_cbdata = data;
lpi->lpi_catdata = catdata;
- CDEBUG(D_OTHER, "Processing "DFID" flags 0x%03x startcat %d startidx %d first_idx %d last_idx %d\n",
- PFID(&loghandle->lgh_id.lgl_oi.oi_fid),
- loghandle->lgh_hdr->llh_flags, d ? d->lpd_startcat : -1,
- d ? d->lpd_startidx : -1, cd ? cd->lpcd_first_idx : -1,
- cd ? cd->lpcd_last_idx : -1);
+ CDEBUG(D_OTHER, "Processing "DFID" flags 0x%03x startcat %d startidx %d first_idx %d last_idx %d read_mode %d\n",
+ PFID(&loghandle->lgh_id.lgl_oi.oi_fid), flags,
+ (flags & LLOG_F_IS_CAT) && d ? d->lpd_startcat : -1,
+ (flags & LLOG_F_IS_CAT) && d ? d->lpd_startidx : -1,
+ cd ? cd->lpcd_first_idx : -1, cd ? cd->lpcd_last_idx : -1,
+ cd ? cd->lpcd_read_mode : -1);
if (fork) {
struct task_struct *task;
if (IS_ERR(task)) {
rc = PTR_ERR(task);
CERROR("%s: cannot start thread: rc = %d\n",
- loghandle->lgh_ctxt->loc_obd->obd_name, rc);
+ loghandle2name(loghandle), rc);
GOTO(out_lpi, rc);
}
wait_for_completion(&lpi->lpi_completion);
/* skip records not set in bitmap */
while (index >= first_index &&
- !ext2_test_bit(index, LLOG_HDR_BITMAP(llh)))
+ llog_is_index_skipable(index, llh, cd))
--index;
LASSERT(index >= first_index - 1);
if (tail->lrt_index == 0)
GOTO(out, rc = 0); /* no more records */
- /* if set, process the callback on this record */
- if (ext2_test_bit(index, LLOG_HDR_BITMAP(llh))) {
+ /* if needed, process the callback on this record */
+ if (!llog_is_index_skipable(index, llh, cd)) {
rec = (void *)tail - tail->lrt_len +
sizeof(*tail);
rc = cb(env, loghandle, rec, data);
- if (rc == LLOG_PROC_BREAK) {
+ if (rc == LLOG_PROC_BREAK ||
+ rc == LLOG_SKIP_PLAIN) {
GOTO(out, rc);
} else if (rc == LLOG_DEL_RECORD) {
rc = llog_cancel_rec(env, loghandle,
*/
int llog_exist(struct llog_handle *loghandle)
{
- struct llog_operations *lop;
- int rc;
+ const struct llog_operations *lop;
+ int rc;
ENTRY;
struct llog_handle *loghandle, struct thandle *th)
{
const struct cred *old_cred;
- struct llog_operations *lop;
+ const struct llog_operations *lop;
int rc;
ENTRY;
struct thandle *th)
{
const struct cred *old_cred;
- struct llog_operations *lop;
+ const struct llog_operations *lop;
int rc;
ENTRY;
struct thandle *th)
{
const struct cred *old_cred;
- struct llog_operations *lop;
+ const struct llog_operations *lop;
int rc;
ENTRY;
int idx, struct thandle *th)
{
const struct cred *old_cred;
- struct llog_operations *lop;
+ const struct llog_operations *lop;
int rc, buflen;
ENTRY;
RETURN(-EPROTO);
} else if (th == NULL) {
CERROR("%s: missed transaction handle\n",
- handle->lgh_obj->do_lu.lo_dev->ld_obd->obd_name);
+ loghandle2name(handle));
RETURN(-EPROTO);
} else if (handle->lgh_hdr == NULL) {
CERROR("%s: loghandle %p with no header\n",
- handle->lgh_obj->do_lu.lo_dev->ld_obd->obd_name,
- handle);
+ loghandle2name(handle), handle);
RETURN(-EPROTO);
}
int llog_close(const struct lu_env *env, struct llog_handle *loghandle)
{
- struct llog_operations *lop;
- int rc;
-
- ENTRY;
-
- rc = llog_handle2ops(loghandle, &lop);
- if (rc)
- GOTO(out, rc);
- if (lop->lop_close == NULL)
- GOTO(out, rc = -EOPNOTSUPP);
- rc = lop->lop_close(env, loghandle);
-out:
- llog_handle_put(loghandle);
- RETURN(rc);
+ return llog_handle_put(env, loghandle);
}
EXPORT_SYMBOL(llog_close);
}
EXPORT_SYMBOL(llog_is_empty);
+/* this callback run in raw read mode (canceled record are processed) */
int llog_copy_handler(const struct lu_env *env, struct llog_handle *llh,
struct llog_rec_hdr *rec, void *data)
{
- struct llog_handle *copy_llh = data;
+ struct llog_handle *copy_llh = data;
+ int idx = rec->lrh_index;
+ int rc;
+
+ ENTRY;
/* Append all records */
- return llog_write(env, copy_llh, rec, LLOG_NEXT_IDX);
+ rc = llog_write(env, copy_llh, rec, LLOG_NEXT_IDX);
+
+ /* Cancel the record if it is canceled on the source */
+ if (!rc && !test_bit_le(idx, LLOG_HDR_BITMAP(llh->lgh_hdr)))
+ rc = llog_cancel_rec(env, copy_llh, copy_llh->lgh_last_idx);
+
+ RETURN(rc);
}
/* backup plain llog */
struct llog_ctxt *ctxt, struct llog_ctxt *bctxt,
char *name, char *backup)
{
- struct llog_handle *llh, *bllh;
- int rc;
+ struct llog_handle *llh, *bllh;
+ struct llog_process_cat_data cd = {0};
+ int rc;
ENTRY;
if (rc)
GOTO(out_backup, rc);
+ /* Read canceled records to have an exact copy */
+ cd.lpcd_read_mode = LLOG_READ_MODE_RAW;
/* Copy log record by record */
rc = llog_process_or_fork(env, llh, llog_copy_handler, (void *)bllh,
- NULL, false);
+ &cd, false);
if (rc)
CERROR("%s: failed to backup log %s: rc = %d\n",
obd->obd_name, name, rc);
rc = llh->lgh_obj->do_ops->do_attr_get(env, llh->lgh_obj, &la);
if (rc) {
CERROR("%s: attr_get failed for "DFID": rc = %d\n",
- llh->lgh_ctxt->loc_obd->obd_name,
- PFID(&llh->lgh_id.lgl_oi.oi_fid), rc);
+ loghandle2name(llh), PFID(&llh->lgh_id.lgl_oi.oi_fid),
+ rc);
return 0;
}