* GPL HEADER END
*/
/*
- * Copyright (c) 2014, Intel Corporation.
+ * Copyright (c) 2014, 2015, Intel Corporation.
*/
/*
#define DEBUG_SUBSYSTEM S_CLASS
#include <lu_target.h>
-#include <md_object.h>
+#include <lustre_obdo.h>
#include <lustre_update.h>
+#include <lustre_swab.h>
+#include <md_object.h>
#include <obd.h>
#include <obd_class.h>
+
#include "tgt_internal.h"
/**
* we do not need lock here */
if (replace_record) {
/* Replace the update record and master transno */
- OBD_FREE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
+ OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
dtrq->dtrq_lur = NULL;
dtrq->dtrq_lur_size = llog_update_record_size(lur);
OBD_ALLOC_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
list_del_init(&dtrq->dtrq_list);
dtrq_destroy(dtrq);
}
+ list_for_each_entry_safe(dtrq, tmp, &tdtd->tdtd_replay_finish_list,
+ dtrq_list) {
+ list_del_init(&dtrq->dtrq_list);
+ dtrq_destroy(dtrq);
+ }
spin_unlock(&tdtd->tdtd_replay_list_lock);
}
EXPORT_SYMBOL(dtrq_list_destroy);
}
EXPORT_SYMBOL(distribute_txn_get_next_transno);
+struct distribute_txn_replay_req *
+distribute_txn_lookup_finish_list(struct target_distribute_txn_data *tdtd,
+ __u64 xid)
+{
+ struct distribute_txn_replay_req *dtrq = NULL;
+ struct distribute_txn_replay_req *iter;
+
+ spin_lock(&tdtd->tdtd_replay_list_lock);
+ list_for_each_entry(iter, &tdtd->tdtd_replay_finish_list, dtrq_list) {
+ if (iter->dtrq_xid == xid) {
+ dtrq = iter;
+ break;
+ }
+ }
+ spin_unlock(&tdtd->tdtd_replay_list_lock);
+ return dtrq;
+}
+
+bool is_req_replayed_by_update(struct ptlrpc_request *req)
+{
+ struct lu_target *tgt = class_exp2tgt(req->rq_export);
+ struct distribute_txn_replay_req *dtrq;
+
+ if (tgt->lut_tdtd == NULL)
+ return false;
+
+ dtrq = distribute_txn_lookup_finish_list(tgt->lut_tdtd, req->rq_xid);
+ if (dtrq == NULL)
+ return false;
+
+ return true;
+}
+EXPORT_SYMBOL(is_req_replayed_by_update);
+
/**
* Check if the update of one object is committed
*
}
/**
+ * Update session information
+ *
+ * Update session information so tgt_txn_stop_cb()->tgt_last_rcvd_update()
+ * can be called correctly during update replay.
+ *
+ * \param[in] env execution environment.
+ * \param[in] tdtd distribute data structure of the recovering tgt.
+ * \param[in] th thandle of this update replay.
+ * \param[in] master_th master sub thandle.
+ * \param[in] ta_arg the tx arg structure to hold the update for updating
+ * reply data.
+ */
+static void update_recovery_update_ses(struct lu_env *env,
+ struct target_distribute_txn_data *tdtd,
+ struct thandle *th,
+ struct thandle *master_th,
+ struct distribute_txn_replay_req *dtrq,
+ struct tx_arg *ta_arg)
+{
+ struct tgt_session_info *tsi;
+ struct lu_target *lut = tdtd->tdtd_lut;
+ struct obd_export *export;
+ struct cfs_hash *hash;
+ struct top_thandle *top_th;
+ struct lsd_reply_data *lrd;
+ size_t size;
+
+ tsi = tgt_ses_info(env);
+ if (tsi->tsi_exp != NULL)
+ return;
+
+ size = ta_arg->u.write.buf.lb_len;
+ lrd = ta_arg->u.write.buf.lb_buf;
+ if (size != sizeof(*lrd) || lrd == NULL)
+ return;
+
+ lrd->lrd_transno = le64_to_cpu(lrd->lrd_transno);
+ lrd->lrd_xid = le64_to_cpu(lrd->lrd_xid);
+ lrd->lrd_data = le64_to_cpu(lrd->lrd_data);
+ lrd->lrd_result = le32_to_cpu(lrd->lrd_result);
+ lrd->lrd_client_gen = le32_to_cpu(lrd->lrd_client_gen);
+
+ if (lrd->lrd_transno != tgt_th_info(env)->tti_transno)
+ return;
+
+ hash = cfs_hash_getref(lut->lut_obd->obd_gen_hash);
+ if (hash == NULL)
+ return;
+
+ export = cfs_hash_lookup(hash, &lrd->lrd_client_gen);
+ if (export == NULL) {
+ cfs_hash_putref(hash);
+ return;
+ }
+
+ tsi->tsi_exp = export;
+ tsi->tsi_xid = lrd->lrd_xid;
+ tsi->tsi_opdata = lrd->lrd_data;
+ tsi->tsi_result = lrd->lrd_result;
+ tsi->tsi_client_gen = lrd->lrd_client_gen;
+ dtrq->dtrq_xid = lrd->lrd_xid;
+ top_th = container_of(th, struct top_thandle, tt_super);
+ top_th->tt_master_sub_thandle = master_th;
+ cfs_hash_putref(hash);
+}
+
+/**
* Execute updates in the update replay records
*
* Declare distribute txn replay by update records and add the updates
if (rc < 0)
GOTO(stop_trans, rc);
+ th->th_dev = tdtd->tdtd_dt;
ta->ta_handle = th;
/* check if the distribute transaction has been committed */
if (rc < 0)
GOTO(stop_trans, rc);
- /* If no updates are needed to be replayed, then
- * mark this records as committed, so commit thread
- * distribute_txn_commit_thread() will delete the
- * record */
+ /* If no updates are needed to be replayed, then mark this records as
+ * committed, so commit thread distribute_txn_commit_thread() will
+ * delete the record */
if (ta->ta_argno == 0)
tmt->tmt_committed = 1;
LASSERT(tmt->tmt_committed == 0);
sub_dt = lu2dt_dev(dt_obj->do_lu.lo_dev);
st = lookup_sub_thandle(tmt, sub_dt);
+
LASSERT(st != NULL);
LASSERT(st->st_sub_th != NULL);
rc = ta->ta_args[i]->exec_fn(env, st->st_sub_th,
ta->ta_args[i]);
+
+ /* If the update is to update the reply data, then
+ * we need set the session information, so
+ * tgt_last_rcvd_update() can be called correctly */
+ if (rc == 0 && dt_obj == tdtd->tdtd_lut->lut_reply_data)
+ update_recovery_update_ses(env, tdtd, th,
+ st->st_sub_th, dtrq, ta_arg);
+
if (unlikely(rc < 0)) {
CDEBUG(D_HA, "error during execution of #%u from"
" %s:%d: rc = %d\n", i, ta->ta_args[i]->file,
if (tur != NULL)
tur->tur_update_records = NULL;
+
+ if (tgt_ses_info(env)->tsi_exp != NULL) {
+ class_export_put(tgt_ses_info(env)->tsi_exp);
+ tgt_ses_info(env)->tsi_exp = NULL;
+ }
exit_session:
lu_context_exit(&session_env);
lu_context_fini(&session_env);