/* In-memory descriptor for a log object or log catalog */
struct llog_handle {
struct rw_semaphore lgh_lock;
- struct mutex lgh_hdr_mutex; /* protect lgh_hdr data */
+ spinlock_t lgh_hdr_lock; /* protect lgh_hdr data */
struct llog_logid lgh_id; /* id of this log */
struct llog_log_hdr *lgh_hdr; /* may be vmalloc'd */
size_t lgh_hdr_size;
(rec->lrh_len - sizeof(struct llog_rec_hdr) - \
sizeof(struct llog_rec_tail))
+#define REC_TAIL(rec) \
+ ((struct llog_rec_tail *)((char *)rec + rec->lrh_len - \
+ sizeof(struct llog_rec_tail)))
+
struct llog_logid_rec {
struct llog_rec_hdr lid_hdr;
struct llog_logid lid_id;
LLOG_F_RM_ON_ERR = 0x400,
LLOG_F_MAX_AGE = 0x800,
LLOG_F_EXT_X_NID_BE = 0x1000,
+ LLOG_F_UNLCK_SEM = 0x2000,
/* Note: Flags covered by LLOG_F_EXT_MASK will be inherited from
* catlog to plain log, so do not add LLOG_F_IS_FIXSIZE here,
LLOG_F_EXT_MASK = LLOG_F_EXT_JOBID | LLOG_F_EXT_EXTRA_FLAGS |
LLOG_F_EXT_X_UIDGID | LLOG_F_EXT_X_NID |
LLOG_F_EXT_X_OMODE | LLOG_F_EXT_X_XATTR |
- LLOG_F_EXT_X_NID_BE,
+ LLOG_F_EXT_X_NID_BE | LLOG_F_UNLCK_SEM,
};
/* On-disk header structure of each log object, stored in little endian order */
if (rc)
GOTO(out_cleanup, rc);
- rc = llog_init_handle(env, ctxt->loc_handle, LLOG_F_IS_CAT, NULL);
+ rc = llog_init_handle(env, ctxt->loc_handle, LLOG_F_IS_CAT |
+ LLOG_F_UNLCK_SEM, NULL);
if (rc)
GOTO(out_close, rc);
return rc;
}
+/* The locking here is a bit tricky. For a CHANGELOG_REC the function
+ * drops loghandle->lgh_lock for a performance reasons. All dt_write()
+ * are used own offset, so it is safe.
+ * For other records general function is called and it doesnot drop
+ * a semaphore. The callers are changelog catalog records and initialisation
+ * records. llog_cat_new_log->llog_write_rec->mdd_changelog_write_rec()
+ *
+ * Since dt_record_write() could be reordered, rec1|rec2|0x0|rec4 could be
+ * at memory, reader should care about it. When the th is commited it is
+ * impossible to have a hole, since reordered records have the same th.
+ */
int mdd_changelog_write_rec(const struct lu_env *env,
struct llog_handle *loghandle,
struct llog_rec_hdr *r,
int idx, struct thandle *th)
{
int rc;
+ static struct thandle *saved_th;
+
+ CDEBUG(D_TRACE, "Adding rec %u type %u to "DFID" flags %x count %d\n",
+ idx, r->lrh_type, PLOGID(&loghandle->lgh_id),
+ loghandle->lgh_hdr->llh_flags, loghandle->lgh_hdr->llh_count);
if (r->lrh_type == CHANGELOG_REC) {
struct mdd_device *mdd;
struct llog_changelog_rec *rec;
+ size_t left;
+ __u32 chunk_size = loghandle->lgh_hdr->llh_hdr.lrh_len;
+ struct dt_object *o = loghandle->lgh_obj;
+ loff_t offset;
+ struct lu_buf lgi_buf;
+
+ left = chunk_size - (loghandle->lgh_cur_offset &
+ (chunk_size - 1));
mdd = lu2mdd_dev(loghandle->lgh_ctxt->loc_obd->obd_lu_dev);
rec = container_of(r, struct llog_changelog_rec, cr_hdr);
+ /* Don't use padding records because it require a slot at header
+ * so previous result of checking llog_is_full(loghandle)
+ * would be invalid, leave zeroes at the end of block.
+ * A reader would care about it.
+ */
+ if (left != 0 && left < r->lrh_len)
+ loghandle->lgh_cur_offset += left;
+
+ offset = loghandle->lgh_cur_offset;
+ loghandle->lgh_cur_offset += r->lrh_len;
+ r->lrh_index = ++loghandle->lgh_last_idx;
+
spin_lock(&mdd->mdd_cl.mc_lock);
- rec->cr.cr_index = mdd->mdd_cl.mc_index + 1;
+ rec->cr.cr_index = ++mdd->mdd_cl.mc_index;
spin_unlock(&mdd->mdd_cl.mc_lock);
- rc = llog_osd_ops.lop_write_rec(env, loghandle, r,
- cookie, idx, th);
+ /* drop the loghandle semaphore for parallel writes */
+ up_write(&loghandle->lgh_lock);
- /*
- * if current llog is full, we will generate a new
- * llog, and since it's actually not an error, let's
- * avoid increasing index so that userspace apps
- * should not see a gap in the changelog sequence
+ REC_TAIL(r)->lrt_len = r->lrh_len;
+ REC_TAIL(r)->lrt_index = r->lrh_index;
+
+ lgi_buf.lb_len = rec->cr_hdr.lrh_len;
+ lgi_buf.lb_buf = rec;
+
+ rc = dt_record_write(env, o, &lgi_buf, &offset, th);
+
+ if (rc) {
+ CERROR("%s: failed to write changelog record file "DFID" rec idx %u off %llu chnlg idx %llu: rc = %d\n",
+ loghandle->lgh_ctxt->loc_obd->obd_name,
+ PFID(lu_object_fid(&o->do_lu)), r->lrh_index,
+ offset, rec->cr.cr_index, rc);
+ return rc;
+ }
+
+ /* mark index at bitmap after successful write, increment count,
+ * and lrt_index with a last index. Use a lgh_hdr_lock for
+ * a synchronization with llog_cancel.
*/
- if (!(rc == -ENOSPC && llog_is_full(loghandle))) {
- spin_lock(&mdd->mdd_cl.mc_lock);
- ++mdd->mdd_cl.mc_index;
- spin_unlock(&mdd->mdd_cl.mc_lock);
+ spin_lock(&loghandle->lgh_hdr_lock);
+ rc = __test_and_set_bit_le(r->lrh_index,
+ LLOG_HDR_BITMAP(loghandle->lgh_hdr));
+ LASSERTF(!rc,
+ "%s: index %u already set in llog bitmap "DFID"\n",
+ loghandle->lgh_ctxt->loc_obd->obd_name,
+ r->lrh_index, PLOGID(&loghandle->lgh_id));
+ loghandle->lgh_hdr->llh_count++;
+ if (LLOG_HDR_TAIL(loghandle->lgh_hdr)->lrt_index < r->lrh_index)
+ LLOG_HDR_TAIL(loghandle->lgh_hdr)->lrt_index =
+ r->lrh_index;
+ spin_unlock(&loghandle->lgh_hdr_lock);
+
+ if (unlikely(th != saved_th)) {
+ CDEBUG(D_OTHER, "%s: wrote rec %u "DFID" count %d\n",
+ loghandle->lgh_ctxt->loc_obd->obd_name,
+ r->lrh_index, PLOGID(&loghandle->lgh_id),
+ loghandle->lgh_hdr->llh_count);
+ saved_th = th;
}
+ lgi_buf.lb_len = loghandle->lgh_hdr_size;
+ lgi_buf.lb_buf = loghandle->lgh_hdr;
+ offset = 0;
+ CDEBUG(D_TRACE, "%s: writing header "DFID"\n",
+ loghandle->lgh_ctxt->loc_obd->obd_name,
+ PLOGID(&loghandle->lgh_id));
+ /* full header write, it is a local. For a mapped bh
+ * it is memcpy() only. Probably it could be delayed as work.
+ */
+ rc = dt_record_write(env, o, &lgi_buf, &offset, th);
} else {
rc = llog_osd_ops.lop_write_rec(env, loghandle, r,
cookie, idx, th);
}
+ if (rc < 0)
+ CERROR("%s: failed to write changelog record file "DFID" count %d offset %llu: rc = %d\n",
+ loghandle->lgh_ctxt->loc_obd->obd_name,
+ PLOGID(&loghandle->lgh_id),
+ loghandle->lgh_hdr->llh_count, loghandle->lgh_cur_offset,
+ rc);
return rc;
}
return NULL;
init_rwsem(&loghandle->lgh_lock);
- mutex_init(&loghandle->lgh_hdr_mutex);
+ spin_lock_init(&loghandle->lgh_hdr_lock);
init_rwsem(&loghandle->lgh_last_sem);
INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
refcount_set(&loghandle->lgh_refcount, 1);
down_write(&loghandle->lgh_lock);
/* clear bitmap */
- mutex_lock(&loghandle->lgh_hdr_mutex);
+ spin_lock(&loghandle->lgh_hdr_lock);
for (i = 0; i < num; ++i) {
if (index[i] == 0) {
+ spin_unlock(&loghandle->lgh_hdr_lock);
CERROR("Can't cancel index 0 which is header\n");
GOTO(out_unlock, rc = -EINVAL);
}
if (!__test_and_clear_bit_le(index[i], LLOG_HDR_BITMAP(llh))) {
+ spin_unlock(&loghandle->lgh_hdr_lock);
CDEBUG(D_OTHER, "Catalog index %u already clear?\n",
index[i]);
GOTO(out_unlock, rc = -ENOENT);
}
loghandle->lgh_hdr->llh_count -= num;
subtract_count = true;
+ spin_unlock(&loghandle->lgh_hdr_lock);
/* Since llog_process_thread use lgi_cookie, it`s better to save them
* and restore after using
out_unlock:
if (rc < 0) {
/* restore bitmap while holding a mutex */
+ spin_lock(&loghandle->lgh_hdr_lock);
if (subtract_count) {
loghandle->lgh_hdr->llh_count += num;
subtract_count = false;
}
for (i = i - 1; i >= 0; i--)
set_bit_le(index[i], LLOG_HDR_BITMAP(llh));
+ spin_unlock(&loghandle->lgh_hdr_lock);
}
- mutex_unlock(&loghandle->lgh_hdr_mutex);
up_write(&loghandle->lgh_lock);
out_trans:
rc1 = dt_trans_stop(env, dt, th);
if (rc == 0)
rc = rc1;
if (rc1 < 0) {
- mutex_lock(&loghandle->lgh_hdr_mutex);
+ spin_lock(&loghandle->lgh_hdr_lock);
if (subtract_count)
loghandle->lgh_hdr->llh_count += num;
for (i = i - 1; i >= 0; i--)
set_bit_le(index[i], LLOG_HDR_BITMAP(llh));
- mutex_unlock(&loghandle->lgh_hdr_mutex);
+ spin_unlock(&loghandle->lgh_hdr_lock);
}
RETURN(rc);
}
set_bit_le(0, LLOG_HDR_BITMAP(llh));
LLOG_HDR_TAIL(llh)->lrt_len = llh->llh_hdr.lrh_len;
LLOG_HDR_TAIL(llh)->lrt_index = llh->llh_hdr.lrh_index;
+ handle->lgh_cur_offset = llh->llh_hdr.lrh_len;
rc = 0;
}
RETURN(rc);
EXPORT_SYMBOL(llog_init_handle);
#define LLOG_ERROR_REC(lgh, rec, format, a...) \
- CERROR("%s: "DFID" rec type=%x idx=%u len=%u, " format "\n" , \
+ CDEBUG(D_OTHER, "%s: "DFID" rec type=%x idx=%u len=%u, " format "\n", \
loghandle2name(lgh), PLOGID(&lgh->lgh_id), (rec)->lrh_type, \
(rec)->lrh_index, (rec)->lrh_len, ##a)
if ((rec->lrh_type & LLOG_OP_MASK) != LLOG_OP_MAGIC)
LLOG_ERROR_REC(llh, rec, "magic is bad");
- else if (rec->lrh_len == 0 || rec->lrh_len > chunk_size)
+ else if (rec->lrh_len == 0 || rec->lrh_len > chunk_size ||
+ rec->lrh_len < LLOG_MIN_REC_SIZE)
LLOG_ERROR_REC(llh, rec, "bad record len, chunk size is %d",
chunk_size);
else if (rec->lrh_index > llog_max_idx(llh->lgh_hdr))
while (rc == 0) {
struct llog_rec_hdr *rec;
off_t chunk_offset = 0;
+ off_t last_chunk_offset = 0;
unsigned int buf_offset = 0;
int lh_last_idx;
int synced_idx = 0;
* The absolute offset of the current chunk is calculated
* from cur_offset value and stored in chunk_offset variable.
*/
+ last_chunk_offset = chunk_offset;
if ((cur_offset & (chunk_size - 1)) != 0)
chunk_offset = cur_offset & ~(chunk_size - 1);
else
chunk_offset = cur_offset - chunk_size;
+ /* When reread a chunk with zeores at the end, it could
+ * happened that index was found at next chunk. Start
+ * processing from a beginning.
+ */
+ if (last_chunk_offset != chunk_offset)
+ buf_offset = 0;
+
/* NB: when rec->lrh_len is accessed it is already swabbed
* since it is used at the "end" of the loop and the rec
* swabbing is done at the beginning of the loop. */
for (rec = (struct llog_rec_hdr *)(buf + buf_offset);
- (char *)rec < buf + chunk_size;
+ (char *)rec <= buf + chunk_size - LLOG_MIN_REC_SIZE;
rec = llog_rec_hdr_next(rec)) {
- CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n",
- rec, rec->lrh_type);
-
if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
lustre_swab_llog_rec(rec);
- CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
- rec->lrh_type, rec->lrh_index);
+ CDEBUG(D_OTHER, "processing rec 0x%px type=%#x idx=%d\n",
+ rec, rec->lrh_type, rec->lrh_index);
/* start with first rec if block was skipped */
if (!index) {
rec->lrh_index, rec->lrh_len,
(int)(buf + chunk_size - (char *)rec));
- /* lgh_cur_offset is used only at llog_test_3 */
- loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
- chunk_offset;
+ /* lgh_cur_offset is used only at llog_test_3 and
+ * changelog
+ */
+ if (unlikely(loghandle->lgh_ctxt->loc_idx ==
+ LLOG_TEST_ORIG_CTXT))
+ loghandle->lgh_cur_offset = (char *)rec -
+ (char *)buf + chunk_offset;
/* if needed, process the callback on this record */
if (!llog_is_index_skipable(index, llh, cd)) {
up_write(&cathandle->lgh_lock);
llog_close(env, loghandle);
}
+ CERROR("%s: initialization error: rc = %d\n",
+ loghandle2name(cathandle), rc);
RETURN(rc);
}
}
dt_attr_set(env, loghandle->lgh_obj, &lgi->lgi_attr, th);
}
}
-
- up_write(&loghandle->lgh_lock);
+ /* llog_write_rec could unlock a semaphore */
+ if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_UNLCK_SEM))
+ up_write(&loghandle->lgh_lock);
if (rc == -ENOBUFS) {
if (retried++ == 0)
lrt->lrt_len = rec->lrh_len;
lrt->lrt_index = rec->lrh_index;
- /* the lgh_hdr_mutex protects llog header data from concurrent
+ /* the lgh_hdr_lock protects llog header data from concurrent
* update/cancel, the llh_count and llh_bitmap are protected */
- mutex_lock(&loghandle->lgh_hdr_mutex);
- if (__test_and_set_bit_le(index, LLOG_HDR_BITMAP(llh))) {
- CERROR("%s: index %u already set in llog bitmap "DFID"\n",
- o->do_lu.lo_dev->ld_obd->obd_name, index,
- PFID(lu_object_fid(&o->do_lu)));
- mutex_unlock(&loghandle->lgh_hdr_mutex);
- LBUG(); /* should never happen */
- }
+ spin_lock(&loghandle->lgh_hdr_lock);
+ rc = __test_and_set_bit_le(index, LLOG_HDR_BITMAP(llh));
+ LASSERTF(!rc,
+ "%s: index %u already set in llog bitmap "DFID"\n",
+ o->do_lu.lo_dev->ld_obd->obd_name, index,
+ PFID(lu_object_fid(&o->do_lu)));
llh->llh_count++;
if (!(llh->llh_flags & LLOG_F_IS_FIXSIZE)) {
else if (reclen < llh->llh_size)
llh->llh_size = reclen;
}
+ spin_unlock(&loghandle->lgh_hdr_lock);
/*
* readers (e.g. llog_osd_read_header()) must not find
out_unlock:
/* unlock here for remote object */
- mutex_unlock(&loghandle->lgh_hdr_mutex);
if (rc) {
dt_write_unlock(env, o);
GOTO(out, rc);
RETURN(rc);
out:
/* cleanup llog for error case */
- mutex_lock(&loghandle->lgh_hdr_mutex);
+ spin_lock(&loghandle->lgh_hdr_lock);
clear_bit_le(index, LLOG_HDR_BITMAP(llh));
llh->llh_count--;
- mutex_unlock(&loghandle->lgh_hdr_mutex);
+ spin_unlock(&loghandle->lgh_hdr_lock);
/* restore llog last_idx */
if (dt_object_remote(o)) {
struct changelog_rec *rec = (struct changelog_rec *)(hdr + 1);
enum changelog_rec_extra_flags xflag = CLFE_INVALID;
- if (flags & CLF_EXTRA_FLAGS &&
- rec->cr_flags & CLF_EXTRA_FLAGS) {
- xflag = changelog_rec_extra_flags(rec)->cr_extra_flags &
- extra_flags;
- }
-
if (unlikely(hdr->lrh_len == 0)) {
/* It is corruption case, we cannot know the next rec,
* jump to the last one directly to avoid dead loop. */
break;
}
+
+ if (flags & CLF_EXTRA_FLAGS &&
+ rec->cr_flags & CLF_EXTRA_FLAGS) {
+ xflag = changelog_rec_extra_flags(rec)->cr_extra_flags &
+ extra_flags;
+ }
+
/* Fill up the changelog record with everything the kernel
* version supports.
*/
rec = buf;
if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
lustre_swab_llog_rec(rec);
+
+ /* caller handles bad records if any */
+ if (llog_verify_record(loghandle, rec))
+ GOTO(out, rc = 0);
+
tail = (struct llog_rec_tail *)((char *)buf + rc -
sizeof(struct llog_rec_tail));
+
+ while ((tail->lrt_index == 0 || tail->lrt_len == 0) &&
+ (void *) tail > buf) {
+ /* looks like zeroes at the end of block */
+ /* searching real record, assume 4bytes align */
+ tail = (struct llog_rec_tail *)(((char *)tail) - 4);
+ };
+
tail_len = tail->lrt_len;
/* base on tail_len do swab */
if (tail_len > chunk_size) {
last_rec = (struct llog_rec_hdr *)((char *)tail - tail_len +
sizeof(struct llog_rec_tail));
- /* caller handles bad records if any */
- if (llog_verify_record(loghandle, rec))
- GOTO(out, rc = 0);
-
if (LLOG_REC_HDR_NEEDS_SWABBING(last_rec))
lustre_swab_llog_rec(last_rec);
if (obj->oo_destroyed)
GOTO(out, rc = -ENOENT);
- if (fid_is_llog(lu_object_fid(&dt->do_lu))) {
+ /* XXX: disable the optimization as it's not compatible
+ * with indexed llog and multiple writes a block in few
+ * threads */
+ if (fid_is_llog(lu_object_fid(&dt->do_lu)) && 0) {
osd_write_llog_header(obj, buf, pos, oh);
} else {
osd_dmu_write(osd, obj->oo_dn, offset, (uint64_t)buf->lb_len,
hdr = req_capsule_server_get(&req->rq_pill, &RMF_LLOG_LOG_HDR);
*hdr = *loghandle->lgh_hdr;
+ CDEBUG(D_OTHER, "%s: red llog header "DFID" count %d lgh_last_idx %d llh_cat_idx %d\n",
+ ctxt->loc_obd->obd_name, PLOGID(&loghandle->lgh_id),
+ hdr->llh_count, LLOG_HDR_TAIL(hdr)->lrt_index, hdr->llh_cat_idx);
EXIT;
out_close:
llog_origin_close(req->rq_svc_thread->t_env, loghandle);
stack_trap "echo $rl > /sys/module/libcfs/parameters/libcfs_console_ratelimit" EXIT
test_mkdir -c 1 -i 0 $DIR/$tdir || error "Failed to create directory"
+ do_nodes $(comma_list $(osts_nodes)) $LCTL set_param \
+ seq.*OST*-super.width=$DATA_SEQ_MAX_WIDTH
+
changelog_chmask "ALL" || error "changelog_chmask failed"
changelog_register || error "changelog_register failed"
# Check changelog entries
lastread=$(__test_135_reader $fd $cl_user) || exit $?
+
! kill -0 $files_pid 2>/dev/null ||
error "creation thread is running. Is changelog reader stuck?"
if (( count > 100 )); then
debugsave
- do_nodes $(comma_list $(all_nodes)) $LCTL set_param -n debug=0
+ do_nodes $(comma_list $(all_nodes)) $LCTL set_param -n debug=ha
fi
$LUSTRE/tests/createmany $*
rc=$?