+#include <tgt_internal.h>
+/**
+ * Dump top mulitple thandle
+ *
+ * Dump top multiple thandle and all of its sub thandle to the debug log.
+ *
+ * \param[in]mask debug mask
+ * \param[in]top_th top_thandle to be dumped
+ */
+static void top_multiple_thandle_dump(struct top_multiple_thandle *tmt,
+ __u32 mask)
+{
+ struct sub_thandle *st;
+
+ LASSERT(tmt->tmt_magic == TOP_THANDLE_MAGIC);
+ CDEBUG(mask, "%s tmt %p refcount %d committed %d result %d batchid %llu\n",
+ tmt->tmt_master_sub_dt ?
+ tmt->tmt_master_sub_dt->dd_lu_dev.ld_obd->obd_name :
+ "NULL",
+ tmt, atomic_read(&tmt->tmt_refcount), tmt->tmt_committed,
+ tmt->tmt_result, tmt->tmt_batchid);
+
+ list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) {
+ struct sub_thandle_cookie *stc;
+
+ CDEBUG(mask, "st %p obd %s committed %d started %d stopped %d "
+ "result %d sub_th %p\n",
+ st, st->st_dt->dd_lu_dev.ld_obd->obd_name,
+ st->st_committed, st->st_started, st->st_stopped,
+ st->st_result, st->st_sub_th);
+
+ list_for_each_entry(stc, &st->st_cookie_list, stc_list) {
+ CDEBUG(mask, " cookie "DFID".%u\n",
+ PFID(&stc->stc_cookie.lgc_lgl.lgl_oi.oi_fid),
+ stc->stc_cookie.lgc_index);
+ }
+ }
+}
+
+/**
+ * Declare write update to sub device
+ *
+ * Declare Write updates llog records to the sub device during distribute
+ * transaction.
+ *
+ * \param[in] env execution environment
+ * \param[in] record update records being written
+ * \param[in] sub_th sub transaction handle
+ * \param[in] record_size total update record size
+ *
+ * \retval 0 if writing succeeds
+ * \retval negative errno if writing fails
+ */
+static int sub_declare_updates_write(const struct lu_env *env,
+ struct llog_update_record *record,
+ struct thandle *sub_th, size_t record_size)
+{
+ struct llog_ctxt *ctxt;
+ struct dt_device *dt = sub_th->th_dev;
+ int left = record_size;
+ int rc;
+
+ /* If ctxt is NULL, it means not need to write update,
+ * for example if the the OSP is used to connect to OST */
+ ctxt = llog_get_context(dt->dd_lu_dev.ld_obd,
+ LLOG_UPDATELOG_ORIG_CTXT);
+
+ /* Not ready to record updates yet. */
+ if (ctxt == NULL || ctxt->loc_handle == NULL) {
+ llog_ctxt_put(ctxt);
+ return 0;
+ }
+
+ rc = llog_declare_add(env, ctxt->loc_handle,
+ &record->lur_hdr, sub_th);
+ if (rc < 0)
+ GOTO(out_put, rc);
+
+ while (left > ctxt->loc_chunk_size) {
+ rc = llog_declare_add(env, ctxt->loc_handle,
+ &record->lur_hdr, sub_th);
+ if (rc < 0)
+ GOTO(out_put, rc);
+
+ left -= ctxt->loc_chunk_size;
+ }
+
+out_put:
+ llog_ctxt_put(ctxt);
+
+ return rc;
+}
+
+/**
+ * write update to sub device
+ *
+ * Write llog update record to the sub device during distribute
+ * transaction. If it succeeds, llog cookie of the record will be
+ * returned by @cookie.
+ *
+ * \param[in] env execution environment
+ * \param[in] record update records being written
+ * \param[in] sub_th sub transaction handle
+ * \param[out] cookie llog cookie of the update record.
+ *
+ * \retval 1 if writing succeeds
+ * \retval negative errno if writing fails
+ */
+static int sub_updates_write(const struct lu_env *env,
+ struct llog_update_record *record,
+ struct sub_thandle *sub_th)
+{
+ struct dt_device *dt = sub_th->st_dt;
+ struct llog_ctxt *ctxt;
+ struct llog_update_record *lur = NULL;
+ __u32 update_count = 0;
+ __u32 param_count = 0;
+ __u32 last_update_count = 0;
+ __u32 last_param_count = 0;
+ char *start;
+ char *cur;
+ char *next;
+ struct sub_thandle_cookie *stc;
+ size_t reclen;
+ bool eof = false;
+ int rc;
+ ENTRY;
+
+ ctxt = llog_get_context(dt->dd_lu_dev.ld_obd,
+ LLOG_UPDATELOG_ORIG_CTXT);
+ /* If ctxt == NULL, then it means updates on OST (only happens
+ * during migration), and we do not track those updates for now */
+ /* If ctxt->loc_handle == NULL, then it does not need to record
+ * update, usually happens in error handler path */
+ if (ctxt == NULL || ctxt->loc_handle == NULL) {
+ llog_ctxt_put(ctxt);
+ RETURN(0);
+ }
+
+ /* Since the cross-MDT updates will includes both local
+ * and remote updates, the update ops count must > 1 */
+ LASSERT(record->lur_update_rec.ur_update_count > 1);
+ LASSERTF(record->lur_hdr.lrh_len == llog_update_record_size(record),
+ "lrh_len %u record_size %zu\n", record->lur_hdr.lrh_len,
+ llog_update_record_size(record));
+
+ /*
+ * If its size > llog chunk_size, then write current chunk to the update
+ * llog, NB the padding should >= LLOG_MIN_REC_SIZE.
+ *
+ * So check padding length is either >= LLOG_MIN_REC_SIZE or is 0
+ * (record length just matches the chunk size).
+ */
+
+ reclen = record->lur_hdr.lrh_len;
+ if (reclen + LLOG_MIN_REC_SIZE <= ctxt->loc_chunk_size ||
+ reclen == ctxt->loc_chunk_size) {
+ OBD_ALLOC_PTR(stc);
+ if (stc == NULL)
+ GOTO(llog_put, rc = -ENOMEM);
+ INIT_LIST_HEAD(&stc->stc_list);
+
+ rc = llog_add(env, ctxt->loc_handle, &record->lur_hdr,
+ &stc->stc_cookie, sub_th->st_sub_th);
+
+ CDEBUG(D_INFO, "%s: Add update log "DFID".%u: rc = %d\n",
+ dt->dd_lu_dev.ld_obd->obd_name,
+ PFID(&stc->stc_cookie.lgc_lgl.lgl_oi.oi_fid),
+ stc->stc_cookie.lgc_index, rc);
+
+ if (rc > 0) {
+ list_add(&stc->stc_list, &sub_th->st_cookie_list);
+ rc = 0;
+ } else {
+ OBD_FREE_PTR(stc);
+ }
+
+ GOTO(llog_put, rc);
+ }
+
+ /* Split the records into chunk_size update record */
+ OBD_ALLOC_LARGE(lur, ctxt->loc_chunk_size);
+ if (lur == NULL)
+ GOTO(llog_put, rc = -ENOMEM);
+
+ memcpy(lur, &record->lur_hdr, sizeof(record->lur_hdr));
+ lur->lur_update_rec.ur_update_count = 0;
+ lur->lur_update_rec.ur_param_count = 0;
+ start = (char *)&record->lur_update_rec.ur_ops;
+ cur = next = start;
+ do {
+ if (update_count < record->lur_update_rec.ur_update_count)
+ next = (char *)update_op_next_op(
+ (struct update_op *)cur);
+ else if (param_count < record->lur_update_rec.ur_param_count)
+ next = (char *)update_param_next_param(
+ (struct update_param *)cur);
+ else
+ eof = true;
+
+ reclen = __llog_update_record_size(
+ __update_records_size(next - start));
+ if ((reclen + LLOG_MIN_REC_SIZE <= ctxt->loc_chunk_size ||
+ reclen == ctxt->loc_chunk_size) &&
+ !eof) {
+ cur = next;
+
+ if (update_count <
+ record->lur_update_rec.ur_update_count)
+ update_count++;
+ else if (param_count <
+ record->lur_update_rec.ur_param_count)
+ param_count++;
+ continue;
+ }
+
+ lur->lur_update_rec.ur_update_count = update_count -
+ last_update_count;
+ lur->lur_update_rec.ur_param_count = param_count -
+ last_param_count;
+ memcpy(&lur->lur_update_rec.ur_ops, start, cur - start);
+ lur->lur_hdr.lrh_len = llog_update_record_size(lur);
+
+ LASSERT(lur->lur_hdr.lrh_len ==
+ __llog_update_record_size(
+ __update_records_size(cur - start)));
+ LASSERT(lur->lur_hdr.lrh_len <= ctxt->loc_chunk_size);
+
+ update_records_dump(&lur->lur_update_rec, D_INFO, true);
+
+ OBD_ALLOC_PTR(stc);
+ if (stc == NULL)
+ GOTO(llog_put, rc = -ENOMEM);
+ INIT_LIST_HEAD(&stc->stc_list);
+
+ rc = llog_add(env, ctxt->loc_handle, &lur->lur_hdr,
+ &stc->stc_cookie, sub_th->st_sub_th);
+
+ CDEBUG(D_INFO, "%s: Add update log "DFID".%u: rc = %d\n",
+ dt->dd_lu_dev.ld_obd->obd_name,
+ PFID(&stc->stc_cookie.lgc_lgl.lgl_oi.oi_fid),
+ stc->stc_cookie.lgc_index, rc);
+
+ if (rc > 0) {
+ list_add(&stc->stc_list, &sub_th->st_cookie_list);
+ rc = 0;
+ } else {
+ OBD_FREE_PTR(stc);
+ GOTO(llog_put, rc);
+ }
+
+ last_update_count = update_count;
+ last_param_count = param_count;
+ start = cur;
+ lur->lur_update_rec.ur_update_count = 0;
+ lur->lur_update_rec.ur_param_count = 0;
+ lur->lur_update_rec.ur_flags |= UPDATE_RECORD_CONTINUE;
+ } while (!eof);
+
+llog_put:
+ if (lur != NULL)
+ OBD_FREE_LARGE(lur, ctxt->loc_chunk_size);
+ llog_ctxt_put(ctxt);
+
+ RETURN(rc);
+}
+
+/**
+ * Prepare the update records.
+ *
+ * Merge params and ops into the update records, then initializing
+ * the update buffer.
+ *
+ * During transaction execution phase, parameters and update ops
+ * are collected in two different buffers (see lod_updates_pack()),
+ * during transaction stop, it needs to be merged in one buffer,
+ * so it will be written in the update log.
+ *
+ * \param[in] env execution environment
+ * \param[in] tmt top_multiple_thandle for distribute txn
+ *
+ * \retval 0 if merging succeeds.
+ * \retval negaitive errno if merging fails.
+ */
+static int prepare_writing_updates(const struct lu_env *env,
+ struct top_multiple_thandle *tmt)
+{
+ struct thandle_update_records *tur = tmt->tmt_update_records;
+ struct llog_update_record *lur;
+ struct update_params *params;
+ size_t params_size;
+ size_t update_size;
+
+ if (tur == NULL || tur->tur_update_records == NULL ||
+ tur->tur_update_params == NULL)
+ return 0;
+
+ lur = tur->tur_update_records;
+ /* Extends the update records buffer if needed */
+ params_size = update_params_size(tur->tur_update_params,
+ tur->tur_update_param_count);
+ LASSERT(lur->lur_update_rec.ur_param_count == 0);
+ update_size = llog_update_record_size(lur);
+ if (cfs_size_round(update_size + params_size) >
+ tur->tur_update_records_buf_size) {
+ int rc;
+
+ rc = tur_update_records_extend(tur,
+ cfs_size_round(update_size + params_size));
+ if (rc < 0)
+ return rc;
+
+ lur = tur->tur_update_records;
+ }
+
+ params = update_records_get_params(&lur->lur_update_rec);
+ memcpy(params, tur->tur_update_params, params_size);
+
+ lur->lur_update_rec.ur_param_count = tur->tur_update_param_count;
+ lur->lur_update_rec.ur_batchid = tmt->tmt_batchid;
+ /* Init update record header */
+ lur->lur_hdr.lrh_len = llog_update_record_size(lur);
+ lur->lur_hdr.lrh_type = UPDATE_REC;
+
+ /* Dump updates for debugging purpose */
+ update_records_dump(&lur->lur_update_rec, D_INFO, true);
+
+ return 0;
+}
+
+/**
+ * Top thandle commit callback
+ *
+ * This callback will be called when all of sub transactions are committed.
+ *
+ * \param[in] th top thandle to be committed.
+ */
+static void top_trans_committed_cb(struct top_multiple_thandle *tmt)
+{
+ struct lu_target *lut;
+ ENTRY;
+
+ LASSERT(atomic_read(&tmt->tmt_refcount) > 0);
+
+ top_multiple_thandle_dump(tmt, D_HA);
+ tmt->tmt_committed = 1;
+ lut = dt2lu_dev(tmt->tmt_master_sub_dt)->ld_site->ls_tgt;
+ if (lut->lut_tdtd && lut->lut_tdtd->tdtd_commit_task)
+ wake_up_process(lut->lut_tdtd->tdtd_commit_task);
+
+ RETURN_EXIT;
+}
+
+struct sub_thandle *lookup_sub_thandle(struct top_multiple_thandle *tmt,
+ struct dt_device *dt_dev)
+{
+ struct sub_thandle *st;
+
+ list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) {
+ if (st->st_dt == dt_dev)
+ return st;
+ }
+ return NULL;
+}
+EXPORT_SYMBOL(lookup_sub_thandle);
+
+struct sub_thandle *create_sub_thandle(struct top_multiple_thandle *tmt,
+ struct dt_device *dt_dev)
+{
+ struct sub_thandle *st;
+
+ OBD_ALLOC_PTR(st);
+ if (st == NULL)
+ RETURN(ERR_PTR(-ENOMEM));
+
+ INIT_LIST_HEAD(&st->st_sub_list);
+ INIT_LIST_HEAD(&st->st_cookie_list);
+ st->st_dt = dt_dev;
+
+ list_add(&st->st_sub_list, &tmt->tmt_sub_thandle_list);
+ return st;
+}
+
+static void sub_trans_commit_cb_internal(struct top_multiple_thandle *tmt,
+ struct thandle *sub_th, int err)
+{
+ struct sub_thandle *st;
+ bool all_committed = true;
+
+ /* Check if all sub thandles are committed */
+ spin_lock(&tmt->tmt_sub_lock);
+ list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) {
+ if (st->st_sub_th == sub_th) {
+ st->st_committed = 1;
+ st->st_result = err;
+ }
+ if (!st->st_committed)
+ all_committed = false;
+ }
+ spin_unlock(&tmt->tmt_sub_lock);
+
+ if (tmt->tmt_result == 0)
+ tmt->tmt_result = err;
+
+ if (all_committed)
+ top_trans_committed_cb(tmt);
+
+ top_multiple_thandle_dump(tmt, D_INFO);
+ top_multiple_thandle_put(tmt);
+ RETURN_EXIT;
+}
+
+/**
+ * sub thandle commit callback
+ *
+ * Mark the sub thandle to be committed and if all sub thandle are committed
+ * notify the top thandle.
+ *
+ * \param[in] env execution environment
+ * \param[in] sub_th sub thandle being committed
+ * \param[in] cb commit callback
+ * \param[in] err trans result
+ */
+static void sub_trans_commit_cb(struct lu_env *env,
+ struct thandle *sub_th,
+ struct dt_txn_commit_cb *cb, int err)
+{
+ struct top_multiple_thandle *tmt = cb->dcb_data;
+
+ sub_trans_commit_cb_internal(tmt, sub_th, err);
+}
+
+static void sub_thandle_register_commit_cb(struct sub_thandle *st,
+ struct top_multiple_thandle *tmt)
+{
+ LASSERT(st->st_sub_th != NULL);
+ top_multiple_thandle_get(tmt);
+ st->st_commit_dcb.dcb_func = sub_trans_commit_cb;
+ st->st_commit_dcb.dcb_data = tmt;
+ INIT_LIST_HEAD(&st->st_commit_dcb.dcb_linkage);
+ dt_trans_cb_add(st->st_sub_th, &st->st_commit_dcb);
+}
+
+/**
+ * Sub thandle stop call back
+ *
+ * After sub thandle is stopped, it will call this callback to notify
+ * the top thandle.
+ *
+ * \param[in] th sub thandle to be stopped
+ * \param[in] rc result of sub trans
+ */
+static void sub_trans_stop_cb(struct lu_env *env,
+ struct thandle *sub_th,
+ struct dt_txn_commit_cb *cb, int err)
+{
+ struct sub_thandle *st;
+ struct top_multiple_thandle *tmt = cb->dcb_data;
+ ENTRY;
+
+ spin_lock(&tmt->tmt_sub_lock);
+ list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) {
+ if (st->st_stopped)
+ continue;
+
+ if (st->st_dt == sub_th->th_dev) {
+ st->st_stopped = 1;
+ st->st_result = err;
+ break;
+ }
+ }
+ spin_unlock(&tmt->tmt_sub_lock);
+
+ wake_up(&tmt->tmt_stop_waitq);
+ RETURN_EXIT;
+}
+
+static void sub_thandle_register_stop_cb(struct sub_thandle *st,
+ struct top_multiple_thandle *tmt)
+{
+ st->st_stop_dcb.dcb_func = sub_trans_stop_cb;
+ st->st_stop_dcb.dcb_data = tmt;
+ st->st_stop_dcb.dcb_flags = DCB_TRANS_STOP;
+ INIT_LIST_HEAD(&st->st_stop_dcb.dcb_linkage);
+ dt_trans_cb_add(st->st_sub_th, &st->st_stop_dcb);
+}
+
+/**
+ * Create sub thandle
+ *
+ * Create transaction handle for sub_thandle
+ *
+ * \param[in] env execution environment
+ * \param[in] th top thandle
+ * \param[in] st sub_thandle
+ *
+ * \retval 0 if creation succeeds.
+ * \retval negative errno if creation fails.
+ */
+int sub_thandle_trans_create(const struct lu_env *env,
+ struct top_thandle *top_th,
+ struct sub_thandle *st)
+{
+ struct thandle *sub_th;
+
+ sub_th = dt_trans_create(env, st->st_dt);
+ if (IS_ERR(sub_th))
+ return PTR_ERR(sub_th);
+
+ sub_th->th_top = &top_th->tt_super;
+ st->st_sub_th = sub_th;
+
+ sub_th->th_wait_submit = 1;
+ sub_thandle_register_stop_cb(st, top_th->tt_multiple_thandle);
+ return 0;
+}
+