* GPL HEADER END
*/
/*
- * Copyright (c) 2014, Intel Corporation.
+ * Copyright (c) 2015, 2017, Intel Corporation.
*/
/*
#define DEBUG_SUBSYSTEM S_CLASS
#include <lu_target.h>
-#include <md_object.h>
+#include <lustre_obdo.h>
#include <lustre_update.h>
+#include <lustre_swab.h>
+#include <md_object.h>
#include <obd.h>
#include <obd_class.h>
+
#include "tgt_internal.h"
/**
int rc = 0;
ENTRY;
- CDEBUG(D_HA, "%s: insert record batchid = "LPU64" transno = "LPU64
+ CDEBUG(D_HA, "%s: insert record batchid = %llu transno = %llu"
" mdt_index %u\n", tdtd->tdtd_lut->lut_obd->obd_name,
record->ur_batchid, record->ur_master_transno, mdt_index);
/* Update batchid if necessary */
spin_lock(&tdtd->tdtd_batchid_lock);
if (record->ur_batchid >= tdtd->tdtd_batchid) {
- CDEBUG(D_HA, "%s update batchid from "LPU64 " to "LPU64"\n",
+ CDEBUG(D_HA, "%s update batchid from %llu" " to %llu\n",
tdtd->tdtd_lut->lut_obd->obd_name,
tdtd->tdtd_batchid, record->ur_batchid);
tdtd->tdtd_batchid = record->ur_batchid + 1;
* we do not need lock here */
if (replace_record) {
/* Replace the update record and master transno */
- OBD_FREE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
+ OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
dtrq->dtrq_lur = NULL;
dtrq->dtrq_lur_size = llog_update_record_size(lur);
OBD_ALLOC_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
struct distribute_txn_replay_req_sub *tmp;
LASSERT(list_empty(&dtrq->dtrq_list));
+ CDEBUG(D_HA, "destroy x%llu t%llu\n", dtrq->dtrq_xid,
+ dtrq->dtrq_master_transno);
spin_lock(&dtrq->dtrq_sub_list_lock);
list_for_each_entry_safe(dtrqs, tmp, &dtrq->dtrq_sub_list, dtrqs_list) {
struct sub_thandle_cookie *stc;
list_del_init(&dtrq->dtrq_list);
dtrq_destroy(dtrq);
}
+ list_for_each_entry_safe(dtrq, tmp, &tdtd->tdtd_replay_finish_list,
+ dtrq_list) {
+ list_del_init(&dtrq->dtrq_list);
+ dtrq_destroy(dtrq);
+ }
spin_unlock(&tdtd->tdtd_replay_list_lock);
}
EXPORT_SYMBOL(dtrq_list_destroy);
}
spin_unlock(&tdtd->tdtd_replay_list_lock);
- CDEBUG(D_HA, "%s: Next update transno "LPU64"\n",
+ CDEBUG(D_HA, "%s: Next update transno %llu\n",
tdtd->tdtd_lut->lut_obd->obd_name, transno);
return transno;
}
EXPORT_SYMBOL(distribute_txn_get_next_transno);
+struct distribute_txn_replay_req *
+distribute_txn_lookup_finish_list(struct target_distribute_txn_data *tdtd,
+ __u64 transno)
+{
+ struct distribute_txn_replay_req *dtrq = NULL;
+ struct distribute_txn_replay_req *iter;
+
+ spin_lock(&tdtd->tdtd_replay_list_lock);
+ list_for_each_entry(iter, &tdtd->tdtd_replay_finish_list, dtrq_list) {
+ if (iter->dtrq_master_transno == transno) {
+ dtrq = iter;
+ break;
+ }
+ }
+ spin_unlock(&tdtd->tdtd_replay_list_lock);
+ return dtrq;
+}
+
+bool is_req_replayed_by_update(struct ptlrpc_request *req)
+{
+ struct lu_target *tgt = class_exp2tgt(req->rq_export);
+ struct distribute_txn_replay_req *dtrq;
+
+ if (tgt->lut_tdtd == NULL)
+ return false;
+
+ dtrq = distribute_txn_lookup_finish_list(tgt->lut_tdtd,
+ lustre_msg_get_transno(req->rq_reqmsg));
+ if (dtrq == NULL)
+ return false;
+
+ return true;
+}
+EXPORT_SYMBOL(is_req_replayed_by_update);
+
/**
* Check if the update of one object is committed
*
struct target_distribute_txn_data *tdtd,
struct thandle *th,
struct thandle *master_th,
+ struct distribute_txn_replay_req *dtrq,
struct tx_arg *ta_arg)
{
struct tgt_session_info *tsi;
lrd->lrd_result = le32_to_cpu(lrd->lrd_result);
lrd->lrd_client_gen = le32_to_cpu(lrd->lrd_client_gen);
+ CDEBUG(D_HA, "xid=%llu transno=%llu\n", lrd->lrd_xid, lrd->lrd_transno);
if (lrd->lrd_transno != tgt_th_info(env)->tti_transno)
return;
tsi->tsi_opdata = lrd->lrd_data;
tsi->tsi_result = lrd->lrd_result;
tsi->tsi_client_gen = lrd->lrd_client_gen;
+ dtrq->dtrq_xid = lrd->lrd_xid;
top_th = container_of(th, struct top_thandle, tt_super);
top_th->tt_master_sub_thandle = master_th;
cfs_hash_putref(hash);
dt_obj = dt_locate(env, tdtd->tdtd_dt, fid);
if (IS_ERR(dt_obj)) {
rc = PTR_ERR(dt_obj);
+ if (rc == -EREMCHG)
+ LCONSOLE_WARN("%.16s: hit invalid OI mapping "
+ "for "DFID" during recovering, "
+ "that may because auto scrub is "
+ "disabled on related MDT, and "
+ "will cause recovery failure. "
+ "Please enable auto scrub and "
+ "retry the recovery.\n",
+ tdtd->tdtd_lut->lut_obd->obd_name,
+ PFID(fid));
+
break;
}
sub_dt_obj = dt_object_child(dt_obj);
break;
}
next:
- lu_object_put(env, &dt_obj->do_lu);
+ dt_object_put(env, dt_obj);
if (rc < 0)
break;
}
* tgt_last_rcvd_update() can be called correctly */
if (rc == 0 && dt_obj == tdtd->tdtd_lut->lut_reply_data)
update_recovery_update_ses(env, tdtd, th,
- st->st_sub_th, ta_arg);
+ st->st_sub_th, dtrq, ta_arg);
if (unlikely(rc < 0)) {
CDEBUG(D_HA, "error during execution of #%u from"
rc = top_trans_stop(env, tdtd->tdtd_dt, th);
for (i = 0; i < ta->ta_argno; i++) {
if (ta->ta_args[i]->object != NULL) {
- lu_object_put(env, &ta->ta_args[i]->object->do_lu);
+ dt_object_put(env, ta->ta_args[i]->object);
ta->ta_args[i]->object = NULL;
}
}