struct sub_thandle *st;
LASSERT(tmt->tmt_magic == TOP_THANDLE_MAGIC);
- CDEBUG(mask, "%s tmt %p refcount %d committed %d result %d"
- "batchid "LPU64"\n",
+ CDEBUG(mask, "%s tmt %p refcount %d committed %d result %d batchid %llu\n",
tmt->tmt_master_sub_dt ?
tmt->tmt_master_sub_dt->dd_lu_dev.ld_obd->obd_name :
"NULL",
list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) {
struct sub_thandle_cookie *stc;
- CDEBUG(mask, "st %p obd %s committed %d sub_th %p\n",
+ CDEBUG(mask, "st %p obd %s committed %d stopped %d sub_th %p\n",
st, st->st_dt->dd_lu_dev.ld_obd->obd_name,
- st->st_committed, st->st_sub_th);
+ st->st_committed, st->st_stopped, st->st_sub_th);
list_for_each_entry(stc, &st->st_cookie_list, stc_list) {
CDEBUG(mask, " cookie "DOSTID": %u\n",
struct llog_update_record *record,
struct sub_thandle *sub_th)
{
- struct dt_device *dt = sub_th->st_dt;
- struct llog_ctxt *ctxt;
- int rc;
+ struct dt_device *dt = sub_th->st_dt;
+ struct llog_ctxt *ctxt;
struct llog_update_record *lur = NULL;
- struct update_params *params = NULL;
- __u32 update_count = 0;
- __u32 param_count = 0;
- __u32 last_update_count = 0;
- __u32 last_param_count = 0;
- void *src;
- void *start;
- void *next;
+ __u32 update_count = 0;
+ __u32 param_count = 0;
+ __u32 last_update_count = 0;
+ __u32 last_param_count = 0;
+ char *start;
+ char *cur;
+ char *next;
struct sub_thandle_cookie *stc;
+ size_t reclen;
+ bool eof = false;
+ int rc;
ENTRY;
ctxt = llog_get_context(dt->dd_lu_dev.ld_obd,
memcpy(lur, &record->lur_hdr, sizeof(record->lur_hdr));
lur->lur_update_rec.ur_update_count = 0;
lur->lur_update_rec.ur_param_count = 0;
- src = &record->lur_update_rec.ur_ops;
- start = next = src;
- lur->lur_hdr.lrh_len = llog_update_record_size(lur);
- params = update_records_get_params(&record->lur_update_rec);
+ start = (char *)&record->lur_update_rec.ur_ops;
+ cur = next = start;
do {
- size_t rec_len;
-
- if (update_count < record->lur_update_rec.ur_update_count) {
- next = update_op_next_op((struct update_op *)src);
- } else {
- if (param_count == 0)
- next = update_records_get_params(
- &record->lur_update_rec);
- else
- next = (char *)src +
- object_update_param_size(
- (struct object_update_param *)src);
+ if (update_count < record->lur_update_rec.ur_update_count)
+ next = (char *)update_op_next_op(
+ (struct update_op *)cur);
+ else if (param_count < record->lur_update_rec.ur_param_count)
+ next = (char *)update_param_next_param(
+ (struct update_param *)cur);
+ else
+ eof = true;
+
+ /*
+ * If its size > llog chunk_size, then write current chunk to
+ * the update llog, NB the padding should >= LLOG_MIN_REC_SIZE.
+ *
+ * So check padding length is either >= LLOG_MIN_REC_SIZE or is
+ * 0 (record length just matches the chunk size).
+ */
+ reclen = __llog_update_record_size(
+ __update_records_size(next - start));
+ if ((reclen + LLOG_MIN_REC_SIZE <= ctxt->loc_chunk_size ||
+ reclen == ctxt->loc_chunk_size) &&
+ !eof) {
+ cur = next;
+
+ if (update_count <
+ record->lur_update_rec.ur_update_count)
+ update_count++;
+ else if (param_count <
+ record->lur_update_rec.ur_param_count)
+ param_count++;
+ continue;
}
- rec_len = cfs_size_round((unsigned long)(next - src));
- /* If its size > llog chunk_size, then write current chunk to
- * the update llog. */
- if (lur->lur_hdr.lrh_len + rec_len + LLOG_MIN_REC_SIZE >
- ctxt->loc_chunk_size ||
- param_count == record->lur_update_rec.ur_param_count) {
- lur->lur_update_rec.ur_update_count =
- update_count > last_update_count ?
- update_count - last_update_count : 0;
- lur->lur_update_rec.ur_param_count = param_count -
- last_param_count;
-
- memcpy(&lur->lur_update_rec.ur_ops, start,
- (unsigned long)(src - start));
- if (last_update_count != 0)
- lur->lur_update_rec.ur_flags |=
- UPDATE_RECORD_CONTINUE;
-
- update_records_dump(&lur->lur_update_rec, D_INFO, true);
- lur->lur_hdr.lrh_len = llog_update_record_size(lur);
- LASSERT(lur->lur_hdr.lrh_len <= ctxt->loc_chunk_size);
-
- OBD_ALLOC_PTR(stc);
- if (stc == NULL)
- GOTO(llog_put, rc = -ENOMEM);
- INIT_LIST_HEAD(&stc->stc_list);
-
- rc = llog_add(env, ctxt->loc_handle,
- &lur->lur_hdr,
- &stc->stc_cookie, sub_th->st_sub_th);
-
- CDEBUG(D_INFO, "%s: Add update log "DOSTID":%u"
- " rc = %d\n", dt->dd_lu_dev.ld_obd->obd_name,
- POSTID(&stc->stc_cookie.lgc_lgl.lgl_oi),
- stc->stc_cookie.lgc_index, rc);
-
- if (rc > 0) {
- list_add(&stc->stc_list,
- &sub_th->st_cookie_list);
- rc = 0;
- } else {
- OBD_FREE_PTR(stc);
- GOTO(llog_put, rc);
- }
+ lur->lur_update_rec.ur_update_count = update_count -
+ last_update_count;
+ lur->lur_update_rec.ur_param_count = param_count -
+ last_param_count;
+ memcpy(&lur->lur_update_rec.ur_ops, start, cur - start);
+ lur->lur_hdr.lrh_len = llog_update_record_size(lur);
- last_update_count = update_count;
- last_param_count = param_count;
- start = src;
- lur->lur_update_rec.ur_update_count = 0;
- lur->lur_update_rec.ur_param_count = 0;
- lur->lur_hdr.lrh_len = llog_update_record_size(lur);
+ LASSERT(lur->lur_hdr.lrh_len ==
+ __llog_update_record_size(
+ __update_records_size(cur - start)));
+ LASSERT(lur->lur_hdr.lrh_len <= ctxt->loc_chunk_size);
+
+ update_records_dump(&lur->lur_update_rec, D_INFO, true);
+
+ OBD_ALLOC_PTR(stc);
+ if (stc == NULL)
+ GOTO(llog_put, rc = -ENOMEM);
+ INIT_LIST_HEAD(&stc->stc_list);
+
+ rc = llog_add(env, ctxt->loc_handle, &lur->lur_hdr,
+ &stc->stc_cookie, sub_th->st_sub_th);
+
+ CDEBUG(D_INFO, "%s: Add update log "DOSTID":%u rc = %d\n",
+ dt->dd_lu_dev.ld_obd->obd_name,
+ POSTID(&stc->stc_cookie.lgc_lgl.lgl_oi),
+ stc->stc_cookie.lgc_index, rc);
+
+ if (rc > 0) {
+ list_add(&stc->stc_list, &sub_th->st_cookie_list);
+ rc = 0;
+ } else {
+ OBD_FREE_PTR(stc);
+ GOTO(llog_put, rc);
}
- src = next;
- lur->lur_hdr.lrh_len += cfs_size_round(rec_len);
- if (update_count < record->lur_update_rec.ur_update_count)
- update_count++;
- else if (param_count < record->lur_update_rec.ur_param_count)
- param_count++;
- else
- break;
- } while (1);
+ last_update_count = update_count;
+ last_param_count = param_count;
+ start = cur;
+ lur->lur_update_rec.ur_update_count = 0;
+ lur->lur_update_rec.ur_param_count = 0;
+ lur->lur_update_rec.ur_flags |= UPDATE_RECORD_CONTINUE;
+ } while (!eof);
llog_put:
if (lur != NULL)
rc = dt_record_write(env, tdtd->tdtd_batchid_obj, &buf,
&off, th);
- CDEBUG(D_INFO, "%s: update batchid "LPU64": rc = %d\n",
+ CDEBUG(D_INFO, "%s: update batchid %llu: rc = %d\n",
tdtd->tdtd_lut->lut_obd->obd_name, batchid, rc);
stop:
wake_up(&thread->t_ctl_waitq);
INIT_LIST_HEAD(&list);
- CDEBUG(D_HA, "%s: start commit thread committed batchid "LPU64"\n",
+ CDEBUG(D_HA, "%s: start commit thread committed batchid %llu\n",
tdtd->tdtd_lut->lut_obd->obd_name,
tdtd->tdtd_committed_batchid);
list_move_tail(&tmt->tmt_commit_list, &list);
} else if (!tdtd->tdtd_lut->lut_obd->obd_recovering) {
LASSERTF(tmt->tmt_batchid >= batchid,
- "tmt %p tmt_batchid: "LPU64", batchid "
- LPU64"\n", tmt, tmt->tmt_batchid,
+ "tmt %p tmt_batchid: %llu, batchid "
+ "%llu\n", tmt, tmt->tmt_batchid,
batchid);
/* There are three types of distribution
* transaction result
}
spin_unlock(&tdtd->tdtd_batchid_lock);
- CDEBUG(D_HA, "%s: batchid: "LPU64" committed batchid "
- LPU64"\n", tdtd->tdtd_lut->lut_obd->obd_name, batchid,
+ CDEBUG(D_HA, "%s: batchid: %llu committed batchid "
+ "%llu\n", tdtd->tdtd_lut->lut_obd->obd_name, batchid,
tdtd->tdtd_committed_batchid);
/* update globally committed on a storage */
if (batchid > tdtd->tdtd_committed_batchid) {
init_waitqueue_head(&lut->lut_tdtd_commit_thread.t_ctl_waitq);
init_waitqueue_head(&tdtd->tdtd_commit_thread_waitq);
+ init_waitqueue_head(&tdtd->tdtd_recovery_threads_waitq);
atomic_set(&tdtd->tdtd_refcount, 0);
+ atomic_set(&tdtd->tdtd_recovery_threads_count, 0);
tdtd->tdtd_lut = lut;
rc = distribute_txn_commit_batchid_init(env, tdtd);