Whamcloud - gitweb
LU-6602 update: split update llog record
[fs/lustre-release.git] / lustre / target / update_recovery.c
index 2fd87d7..cad0183 100644 (file)
@@ -193,6 +193,36 @@ dtrq_sub_lookup(struct distribute_txn_replay_req *dtrq, __u32 mdt_index)
 }
 
 /**
+ * Try to add cookie to sub distribute txn request
+ *
+ * Check if the update log cookie has been added to the request, if not,
+ * add it to the dtrqs_cookie_list.
+ *
+ * \param[in] dtrqs    sub replay req where cookies to be added.
+ * \param[in] cookie   cookie to be added.
+ *
+ * \retval             0 if the cookie is adding succeeds.
+ * \retval             negative errno if adding fails.
+ */
+static int dtrq_sub_add_cookie(struct distribute_txn_replay_req_sub *dtrqs,
+                              struct llog_cookie *cookie)
+{
+       struct sub_thandle_cookie *new;
+
+       OBD_ALLOC_PTR(new);
+       if (new == NULL)
+               return -ENOMEM;
+
+       INIT_LIST_HEAD(&new->stc_list);
+       new->stc_cookie = *cookie;
+       /* Note: only single thread will access one sub_request each time,
+        * so no need lock here */
+       list_add(&new->stc_list, &dtrqs->dtrqs_cookie_list);
+
+       return 0;
+}
+
+/**
  * Insert distribute txn sub req replay
  *
  * Allocate sub replay req and insert distribute txn replay list.
@@ -209,31 +239,102 @@ dtrq_sub_create_and_insert(struct distribute_txn_replay_req *dtrq,
                           struct llog_cookie *cookie,
                           __u32 mdt_index)
 {
-       struct distribute_txn_replay_req_sub *dtrqs = NULL;
-       struct distribute_txn_replay_req_sub *new;
+       struct distribute_txn_replay_req_sub    *dtrqs = NULL;
+       struct distribute_txn_replay_req_sub    *new;
+       int                                     rc;
        ENTRY;
 
        spin_lock(&dtrq->dtrq_sub_list_lock);
        dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
        spin_unlock(&dtrq->dtrq_sub_list_lock);
-       if (dtrqs != NULL)
+       if (dtrqs != NULL) {
+               rc = dtrq_sub_add_cookie(dtrqs, cookie);
                RETURN(0);
+       }
 
        OBD_ALLOC_PTR(new);
        if (new == NULL)
                RETURN(-ENOMEM);
 
        INIT_LIST_HEAD(&new->dtrqs_list);
+       INIT_LIST_HEAD(&new->dtrqs_cookie_list);
        new->dtrqs_mdt_index = mdt_index;
-       new->dtrqs_llog_cookie = *cookie;
        spin_lock(&dtrq->dtrq_sub_list_lock);
        dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
-       if (dtrqs == NULL)
+       if (dtrqs == NULL) {
                list_add(&new->dtrqs_list, &dtrq->dtrq_sub_list);
-       else
+               dtrqs = new;
+       } else {
                OBD_FREE_PTR(new);
+       }
        spin_unlock(&dtrq->dtrq_sub_list_lock);
 
+       rc = dtrq_sub_add_cookie(dtrqs, cookie);
+
+       RETURN(rc);
+}
+
+/**
+ * append updates to the current replay updates
+ *
+ * Append more updates to the existent replay update. And this is only
+ * used when combining mulitple updates into one large updates during
+ * replay.
+ *
+ * \param[in] dtrq     the update replay request where the new update
+ *                      records will be added.
+ * \param[in] lur      the new update record.
+ *
+ * \retval             0 if appending succeeds.
+ * \retval             negative errno if appending fails.
+ */
+static int dtrq_append_updates(struct distribute_txn_replay_req *dtrq,
+                              struct update_records *record)
+{
+       struct llog_update_record *new_lur;
+       size_t lur_size = dtrq->dtrq_lur_size;
+       void *ptr;
+       ENTRY;
+
+       /* Because several threads might retrieve the same records from
+        * different targets, and we only need one copy of records. So
+        * we will check if the records is in the next one, if not, just
+        * skip it */
+       spin_lock(&dtrq->dtrq_sub_list_lock);
+       if (dtrq->dtrq_lur->lur_update_rec.ur_index + 1 != record->ur_index) {
+               spin_unlock(&dtrq->dtrq_sub_list_lock);
+               RETURN(0);
+       }
+       dtrq->dtrq_lur->lur_update_rec.ur_index++;
+       spin_unlock(&dtrq->dtrq_sub_list_lock);
+
+       lur_size += update_records_size(record);
+       OBD_ALLOC_LARGE(new_lur, lur_size);
+       if (new_lur == NULL) {
+               spin_lock(&dtrq->dtrq_sub_list_lock);
+               dtrq->dtrq_lur->lur_update_rec.ur_index--;
+               spin_unlock(&dtrq->dtrq_sub_list_lock);
+               RETURN(-ENOMEM);
+       }
+
+       /* Copy the old and new records to the new allocated buffer */
+       memcpy(new_lur, dtrq->dtrq_lur, dtrq->dtrq_lur_size);
+       ptr = (char *)&new_lur->lur_update_rec +
+               update_records_size(&new_lur->lur_update_rec);
+       memcpy(ptr, &record->ur_ops,
+              update_records_size(record) -
+              offsetof(struct update_records, ur_ops));
+
+       new_lur->lur_update_rec.ur_update_count += record->ur_update_count;
+       new_lur->lur_update_rec.ur_param_count += record->ur_param_count;
+       new_lur->lur_hdr.lrh_len = llog_update_record_size(new_lur);
+
+       /* Replace the records */
+       OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
+       dtrq->dtrq_lur = new_lur;
+       dtrq->dtrq_lur_size = lur_size;
+       dtrq->dtrq_lur->lur_update_rec.ur_flags = record->ur_flags;
+       update_records_dump(&new_lur->lur_update_rec, D_INFO, true);
        RETURN(0);
 }
 
@@ -266,7 +367,8 @@ insert_update_records_to_replay_list(struct target_distribute_txn_data *tdtd,
        CDEBUG(D_HA, "%s: insert record batchid = "LPU64" transno = "LPU64
               " mdt_index %u\n", tdtd->tdtd_lut->lut_obd->obd_name,
               record->ur_batchid, record->ur_master_transno, mdt_index);
-again:
+
+       /* First try to build the replay update request with the records */
        spin_lock(&tdtd->tdtd_replay_list_lock);
        dtrq = dtrq_lookup(tdtd, record->ur_batchid);
        spin_unlock(&tdtd->tdtd_replay_list_lock);
@@ -286,27 +388,38 @@ again:
                spin_lock(&tdtd->tdtd_replay_list_lock);
                rc = dtrq_insert(tdtd, dtrq);
                spin_unlock(&tdtd->tdtd_replay_list_lock);
-       } else if (record->ur_master_transno != 0 &&
-                  dtrq->dtrq_lur->lur_update_rec.ur_master_transno !=
-                  record->ur_master_transno) {
-               /* If the master transno in update header is not matched with
-                * the one in the record, then it means the dtrq is originally
-                * created by master record, and we need update master transno
-                * and reposition the dtrq(by master transno). */
-               dtrq->dtrq_lur->lur_update_rec.ur_master_transno =
-                                               record->ur_master_transno;
-               list_del_init(&dtrq->dtrq_list);
-               spin_lock(&tdtd->tdtd_replay_list_lock);
-               rc = dtrq_insert(tdtd, dtrq);
-               spin_unlock(&tdtd->tdtd_replay_list_lock);
-       }
+               if (rc == -EEXIST) {
+                       /* Some one else already add the record */
+                       dtrq_destroy(dtrq);
+                       rc = 0;
+               }
+       } else {
+               struct update_records *dtrq_rec;
+
+               /* If the master transno in update header is not
+                * matched with the one in the record, then it means
+                * the dtrq is originally created by master record,
+                * and we need update master transno and reposition
+                * the dtrq(by master transno). */
+               dtrq_rec = &dtrq->dtrq_lur->lur_update_rec;
+               if (record->ur_master_transno != 0 &&
+                   dtrq_rec->ur_master_transno != record->ur_master_transno) {
+                       dtrq_rec->ur_master_transno = record->ur_master_transno;
+                       spin_lock(&tdtd->tdtd_replay_list_lock);
+                       list_del_init(&dtrq->dtrq_list);
+                       rc = dtrq_insert(tdtd, dtrq);
+                       spin_unlock(&tdtd->tdtd_replay_list_lock);
+                       if (rc < 0)
+                               return rc;
+               }
 
-       if (rc == -EEXIST) {
-               dtrq_destroy(dtrq);
-               rc = 0;
-               goto again;
+               /* This is a partial update records, let's try to append
+                * the record to the current replay request */
+               if (record->ur_flags & UPDATE_RECORD_CONTINUE)
+                       rc = dtrq_append_updates(dtrq, record);
        }
 
+       /* Then create and add sub update request */
        rc = dtrq_sub_create_and_insert(dtrq, cookie, mdt_index);
 
        RETURN(rc);
@@ -350,7 +463,15 @@ void dtrq_destroy(struct distribute_txn_replay_req *dtrq)
        LASSERT(list_empty(&dtrq->dtrq_list));
        spin_lock(&dtrq->dtrq_sub_list_lock);
        list_for_each_entry_safe(dtrqs, tmp, &dtrq->dtrq_sub_list, dtrqs_list) {
+               struct sub_thandle_cookie *stc;
+               struct sub_thandle_cookie *tmp;
+
                list_del(&dtrqs->dtrqs_list);
+               list_for_each_entry_safe(stc, tmp, &dtrqs->dtrqs_cookie_list,
+                                        stc_list) {
+                       list_del(&stc->stc_list);
+                       OBD_FREE_PTR(stc);
+               }
                OBD_FREE_PTR(dtrqs);
        }
        spin_unlock(&dtrq->dtrq_sub_list_lock);
@@ -496,8 +617,15 @@ static int update_is_committed(const struct lu_env *env,
        dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
        if (dtrqs != NULL || top_th->tt_multiple_thandle->tmt_committed) {
                st->st_committed = 1;
-               if (dtrqs != NULL)
-                       st->st_cookie = dtrqs->dtrqs_llog_cookie;
+               if (dtrqs != NULL) {
+                       struct sub_thandle_cookie *stc;
+                       struct sub_thandle_cookie *tmp;
+
+                       list_for_each_entry_safe(stc, tmp,
+                                                &dtrqs->dtrqs_cookie_list,
+                                                stc_list)
+                               list_move(&stc->stc_list, &st->st_cookie_list);
+               }
                RETURN(0);
        }
 
@@ -886,6 +1014,9 @@ static int update_recovery_exec(const struct lu_env *env,
                struct dt_device        *sub_dt;
                struct sub_thandle      *st;
 
+               if (op->uop_type == OUT_NOOP)
+                       continue;
+
                dt_obj = dt_locate(env, tdtd->tdtd_dt, fid);
                if (IS_ERR(dt_obj)) {
                        rc = PTR_ERR(dt_obj);