Whamcloud - gitweb
LU-12635 build: Support for gcc -Wimplicit-fallthrough
[fs/lustre-release.git] / lustre / target / update_recovery.c
index 5f121d2..b483a26 100644 (file)
@@ -20,7 +20,7 @@
  * GPL HEADER END
  */
 /*
- * Copyright (c) 2014, Intel Corporation.
+ * Copyright (c) 2015, 2017, Intel Corporation.
  */
 
 /*
 #define DEBUG_SUBSYSTEM S_CLASS
 
 #include <lu_target.h>
-#include <md_object.h>
+#include <lustre_obdo.h>
 #include <lustre_update.h>
+#include <lustre_swab.h>
+#include <md_object.h>
 #include <obd.h>
 #include <obd_class.h>
+
 #include "tgt_internal.h"
 
 /**
@@ -378,14 +381,14 @@ insert_update_records_to_replay_list(struct target_distribute_txn_data *tdtd,
        int rc = 0;
        ENTRY;
 
-       CDEBUG(D_HA, "%s: insert record batchid = "LPU64" transno = "LPU64
+       CDEBUG(D_HA, "%s: insert record batchid = %llu transno = %llu"
               " mdt_index %u\n", tdtd->tdtd_lut->lut_obd->obd_name,
               record->ur_batchid, record->ur_master_transno, mdt_index);
 
        /* Update batchid if necessary */
        spin_lock(&tdtd->tdtd_batchid_lock);
        if (record->ur_batchid >= tdtd->tdtd_batchid) {
-               CDEBUG(D_HA, "%s update batchid from "LPU64 " to "LPU64"\n",
+               CDEBUG(D_HA, "%s update batchid from %llu" " to %llu\n",
                       tdtd->tdtd_lut->lut_obd->obd_name,
                       tdtd->tdtd_batchid, record->ur_batchid);
                tdtd->tdtd_batchid = record->ur_batchid + 1;
@@ -442,7 +445,7 @@ again:
         * we do not need lock here */
        if (replace_record) {
                /* Replace the update record and master transno */
-               OBD_FREE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
+               OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
                dtrq->dtrq_lur = NULL;
                dtrq->dtrq_lur_size = llog_update_record_size(lur);
                OBD_ALLOC_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
@@ -499,6 +502,8 @@ void dtrq_destroy(struct distribute_txn_replay_req *dtrq)
        struct distribute_txn_replay_req_sub    *tmp;
 
        LASSERT(list_empty(&dtrq->dtrq_list));
+       CDEBUG(D_HA, "destroy x%llu t%llu\n", dtrq->dtrq_xid,
+              dtrq->dtrq_master_transno);
        spin_lock(&dtrq->dtrq_sub_list_lock);
        list_for_each_entry_safe(dtrqs, tmp, &dtrq->dtrq_sub_list, dtrqs_list) {
                struct sub_thandle_cookie *stc;
@@ -539,6 +544,11 @@ void dtrq_list_destroy(struct target_distribute_txn_data *tdtd)
                list_del_init(&dtrq->dtrq_list);
                dtrq_destroy(dtrq);
        }
+       list_for_each_entry_safe(dtrq, tmp, &tdtd->tdtd_replay_finish_list,
+                                dtrq_list) {
+               list_del_init(&dtrq->dtrq_list);
+               dtrq_destroy(dtrq);
+       }
        spin_unlock(&tdtd->tdtd_replay_list_lock);
 }
 EXPORT_SYMBOL(dtrq_list_destroy);
@@ -591,12 +601,47 @@ __u64 distribute_txn_get_next_transno(struct target_distribute_txn_data *tdtd)
        }
        spin_unlock(&tdtd->tdtd_replay_list_lock);
 
-       CDEBUG(D_HA, "%s: Next update transno "LPU64"\n",
+       CDEBUG(D_HA, "%s: Next update transno %llu\n",
               tdtd->tdtd_lut->lut_obd->obd_name, transno);
        return transno;
 }
 EXPORT_SYMBOL(distribute_txn_get_next_transno);
 
+struct distribute_txn_replay_req *
+distribute_txn_lookup_finish_list(struct target_distribute_txn_data *tdtd,
+                                 __u64 transno)
+{
+       struct distribute_txn_replay_req *dtrq = NULL;
+       struct distribute_txn_replay_req *iter;
+
+       spin_lock(&tdtd->tdtd_replay_list_lock);
+       list_for_each_entry(iter, &tdtd->tdtd_replay_finish_list, dtrq_list) {
+               if (iter->dtrq_master_transno == transno) {
+                       dtrq = iter;
+                       break;
+               }
+       }
+       spin_unlock(&tdtd->tdtd_replay_list_lock);
+       return dtrq;
+}
+
+bool is_req_replayed_by_update(struct ptlrpc_request *req)
+{
+       struct lu_target *tgt = class_exp2tgt(req->rq_export);
+       struct distribute_txn_replay_req *dtrq;
+
+       if (tgt->lut_tdtd == NULL)
+               return false;
+
+       dtrq = distribute_txn_lookup_finish_list(tgt->lut_tdtd,
+                                       lustre_msg_get_transno(req->rq_reqmsg));
+       if (dtrq == NULL)
+               return false;
+
+       return true;
+}
+EXPORT_SYMBOL(is_req_replayed_by_update);
+
 /**
  * Check if the update of one object is committed
  *
@@ -1009,6 +1054,74 @@ static int update_recovery_xattr_del(const struct lu_env *env,
 }
 
 /**
+ * Update session information
+ *
+ * Update session information so tgt_txn_stop_cb()->tgt_last_rcvd_update()
+ * can be called correctly during update replay.
+ *
+ * \param[in] env      execution environment.
+ * \param[in] tdtd     distribute data structure of the recovering tgt.
+ * \param[in] th       thandle of this update replay.
+ * \param[in] master_th        master sub thandle.
+ * \param[in] ta_arg   the tx arg structure to hold the update for updating
+ *                      reply data.
+ */
+static void update_recovery_update_ses(struct lu_env *env,
+                                     struct target_distribute_txn_data *tdtd,
+                                     struct thandle *th,
+                                     struct thandle *master_th,
+                                     struct distribute_txn_replay_req *dtrq,
+                                     struct tx_arg *ta_arg)
+{
+       struct tgt_session_info *tsi;
+       struct lu_target        *lut = tdtd->tdtd_lut;
+       struct obd_export       *export;
+       struct cfs_hash         *hash;
+       struct top_thandle      *top_th;
+       struct lsd_reply_data   *lrd;
+       size_t                  size;
+
+       tsi = tgt_ses_info(env);
+       if (tsi->tsi_exp != NULL)
+               return;
+
+       size = ta_arg->u.write.buf.lb_len;
+       lrd = ta_arg->u.write.buf.lb_buf;
+       if (size != sizeof(*lrd) || lrd == NULL)
+               return;
+
+       lrd->lrd_transno         = le64_to_cpu(lrd->lrd_transno);
+       lrd->lrd_xid             = le64_to_cpu(lrd->lrd_xid);
+       lrd->lrd_data            = le64_to_cpu(lrd->lrd_data);
+       lrd->lrd_result          = le32_to_cpu(lrd->lrd_result);
+       lrd->lrd_client_gen      = le32_to_cpu(lrd->lrd_client_gen);
+
+       CDEBUG(D_HA, "xid=%llu transno=%llu\n", lrd->lrd_xid, lrd->lrd_transno);
+       if (lrd->lrd_transno != tgt_th_info(env)->tti_transno)
+               return;
+
+       hash = cfs_hash_getref(lut->lut_obd->obd_gen_hash);
+       if (hash == NULL)
+               return;
+
+       export = cfs_hash_lookup(hash, &lrd->lrd_client_gen);
+       if (export == NULL) {
+               cfs_hash_putref(hash);
+               return;
+       }
+
+       tsi->tsi_exp = export;
+       tsi->tsi_xid = lrd->lrd_xid;
+       tsi->tsi_opdata = lrd->lrd_data;
+       tsi->tsi_result = lrd->lrd_result;
+       tsi->tsi_client_gen = lrd->lrd_client_gen;
+       dtrq->dtrq_xid = lrd->lrd_xid;
+       top_th = container_of(th, struct top_thandle, tt_super);
+       top_th->tt_master_sub_thandle = master_th;
+       cfs_hash_putref(hash);
+}
+
+/**
  * Execute updates in the update replay records
  *
  * Declare distribute txn replay by update records and add the updates
@@ -1058,6 +1171,17 @@ static int update_recovery_exec(const struct lu_env *env,
                dt_obj = dt_locate(env, tdtd->tdtd_dt, fid);
                if (IS_ERR(dt_obj)) {
                        rc = PTR_ERR(dt_obj);
+                       if (rc == -EREMCHG)
+                               LCONSOLE_WARN("%.16s: hit invalid OI mapping "
+                                             "for "DFID" during recovering, "
+                                             "that may because auto scrub is "
+                                             "disabled on related MDT, and "
+                                             "will cause recovery failure. "
+                                             "Please enable auto scrub and "
+                                             "retry the recovery.\n",
+                                             tdtd->tdtd_lut->lut_obd->obd_name,
+                                             PFID(fid));
+
                        break;
                }
                sub_dt_obj = dt_object_child(dt_obj);
@@ -1145,7 +1269,7 @@ static int update_recovery_exec(const struct lu_env *env,
                        break;
                }
 next:
-               lu_object_put(env, &dt_obj->do_lu);
+               dt_object_put(env, dt_obj);
                if (rc < 0)
                        break;
        }
@@ -1213,6 +1337,7 @@ int distribute_txn_replay_handle(struct lu_env *env,
        if (rc < 0)
                GOTO(stop_trans, rc);
 
+       th->th_dev = tdtd->tdtd_dt;
        ta->ta_handle = th;
 
        /* check if the distribute transaction has been committed */
@@ -1228,10 +1353,9 @@ int distribute_txn_replay_handle(struct lu_env *env,
        if (rc < 0)
                GOTO(stop_trans, rc);
 
-       /* If no updates are needed to be replayed, then
-        * mark this records as committed, so commit thread
-        * distribute_txn_commit_thread() will delete the
-        * record */
+       /* If no updates are needed to be replayed, then mark this records as
+        * committed, so commit thread distribute_txn_commit_thread() will
+        * delete the record */
        if (ta->ta_argno == 0)
                tmt->tmt_committed = 1;
 
@@ -1259,10 +1383,19 @@ int distribute_txn_replay_handle(struct lu_env *env,
                LASSERT(tmt->tmt_committed == 0);
                sub_dt = lu2dt_dev(dt_obj->do_lu.lo_dev);
                st = lookup_sub_thandle(tmt, sub_dt);
+
                LASSERT(st != NULL);
                LASSERT(st->st_sub_th != NULL);
                rc = ta->ta_args[i]->exec_fn(env, st->st_sub_th,
                                             ta->ta_args[i]);
+
+               /* If the update is to update the reply data, then
+                * we need set the session information, so
+                * tgt_last_rcvd_update() can be called correctly */
+               if (rc == 0 && dt_obj == tdtd->tdtd_lut->lut_reply_data)
+                       update_recovery_update_ses(env, tdtd, th,
+                                                  st->st_sub_th, dtrq, ta_arg);
+
                if (unlikely(rc < 0)) {
                        CDEBUG(D_HA, "error during execution of #%u from"
                               " %s:%d: rc = %d\n", i, ta->ta_args[i]->file,
@@ -1298,13 +1431,18 @@ stop_trans:
        rc = top_trans_stop(env, tdtd->tdtd_dt, th);
        for (i = 0; i < ta->ta_argno; i++) {
                if (ta->ta_args[i]->object != NULL) {
-                       lu_object_put(env, &ta->ta_args[i]->object->do_lu);
+                       dt_object_put(env, ta->ta_args[i]->object);
                        ta->ta_args[i]->object = NULL;
                }
        }
 
        if (tur != NULL)
                tur->tur_update_records = NULL;
+
+       if (tgt_ses_info(env)->tsi_exp != NULL) {
+               class_export_put(tgt_ses_info(env)->tsi_exp);
+               tgt_ses_info(env)->tsi_exp = NULL;
+       }
 exit_session:
        lu_context_exit(&session_env);
        lu_context_fini(&session_env);