struct distribute_txn_replay_req *dtrq = NULL;
list_for_each_entry(tmp, &tdtd->tdtd_replay_list, dtrq_list) {
- if (tmp->dtrq_lur->lur_update_rec.ur_batchid == batchid) {
+ if (tmp->dtrq_batchid == batchid) {
dtrq = tmp;
break;
}
{
struct distribute_txn_replay_req *iter;
+ /* Check if the dtrq has been added to the list */
+ iter = dtrq_lookup(tdtd, new->dtrq_batchid);
+ if (iter != NULL)
+ return -EEXIST;
+
list_for_each_entry_reverse(iter, &tdtd->tdtd_replay_list, dtrq_list) {
- if (iter->dtrq_lur->lur_update_rec.ur_master_transno >
- new->dtrq_lur->lur_update_rec.ur_master_transno)
+ if (iter->dtrq_master_transno > new->dtrq_master_transno)
continue;
/* If there are mulitple replay req with same transno, then
* sort them with batchid */
- if (iter->dtrq_lur->lur_update_rec.ur_master_transno ==
- new->dtrq_lur->lur_update_rec.ur_master_transno &&
- iter->dtrq_lur->lur_update_rec.ur_batchid ==
- new->dtrq_lur->lur_update_rec.ur_batchid)
- return -EEXIST;
-
- /* If there are mulitple replay req with same transno, then
- * sort them with batchid */
- if (iter->dtrq_lur->lur_update_rec.ur_master_transno ==
- new->dtrq_lur->lur_update_rec.ur_master_transno &&
- iter->dtrq_lur->lur_update_rec.ur_batchid >
- new->dtrq_lur->lur_update_rec.ur_batchid)
+ if (iter->dtrq_master_transno == new->dtrq_master_transno &&
+ iter->dtrq_batchid > new->dtrq_batchid)
continue;
list_add(&new->dtrq_list, &iter->dtrq_list);
* \retval NULL if the creation fails.
*/
static struct distribute_txn_replay_req *
-dtrq_create(struct llog_update_record *lur)
+dtrq_create(struct target_distribute_txn_data *tdtd,
+ struct llog_update_record *lur)
{
struct distribute_txn_replay_req *new;
memcpy(new->dtrq_lur, lur, new->dtrq_lur_size);
+ /* If the transno in the update record is 0, it means the
+ * update are from master MDT, and it will use the master
+ * last committed transno as its master transno. Later, if
+ * the update records are gotten from slave MDTs, then these
+ * transno will be replaced.
+ * See insert_update_records_to_replay_list(). */
+ if (lur->lur_update_rec.ur_master_transno == 0) {
+ new->dtrq_lur->lur_update_rec.ur_master_transno =
+ tdtd->tdtd_lut->lut_obd->obd_last_committed;
+ new->dtrq_master_transno =
+ tdtd->tdtd_lut->lut_obd->obd_last_committed;
+ } else {
+ new->dtrq_master_transno =
+ lur->lur_update_rec.ur_master_transno;
+ }
+
+ new->dtrq_batchid = lur->lur_update_rec.ur_batchid;
+
spin_lock_init(&new->dtrq_sub_list_lock);
INIT_LIST_HEAD(&new->dtrq_sub_list);
INIT_LIST_HEAD(&new->dtrq_list);
__u32 mdt_index)
{
struct distribute_txn_replay_req *dtrq;
- struct update_records *record = &lur->lur_update_rec;
+ struct update_records *record = &lur->lur_update_rec;
+ bool replace_record = false;
int rc = 0;
ENTRY;
" mdt_index %u\n", tdtd->tdtd_lut->lut_obd->obd_name,
record->ur_batchid, record->ur_master_transno, mdt_index);
+ /* Update batchid if necessary */
+ spin_lock(&tdtd->tdtd_batchid_lock);
+ if (record->ur_batchid >= tdtd->tdtd_batchid) {
+ CDEBUG(D_HA, "%s update batchid from "LPU64 " to "LPU64"\n",
+ tdtd->tdtd_lut->lut_obd->obd_name,
+ tdtd->tdtd_batchid, record->ur_batchid);
+ tdtd->tdtd_batchid = record->ur_batchid + 1;
+ }
+ spin_unlock(&tdtd->tdtd_batchid_lock);
+
again:
- /* First try to build the replay update request with the records */
spin_lock(&tdtd->tdtd_replay_list_lock);
+ /* First try to build the replay update request with the records */
dtrq = dtrq_lookup(tdtd, record->ur_batchid);
- spin_unlock(&tdtd->tdtd_replay_list_lock);
if (dtrq == NULL) {
- /* If the transno in the update record is 0, it means the
- * update are from master MDT, and we will use the master
- * last committed transno as its batchid. Note: if it got
- * the records from the slave later, it needs to update
- * the batchid by the transno in slave update log (see below) */
- dtrq = dtrq_create(lur);
+ spin_unlock(&tdtd->tdtd_replay_list_lock);
+ dtrq = dtrq_create(tdtd, lur);
if (IS_ERR(dtrq))
RETURN(PTR_ERR(dtrq));
- if (record->ur_master_transno == 0)
- dtrq->dtrq_lur->lur_update_rec.ur_master_transno =
- tdtd->tdtd_lut->lut_last_transno;
spin_lock(&tdtd->tdtd_replay_list_lock);
rc = dtrq_insert(tdtd, dtrq);
- spin_unlock(&tdtd->tdtd_replay_list_lock);
- if (rc == -EEXIST) {
- /* Some one else already add the record */
+ if (rc < 0) {
+ spin_unlock(&tdtd->tdtd_replay_list_lock);
dtrq_destroy(dtrq);
- goto again;
+ if (rc == -EEXIST)
+ goto again;
+ return rc;
}
} else {
- struct update_records *dtrq_rec;
-
/* If the master transno in update header is not
- * matched with the one in the record, then it means
- * the dtrq is originally created by master record,
- * and we need update master transno and reposition
- * the dtrq(by master transno). */
- dtrq_rec = &dtrq->dtrq_lur->lur_update_rec;
+ * matched with the one in the record, then it means
+ * the dtrq is originally created by master record,
+ * so we need update master transno and reposition
+ * the dtrq(by master transno) in the list and also
+ * replace update record */
if (record->ur_master_transno != 0 &&
- dtrq_rec->ur_master_transno != record->ur_master_transno) {
- dtrq_rec->ur_master_transno = record->ur_master_transno;
- spin_lock(&tdtd->tdtd_replay_list_lock);
+ dtrq->dtrq_master_transno != record->ur_master_transno &&
+ dtrq->dtrq_lur != NULL) {
list_del_init(&dtrq->dtrq_list);
+ dtrq->dtrq_lur->lur_update_rec.ur_master_transno =
+ record->ur_master_transno;
+
+ dtrq->dtrq_master_transno = record->ur_master_transno;
+ replace_record = true;
+ /* try to insert again */
rc = dtrq_insert(tdtd, dtrq);
- spin_unlock(&tdtd->tdtd_replay_list_lock);
- if (rc < 0)
+ if (rc < 0) {
+ spin_unlock(&tdtd->tdtd_replay_list_lock);
+ dtrq_destroy(dtrq);
return rc;
+ }
}
+ }
+ spin_unlock(&tdtd->tdtd_replay_list_lock);
- /* This is a partial update records, let's try to append
- * the record to the current replay request */
- if (record->ur_flags & UPDATE_RECORD_CONTINUE)
- rc = dtrq_append_updates(dtrq, record);
+ /* Because there should be only thread access the update record, so
+ * we do not need lock here */
+ if (replace_record) {
+ /* Replace the update record and master transno */
+ OBD_FREE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
+ dtrq->dtrq_lur = NULL;
+ dtrq->dtrq_lur_size = llog_update_record_size(lur);
+ OBD_ALLOC_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
+ if (dtrq->dtrq_lur == NULL)
+ return -ENOMEM;
+
+ memcpy(dtrq->dtrq_lur, lur, dtrq->dtrq_lur_size);
}
+ /* This is a partial update records, let's try to append
+ * the record to the current replay request */
+ if (record->ur_flags & UPDATE_RECORD_CONTINUE)
+ rc = dtrq_append_updates(dtrq, record);
+
/* Then create and add sub update request */
rc = dtrq_sub_create_and_insert(dtrq, cookie, mdt_index);
if (!list_empty(&tdtd->tdtd_replay_list)) {
dtrq = list_entry(tdtd->tdtd_replay_list.next,
struct distribute_txn_replay_req, dtrq_list);
- transno = dtrq->dtrq_lur->lur_update_rec.ur_master_transno;
+ transno = dtrq->dtrq_master_transno;
}
spin_unlock(&tdtd->tdtd_replay_list_lock);
/* check if the distribute transaction has been committed */
tmt = top_th->tt_multiple_thandle;
tmt->tmt_master_sub_dt = tdtd->tdtd_lut->lut_bottom;
- tmt->tmt_batchid = records->ur_batchid;
- tgt_th_info(env)->tti_transno = records->ur_master_transno;
+ tmt->tmt_batchid = dtrq->dtrq_batchid;
+ tgt_th_info(env)->tti_transno = dtrq->dtrq_master_transno;
if (tmt->tmt_batchid <= tdtd->tdtd_committed_batchid)
tmt->tmt_committed = 1;