}
/**
+ * Try to add cookie to sub distribute txn request
+ *
+ * Check if the update log cookie has been added to the request, if not,
+ * add it to the dtrqs_cookie_list.
+ *
+ * \param[in] dtrqs sub replay req where cookies to be added.
+ * \param[in] cookie cookie to be added.
+ *
+ * \retval 0 if the cookie is adding succeeds.
+ * \retval negative errno if adding fails.
+ */
+static int dtrq_sub_add_cookie(struct distribute_txn_replay_req_sub *dtrqs,
+ struct llog_cookie *cookie)
+{
+ struct sub_thandle_cookie *new;
+
+ OBD_ALLOC_PTR(new);
+ if (new == NULL)
+ return -ENOMEM;
+
+ INIT_LIST_HEAD(&new->stc_list);
+ new->stc_cookie = *cookie;
+ /* Note: only single thread will access one sub_request each time,
+ * so no need lock here */
+ list_add(&new->stc_list, &dtrqs->dtrqs_cookie_list);
+
+ return 0;
+}
+
+/**
* Insert distribute txn sub req replay
*
* Allocate sub replay req and insert distribute txn replay list.
struct llog_cookie *cookie,
__u32 mdt_index)
{
- struct distribute_txn_replay_req_sub *dtrqs = NULL;
- struct distribute_txn_replay_req_sub *new;
+ struct distribute_txn_replay_req_sub *dtrqs = NULL;
+ struct distribute_txn_replay_req_sub *new;
+ int rc;
ENTRY;
spin_lock(&dtrq->dtrq_sub_list_lock);
dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
spin_unlock(&dtrq->dtrq_sub_list_lock);
- if (dtrqs != NULL)
+ if (dtrqs != NULL) {
+ rc = dtrq_sub_add_cookie(dtrqs, cookie);
RETURN(0);
+ }
OBD_ALLOC_PTR(new);
if (new == NULL)
RETURN(-ENOMEM);
INIT_LIST_HEAD(&new->dtrqs_list);
+ INIT_LIST_HEAD(&new->dtrqs_cookie_list);
new->dtrqs_mdt_index = mdt_index;
- new->dtrqs_llog_cookie = *cookie;
spin_lock(&dtrq->dtrq_sub_list_lock);
dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
- if (dtrqs == NULL)
+ if (dtrqs == NULL) {
list_add(&new->dtrqs_list, &dtrq->dtrq_sub_list);
- else
+ dtrqs = new;
+ } else {
OBD_FREE_PTR(new);
+ }
spin_unlock(&dtrq->dtrq_sub_list_lock);
+ rc = dtrq_sub_add_cookie(dtrqs, cookie);
+
+ RETURN(rc);
+}
+
+/**
+ * append updates to the current replay updates
+ *
+ * Append more updates to the existent replay update. And this is only
+ * used when combining mulitple updates into one large updates during
+ * replay.
+ *
+ * \param[in] dtrq the update replay request where the new update
+ * records will be added.
+ * \param[in] lur the new update record.
+ *
+ * \retval 0 if appending succeeds.
+ * \retval negative errno if appending fails.
+ */
+static int dtrq_append_updates(struct distribute_txn_replay_req *dtrq,
+ struct update_records *record)
+{
+ struct llog_update_record *new_lur;
+ size_t lur_size = dtrq->dtrq_lur_size;
+ void *ptr;
+ ENTRY;
+
+ /* Because several threads might retrieve the same records from
+ * different targets, and we only need one copy of records. So
+ * we will check if the records is in the next one, if not, just
+ * skip it */
+ spin_lock(&dtrq->dtrq_sub_list_lock);
+ if (dtrq->dtrq_lur->lur_update_rec.ur_index + 1 != record->ur_index) {
+ spin_unlock(&dtrq->dtrq_sub_list_lock);
+ RETURN(0);
+ }
+ dtrq->dtrq_lur->lur_update_rec.ur_index++;
+ spin_unlock(&dtrq->dtrq_sub_list_lock);
+
+ lur_size += update_records_size(record);
+ OBD_ALLOC_LARGE(new_lur, lur_size);
+ if (new_lur == NULL) {
+ spin_lock(&dtrq->dtrq_sub_list_lock);
+ dtrq->dtrq_lur->lur_update_rec.ur_index--;
+ spin_unlock(&dtrq->dtrq_sub_list_lock);
+ RETURN(-ENOMEM);
+ }
+
+ /* Copy the old and new records to the new allocated buffer */
+ memcpy(new_lur, dtrq->dtrq_lur, dtrq->dtrq_lur_size);
+ ptr = (char *)&new_lur->lur_update_rec +
+ update_records_size(&new_lur->lur_update_rec);
+ memcpy(ptr, &record->ur_ops,
+ update_records_size(record) -
+ offsetof(struct update_records, ur_ops));
+
+ new_lur->lur_update_rec.ur_update_count += record->ur_update_count;
+ new_lur->lur_update_rec.ur_param_count += record->ur_param_count;
+ new_lur->lur_hdr.lrh_len = llog_update_record_size(new_lur);
+
+ /* Replace the records */
+ OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
+ dtrq->dtrq_lur = new_lur;
+ dtrq->dtrq_lur_size = lur_size;
+ dtrq->dtrq_lur->lur_update_rec.ur_flags = record->ur_flags;
+ update_records_dump(&new_lur->lur_update_rec, D_INFO, true);
RETURN(0);
}
CDEBUG(D_HA, "%s: insert record batchid = "LPU64" transno = "LPU64
" mdt_index %u\n", tdtd->tdtd_lut->lut_obd->obd_name,
record->ur_batchid, record->ur_master_transno, mdt_index);
-again:
+
+ /* First try to build the replay update request with the records */
spin_lock(&tdtd->tdtd_replay_list_lock);
dtrq = dtrq_lookup(tdtd, record->ur_batchid);
spin_unlock(&tdtd->tdtd_replay_list_lock);
spin_lock(&tdtd->tdtd_replay_list_lock);
rc = dtrq_insert(tdtd, dtrq);
spin_unlock(&tdtd->tdtd_replay_list_lock);
- } else if (record->ur_master_transno != 0 &&
- dtrq->dtrq_lur->lur_update_rec.ur_master_transno !=
- record->ur_master_transno) {
- /* If the master transno in update header is not matched with
- * the one in the record, then it means the dtrq is originally
- * created by master record, and we need update master transno
- * and reposition the dtrq(by master transno). */
- dtrq->dtrq_lur->lur_update_rec.ur_master_transno =
- record->ur_master_transno;
- list_del_init(&dtrq->dtrq_list);
- spin_lock(&tdtd->tdtd_replay_list_lock);
- rc = dtrq_insert(tdtd, dtrq);
- spin_unlock(&tdtd->tdtd_replay_list_lock);
- }
+ if (rc == -EEXIST) {
+ /* Some one else already add the record */
+ dtrq_destroy(dtrq);
+ rc = 0;
+ }
+ } else {
+ struct update_records *dtrq_rec;
+
+ /* If the master transno in update header is not
+ * matched with the one in the record, then it means
+ * the dtrq is originally created by master record,
+ * and we need update master transno and reposition
+ * the dtrq(by master transno). */
+ dtrq_rec = &dtrq->dtrq_lur->lur_update_rec;
+ if (record->ur_master_transno != 0 &&
+ dtrq_rec->ur_master_transno != record->ur_master_transno) {
+ dtrq_rec->ur_master_transno = record->ur_master_transno;
+ spin_lock(&tdtd->tdtd_replay_list_lock);
+ list_del_init(&dtrq->dtrq_list);
+ rc = dtrq_insert(tdtd, dtrq);
+ spin_unlock(&tdtd->tdtd_replay_list_lock);
+ if (rc < 0)
+ return rc;
+ }
- if (rc == -EEXIST) {
- dtrq_destroy(dtrq);
- rc = 0;
- goto again;
+ /* This is a partial update records, let's try to append
+ * the record to the current replay request */
+ if (record->ur_flags & UPDATE_RECORD_CONTINUE)
+ rc = dtrq_append_updates(dtrq, record);
}
+ /* Then create and add sub update request */
rc = dtrq_sub_create_and_insert(dtrq, cookie, mdt_index);
RETURN(rc);
LASSERT(list_empty(&dtrq->dtrq_list));
spin_lock(&dtrq->dtrq_sub_list_lock);
list_for_each_entry_safe(dtrqs, tmp, &dtrq->dtrq_sub_list, dtrqs_list) {
+ struct sub_thandle_cookie *stc;
+ struct sub_thandle_cookie *tmp;
+
list_del(&dtrqs->dtrqs_list);
+ list_for_each_entry_safe(stc, tmp, &dtrqs->dtrqs_cookie_list,
+ stc_list) {
+ list_del(&stc->stc_list);
+ OBD_FREE_PTR(stc);
+ }
OBD_FREE_PTR(dtrqs);
}
spin_unlock(&dtrq->dtrq_sub_list_lock);
dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
if (dtrqs != NULL || top_th->tt_multiple_thandle->tmt_committed) {
st->st_committed = 1;
- if (dtrqs != NULL)
- st->st_cookie = dtrqs->dtrqs_llog_cookie;
+ if (dtrqs != NULL) {
+ struct sub_thandle_cookie *stc;
+ struct sub_thandle_cookie *tmp;
+
+ list_for_each_entry_safe(stc, tmp,
+ &dtrqs->dtrqs_cookie_list,
+ stc_list)
+ list_move(&stc->stc_list, &st->st_cookie_list);
+ }
RETURN(0);
}
struct dt_device *sub_dt;
struct sub_thandle *st;
+ if (op->uop_type == OUT_NOOP)
+ continue;
+
dt_obj = dt_locate(env, tdtd->tdtd_dt, fid);
if (IS_ERR(dt_obj)) {
rc = PTR_ERR(dt_obj);