Whamcloud - gitweb
LU-3540 lod: update recovery thread
[fs/lustre-release.git] / lustre / lod / lod_dev.c
index 88d5800..7c7a1cb 100644 (file)
@@ -276,6 +276,66 @@ struct lod_recovery_data {
        __u32                   lrd_idx;
 };
 
+
+/**
+ * process update recovery record
+ *
+ * Add the update recovery recode to the update recovery list in
+ * lod_recovery_data. Then the recovery thread (target_recovery_thread)
+ * will redo these updates.
+ *
+ * \param[in]env       execution environment
+ * \param[in]llh       log handle of update record
+ * \param[in]rec       update record to be replayed
+ * \param[in]data      update recovery data which holds the necessary
+ *                      arguments for recovery (see struct lod_recovery_data)
+ *
+ * \retval             0 if the record is processed successfully.
+ * \retval             negative errno if the record processing fails.
+ */
+static int lod_process_recovery_updates(const struct lu_env *env,
+                                       struct llog_handle *llh,
+                                       struct llog_rec_hdr *rec,
+                                       void *data)
+{
+       struct lod_recovery_data        *lrd = data;
+       struct llog_cookie      *cookie = &lod_env_info(env)->lti_cookie;
+       struct lu_target                *lut;
+       __u32                           index = 0;
+       ENTRY;
+
+       if (lrd->lrd_ltd == NULL) {
+               int rc;
+
+               rc = lodname2mdt_index(lod2obd(lrd->lrd_lod)->obd_name, &index);
+               if (rc != 0)
+                       return rc;
+       } else {
+               index = lrd->lrd_ltd->ltd_index;
+       }
+
+       if (rec->lrh_len !=
+               llog_update_record_size((struct llog_update_record *)rec)) {
+               CERROR("%s broken update record! index %u "DOSTID":%u : rc = %d\n",
+                      lod2obd(lrd->lrd_lod)->obd_name, index,
+                      POSTID(&llh->lgh_id.lgl_oi), rec->lrh_index, -EIO);
+               return -EIO;
+       }
+
+       cookie->lgc_lgl = llh->lgh_id;
+       cookie->lgc_index = rec->lrh_index;
+       cookie->lgc_subsys = LLOG_UPDATELOG_ORIG_CTXT;
+
+       CDEBUG(D_HA, "%s: process recovery updates "DOSTID":%u\n",
+              lod2obd(lrd->lrd_lod)->obd_name,
+              POSTID(&llh->lgh_id.lgl_oi), rec->lrh_index);
+       lut = lod2lu_dev(lrd->lrd_lod)->ld_site->ls_tgt;
+
+       return insert_update_records_to_replay_list(lut->lut_tdtd,
+                                       (struct llog_update_record *)rec,
+                                       cookie, index);
+}
+
 /**
  * recovery thread for update log
  *
@@ -293,6 +353,7 @@ static int lod_sub_recovery_thread(void *arg)
        struct lod_device               *lod = lrd->lrd_lod;
        struct dt_device                *dt;
        struct ptlrpc_thread            *thread = lrd->lrd_thread;
+       struct llog_ctxt                *ctxt;
        struct lu_env                   env;
        int                             rc;
        ENTRY;
@@ -317,7 +378,53 @@ static int lod_sub_recovery_thread(void *arg)
        if (rc != 0)
                GOTO(out, rc);
 
-       /* XXX do recovery in the following patches */
+       /* Process the recovery record */
+       ctxt = llog_get_context(dt->dd_lu_dev.ld_obd, LLOG_UPDATELOG_ORIG_CTXT);
+       LASSERT(ctxt != NULL);
+       LASSERT(ctxt->loc_handle != NULL);
+
+       rc = llog_cat_process(&env, ctxt->loc_handle,
+                             lod_process_recovery_updates, lrd, 0, 0);
+       llog_ctxt_put(ctxt);
+
+       if (rc < 0) {
+               CERROR("%s getting update log failed: rc = %d\n",
+                      dt->dd_lu_dev.ld_obd->obd_name, rc);
+               GOTO(out, rc);
+       }
+
+       CDEBUG(D_HA, "%s retrieve update log: rc = %d\n",
+              dt->dd_lu_dev.ld_obd->obd_name, rc);
+
+       if (lrd->lrd_ltd == NULL)
+               lod->lod_child_got_update_log = 1;
+       else
+               lrd->lrd_ltd->ltd_got_update_log = 1;
+
+       if (lod->lod_child_got_update_log) {
+               struct lod_tgt_descs    *ltd = &lod->lod_mdt_descs;
+               struct lod_tgt_desc     *tgt = NULL;
+               bool                    all_got_log = true;
+               int                     i;
+
+               cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) {
+                       tgt = LTD_TGT(ltd, i);
+                       if (!tgt->ltd_got_update_log) {
+                               all_got_log = false;
+                               break;
+                       }
+               }
+
+               if (all_got_log) {
+                       struct lu_target *lut;
+
+                       lut = lod2lu_dev(lod)->ld_site->ls_tgt;
+                       CDEBUG(D_HA, "%s got update logs from all MDTs.\n",
+                              lut->lut_obd->obd_name);
+                       lut->lut_tdtd->tdtd_replay_ready = 1;
+                       wake_up(&lut->lut_obd->obd_next_transno_waitq);
+               }
+       }
 
 out:
        OBD_FREE_PTR(lrd);
@@ -443,18 +550,21 @@ int lod_sub_init_llog(const struct lu_env *env, struct lod_device *lod,
        struct l_wait_info              lwi = { 0 };
        struct lod_tgt_desc             *sub_ltd = NULL;
        __u32                           index;
+       __u32                           master_index;
        int                             rc;
        ENTRY;
 
+       rc = lodname2mdt_index(lod2obd(lod)->obd_name, &master_index);
+       if (rc != 0)
+               RETURN(rc);
+
        OBD_ALLOC_PTR(lrd);
        if (lrd == NULL)
                RETURN(-ENOMEM);
 
        if (lod->lod_child == dt) {
                thread = &lod->lod_child_recovery_thread;
-               rc = lodname2mdt_index(lod2obd(lod)->obd_name, &index);
-               if (rc != 0)
-                       GOTO(free_lrd, rc);
+               index = master_index;
        } else {
                struct lod_tgt_descs    *ltd = &lod->lod_mdt_descs;
                struct lod_tgt_desc     *tgt = NULL;
@@ -476,6 +586,8 @@ int lod_sub_init_llog(const struct lu_env *env, struct lod_device *lod,
                thread = sub_ltd->ltd_recovery_thread;
        }
 
+       CDEBUG(D_INFO, "%s init sub log %s\n", lod2obd(lod)->obd_name,
+              dt->dd_lu_dev.ld_obd->obd_name);
        lrd->lrd_lod = lod;
        lrd->lrd_ltd = sub_ltd;
        lrd->lrd_thread = thread;
@@ -493,8 +605,8 @@ int lod_sub_init_llog(const struct lu_env *env, struct lod_device *lod,
        }
 
        /* Start the recovery thread */
-       task = kthread_run(lod_sub_recovery_thread, lrd, "lod_recov_%04x",
-                          index);
+       task = kthread_run(lod_sub_recovery_thread, lrd, "lod%04x_rec%04x",
+                          master_index, index);
        if (IS_ERR(task)) {
                rc = PTR_ERR(task);
                CERROR("%s: cannot start recovery thread: rc = %d\n",
@@ -588,6 +700,12 @@ static int lod_prepare_distribute_txn(const struct lu_env *env,
                RETURN(rc);
        }
 
+       tdtd->tdtd_dt = &lod->lod_dt_dev;
+       INIT_LIST_HEAD(&tdtd->tdtd_replay_list);
+       spin_lock_init(&tdtd->tdtd_replay_list_lock);
+       tdtd->tdtd_replay_handler = distribute_txn_replay_handle;
+       tdtd->tdtd_replay_ready = 0;
+
        lut->lut_tdtd = tdtd;
 
        RETURN(0);