__u32 lrd_idx;
};
+
+/**
+ * process update recovery record
+ *
+ * Add the update recovery recode to the update recovery list in
+ * lod_recovery_data. Then the recovery thread (target_recovery_thread)
+ * will redo these updates.
+ *
+ * \param[in]env execution environment
+ * \param[in]llh log handle of update record
+ * \param[in]rec update record to be replayed
+ * \param[in]data update recovery data which holds the necessary
+ * arguments for recovery (see struct lod_recovery_data)
+ *
+ * \retval 0 if the record is processed successfully.
+ * \retval negative errno if the record processing fails.
+ */
+static int lod_process_recovery_updates(const struct lu_env *env,
+ struct llog_handle *llh,
+ struct llog_rec_hdr *rec,
+ void *data)
+{
+ struct lod_recovery_data *lrd = data;
+ struct llog_cookie *cookie = &lod_env_info(env)->lti_cookie;
+ struct lu_target *lut;
+ __u32 index = 0;
+ ENTRY;
+
+ if (lrd->lrd_ltd == NULL) {
+ int rc;
+
+ rc = lodname2mdt_index(lod2obd(lrd->lrd_lod)->obd_name, &index);
+ if (rc != 0)
+ return rc;
+ } else {
+ index = lrd->lrd_ltd->ltd_index;
+ }
+
+ if (rec->lrh_len !=
+ llog_update_record_size((struct llog_update_record *)rec)) {
+ CERROR("%s broken update record! index %u "DOSTID":%u : rc = %d\n",
+ lod2obd(lrd->lrd_lod)->obd_name, index,
+ POSTID(&llh->lgh_id.lgl_oi), rec->lrh_index, -EIO);
+ return -EIO;
+ }
+
+ cookie->lgc_lgl = llh->lgh_id;
+ cookie->lgc_index = rec->lrh_index;
+ cookie->lgc_subsys = LLOG_UPDATELOG_ORIG_CTXT;
+
+ CDEBUG(D_HA, "%s: process recovery updates "DOSTID":%u\n",
+ lod2obd(lrd->lrd_lod)->obd_name,
+ POSTID(&llh->lgh_id.lgl_oi), rec->lrh_index);
+ lut = lod2lu_dev(lrd->lrd_lod)->ld_site->ls_tgt;
+
+ return insert_update_records_to_replay_list(lut->lut_tdtd,
+ (struct llog_update_record *)rec,
+ cookie, index);
+}
+
/**
* recovery thread for update log
*
struct lod_device *lod = lrd->lrd_lod;
struct dt_device *dt;
struct ptlrpc_thread *thread = lrd->lrd_thread;
+ struct llog_ctxt *ctxt;
struct lu_env env;
int rc;
ENTRY;
if (rc != 0)
GOTO(out, rc);
- /* XXX do recovery in the following patches */
+ /* Process the recovery record */
+ ctxt = llog_get_context(dt->dd_lu_dev.ld_obd, LLOG_UPDATELOG_ORIG_CTXT);
+ LASSERT(ctxt != NULL);
+ LASSERT(ctxt->loc_handle != NULL);
+
+ rc = llog_cat_process(&env, ctxt->loc_handle,
+ lod_process_recovery_updates, lrd, 0, 0);
+ llog_ctxt_put(ctxt);
+
+ if (rc < 0) {
+ CERROR("%s getting update log failed: rc = %d\n",
+ dt->dd_lu_dev.ld_obd->obd_name, rc);
+ GOTO(out, rc);
+ }
+
+ CDEBUG(D_HA, "%s retrieve update log: rc = %d\n",
+ dt->dd_lu_dev.ld_obd->obd_name, rc);
+
+ if (lrd->lrd_ltd == NULL)
+ lod->lod_child_got_update_log = 1;
+ else
+ lrd->lrd_ltd->ltd_got_update_log = 1;
+
+ if (lod->lod_child_got_update_log) {
+ struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
+ struct lod_tgt_desc *tgt = NULL;
+ bool all_got_log = true;
+ int i;
+
+ cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) {
+ tgt = LTD_TGT(ltd, i);
+ if (!tgt->ltd_got_update_log) {
+ all_got_log = false;
+ break;
+ }
+ }
+
+ if (all_got_log) {
+ struct lu_target *lut;
+
+ lut = lod2lu_dev(lod)->ld_site->ls_tgt;
+ CDEBUG(D_HA, "%s got update logs from all MDTs.\n",
+ lut->lut_obd->obd_name);
+ lut->lut_tdtd->tdtd_replay_ready = 1;
+ wake_up(&lut->lut_obd->obd_next_transno_waitq);
+ }
+ }
out:
OBD_FREE_PTR(lrd);
struct l_wait_info lwi = { 0 };
struct lod_tgt_desc *sub_ltd = NULL;
__u32 index;
+ __u32 master_index;
int rc;
ENTRY;
+ rc = lodname2mdt_index(lod2obd(lod)->obd_name, &master_index);
+ if (rc != 0)
+ RETURN(rc);
+
OBD_ALLOC_PTR(lrd);
if (lrd == NULL)
RETURN(-ENOMEM);
if (lod->lod_child == dt) {
thread = &lod->lod_child_recovery_thread;
- rc = lodname2mdt_index(lod2obd(lod)->obd_name, &index);
- if (rc != 0)
- GOTO(free_lrd, rc);
+ index = master_index;
} else {
struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
struct lod_tgt_desc *tgt = NULL;
thread = sub_ltd->ltd_recovery_thread;
}
+ CDEBUG(D_INFO, "%s init sub log %s\n", lod2obd(lod)->obd_name,
+ dt->dd_lu_dev.ld_obd->obd_name);
lrd->lrd_lod = lod;
lrd->lrd_ltd = sub_ltd;
lrd->lrd_thread = thread;
}
/* Start the recovery thread */
- task = kthread_run(lod_sub_recovery_thread, lrd, "lod_recov_%04x",
- index);
+ task = kthread_run(lod_sub_recovery_thread, lrd, "lod%04x_rec%04x",
+ master_index, index);
if (IS_ERR(task)) {
rc = PTR_ERR(task);
CERROR("%s: cannot start recovery thread: rc = %d\n",
RETURN(rc);
}
+ tdtd->tdtd_dt = &lod->lod_dt_dev;
+ INIT_LIST_HEAD(&tdtd->tdtd_replay_list);
+ spin_lock_init(&tdtd->tdtd_replay_list_lock);
+ tdtd->tdtd_replay_handler = distribute_txn_replay_handle;
+ tdtd->tdtd_replay_ready = 0;
+
lut->lut_tdtd = tdtd;
RETURN(0);