X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Ftarget%2Fupdate_trans.c;h=00bc03442d676b2d153aaf308d9f2e237ed0e57b;hb=861c70ff6e3371c752d53fcb71cdc54fe74cd3c2;hp=09d9d896881f5aa9234d6c1be291fb4ebab6abff;hpb=c233f39a015943b3df8ddb555765a81b85b31083;p=fs%2Flustre-release.git diff --git a/lustre/target/update_trans.c b/lustre/target/update_trans.c index 09d9d89..00bc034 100644 --- a/lustre/target/update_trans.c +++ b/lustre/target/update_trans.c @@ -49,12 +49,532 @@ #define DEBUG_SUBSYSTEM S_CLASS +#include #include +#include #include #include #include #include +#include +/** + * Dump top mulitple thandle + * + * Dump top multiple thandle and all of its sub thandle to the debug log. + * + * \param[in]mask debug mask + * \param[in]top_th top_thandle to be dumped + */ +static void top_multiple_thandle_dump(struct top_multiple_thandle *tmt, + __u32 mask) +{ + struct sub_thandle *st; + + LASSERT(tmt->tmt_magic == TOP_THANDLE_MAGIC); + CDEBUG(mask, "%s tmt %p refcount %d committed %d result %d" + "batchid "LPU64"\n", + tmt->tmt_master_sub_dt ? + tmt->tmt_master_sub_dt->dd_lu_dev.ld_obd->obd_name : + "NULL", + tmt, atomic_read(&tmt->tmt_refcount), tmt->tmt_committed, + tmt->tmt_result, tmt->tmt_batchid); + + list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) { + struct sub_thandle_cookie *stc; + + CDEBUG(mask, "st %p obd %s committed %d sub_th %p\n", + st, st->st_dt->dd_lu_dev.ld_obd->obd_name, + st->st_committed, st->st_sub_th); + + list_for_each_entry(stc, &st->st_cookie_list, stc_list) { + CDEBUG(mask, " cookie "DOSTID": %u\n", + POSTID(&stc->stc_cookie.lgc_lgl.lgl_oi), + stc->stc_cookie.lgc_index); + } + } +} + +/** + * Declare write update to sub device + * + * Declare Write updates llog records to the sub device during distribute + * transaction. + * + * \param[in] env execution environment + * \param[in] record update records being written + * \param[in] sub_th sub transaction handle + * \param[in] record_size total update record size + * + * \retval 0 if writing succeeds + * \retval negative errno if writing fails + */ +static int sub_declare_updates_write(const struct lu_env *env, + struct llog_update_record *record, + struct thandle *sub_th, size_t record_size) +{ + struct llog_ctxt *ctxt; + struct dt_device *dt = sub_th->th_dev; + int left = record_size; + int rc; + + /* If ctxt is NULL, it means not need to write update, + * for example if the the OSP is used to connect to OST */ + ctxt = llog_get_context(dt->dd_lu_dev.ld_obd, + LLOG_UPDATELOG_ORIG_CTXT); + + /* Not ready to record updates yet. */ + if (ctxt == NULL || ctxt->loc_handle == NULL) { + llog_ctxt_put(ctxt); + return 0; + } + + rc = llog_declare_add(env, ctxt->loc_handle, + &record->lur_hdr, sub_th); + if (rc < 0) + GOTO(out_put, rc); + + while (left > ctxt->loc_chunk_size) { + rc = llog_declare_add(env, ctxt->loc_handle, + &record->lur_hdr, sub_th); + if (rc < 0) + GOTO(out_put, rc); + + left -= ctxt->loc_chunk_size; + } + +out_put: + llog_ctxt_put(ctxt); + + return rc; +} + +/** + * write update to sub device + * + * Write llog update record to the sub device during distribute + * transaction. If it succeeds, llog cookie of the record will be + * returned by @cookie. + * + * \param[in] env execution environment + * \param[in] record update records being written + * \param[in] sub_th sub transaction handle + * \param[out] cookie llog cookie of the update record. + * + * \retval 1 if writing succeeds + * \retval negative errno if writing fails + */ +static int sub_updates_write(const struct lu_env *env, + struct llog_update_record *record, + struct sub_thandle *sub_th) +{ + struct dt_device *dt = sub_th->st_dt; + struct llog_ctxt *ctxt; + int rc; + struct llog_update_record *lur = NULL; + struct update_params *params = NULL; + __u32 update_count = 0; + __u32 param_count = 0; + __u32 last_update_count = 0; + __u32 last_param_count = 0; + void *src; + void *start; + void *next; + struct sub_thandle_cookie *stc; + ENTRY; + + ctxt = llog_get_context(dt->dd_lu_dev.ld_obd, + LLOG_UPDATELOG_ORIG_CTXT); + /* If ctxt == NULL, then it means updates on OST (only happens + * during migration), and we do not track those updates for now */ + /* If ctxt->loc_handle == NULL, then it does not need to record + * update, usually happens in error handler path */ + if (ctxt == NULL || ctxt->loc_handle == NULL) { + llog_ctxt_put(ctxt); + RETURN(0); + } + + /* Since the cross-MDT updates will includes both local + * and remote updates, the update ops count must > 1 */ + LASSERT(record->lur_update_rec.ur_update_count > 1); + LASSERTF(record->lur_hdr.lrh_len == llog_update_record_size(record), + "lrh_len %u record_size %zu\n", record->lur_hdr.lrh_len, + llog_update_record_size(record)); + + if (likely(record->lur_hdr.lrh_len <= ctxt->loc_chunk_size)) { + OBD_ALLOC_PTR(stc); + if (stc == NULL) + GOTO(llog_put, rc = -ENOMEM); + INIT_LIST_HEAD(&stc->stc_list); + + rc = llog_add(env, ctxt->loc_handle, &record->lur_hdr, + &stc->stc_cookie, sub_th->st_sub_th); + + CDEBUG(D_INFO, "%s: Add update log "DOSTID":%u: rc = %d\n", + dt->dd_lu_dev.ld_obd->obd_name, + POSTID(&stc->stc_cookie.lgc_lgl.lgl_oi), + stc->stc_cookie.lgc_index, rc); + + if (rc > 0) { + list_add(&stc->stc_list, &sub_th->st_cookie_list); + rc = 0; + } else { + OBD_FREE_PTR(stc); + } + + GOTO(llog_put, rc); + } + + /* Split the records into chunk_size update record */ + OBD_ALLOC_LARGE(lur, ctxt->loc_chunk_size); + if (lur == NULL) + GOTO(llog_put, rc = -ENOMEM); + + memcpy(lur, &record->lur_hdr, sizeof(record->lur_hdr)); + lur->lur_update_rec.ur_update_count = 0; + lur->lur_update_rec.ur_param_count = 0; + src = &record->lur_update_rec.ur_ops; + start = next = src; + lur->lur_hdr.lrh_len = llog_update_record_size(lur); + params = update_records_get_params(&record->lur_update_rec); + do { + size_t rec_len; + + if (update_count < record->lur_update_rec.ur_update_count) { + next = update_op_next_op((struct update_op *)src); + } else { + if (param_count == 0) + next = update_records_get_params( + &record->lur_update_rec); + else + next = (char *)src + + object_update_param_size( + (struct object_update_param *)src); + } + + rec_len = cfs_size_round((unsigned long)(next - src)); + /* If its size > llog chunk_size, then write current chunk to + * the update llog. */ + if (lur->lur_hdr.lrh_len + rec_len + LLOG_MIN_REC_SIZE > + ctxt->loc_chunk_size || + param_count == record->lur_update_rec.ur_param_count) { + lur->lur_update_rec.ur_update_count = + update_count > last_update_count ? + update_count - last_update_count : 0; + lur->lur_update_rec.ur_param_count = param_count - + last_param_count; + + memcpy(&lur->lur_update_rec.ur_ops, start, + (unsigned long)(src - start)); + if (last_update_count != 0) + lur->lur_update_rec.ur_flags |= + UPDATE_RECORD_CONTINUE; + + update_records_dump(&lur->lur_update_rec, D_INFO, true); + lur->lur_hdr.lrh_len = llog_update_record_size(lur); + LASSERT(lur->lur_hdr.lrh_len <= ctxt->loc_chunk_size); + + OBD_ALLOC_PTR(stc); + if (stc == NULL) + GOTO(llog_put, rc = -ENOMEM); + INIT_LIST_HEAD(&stc->stc_list); + + rc = llog_add(env, ctxt->loc_handle, + &lur->lur_hdr, + &stc->stc_cookie, sub_th->st_sub_th); + + CDEBUG(D_INFO, "%s: Add update log "DOSTID":%u" + " rc = %d\n", dt->dd_lu_dev.ld_obd->obd_name, + POSTID(&stc->stc_cookie.lgc_lgl.lgl_oi), + stc->stc_cookie.lgc_index, rc); + + if (rc > 0) { + list_add(&stc->stc_list, + &sub_th->st_cookie_list); + rc = 0; + } else { + OBD_FREE_PTR(stc); + GOTO(llog_put, rc); + } + + last_update_count = update_count; + last_param_count = param_count; + start = src; + lur->lur_update_rec.ur_update_count = 0; + lur->lur_update_rec.ur_param_count = 0; + lur->lur_hdr.lrh_len = llog_update_record_size(lur); + } + + src = next; + lur->lur_hdr.lrh_len += cfs_size_round(rec_len); + if (update_count < record->lur_update_rec.ur_update_count) + update_count++; + else if (param_count < record->lur_update_rec.ur_param_count) + param_count++; + else + break; + } while (1); + +llog_put: + if (lur != NULL) + OBD_FREE_LARGE(lur, ctxt->loc_chunk_size); + llog_ctxt_put(ctxt); + + RETURN(rc); +} + +/** + * Prepare the update records. + * + * Merge params and ops into the update records, then initializing + * the update buffer. + * + * During transaction execution phase, parameters and update ops + * are collected in two different buffers (see lod_updates_pack()), + * during transaction stop, it needs to be merged in one buffer, + * so it will be written in the update log. + * + * \param[in] env execution environment + * \param[in] tmt top_multiple_thandle for distribute txn + * + * \retval 0 if merging succeeds. + * \retval negaitive errno if merging fails. + */ +static int prepare_writing_updates(const struct lu_env *env, + struct top_multiple_thandle *tmt) +{ + struct thandle_update_records *tur = tmt->tmt_update_records; + struct llog_update_record *lur; + struct update_params *params; + size_t params_size; + size_t update_size; + + if (tur == NULL || tur->tur_update_records == NULL || + tur->tur_update_params == NULL) + return 0; + + lur = tur->tur_update_records; + /* Extends the update records buffer if needed */ + params_size = update_params_size(tur->tur_update_params, + tur->tur_update_param_count); + LASSERT(lur->lur_update_rec.ur_param_count == 0); + update_size = llog_update_record_size(lur); + if (cfs_size_round(update_size + params_size) > + tur->tur_update_records_buf_size) { + int rc; + + rc = tur_update_records_extend(tur, + cfs_size_round(update_size + params_size)); + if (rc < 0) + return rc; + + lur = tur->tur_update_records; + } + + params = update_records_get_params(&lur->lur_update_rec); + memcpy(params, tur->tur_update_params, params_size); + + lur->lur_update_rec.ur_param_count = tur->tur_update_param_count; + lur->lur_update_rec.ur_batchid = tmt->tmt_batchid; + /* Init update record header */ + lur->lur_hdr.lrh_len = llog_update_record_size(lur); + lur->lur_hdr.lrh_type = UPDATE_REC; + + /* Dump updates for debugging purpose */ + update_records_dump(&lur->lur_update_rec, D_INFO, true); + + return 0; +} + +static inline int +distribute_txn_commit_thread_running(struct lu_target *lut) +{ + return lut->lut_tdtd_commit_thread.t_flags & SVC_RUNNING; +} + +static inline int +distribute_txn_commit_thread_stopped(struct lu_target *lut) +{ + return lut->lut_tdtd_commit_thread.t_flags & SVC_STOPPED; +} + +/** + * Top thandle commit callback + * + * This callback will be called when all of sub transactions are committed. + * + * \param[in] th top thandle to be committed. + */ +static void top_trans_committed_cb(struct top_multiple_thandle *tmt) +{ + struct lu_target *lut; + ENTRY; + + LASSERT(atomic_read(&tmt->tmt_refcount) > 0); + + top_multiple_thandle_dump(tmt, D_HA); + tmt->tmt_committed = 1; + lut = dt2lu_dev(tmt->tmt_master_sub_dt)->ld_site->ls_tgt; + if (distribute_txn_commit_thread_running(lut)) + wake_up(&lut->lut_tdtd->tdtd_commit_thread_waitq); + RETURN_EXIT; +} + +struct sub_thandle *lookup_sub_thandle(struct top_multiple_thandle *tmt, + struct dt_device *dt_dev) +{ + struct sub_thandle *st; + + list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) { + if (st->st_dt == dt_dev) + return st; + } + return NULL; +} +EXPORT_SYMBOL(lookup_sub_thandle); + +struct sub_thandle *create_sub_thandle(struct top_multiple_thandle *tmt, + struct dt_device *dt_dev) +{ + struct sub_thandle *st; + + OBD_ALLOC_PTR(st); + if (st == NULL) + RETURN(ERR_PTR(-ENOMEM)); + + INIT_LIST_HEAD(&st->st_sub_list); + INIT_LIST_HEAD(&st->st_cookie_list); + st->st_dt = dt_dev; + + list_add(&st->st_sub_list, &tmt->tmt_sub_thandle_list); + return st; +} + +/** + * sub thandle commit callback + * + * Mark the sub thandle to be committed and if all sub thandle are committed + * notify the top thandle. + * + * \param[in] env execution environment + * \param[in] sub_th sub thandle being committed + * \param[in] cb commit callback + * \param[in] err trans result + */ +static void sub_trans_commit_cb(struct lu_env *env, + struct thandle *sub_th, + struct dt_txn_commit_cb *cb, int err) +{ + struct sub_thandle *st; + struct top_multiple_thandle *tmt = cb->dcb_data; + bool all_committed = true; + ENTRY; + + /* Check if all sub thandles are committed */ + spin_lock(&tmt->tmt_sub_lock); + list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) { + if (st->st_sub_th == sub_th) { + st->st_committed = 1; + st->st_result = err; + } + if (!st->st_committed) + all_committed = false; + } + spin_unlock(&tmt->tmt_sub_lock); + + if (tmt->tmt_result == 0) + tmt->tmt_result = err; + + if (all_committed) + top_trans_committed_cb(tmt); + + top_multiple_thandle_dump(tmt, D_INFO); + top_multiple_thandle_put(tmt); + RETURN_EXIT; +} + +static void sub_thandle_register_commit_cb(struct sub_thandle *st, + struct top_multiple_thandle *tmt) +{ + LASSERT(st->st_sub_th != NULL); + top_multiple_thandle_get(tmt); + st->st_commit_dcb.dcb_func = sub_trans_commit_cb; + st->st_commit_dcb.dcb_data = tmt; + INIT_LIST_HEAD(&st->st_commit_dcb.dcb_linkage); + dt_trans_cb_add(st->st_sub_th, &st->st_commit_dcb); +} + +/** + * Sub thandle stop call back + * + * After sub thandle is stopped, it will call this callback to notify + * the top thandle. + * + * \param[in] th sub thandle to be stopped + * \param[in] rc result of sub trans + */ +static void sub_trans_stop_cb(struct lu_env *env, + struct thandle *sub_th, + struct dt_txn_commit_cb *cb, int err) +{ + struct sub_thandle *st; + struct top_multiple_thandle *tmt = cb->dcb_data; + ENTRY; + + list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) { + if (st->st_stopped) + continue; + + if (st->st_dt == sub_th->th_dev) { + st->st_stopped = 1; + st->st_result = err; + break; + } + } + + wake_up(&tmt->tmt_stop_waitq); + RETURN_EXIT; +} + +static void sub_thandle_register_stop_cb(struct sub_thandle *st, + struct top_multiple_thandle *tmt) +{ + st->st_stop_dcb.dcb_func = sub_trans_stop_cb; + st->st_stop_dcb.dcb_data = tmt; + st->st_stop_dcb.dcb_flags = DCB_TRANS_STOP; + INIT_LIST_HEAD(&st->st_stop_dcb.dcb_linkage); + dt_trans_cb_add(st->st_sub_th, &st->st_stop_dcb); +} + +/** + * Create sub thandle + * + * Create transaction handle for sub_thandle + * + * \param[in] env execution environment + * \param[in] th top thandle + * \param[in] st sub_thandle + * + * \retval 0 if creation succeeds. + * \retval negative errno if creation fails. + */ +int sub_thandle_trans_create(const struct lu_env *env, + struct top_thandle *top_th, + struct sub_thandle *st) +{ + struct thandle *sub_th; + + sub_th = dt_trans_create(env, st->st_dt); + if (IS_ERR(sub_th)) + return PTR_ERR(sub_th); + + sub_th->th_top = &top_th->tt_super; + st->st_sub_th = sub_th; + + sub_th->th_wait_submit = 1; + return 0; +} + /** * Create the top transaction. * @@ -77,25 +597,160 @@ top_trans_create(const struct lu_env *env, struct dt_device *master_dev) if (top_th == NULL) return ERR_PTR(-ENOMEM); - child_th = dt_trans_create(env, master_dev); - if (IS_ERR(child_th)) { - OBD_FREE_PTR(top_th); - return child_th; - } + top_th->tt_super.th_top = &top_th->tt_super; - top_th->tt_magic = TOP_THANDLE_MAGIC; - top_th->tt_master_sub_thandle = child_th; - child_th->th_top = &top_th->tt_super; + if (master_dev != NULL) { + child_th = dt_trans_create(env, master_dev); + if (IS_ERR(child_th)) { + OBD_FREE_PTR(top_th); + return child_th; + } - top_th->tt_update_records = NULL; - top_th->tt_super.th_top = &top_th->tt_super; - INIT_LIST_HEAD(&top_th->tt_sub_thandle_list); + child_th->th_top = &top_th->tt_super; + child_th->th_wait_submit = 1; + top_th->tt_master_sub_thandle = child_th; + top_th->tt_super.th_tags |= child_th->th_tags; + } return &top_th->tt_super; } EXPORT_SYMBOL(top_trans_create); /** + * Declare write update transaction + * + * Check if there are updates being recorded in this transaction, + * it will write the record into the disk. + * + * \param[in] env execution environment + * \param[in] tmt top multiple transaction handle + * + * \retval 0 if writing succeeds + * \retval negative errno if writing fails + */ +static int declare_updates_write(const struct lu_env *env, + struct top_multiple_thandle *tmt) +{ + struct llog_update_record *record; + struct sub_thandle *st; + int rc = 0; + + record = tmt->tmt_update_records->tur_update_records; + /* Declare update write for all other target */ + list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) { + if (st->st_sub_th == NULL) + continue; + + rc = sub_declare_updates_write(env, record, st->st_sub_th, + tmt->tmt_record_size); + if (rc < 0) + break; + } + + return rc; +} + +/** + * Assign batchid to the distribute transaction. + * + * Assign batchid to the distribute transaction + * + * \param[in] tmt distribute transaction + */ +static void distribute_txn_assign_batchid(struct top_multiple_thandle *new) +{ + struct target_distribute_txn_data *tdtd; + struct dt_device *dt = new->tmt_master_sub_dt; + + LASSERT(dt != NULL); + tdtd = dt2lu_dev(dt)->ld_site->ls_tgt->lut_tdtd; + spin_lock(&tdtd->tdtd_batchid_lock); + new->tmt_batchid = tdtd->tdtd_batchid++; + list_add_tail(&new->tmt_commit_list, &tdtd->tdtd_list); + spin_unlock(&tdtd->tdtd_batchid_lock); + top_multiple_thandle_get(new); + top_multiple_thandle_dump(new, D_INFO); +} + +/** + * Insert distribute transaction to the distribute txn list. + * + * Insert distribute transaction to the distribute txn list. + * + * \param[in] new the distribute txn to be inserted. + */ +void distribute_txn_insert_by_batchid(struct top_multiple_thandle *new) +{ + struct dt_device *dt = new->tmt_master_sub_dt; + struct top_multiple_thandle *tmt; + struct target_distribute_txn_data *tdtd; + bool at_head = false; + + LASSERT(dt != NULL); + tdtd = dt2lu_dev(dt)->ld_site->ls_tgt->lut_tdtd; + + spin_lock(&tdtd->tdtd_batchid_lock); + list_for_each_entry_reverse(tmt, &tdtd->tdtd_list, tmt_commit_list) { + if (new->tmt_batchid > tmt->tmt_batchid) { + list_add(&new->tmt_commit_list, &tmt->tmt_commit_list); + break; + } + } + if (list_empty(&new->tmt_commit_list)) { + at_head = true; + list_add(&new->tmt_commit_list, &tdtd->tdtd_list); + } + spin_unlock(&tdtd->tdtd_batchid_lock); + top_multiple_thandle_get(new); + top_multiple_thandle_dump(new, D_INFO); + if (new->tmt_committed && at_head) + wake_up(&tdtd->tdtd_commit_thread_waitq); +} + +/** + * Prepare cross-MDT operation. + * + * Create the update record buffer to record updates for cross-MDT operation, + * add master sub transaction to tt_sub_trans_list, and declare the update + * writes. + * + * During updates packing, all of parameters will be packed in + * tur_update_params, and updates will be packed in tur_update_records. + * Then in transaction stop, parameters and updates will be merged + * into one updates buffer. + * + * And also master thandle will be added to the sub_th list, so it will be + * easy to track the commit status. + * + * \param[in] env execution environment + * \param[in] th top transaction handle + * + * \retval 0 if preparation succeeds. + * \retval negative errno if preparation fails. + */ +static int prepare_multiple_node_trans(const struct lu_env *env, + struct top_multiple_thandle *tmt) +{ + struct thandle_update_records *tur; + int rc; + ENTRY; + + if (tmt->tmt_update_records == NULL) { + tur = &update_env_info(env)->uti_tur; + rc = check_and_prepare_update_record(env, tur); + if (rc < 0) + RETURN(rc); + + tmt->tmt_update_records = tur; + distribute_txn_assign_batchid(tmt); + } + + rc = declare_updates_write(env, tmt); + + RETURN(rc); +} + +/** * start the top transaction. * * Start all of its sub transactions, then start master sub transaction. @@ -112,29 +767,132 @@ int top_trans_start(const struct lu_env *env, struct dt_device *master_dev, { struct top_thandle *top_th = container_of(th, struct top_thandle, tt_super); - struct sub_thandle *lst; - int rc; + struct sub_thandle *st; + struct top_multiple_thandle *tmt = top_th->tt_multiple_thandle; + int rc = 0; + ENTRY; + + if (tmt == NULL) { + rc = dt_trans_start(env, top_th->tt_master_sub_thandle->th_dev, + top_th->tt_master_sub_thandle); + RETURN(rc); + } - rc = check_and_prepare_update_record(env, th); + tmt = top_th->tt_multiple_thandle; + rc = prepare_multiple_node_trans(env, tmt); if (rc < 0) - return rc; - - LASSERT(top_th->tt_magic == TOP_THANDLE_MAGIC); - list_for_each_entry(lst, &top_th->tt_sub_thandle_list, st_sub_list) { - lst->st_sub_th->th_sync = th->th_sync; - lst->st_sub_th->th_local = th->th_local; - rc = dt_trans_start(env, lst->st_sub_th->th_dev, - lst->st_sub_th); + RETURN(rc); + + list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) { + if (st->st_sub_th == NULL) + continue; + if (th->th_sync) + st->st_sub_th->th_sync = th->th_sync; + st->st_sub_th->th_local = th->th_local; + st->st_sub_th->th_tags = th->th_tags; + rc = dt_trans_start(env, st->st_sub_th->th_dev, + st->st_sub_th); if (rc != 0) - return rc; + GOTO(out, rc); + + sub_thandle_register_stop_cb(st, tmt); + sub_thandle_register_commit_cb(st, tmt); } +out: + th->th_result = rc; + RETURN(rc); +} +EXPORT_SYMBOL(top_trans_start); + +/** + * Check whether we need write updates record + * + * Check if the updates for the top_thandle needs to be writen + * to all targets. Only if the transaction succeeds and the updates + * number > 2, it will write the updates, + * + * \params [in] top_th top thandle. + * + * \retval true if it needs to write updates + * \retval false if it does not need to write updates + **/ +static bool top_check_write_updates(struct top_thandle *top_th) +{ + struct top_multiple_thandle *tmt; + struct thandle_update_records *tur; + + /* Do not write updates to records if the transaction fails */ + if (top_th->tt_super.th_result != 0) + return false; + + tmt = top_th->tt_multiple_thandle; + if (tmt == NULL) + return false; + + tur = tmt->tmt_update_records; + if (tur == NULL) + return false; - top_th->tt_master_sub_thandle->th_local = th->th_local; - top_th->tt_master_sub_thandle->th_sync = th->th_sync; + /* Hmm, false update records, since the cross-MDT operation + * should includes both local and remote updates, so the + * updates count should >= 2 */ + if (tur->tur_update_records == NULL || + tur->tur_update_records->lur_update_rec.ur_update_count <= 1) + return false; - return dt_trans_start(env, master_dev, top_th->tt_master_sub_thandle); + return true; +} + +/** + * Check if top transaction is stopped + * + * Check if top transaction is stopped, only if all sub transaction + * is stopped, then the top transaction is stopped. + * + * \param [in] top_th top thandle + * + * \retval true if the top transaction is stopped. + * \retval false if the top transaction is not stopped. + */ +static bool top_trans_is_stopped(struct top_thandle *top_th) +{ + struct top_multiple_thandle *tmt; + struct sub_thandle *st; + bool all_stopped = true; + + tmt = top_th->tt_multiple_thandle; + list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) { + if (!st->st_stopped && st->st_sub_th != NULL) { + all_stopped = false; + break; + } + + if (st->st_result != 0 && + top_th->tt_super.th_result == 0) + top_th->tt_super.th_result = st->st_result; + } + + return all_stopped; +} + +/** + * Wait result of top transaction + * + * Wait until all sub transaction get its result. + * + * \param [in] top_th top thandle. + * + * \retval the result of top thandle. + */ +static int top_trans_wait_result(struct top_thandle *top_th) +{ + struct l_wait_info lwi = {0}; + + l_wait_event(top_th->tt_multiple_thandle->tmt_stop_waitq, + top_trans_is_stopped(top_th), &lwi); + + RETURN(top_th->tt_super.th_result); } -EXPORT_SYMBOL(top_trans_start); /** * Stop the top transaction. @@ -152,55 +910,203 @@ EXPORT_SYMBOL(top_trans_start); int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev, struct thandle *th) { - struct sub_thandle *lst; struct top_thandle *top_th = container_of(th, struct top_thandle, tt_super); - struct thandle_update_records *tur = top_th->tt_update_records; - int rc; + struct sub_thandle *st; + struct sub_thandle *master_st; + struct top_multiple_thandle *tmt; + struct thandle_update_records *tur; + bool write_updates = false; + int rc = 0; ENTRY; - /* Note: we always need walk through all of sub_transaction to do - * transaction stop to release the resource here */ - if (tur != NULL) { - rc = merge_params_updates_buf(env, tur); - if (rc == 0) { - struct update_records *record; + if (likely(top_th->tt_multiple_thandle == NULL)) { + LASSERT(master_dev != NULL); + rc = dt_trans_stop(env, master_dev, + top_th->tt_master_sub_thandle); + OBD_FREE_PTR(top_th); + RETURN(rc); + } + + tmt = top_th->tt_multiple_thandle; + tur = tmt->tmt_update_records; + + /* Note: we need stop the master thandle first, then the stop + * callback will fill the master transno in the update logs, + * then these update logs will be sent to other MDTs */ + /* get the master sub thandle */ + master_st = lookup_sub_thandle(tmt, tmt->tmt_master_sub_dt); + write_updates = top_check_write_updates(top_th); - record = &tur->tur_update_records->lur_update_rec; - update_records_dump(record, D_INFO, false); + /* Step 1: write the updates log on Master MDT */ + if (master_st != NULL && master_st->st_sub_th != NULL && + write_updates) { + struct llog_update_record *lur; + + /* Merge the parameters and updates into one buffer */ + rc = prepare_writing_updates(env, tmt); + if (rc < 0) { + CERROR("%s: cannot prepare updates: rc = %d\n", + master_dev->dd_lu_dev.ld_obd->obd_name, rc); + th->th_result = rc; + GOTO(stop_master_trans, rc); + } + + lur = tur->tur_update_records; + /* Write updates to the master MDT */ + rc = sub_updates_write(env, lur, master_st); + + /* Cleanup the common parameters in the update records, + * master transno callback might add more parameters. + * and we need merge the update records again in the + * following */ + if (tur->tur_update_params != NULL) + lur->lur_update_rec.ur_param_count = 0; + + if (rc < 0) { + CERROR("%s: write updates failed: rc = %d\n", + master_dev->dd_lu_dev.ld_obd->obd_name, rc); + th->th_result = rc; + GOTO(stop_master_trans, rc); + } + } + +stop_master_trans: + /* Step 2: Stop the transaction on the master MDT, and fill the + * master transno in the update logs to other MDT. */ + if (master_st != NULL && master_st->st_sub_th != NULL) { + master_st->st_sub_th->th_local = th->th_local; + if (th->th_sync) + master_st->st_sub_th->th_sync = th->th_sync; + master_st->st_sub_th->th_tags = th->th_tags; + master_st->st_sub_th->th_result = th->th_result; + rc = dt_trans_stop(env, master_st->st_dt, master_st->st_sub_th); + if (rc < 0) { + th->th_result = rc; + GOTO(stop_other_trans, rc); + } else if (tur != NULL && tur->tur_update_records != NULL) { + struct llog_update_record *lur; + + lur = tur->tur_update_records; + if (lur->lur_update_rec.ur_master_transno == 0) + /* Update master transno after master stop + * callback */ + lur->lur_update_rec.ur_master_transno = + tgt_th_info(env)->tti_transno; } - top_th->tt_update_records = NULL; } - LASSERT(top_th->tt_magic == TOP_THANDLE_MAGIC); + /* Step 3: write updates to other MDTs */ + if (write_updates) { + struct llog_update_record *lur; - top_th->tt_master_sub_thandle->th_local = th->th_local; - top_th->tt_master_sub_thandle->th_sync = th->th_sync; + /* Stop callback of master will add more updates and also update + * master transno, so merge the parameters and updates into one + * buffer again */ + rc = prepare_writing_updates(env, tmt); + if (rc < 0) { + CERROR("%s: prepare updates failed: rc = %d\n", + master_dev->dd_lu_dev.ld_obd->obd_name, rc); + th->th_result = rc; + GOTO(stop_other_trans, rc); + } + lur = tur->tur_update_records; + list_for_each_entry(st, &tmt->tmt_sub_thandle_list, + st_sub_list) { + if (st->st_sub_th == NULL || st == master_st || + st->st_sub_th->th_result < 0) + continue; - /* To avoid sending RPC while holding thandle, it always stop local - * transaction first, then other sub thandle */ - rc = dt_trans_stop(env, master_dev, top_th->tt_master_sub_thandle); + rc = sub_updates_write(env, lur, st); + if (rc < 0) { + th->th_result = rc; + break; + } + } + } - list_for_each_entry(lst, &top_th->tt_sub_thandle_list, st_sub_list) { - int rc2; +stop_other_trans: + /* Step 4: Stop the transaction on other MDTs */ + list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) { + if (st == master_st || st->st_sub_th == NULL) + continue; - if (rc != 0) - lst->st_sub_th->th_result = rc; - lst->st_sub_th->th_sync = th->th_sync; - lst->st_sub_th->th_local = th->th_local; - rc2 = dt_trans_stop(env, lst->st_sub_th->th_dev, - lst->st_sub_th); - if (unlikely(rc2 < 0 && rc == 0)) - rc = rc2; + if (th->th_sync) + st->st_sub_th->th_sync = th->th_sync; + st->st_sub_th->th_local = th->th_local; + st->st_sub_th->th_tags = th->th_tags; + st->st_sub_th->th_result = th->th_result; + rc = dt_trans_stop(env, st->st_sub_th->th_dev, + st->st_sub_th); + if (unlikely(rc < 0 && th->th_result == 0)) + th->th_result = rc; } - top_thandle_destroy(top_th); + rc = top_trans_wait_result(top_th); + + tmt->tmt_result = rc; + /* Balance for the refcount in top_trans_create, Note: if it is NOT + * multiple node transaction, the top transaction will be destroyed. */ + top_multiple_thandle_put(tmt); + OBD_FREE_PTR(top_th); RETURN(rc); } EXPORT_SYMBOL(top_trans_stop); /** + * Create top_multiple_thandle for top_thandle + * + * Create top_mutilple_thandle to manage the mutiple node transaction + * for top_thandle, and it also needs to add master sub thandle to the + * sub trans list now. + * + * \param[in] env execution environment + * \param[in] top_th the top thandle + * + * \retval 0 if creation succeeds + * \retval negative errno if creation fails + */ +int top_trans_create_tmt(const struct lu_env *env, + struct top_thandle *top_th) +{ + struct top_multiple_thandle *tmt; + + OBD_ALLOC_PTR(tmt); + if (tmt == NULL) + return -ENOMEM; + + tmt->tmt_magic = TOP_THANDLE_MAGIC; + INIT_LIST_HEAD(&tmt->tmt_sub_thandle_list); + INIT_LIST_HEAD(&tmt->tmt_commit_list); + atomic_set(&tmt->tmt_refcount, 1); + spin_lock_init(&tmt->tmt_sub_lock); + init_waitqueue_head(&tmt->tmt_stop_waitq); + + top_th->tt_multiple_thandle = tmt; + + return 0; +} + +static struct sub_thandle * +create_sub_thandle_with_thandle(struct top_thandle *top_th, + struct thandle *sub_th) +{ + struct sub_thandle *st; + + /* create and init sub th to the top trans list */ + st = create_sub_thandle(top_th->tt_multiple_thandle, + sub_th->th_dev); + if (IS_ERR(st)) + return st; + + st->st_sub_th = sub_th; + + sub_th->th_top = &top_th->tt_super; + return st; +} + +/** * Get sub thandle. * * Get sub thandle from the top thandle according to the sub dt_device. @@ -216,66 +1122,583 @@ struct thandle *thandle_get_sub_by_dt(const struct lu_env *env, struct thandle *th, struct dt_device *sub_dt) { - struct sub_thandle *lst; + struct sub_thandle *st = NULL; + struct sub_thandle *master_st = NULL; struct top_thandle *top_th; - struct thandle *sub_th; + struct thandle *sub_th = NULL; + int rc = 0; ENTRY; top_th = container_of(th, struct top_thandle, tt_super); - LASSERT(top_th->tt_magic == TOP_THANDLE_MAGIC); - LASSERT(top_th->tt_master_sub_thandle != NULL); + if (likely(sub_dt == top_th->tt_master_sub_thandle->th_dev)) RETURN(top_th->tt_master_sub_thandle); - /* Find or create the transaction in tt_trans_list, since there is - * always only one thread access the list, so no need lock here */ - list_for_each_entry(lst, &top_th->tt_sub_thandle_list, st_sub_list) { - if (lst->st_sub_th->th_dev == sub_dt) - RETURN(lst->st_sub_th); + if (top_th->tt_multiple_thandle != NULL) { + st = lookup_sub_thandle(top_th->tt_multiple_thandle, sub_dt); + if (st != NULL) + RETURN(st->st_sub_th); } sub_th = dt_trans_create(env, sub_dt); if (IS_ERR(sub_th)) RETURN(sub_th); - /* XXX all of mixed transaction (see struct th_handle) will - * be synchronized until async update is done */ - th->th_sync = 1; + /* Create top_multiple_thandle if necessary */ + if (top_th->tt_multiple_thandle == NULL) { + struct top_multiple_thandle *tmt; - sub_th->th_top = th; - OBD_ALLOC_PTR(lst); - if (lst == NULL) { - dt_trans_stop(env, sub_dt, sub_th); - RETURN(ERR_PTR(-ENOMEM)); + rc = top_trans_create_tmt(env, top_th); + if (rc < 0) + GOTO(stop_trans, rc); + + tmt = top_th->tt_multiple_thandle; + + /* Add master sub th to the top trans list */ + tmt->tmt_master_sub_dt = + top_th->tt_master_sub_thandle->th_dev; + master_st = create_sub_thandle_with_thandle(top_th, + top_th->tt_master_sub_thandle); + if (IS_ERR(master_st)) { + rc = PTR_ERR(master_st); + master_st = NULL; + GOTO(stop_trans, rc); + } } - INIT_LIST_HEAD(&lst->st_sub_list); - lst->st_sub_th = sub_th; - list_add(&lst->st_sub_list, &top_th->tt_sub_thandle_list); - lst->st_record_update = 1; + /* create and init sub th to the top trans list */ + st = create_sub_thandle_with_thandle(top_th, sub_th); + if (IS_ERR(st)) { + rc = PTR_ERR(st); + st = NULL; + GOTO(stop_trans, rc); + } + st->st_sub_th->th_wait_submit = 1; +stop_trans: + if (rc < 0) { + if (master_st != NULL) { + list_del(&master_st->st_sub_list); + OBD_FREE_PTR(master_st); + } + sub_th->th_result = rc; + dt_trans_stop(env, sub_dt, sub_th); + sub_th = ERR_PTR(rc); + } RETURN(sub_th); } EXPORT_SYMBOL(thandle_get_sub_by_dt); /** - * Top thandle destroy + * Top multiple thandle destroy * - * Destroy the top thandle and all of its sub thandle. + * Destroy multiple thandle and all its sub thandle. * - * \param[in] top_th top thandle to be destroyed. + * \param[in] tmt top_multiple_thandle to be destroyed. */ -void top_thandle_destroy(struct top_thandle *top_th) +void top_multiple_thandle_destroy(struct top_multiple_thandle *tmt) { struct sub_thandle *st; struct sub_thandle *tmp; - LASSERT(top_th->tt_magic == TOP_THANDLE_MAGIC); - list_for_each_entry_safe(st, tmp, &top_th->tt_sub_thandle_list, + LASSERT(tmt->tmt_magic == TOP_THANDLE_MAGIC); + list_for_each_entry_safe(st, tmp, &tmt->tmt_sub_thandle_list, st_sub_list) { + struct sub_thandle_cookie *stc; + struct sub_thandle_cookie *tmp; + list_del(&st->st_sub_list); + list_for_each_entry_safe(stc, tmp, &st->st_cookie_list, + stc_list) { + list_del(&stc->stc_list); + OBD_FREE_PTR(stc); + } OBD_FREE_PTR(st); } - OBD_FREE_PTR(top_th); + OBD_FREE_PTR(tmt); +} +EXPORT_SYMBOL(top_multiple_thandle_destroy); + +/** + * Cancel the update log on MDTs + * + * Cancel the update log on MDTs then destroy the thandle. + * + * \param[in] env execution environment + * \param[in] tmt the top multiple thandle whose updates records + * will be cancelled. + * + * \retval 0 if cancellation succeeds. + * \retval negative errno if cancellation fails. + */ +static int distribute_txn_cancel_records(const struct lu_env *env, + struct top_multiple_thandle *tmt) +{ + struct sub_thandle *st; + ENTRY; + + top_multiple_thandle_dump(tmt, D_INFO); + /* Cancel update logs on other MDTs */ + list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) { + struct llog_ctxt *ctxt; + struct obd_device *obd; + struct llog_cookie *cookie; + struct sub_thandle_cookie *stc; + int rc; + + obd = st->st_dt->dd_lu_dev.ld_obd; + ctxt = llog_get_context(obd, LLOG_UPDATELOG_ORIG_CTXT); + if (ctxt == NULL) + continue; + list_for_each_entry(stc, &st->st_cookie_list, stc_list) { + cookie = &stc->stc_cookie; + if (fid_is_zero(&cookie->lgc_lgl.lgl_oi.oi_fid)) + continue; + + rc = llog_cat_cancel_records(env, ctxt->loc_handle, 1, + cookie); + CDEBUG(D_HA, "%s: batchid %llu cancel update log " + DOSTID ".%u : rc = %d\n", obd->obd_name, + tmt->tmt_batchid, + POSTID(&cookie->lgc_lgl.lgl_oi), + cookie->lgc_index, rc); + } + + llog_ctxt_put(ctxt); + } + + RETURN(0); +} + +/** + * Check if there are committed transaction + * + * Check if there are committed transaction in the distribute transaction + * list, then cancel the update records for those committed transaction. + * Because the distribute transaction in the list are sorted by batchid, + * and cancellation will be done by batchid order, so we only check the first + * the transaction(with lowest batchid) in the list. + * + * \param[in] lod lod device where cancel thread is + * + * \retval true if it is ready + * \retval false if it is not ready + */ +static bool tdtd_ready_for_cancel_log(struct target_distribute_txn_data *tdtd) +{ + struct top_multiple_thandle *tmt = NULL; + struct obd_device *obd = tdtd->tdtd_lut->lut_obd; + bool ready = false; + + spin_lock(&tdtd->tdtd_batchid_lock); + if (!list_empty(&tdtd->tdtd_list)) { + tmt = list_entry(tdtd->tdtd_list.next, + struct top_multiple_thandle, tmt_commit_list); + if (tmt->tmt_committed && + (!obd->obd_recovering || (obd->obd_recovering && + tmt->tmt_batchid <= tdtd->tdtd_committed_batchid))) + ready = true; + } + spin_unlock(&tdtd->tdtd_batchid_lock); + + return ready; +} + +struct distribute_txn_bid_data { + struct dt_txn_commit_cb dtbd_cb; + struct target_distribute_txn_data *dtbd_tdtd; + __u64 dtbd_batchid; +}; + +/** + * callback of updating commit batchid + * + * Updating commit batchid then wake up the commit thread to cancel the + * records. + * + * \param[in]env execution environment + * \param[in]th thandle to updating commit batchid + * \param[in]cb commit callback + * \param[in]err result of thandle + */ +static void distribute_txn_batchid_cb(struct lu_env *env, + struct thandle *th, + struct dt_txn_commit_cb *cb, + int err) +{ + struct distribute_txn_bid_data *dtbd = NULL; + struct target_distribute_txn_data *tdtd; + + dtbd = container_of0(cb, struct distribute_txn_bid_data, dtbd_cb); + tdtd = dtbd->dtbd_tdtd; + + CDEBUG(D_HA, "%s: %llu batchid updated\n", + tdtd->tdtd_lut->lut_obd->obd_name, dtbd->dtbd_batchid); + spin_lock(&tdtd->tdtd_batchid_lock); + if (dtbd->dtbd_batchid > tdtd->tdtd_committed_batchid && + !tdtd->tdtd_lut->lut_obd->obd_no_transno) + tdtd->tdtd_committed_batchid = dtbd->dtbd_batchid; + spin_unlock(&tdtd->tdtd_batchid_lock); + atomic_dec(&tdtd->tdtd_refcount); + wake_up(&tdtd->tdtd_commit_thread_waitq); + + OBD_FREE_PTR(dtbd); +} + +/** + * Update the commit batchid in disk + * + * Update commit batchid in the disk, after this is committed, it can start + * to cancel the update records. + * + * \param[in] env execution environment + * \param[in] tdtd distribute transaction structure + * \param[in] batchid commit batchid to be updated + * + * \retval 0 if update succeeds. + * \retval negative errno if update fails. + */ +static int +distribute_txn_commit_batchid_update(const struct lu_env *env, + struct target_distribute_txn_data *tdtd, + __u64 batchid) +{ + struct distribute_txn_bid_data *dtbd = NULL; + struct thandle *th; + struct lu_buf buf; + __u64 tmp; + __u64 off; + int rc; + ENTRY; + + OBD_ALLOC_PTR(dtbd); + if (dtbd == NULL) + RETURN(-ENOMEM); + dtbd->dtbd_batchid = batchid; + dtbd->dtbd_tdtd = tdtd; + dtbd->dtbd_cb.dcb_func = distribute_txn_batchid_cb; + atomic_inc(&tdtd->tdtd_refcount); + + th = dt_trans_create(env, tdtd->tdtd_lut->lut_bottom); + if (IS_ERR(th)) { + OBD_FREE_PTR(dtbd); + RETURN(PTR_ERR(th)); + } + + tmp = cpu_to_le64(batchid); + buf.lb_buf = &tmp; + buf.lb_len = sizeof(tmp); + off = 0; + + rc = dt_declare_record_write(env, tdtd->tdtd_batchid_obj, &buf, off, + th); + if (rc < 0) + GOTO(stop, rc); + + rc = dt_trans_start_local(env, tdtd->tdtd_lut->lut_bottom, th); + if (rc < 0) + GOTO(stop, rc); + + rc = dt_trans_cb_add(th, &dtbd->dtbd_cb); + if (rc < 0) + GOTO(stop, rc); + + rc = dt_record_write(env, tdtd->tdtd_batchid_obj, &buf, + &off, th); + + CDEBUG(D_INFO, "%s: update batchid "LPU64": rc = %d\n", + tdtd->tdtd_lut->lut_obd->obd_name, batchid, rc); + +stop: + dt_trans_stop(env, tdtd->tdtd_lut->lut_bottom, th); + if (rc < 0) + OBD_FREE_PTR(dtbd); + RETURN(rc); +} + +/** + * Init commit batchid for distribute transaction. + * + * Initialize the batchid object and get commit batchid from the object. + * + * \param[in] env execution environment + * \param[in] tdtd distribute transaction whose batchid is initialized. + * + * \retval 0 if initialization succeeds. + * \retval negative errno if initialization fails. + **/ +static int +distribute_txn_commit_batchid_init(const struct lu_env *env, + struct target_distribute_txn_data *tdtd) +{ + struct tgt_thread_info *tti = tgt_th_info(env); + struct lu_target *lut = tdtd->tdtd_lut; + struct lu_attr *attr = &tti->tti_attr; + struct lu_fid *fid = &tti->tti_fid1; + struct dt_object_format *dof = &tti->tti_u.update.tti_update_dof; + struct dt_object *dt_obj = NULL; + struct lu_buf buf; + __u64 tmp; + __u64 off; + int rc; + ENTRY; + + memset(attr, 0, sizeof(*attr)); + attr->la_valid = LA_MODE; + attr->la_mode = S_IFREG | S_IRUGO | S_IWUSR; + dof->dof_type = dt_mode_to_dft(S_IFREG); + + lu_local_obj_fid(fid, BATCHID_COMMITTED_OID); + + dt_obj = dt_find_or_create(env, lut->lut_bottom, fid, dof, + attr); + if (IS_ERR(dt_obj)) { + rc = PTR_ERR(dt_obj); + dt_obj = NULL; + GOTO(out_put, rc); + } + + tdtd->tdtd_batchid_obj = dt_obj; + + buf.lb_buf = &tmp; + buf.lb_len = sizeof(tmp); + off = 0; + rc = dt_read(env, dt_obj, &buf, &off); + if (rc < 0 || (rc < buf.lb_len && rc > 0)) { + CERROR("%s can't read last committed batchid: rc = %d\n", + tdtd->tdtd_lut->lut_obd->obd_name, rc); + if (rc > 0) + rc = -EINVAL; + GOTO(out_put, rc); + } else if (rc == buf.lb_len) { + tdtd->tdtd_committed_batchid = le64_to_cpu(tmp); + CDEBUG(D_HA, "%s: committed batchid %llu\n", + tdtd->tdtd_lut->lut_obd->obd_name, + tdtd->tdtd_committed_batchid); + rc = 0; + } + +out_put: + if (rc < 0 && dt_obj != NULL) { + lu_object_put(env, &dt_obj->do_lu); + tdtd->tdtd_batchid_obj = NULL; + } + return rc; +} + +/** + * manage the distribute transaction thread + * + * Distribute transaction are linked to the list, and once the distribute + * transaction is committed, it will update the last committed batchid first, + * after it is committed, it will cancel the records. + * + * \param[in] _arg argument for commit thread + * + * \retval 0 if thread is running successfully + * \retval negative errno if the thread can not be run. + */ +static int distribute_txn_commit_thread(void *_arg) +{ + struct target_distribute_txn_data *tdtd = _arg; + struct lu_target *lut = tdtd->tdtd_lut; + struct ptlrpc_thread *thread = &lut->lut_tdtd_commit_thread; + struct l_wait_info lwi = { 0 }; + struct lu_env env; + struct list_head list; + int rc; + struct top_multiple_thandle *tmt; + struct top_multiple_thandle *tmp; + __u64 batchid = 0, committed; + + ENTRY; + + rc = lu_env_init(&env, LCT_LOCAL | LCT_MD_THREAD); + if (rc != 0) + RETURN(rc); + + spin_lock(&tdtd->tdtd_batchid_lock); + thread->t_flags = SVC_RUNNING; + spin_unlock(&tdtd->tdtd_batchid_lock); + wake_up(&thread->t_ctl_waitq); + INIT_LIST_HEAD(&list); + + CDEBUG(D_HA, "%s: start commit thread committed batchid "LPU64"\n", + tdtd->tdtd_lut->lut_obd->obd_name, + tdtd->tdtd_committed_batchid); + + while (distribute_txn_commit_thread_running(lut)) { + spin_lock(&tdtd->tdtd_batchid_lock); + list_for_each_entry_safe(tmt, tmp, &tdtd->tdtd_list, + tmt_commit_list) { + if (tmt->tmt_committed == 0) + break; + + /* Note: right now, replay is based on master MDT + * transno, but cancellation is based on batchid. + * so we do not try to cancel the update log until + * the recoverying is done, unless the update records + * batchid < committed_batchid. */ + if (tmt->tmt_batchid <= tdtd->tdtd_committed_batchid) { + list_move_tail(&tmt->tmt_commit_list, &list); + } else if (!tdtd->tdtd_lut->lut_obd->obd_recovering) { + LASSERTF(tmt->tmt_batchid >= batchid, + "tmt %p tmt_batchid: "LPU64", batchid " + LPU64"\n", tmt, tmt->tmt_batchid, + batchid); + /* There are three types of distribution + * transaction result + * + * 1. If tmt_result < 0, it means the + * distribution transaction fails, which should + * be rare, because once declare phase succeeds, + * the operation should succeeds anyway. Note in + * this case, we will still update batchid so + * cancellation would be stopped. + * + * 2. If tmt_result == 0, it means the + * distribution transaction succeeds, and we + * will update batchid. + * + * 3. If tmt_result > 0, it means distribute + * transaction is not yet committed on every + * node, but we need release this tmt before + * that, which usuually happens during umount. + */ + if (tmt->tmt_result <= 0) + batchid = tmt->tmt_batchid; + list_move_tail(&tmt->tmt_commit_list, &list); + } + } + spin_unlock(&tdtd->tdtd_batchid_lock); + + CDEBUG(D_HA, "%s: batchid: "LPU64" committed batchid " + LPU64"\n", tdtd->tdtd_lut->lut_obd->obd_name, batchid, + tdtd->tdtd_committed_batchid); + /* update globally committed on a storage */ + if (batchid > tdtd->tdtd_committed_batchid) { + rc = distribute_txn_commit_batchid_update(&env, tdtd, + batchid); + if (rc == 0) + batchid = 0; + } + /* cancel the records for committed batchid's */ + /* XXX: should we postpone cancel's till the end of recovery? */ + committed = tdtd->tdtd_committed_batchid; + list_for_each_entry_safe(tmt, tmp, &list, tmt_commit_list) { + if (tmt->tmt_batchid > committed) + break; + list_del_init(&tmt->tmt_commit_list); + if (tmt->tmt_result <= 0) + distribute_txn_cancel_records(&env, tmt); + top_multiple_thandle_put(tmt); + } + + l_wait_event(tdtd->tdtd_commit_thread_waitq, + !distribute_txn_commit_thread_running(lut) || + committed < tdtd->tdtd_committed_batchid || + tdtd_ready_for_cancel_log(tdtd), &lwi); + }; + + l_wait_event(tdtd->tdtd_commit_thread_waitq, + atomic_read(&tdtd->tdtd_refcount) == 0, &lwi); + + spin_lock(&tdtd->tdtd_batchid_lock); + list_for_each_entry_safe(tmt, tmp, &tdtd->tdtd_list, + tmt_commit_list) + list_move_tail(&tmt->tmt_commit_list, &list); + spin_unlock(&tdtd->tdtd_batchid_lock); + + CDEBUG(D_INFO, "%s stopping distribute txn commit thread.\n", + tdtd->tdtd_lut->lut_obd->obd_name); + list_for_each_entry_safe(tmt, tmp, &list, tmt_commit_list) { + list_del_init(&tmt->tmt_commit_list); + top_multiple_thandle_dump(tmt, D_HA); + top_multiple_thandle_put(tmt); + } + + thread->t_flags = SVC_STOPPED; + lu_env_fini(&env); + wake_up(&thread->t_ctl_waitq); + + RETURN(0); +} + +/** + * Start llog cancel thread + * + * Start llog cancel(master/slave) thread on LOD + * + * \param[in]lclt cancel log thread to be started. + * + * \retval 0 if the thread is started successfully. + * \retval negative errno if the thread is not being + * started. + */ +int distribute_txn_init(const struct lu_env *env, + struct lu_target *lut, + struct target_distribute_txn_data *tdtd, + __u32 index) +{ + struct task_struct *task; + struct l_wait_info lwi = { 0 }; + int rc; + ENTRY; + + spin_lock_init(&tdtd->tdtd_batchid_lock); + INIT_LIST_HEAD(&tdtd->tdtd_list); + + tdtd->tdtd_batchid = lut->lut_last_transno + 1; + + init_waitqueue_head(&lut->lut_tdtd_commit_thread.t_ctl_waitq); + init_waitqueue_head(&tdtd->tdtd_commit_thread_waitq); + atomic_set(&tdtd->tdtd_refcount, 0); + + tdtd->tdtd_lut = lut; + rc = distribute_txn_commit_batchid_init(env, tdtd); + if (rc != 0) + RETURN(rc); + + task = kthread_run(distribute_txn_commit_thread, tdtd, "tdtd-%u", + index); + if (IS_ERR(task)) + RETURN(PTR_ERR(task)); + + l_wait_event(lut->lut_tdtd_commit_thread.t_ctl_waitq, + distribute_txn_commit_thread_running(lut) || + distribute_txn_commit_thread_stopped(lut), &lwi); + RETURN(0); +} +EXPORT_SYMBOL(distribute_txn_init); + +/** + * Stop llog cancel thread + * + * Stop llog cancel(master/slave) thread on LOD and also destory + * all of transaction in the list. + * + * \param[in]lclt cancel log thread to be stopped. + */ +void distribute_txn_fini(const struct lu_env *env, + struct target_distribute_txn_data *tdtd) +{ + struct lu_target *lut = tdtd->tdtd_lut; + + /* Stop cancel thread */ + if (lut == NULL || !distribute_txn_commit_thread_running(lut)) + return; + + spin_lock(&tdtd->tdtd_batchid_lock); + lut->lut_tdtd_commit_thread.t_flags = SVC_STOPPING; + spin_unlock(&tdtd->tdtd_batchid_lock); + wake_up(&tdtd->tdtd_commit_thread_waitq); + wait_event(lut->lut_tdtd_commit_thread.t_ctl_waitq, + lut->lut_tdtd_commit_thread.t_flags & SVC_STOPPED); + + dtrq_list_destroy(tdtd); + if (tdtd->tdtd_batchid_obj != NULL) { + lu_object_put(env, &tdtd->tdtd_batchid_obj->do_lu); + tdtd->tdtd_batchid_obj = NULL; + } } -EXPORT_SYMBOL(top_thandle_destroy); +EXPORT_SYMBOL(distribute_txn_fini);