From: Wang Di Date: Thu, 21 Aug 2014 22:39:19 +0000 (-0700) Subject: LU-3536 lod: cancel update log after all committed X-Git-Tag: 2.7.55~30 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=cd764a5462697261a9a6b1e6c6858c75d969bae1 LU-3536 lod: cancel update log after all committed Cancel the log cookies after all of updates have been committed to disk. In DNE2, the distributed operation will write update log in all of involve MDTs, when these the updates are committed on all of MDT, these update records will be cancelled. If one of these MDTs restart during the operation, it will replay the updates based on these update records. The distribute operation will be represent as distribute transaction (top_multiple_thandle), and all of distribute transaction will be linked into a list, which is attached to lu_target (see struct target_distribute_txn_data), after distribute thandle is committed, the commit thread (distribute_txn_commit_thread) will update the commit batchid, after it is committed, it will then cancel the update records. Change-Id: I3023f95515b2c15307479f7f85c3f13926d4bcc9 Signed-off-by: Wang Di Signed-off-by: Alex Zhuravlev Reviewed-on: http://review.whamcloud.com/11572 Tested-by: Jenkins Tested-by: Maloo Reviewed-by: Lai Siyao Reviewed-by: Oleg Drokin --- diff --git a/lustre/fid/fid_handler.c b/lustre/fid/fid_handler.c index fe94281..1046b3c 100644 --- a/lustre/fid/fid_handler.c +++ b/lustre/fid/fid_handler.c @@ -258,17 +258,27 @@ static int range_alloc_set(const struct lu_env *env, RETURN(rc); } -static int __seq_server_alloc_meta(struct lu_server_seq *seq, - struct lu_seq_range *out, - const struct lu_env *env) +/** + * Check if the sequence server has sequence avaible + * + * Check if the sequence server has sequence avaible, if not, then + * allocating super sequence from sequence manager (MDT0). + * + * \param[in] env execution environment + * \param[in] seq server sequence + * + * \retval negative errno if allocating new sequence fails + * \retval 0 if there is enough sequence or allocating + * new sequence succeeds + */ +int seq_server_check_and_alloc_super(const struct lu_env *env, + struct lu_server_seq *seq) { struct lu_seq_range *space = &seq->lss_space; int rc = 0; ENTRY; - LASSERT(range_is_sane(space)); - /* Check if available space ends and allocate new super seq */ if (range_is_exhausted(space)) { if (!seq->lss_cli) { @@ -279,8 +289,8 @@ static int __seq_server_alloc_meta(struct lu_server_seq *seq, rc = seq_client_alloc_super(seq->lss_cli, env); if (rc) { - CERROR("%s: Can't allocate super-sequence, rc %d\n", - seq->lss_name, rc); + CDEBUG(D_HA, "%s: Can't allocate super-sequence:" + " rc %d\n", seq->lss_name, rc); RETURN(rc); } @@ -298,6 +308,31 @@ static int __seq_server_alloc_meta(struct lu_server_seq *seq, } } + if (range_is_zero(&seq->lss_lowater_set)) + __seq_set_init(env, seq); + + RETURN(rc); +} +EXPORT_SYMBOL(seq_server_check_and_alloc_super); + +static int __seq_server_alloc_meta(struct lu_server_seq *seq, + struct lu_seq_range *out, + const struct lu_env *env) +{ + struct lu_seq_range *space = &seq->lss_space; + int rc = 0; + + ENTRY; + + LASSERT(range_is_sane(space)); + + rc = seq_server_check_and_alloc_super(env, seq); + if (rc < 0) { + CERROR("%s: Allocated super-sequence failed: rc = %d\n", + seq->lss_name, rc); + RETURN(rc); + } + rc = range_alloc_set(env, out, seq); if (rc != 0) { CERROR("%s: Allocated meta-sequence failed: rc = %d\n", @@ -324,6 +359,7 @@ int seq_server_alloc_meta(struct lu_server_seq *seq, RETURN(rc); } +EXPORT_SYMBOL(seq_server_alloc_meta); static int seq_server_handle(struct lu_site *site, const struct lu_env *env, diff --git a/lustre/fld/fld_handler.c b/lustre/fld/fld_handler.c index 417f103..b38f306 100644 --- a/lustre/fld/fld_handler.c +++ b/lustre/fld/fld_handler.c @@ -474,6 +474,7 @@ int fld_server_init(const struct lu_env *env, struct lu_server_fld *fld, fld->lsf_control_exp = NULL; fld->lsf_seq_lookup = fld_server_lookup; + fld->lsf_seq_lookup = fld_server_lookup; RETURN(0); out_index: fld_index_fini(env, fld); diff --git a/lustre/include/dt_object.h b/lustre/include/dt_object.h index e5f782c..4921873 100644 --- a/lustre/include/dt_object.h +++ b/lustre/include/dt_object.h @@ -104,10 +104,13 @@ typedef void (*dt_cb_t)(struct lu_env *env, struct thandle *th, #define TRANS_COMMIT_CB_MAGIC 0xa0a00a0a #define MAX_COMMIT_CB_STR_LEN 32 +#define DCB_TRANS_NOT_COMMITTED 0x1 struct dt_txn_commit_cb { struct list_head dcb_linkage; dt_cb_t dcb_func; + void *dcb_data; __u32 dcb_magic; + __u32 dcb_flags; char dcb_name[MAX_COMMIT_CB_STR_LEN]; }; @@ -1794,9 +1797,13 @@ struct thandle { /** the dt device on which the transactions are executed */ struct dt_device *th_dev; - /* In some callback function, it needs to access the top_th directly */ - struct thandle *th_top; - + /* point to the top thandle, XXX this is a bit hacky right now, + * but normal device trans callback triggered by the bottom + * device (OSP/OSD == sub thandle layer) needs to get the + * top_thandle (see dt_txn_hook_start/stop()), so we put the + * top thandle here for now, will fix it when we have better + * callback mechanism */ + struct thandle *th_top; /** context for this transaction, tag is LCT_TX_HANDLE */ struct lu_context th_ctx; @@ -1810,7 +1817,9 @@ struct thandle { /** whether we need sync commit */ unsigned int th_sync:1, /* local transation, no need to inform other layers */ - th_local:1; + th_local:1, + /* Whether we need wait the transaction to be submitted */ + th_wait_submit:1; }; /** diff --git a/lustre/include/lu_target.h b/lustre/include/lu_target.h index d0580c2..2e56849 100644 --- a/lustre/include/lu_target.h +++ b/lustre/include/lu_target.h @@ -43,10 +43,32 @@ #include #include +struct target_distribute_txn_data { + /* Distribution ID is used to identify updates log on different + * MDTs for one operation */ + spinlock_t tdtd_batchid_lock; + __u64 tdtd_batchid; + struct lu_target *tdtd_lut; + struct dt_object *tdtd_batchid_obj; + + /* Committed batchid for distribute transaction */ + __u64 tdtd_committed_batchid; + + /* List for distribute transaction */ + struct list_head tdtd_list; + + /* Threads to manage distribute transaction */ + wait_queue_head_t tdtd_commit_thread_waitq; + atomic_t tdtd_refcount; +}; + struct lu_target { struct obd_device *lut_obd; struct dt_device *lut_bottom; + struct target_distribute_txn_data *lut_tdtd; + struct ptlrpc_thread lut_tdtd_commit_thread; + /* supported opcodes and handlers for this target */ struct tgt_opc_slice *lut_slice; __u32 lut_reply_fail_id; @@ -333,6 +355,15 @@ int tgt_server_data_update(const struct lu_env *env, struct lu_target *tg, int tgt_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tg, loff_t off); +/* target/update_trans.c */ +int distribute_txn_init(const struct lu_env *env, + struct lu_target *lut, + struct target_distribute_txn_data *tdtd, + __u32 index); +void distribute_txn_fini(const struct lu_env *env, + struct target_distribute_txn_data *tdtd); + + enum { ESERIOUS = 0x0001000 }; diff --git a/lustre/include/lustre_fid.h b/lustre/include/lustre_fid.h index f325849..e5ef46a 100644 --- a/lustre/include/lustre_fid.h +++ b/lustre/include/lustre_fid.h @@ -239,6 +239,7 @@ enum local_oid { /* This definition is obsolete * SLAVE_LLOG_CATALOGS_OID = 4124UL, */ + BATCHID_COMMITTED_OID = 4125UL, }; static inline void lu_local_obj_fid(struct lu_fid *fid, __u32 oid) @@ -501,6 +502,8 @@ int seq_server_set_cli(const struct lu_env *env, struct lu_server_seq *seq, struct lu_client_seq *cli); +int seq_server_check_and_alloc_super(const struct lu_env *env, + struct lu_server_seq *seq); /* Client methods */ int seq_client_init(struct lu_client_seq *seq, struct obd_export *exp, diff --git a/lustre/include/lustre_update.h b/lustre/include/lustre_update.h index 4cb87e0..8a42fb8 100644 --- a/lustre/include/lustre_update.h +++ b/lustre/include/lustre_update.h @@ -333,36 +333,50 @@ struct thandle_update_records { }; #define TOP_THANDLE_MAGIC 0x20140917 +struct top_multiple_thandle { + struct dt_device *tmt_master_sub_dt; + atomic_t tmt_refcount; + /* Other sub transactions will be listed here. */ + struct list_head tmt_sub_thandle_list; + + struct list_head tmt_commit_list; + /* All of update records will packed here */ + struct thandle_update_records *tmt_update_records; + + __u64 tmt_batchid; + int tmt_result; + __u32 tmt_magic; + __u32 tmt_committed:1; +}; + /* {top,sub}_thandle are used to manage distributed transactions which * include updates on several nodes. A top_handle represents the * whole operation, and sub_thandle represents updates on each node. */ struct top_thandle { struct thandle tt_super; - __u32 tt_magic; - atomic_t tt_refcount; /* The master sub transaction. */ struct thandle *tt_master_sub_thandle; - /* Other sub thandle will be listed here. */ - struct list_head tt_sub_thandle_list; - - /* All of update records will packed here */ - struct thandle_update_records *tt_update_records; + struct top_multiple_thandle *tt_multiple_thandle; }; +/* Sub thandle is used to track multiple sub thandles under one parent + * thandle */ struct sub_thandle { - /* point to the osd/osp_thandle */ struct thandle *st_sub_th; + struct dt_device *st_dt; + struct list_head st_list; + struct llog_cookie st_cookie; + struct dt_txn_commit_cb st_commit_dcb; + int st_result; /* linked to top_thandle */ struct list_head st_sub_list; /* If this sub thandle is committed */ - bool st_committed:1, - st_record_update:1; + bool st_committed:1; }; - /* target/out_lib.c */ int out_update_pack(const struct lu_env *env, struct object_update *update, size_t max_update_size, enum update_type op, @@ -433,14 +447,25 @@ thandle_get_sub(const struct lu_env *env, struct thandle *th, struct thandle * top_trans_create(const struct lu_env *env, struct dt_device *master_dev); - int top_trans_start(const struct lu_env *env, struct dt_device *master_dev, struct thandle *th); - int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev, struct thandle *th); +void top_multiple_thandle_destroy(struct top_multiple_thandle *tmt); + +static inline void top_multiple_thandle_get(struct top_multiple_thandle *tmt) +{ + atomic_inc(&tmt->tmt_refcount); +} + +static inline void top_multiple_thandle_put(struct top_multiple_thandle *tmt) +{ + if (atomic_dec_and_test(&tmt->tmt_refcount)) + top_multiple_thandle_destroy(tmt); +} -void top_thandle_destroy(struct top_thandle *top_th); +struct sub_thandle *lookup_sub_thandle(struct top_multiple_thandle *tmt, + struct dt_device *dt_dev); /* update_records.c */ int update_records_create_pack(const struct lu_env *env, @@ -549,16 +574,13 @@ int tur_update_records_extend(struct thandle_update_records *tur, size_t new_size); int tur_update_params_extend(struct thandle_update_records *tur, size_t new_size); -int check_and_prepare_update_record(const struct lu_env *env, - struct thandle *th); -int merge_params_updates_buf(const struct lu_env *env, - struct thandle_update_records *tur); int tur_update_extend(struct thandle_update_records *tur, size_t new_op_size, size_t new_param_size); #define update_record_pack(name, th, ...) \ ({ \ struct top_thandle *top_th; \ + struct top_multiple_thandle *tmt; \ struct thandle_update_records *tur; \ struct llog_update_record *lur; \ size_t avail_param_size; \ @@ -567,7 +589,8 @@ int tur_update_extend(struct thandle_update_records *tur, \ while (1) { \ top_th = container_of(th, struct top_thandle, tt_super);\ - tur = top_th->tt_update_records; \ + tmt = top_th->tt_multiple_thandle; \ + tur = tmt->tmt_update_records; \ lur = tur->tur_update_records; \ avail_param_size = tur->tur_update_params_buf_size - \ update_params_size(tur->tur_update_params, \ diff --git a/lustre/lod/lod_dev.c b/lustre/lod/lod_dev.c index cd7a97d..88d5800 100644 --- a/lustre/lod/lod_dev.c +++ b/lustre/lod/lod_dev.c @@ -153,6 +153,9 @@ int lod_fld_lookup(const struct lu_env *env, struct lod_device *lod, } server_fld = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_server_fld; + if (server_fld == NULL) + RETURN(-EIO); + fld_range_set_type(&range, *type); rc = fld_server_lookup(env, server_fld, fid_seq(fid), &range); if (rc != 0) @@ -170,6 +173,8 @@ int lod_fld_lookup(const struct lu_env *env, struct lod_device *lod, /* Slab for OSD object allocation */ struct kmem_cache *lod_object_kmem; +/* Slab for dt_txn_callback */ +struct kmem_cache *lod_txn_callback_kmem; static struct lu_kmem_descr lod_caches[] = { { .ckd_cache = &lod_object_kmem, @@ -177,6 +182,11 @@ static struct lu_kmem_descr lod_caches[] = { .ckd_size = sizeof(struct lod_object) }, { + .ckd_cache = &lod_txn_callback_kmem, + .ckd_name = "lod_txn_callback", + .ckd_size = sizeof(struct dt_txn_callback) + }, + { .ckd_cache = NULL } }; @@ -212,7 +222,7 @@ static struct lu_object *lod_object_alloc(const struct lu_env *env, } /** - * Cleanup table of target's descriptors. + * Process the config log for all sub device. * * The function goes through all the targets in the given table * and apply given configuration command on to the targets. @@ -318,6 +328,44 @@ out: } /** + * finish sub llog context + * + * Stop update recovery thread for the sub device, then cleanup the + * correspondent llog ctxt. + * + * \param[in] env execution environment + * \param[in] lod lod device to do update recovery + * \param[in] thread recovery thread on this sub device + */ +void lod_sub_fini_llog(const struct lu_env *env, + struct dt_device *dt, struct ptlrpc_thread *thread) +{ + struct obd_device *obd; + struct llog_ctxt *ctxt; + ENTRY; + + obd = dt->dd_lu_dev.ld_obd; + CDEBUG(D_INFO, "%s: finish sub llog\n", obd->obd_name); + /* Stop recovery thread first */ + if (thread != NULL && thread->t_flags & SVC_RUNNING) { + thread->t_flags = SVC_STOPPING; + wake_up(&thread->t_ctl_waitq); + wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_STOPPED); + } + + ctxt = llog_get_context(obd, LLOG_UPDATELOG_ORIG_CTXT); + if (ctxt == NULL) + RETURN_EXIT; + + if (ctxt->loc_handle != NULL) + llog_cat_close(env, ctxt->loc_handle); + + llog_cleanup(env, ctxt); + + RETURN_EXIT; +} + +/** * Extract MDT target index from a device name. * * a helper function to extract index from the given device name @@ -405,16 +453,13 @@ int lod_sub_init_llog(const struct lu_env *env, struct lod_device *lod, if (lod->lod_child == dt) { thread = &lod->lod_child_recovery_thread; rc = lodname2mdt_index(lod2obd(lod)->obd_name, &index); - if (rc != 0) { - OBD_FREE_PTR(lrd); - RETURN(rc); - } + if (rc != 0) + GOTO(free_lrd, rc); } else { struct lod_tgt_descs *ltd = &lod->lod_mdt_descs; struct lod_tgt_desc *tgt = NULL; unsigned int i; - mutex_lock(<d->ltd_mutex); cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) { tgt = LTD_TGT(ltd, i); if (tgt->ltd_tgt == dt) { @@ -423,13 +468,12 @@ int lod_sub_init_llog(const struct lu_env *env, struct lod_device *lod, break; } } - mutex_unlock(<d->ltd_mutex); - OBD_ALLOC_PTR(tgt->ltd_recovery_thread); - if (tgt->ltd_recovery_thread == NULL) { - OBD_FREE_PTR(lrd); - RETURN(-ENOMEM); - } - thread = tgt->ltd_recovery_thread; + LASSERT(sub_ltd != NULL); + OBD_ALLOC_PTR(sub_ltd->ltd_recovery_thread); + if (sub_ltd->ltd_recovery_thread == NULL) + GOTO(free_lrd, rc = -ENOMEM); + + thread = sub_ltd->ltd_recovery_thread; } lrd->lrd_lod = lod; @@ -445,8 +489,7 @@ int lod_sub_init_llog(const struct lu_env *env, struct lod_device *lod, if (rc < 0) { CERROR("%s: cannot setup updatelog llog: rc = %d\n", obd->obd_name, rc); - OBD_FREE_PTR(lrd); - RETURN(rc); + GOTO(free_thread, rc); } /* Start the recovery thread */ @@ -454,7 +497,6 @@ int lod_sub_init_llog(const struct lu_env *env, struct lod_device *lod, index); if (IS_ERR(task)) { rc = PTR_ERR(task); - OBD_FREE_PTR(lrd); CERROR("%s: cannot start recovery thread: rc = %d\n", obd->obd_name, rc); GOTO(out_llog, rc); @@ -462,46 +504,18 @@ int lod_sub_init_llog(const struct lu_env *env, struct lod_device *lod, l_wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_RUNNING || thread->t_flags & SVC_STOPPED, &lwi); -out_llog: - if (rc != 0) - lod_sub_fini_llog(env, dt, thread); - - RETURN(rc); -} -/** - * finish sub llog context - * - * Stop update recovery thread for the sub device, then cleanup the - * correspondent llog ctxt. - * - * \param[in] env execution environment - * \param[in] dt dt device(lod) to do update recovery - * \param[in] thread recovery thread on this sub device - */ -void lod_sub_fini_llog(const struct lu_env *env, - struct dt_device *dt, struct ptlrpc_thread *thread) -{ - struct obd_device *obd; - struct llog_ctxt *ctxt; - - CDEBUG(D_INFO, "%s: finish sub llog\n", dt->dd_lu_dev.ld_obd->obd_name); - /* Stop recovery thread first */ - if (thread != NULL && thread->t_flags & SVC_RUNNING) { - thread->t_flags = SVC_STOPPING; - wake_up(&thread->t_ctl_waitq); - wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_STOPPED); + RETURN(0); +out_llog: + lod_sub_fini_llog(env, dt, thread); +free_thread: + if (lod->lod_child != dt) { + OBD_FREE_PTR(sub_ltd->ltd_recovery_thread); + sub_ltd->ltd_recovery_thread = NULL; } - - obd = dt->dd_lu_dev.ld_obd; - ctxt = llog_get_context(obd, LLOG_UPDATELOG_ORIG_CTXT); - if (ctxt == NULL) - return; - - if (ctxt->loc_handle != NULL) - llog_cat_close(env, ctxt->loc_handle); - - llog_cleanup(env, ctxt); +free_lrd: + OBD_FREE_PTR(lrd); + RETURN(rc); } /** @@ -512,15 +526,16 @@ void lod_sub_fini_llog(const struct lu_env *env, * \param[in] env execution environment * \param[in] lod lod device to do update recovery */ -void lod_sub_fini_all_llogs(const struct lu_env *env, struct lod_device *lod) +static void lod_sub_fini_all_llogs(const struct lu_env *env, + struct lod_device *lod) { struct lod_tgt_descs *ltd = &lod->lod_mdt_descs; unsigned int i; /* Stop the update log commit cancel threads and finish master * llog ctxt */ - lod_sub_fini_llog(env, lod->lod_child, &lod->lod_child_recovery_thread); - + lod_sub_fini_llog(env, lod->lod_child, + &lod->lod_child_recovery_thread); lod_getref(ltd); cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) { struct lod_tgt_desc *tgt; @@ -538,6 +553,71 @@ void lod_sub_fini_all_llogs(const struct lu_env *env, struct lod_device *lod) } /** + * Prepare distribute txn + * + * Prepare distribute txn structure for LOD + * + * \param[in] env execution environment + * \param[in] lod_device LOD device + * + * \retval 0 if preparation succeeds. + * \retval negative errno if preparation fails. + */ +static int lod_prepare_distribute_txn(const struct lu_env *env, + struct lod_device *lod) +{ + struct target_distribute_txn_data *tdtd; + struct lu_target *lut; + int rc; + ENTRY; + + /* Init update recovery data */ + OBD_ALLOC_PTR(tdtd); + if (tdtd == NULL) + RETURN(-ENOMEM); + + lut = lod2lu_dev(lod)->ld_site->ls_tgt; + + rc = distribute_txn_init(env, lut, tdtd, + lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id); + + if (rc < 0) { + CERROR("%s: cannot init distribute txn: rc = %d\n", + lod2obd(lod)->obd_name, rc); + OBD_FREE_PTR(tdtd); + RETURN(rc); + } + + lut->lut_tdtd = tdtd; + + RETURN(0); +} + +/** + * Finish distribute txn + * + * Release the resource holding by distribute txn, i.e. stop distribute + * txn thread. + * + * \param[in] env execution environment + * \param[in] lod lod device + */ +static void lod_fini_distribute_txn(const struct lu_env *env, + struct lod_device *lod) +{ + struct lu_target *lut; + + lut = lod2lu_dev(lod)->ld_site->ls_tgt; + if (lut->lut_tdtd == NULL) + return; + + distribute_txn_fini(env, lut->lut_tdtd); + + OBD_FREE_PTR(lut->lut_tdtd); + lut->lut_tdtd = NULL; +} + +/** * Implementation of lu_device_operations::ldo_process_config() for LOD * * The method is called by the configuration subsystem during setup, @@ -645,6 +725,7 @@ static int lod_process_config(const struct lu_env *env, CDEBUG(D_HA, "%s: can't process %u: %d\n", lod2obd(lod)->obd_name, lcfg->lcfg_command, rc); + lod_fini_distribute_txn(env, lod); lod_sub_fini_all_llogs(env, lod); break; } @@ -791,10 +872,10 @@ static int lod_prepare(const struct lu_env *env, struct lu_device *pdev, if (IS_ERR(root)) RETURN(PTR_ERR(root)); + /* Create update log object */ index = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id; lu_update_log_fid(fid, index); - /* Create update log object */ dto = local_file_find_or_create_with_fid(env, lod->lod_child, fid, root, lod_update_log_name, @@ -815,8 +896,12 @@ static int lod_prepare(const struct lu_env *env, struct lu_device *pdev, lu_object_put(env, &dto->do_lu); + rc = lod_prepare_distribute_txn(env, lod); + if (rc != 0) + GOTO(out_put, rc); + rc = lod_sub_init_llogs(env, lod); - if (rc < 0) + if (rc != 0) GOTO(out_put, rc); out_put: @@ -1203,7 +1288,7 @@ static struct lu_device *lod_device_free(const struct lu_env *env, struct lu_device *next = &lod->lod_child->dd_lu_dev; ENTRY; - LASSERT(atomic_read(&lu->ld_ref) == 0); + LASSERTF(atomic_read(&lu->ld_ref) == 0, "lu is %p\n", lu); dt_device_fini(&lod->lod_dt_dev); OBD_FREE_PTR(lod); RETURN(next); diff --git a/lustre/lod/lod_internal.h b/lustre/lod/lod_internal.h index c99b074..85c5007 100644 --- a/lustre/lod/lod_internal.h +++ b/lustre/lod/lod_internal.h @@ -402,6 +402,8 @@ int lod_sub_init_llog(const struct lu_env *env, struct lod_device *lod, struct dt_device *dt); void lod_sub_fini_llog(const struct lu_env *env, struct dt_device *dt, struct ptlrpc_thread *thread); +int lodname2mdt_index(char *lodname, __u32 *mdt_index); + /* lod_lov.c */ void lod_getref(struct lod_tgt_descs *ltd); void lod_putref(struct lod_device *lod, struct lod_tgt_descs *ltd); diff --git a/lustre/lod/lod_sub_object.c b/lustre/lod/lod_sub_object.c index d6f1f32..05eb8a4 100644 --- a/lustre/lod/lod_sub_object.c +++ b/lustre/lod/lod_sub_object.c @@ -74,7 +74,7 @@ struct thandle *lod_sub_get_thandle(const struct lu_env *env, RETURN(th); tth = container_of(th, struct top_thandle, tt_super); - LASSERT(tth->tt_magic == TOP_THANDLE_MAGIC); + /* local object must be mdt object, Note: during ost object * creation, FID is not assigned until osp_object_create(), * so if the FID of sub_obj is zero, it means OST object. */ @@ -82,8 +82,9 @@ struct thandle *lod_sub_get_thandle(const struct lu_env *env, fid_is_zero(lu_object_fid(&sub_obj->do_lu))) { /* local MDT object */ if (fid_is_sane(lu_object_fid(&sub_obj->do_lu)) && - tth->tt_update_records != NULL && - record_update != NULL) + tth->tt_multiple_thandle != NULL && + record_update != NULL && + th->th_result == 0) *record_update = true; RETURN(tth->tt_master_sub_thandle); @@ -97,7 +98,8 @@ struct thandle *lod_sub_get_thandle(const struct lu_env *env, if (type == LU_SEQ_RANGE_OST) RETURN(tth->tt_master_sub_thandle); - if (tth->tt_update_records != NULL && record_update != NULL) + if (tth->tt_multiple_thandle != NULL && record_update != NULL && + th->th_result == 0) *record_update = true; sub_th = thandle_get_sub(env, th, sub_obj); @@ -881,33 +883,69 @@ int lod_sub_prep_llog(const struct lu_env *env, struct lod_device *lod, struct lu_fid *fid = <i->lti_fid; struct obd_device *obd; int rc; + bool need_put = false; ENTRY; lu_update_log_fid(fid, index); - fid_to_logid(fid, &cid->lci_logid); + + rc = lodname2mdt_index(lod2obd(lod)->obd_name, (__u32 *)&index); + if (rc < 0) + RETURN(rc); + + rc = llog_osd_get_cat_list(env, dt, index, 1, cid, fid); + if (rc != 0) { + CERROR("%s: can't get id from catalogs: rc = %d\n", + lod2obd(lod)->obd_name, rc); + RETURN(rc); + } obd = dt->dd_lu_dev.ld_obd; ctxt = llog_get_context(obd, LLOG_UPDATELOG_ORIG_CTXT); LASSERT(ctxt != NULL); ctxt->loc_flags |= LLOG_CTXT_FLAG_NORMAL_FID; - rc = llog_open(env, ctxt, &lgh, &cid->lci_logid, NULL, - LLOG_OPEN_EXISTS); - if (rc < 0) { - llog_ctxt_put(ctxt); - RETURN(rc); + if (likely(logid_id(&cid->lci_logid) != 0)) { + rc = llog_open(env, ctxt, &lgh, &cid->lci_logid, NULL, + LLOG_OPEN_EXISTS); + + /* re-create llog if it is missing */ + if (rc == -ENOENT) + logid_set_id(&cid->lci_logid, 0); + else if (rc < 0) + GOTO(out_put, rc); + } + + if (unlikely(logid_id(&cid->lci_logid) == 0)) { + rc = llog_open_create(env, ctxt, &lgh, NULL, NULL); + if (rc < 0) + GOTO(out_put, rc); + cid->lci_logid = lgh->lgh_id; + need_put = true; } LASSERT(lgh != NULL); ctxt->loc_handle = lgh; rc = llog_cat_init_and_process(env, lgh); + if (rc != 0) + GOTO(out_close, rc); + + if (need_put) { + rc = llog_osd_put_cat_list(env, dt, index, 1, cid, fid); + if (rc != 0) + GOTO(out_close, rc); + } + + CDEBUG(D_INFO, "%s: Init llog for %d - catid "DOSTID":%x\n", + obd->obd_name, index, POSTID(&cid->lci_logid.lgl_oi), + cid->lci_logid.lgl_ogen); +out_close: if (rc != 0) { llog_cat_close(env, ctxt->loc_handle); ctxt->loc_handle = NULL; } - +out_put: llog_ctxt_put(ctxt); - RETURN(rc); } + diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index caa9145..b90f15e 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -3505,20 +3505,26 @@ static int mdt_register_lwp_callback(void *data) LASSERT(mdt_seq_site(mdt)->ss_node_id != 0); - if (!likely(fld->lsf_new)) - RETURN(0); - rc = lu_env_init(&env, LCT_MD_THREAD); - if (rc) { + if (rc < 0) { CERROR("%s: cannot init env: rc = %d\n", mdt_obd_name(mdt), rc); RETURN(rc); } - rc = fld_update_from_controller(&env, fld); - if (rc != 0) { - CERROR("%s: cannot update controller: rc = %d\n", - mdt_obd_name(mdt), rc); + /* Allocate new sequence now to avoid creating local transaction + * in the normal transaction process */ + rc = seq_server_check_and_alloc_super(&env, + mdt_seq_site(mdt)->ss_server_seq); + if (rc < 0) GOTO(out, rc); + + if (fld->lsf_new) { + rc = fld_update_from_controller(&env, fld); + if (rc != 0) { + CERROR("%s: cannot update controller: rc = %d\n", + mdt_obd_name(mdt), rc); + GOTO(out, rc); + } } out: lu_env_fini(&env); diff --git a/lustre/obdclass/llog.c b/lustre/obdclass/llog.c index 9346f18..4e3ae10 100644 --- a/lustre/obdclass/llog.c +++ b/lustre/obdclass/llog.c @@ -791,6 +791,7 @@ int llog_open_create(const struct lu_env *env, struct llog_ctxt *ctxt, if (IS_ERR(th)) GOTO(out, rc = PTR_ERR(th)); + th->th_wait_submit = 1; rc = llog_declare_create(env, *res, th); if (rc == 0) { rc = dt_trans_start_local(env, d, th); @@ -863,6 +864,7 @@ int llog_write(const struct lu_env *env, struct llog_handle *loghandle, if (rc) GOTO(out_trans, rc); + th->th_wait_submit = 1; rc = dt_trans_start_local(env, dt, th); if (rc) GOTO(out_trans, rc); diff --git a/lustre/obdclass/llog_cat.c b/lustre/obdclass/llog_cat.c index c8c2da3..bc5c126 100644 --- a/lustre/obdclass/llog_cat.c +++ b/lustre/obdclass/llog_cat.c @@ -72,6 +72,14 @@ static int llog_cat_new_log(const struct lu_env *env, if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED)) RETURN(-ENOSPC); + if (loghandle->lgh_hdr != NULL) { + /* If llog object is remote and creation is failed, lgh_hdr + * might be left over here, free it first */ + LASSERT(!llog_exist(loghandle)); + OBD_FREE_PTR(loghandle->lgh_hdr); + loghandle->lgh_hdr = NULL; + } + rc = llog_create(env, loghandle, th); /* if llog is already created, no need to initialize it */ if (rc == -EEXIST) { diff --git a/lustre/obdclass/llog_osd.c b/lustre/obdclass/llog_osd.c index 9aa79de..f0e904b 100644 --- a/lustre/obdclass/llog_osd.c +++ b/lustre/obdclass/llog_osd.c @@ -1377,12 +1377,15 @@ static int llog_osd_close(const struct lu_env *env, struct llog_handle *handle) LASSERT(handle->lgh_obj); - lu_object_put(env, &handle->lgh_obj->do_lu); - - if (handle->lgh_ctxt->loc_flags & - LLOG_CTXT_FLAG_NORMAL_FID) + if (handle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) { + /* Remove the object from the cache, otherwise it may + * hold LOD being released during cleanup process */ + lu_object_put_nocache(env, &handle->lgh_obj->do_lu); + LASSERT(handle->private_data == NULL); RETURN(rc); - + } else { + lu_object_put(env, &handle->lgh_obj->do_lu); + } los = handle->private_data; LASSERT(los); dt_los_put(los); @@ -1511,6 +1514,8 @@ static int llog_osd_destroy(const struct lu_env *env, if (rc) GOTO(out_trans, rc); + th->th_wait_submit = 1; + dt_write_lock(env, o, 0); if (dt_object_exists(o)) { if (name) { @@ -1722,6 +1727,12 @@ int llog_osd_get_cat_list(const struct lu_env *env, struct dt_device *d, lgi->lgi_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR; lgi->lgi_dof.dof_type = dt_mode_to_dft(S_IFREG); + th->th_wait_submit = 1; + /* Make the llog object creation synchronization, so + * it will be reliable to the reference, especially + * for remote reference */ + th->th_sync = 1; + rc = dt_declare_create(env, o, &lgi->lgi_attr, NULL, &lgi->lgi_dof, th); if (rc) @@ -1854,6 +1865,8 @@ int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d, if (rc) GOTO(out_trans, rc); + th->th_wait_submit = 1; + rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); if (rc) CDEBUG(D_INODE, "can't write CATALOGS at index %d: rc = %d\n", diff --git a/lustre/osd-ldiskfs/osd_handler.c b/lustre/osd-ldiskfs/osd_handler.c index bcdf461..9c8ea63 100644 --- a/lustre/osd-ldiskfs/osd_handler.c +++ b/lustre/osd-ldiskfs/osd_handler.c @@ -1026,7 +1026,7 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt, qtrans = oh->ot_quota_trans; oh->ot_quota_trans = NULL; - if (oh->ot_handle != NULL) { + if (oh->ot_handle != NULL) { handle_t *hdl = oh->ot_handle; /* @@ -1039,6 +1039,7 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt, LASSERT(oti->oti_txns == 1); oti->oti_txns--; + rc = dt_txn_hook_stop(env, th); if (rc != 0) CERROR("%s: failed in transaction hook: rc = %d\n", @@ -5549,10 +5550,18 @@ static int osd_fid_init(const struct lu_env *env, struct osd_device *osd) rc = seq_client_init(osd->od_cl_seq, NULL, LUSTRE_SEQ_METADATA, osd->od_svname, ss->ss_server_seq); - if (rc != 0) { OBD_FREE_PTR(osd->od_cl_seq); osd->od_cl_seq = NULL; + RETURN(rc); + } + + if (ss->ss_node_id == 0) { + /* If the OSD on the sequence controller(MDT0), then allocate + * sequence here, otherwise allocate sequence after connected + * to MDT0 (see mdt_register_lwp_callback()). */ + rc = seq_server_alloc_meta(osd->od_cl_seq->lcs_srv, + &osd->od_cl_seq->lcs_space, env); } RETURN(rc); diff --git a/lustre/osd-ldiskfs/osd_oi.c b/lustre/osd-ldiskfs/osd_oi.c index 9926dd1..a6fed15 100644 --- a/lustre/osd-ldiskfs/osd_oi.c +++ b/lustre/osd-ldiskfs/osd_oi.c @@ -514,6 +514,9 @@ int fid_is_on_ost(struct osd_thread_info *info, struct osd_device *osd, if (!(flags & OI_CHECK_FLD)) RETURN(0); + if (osd_seq_site(osd)->ss_server_fld == NULL) + RETURN(0); + rc = osd_fld_lookup(info->oti_env, osd, fid_seq(fid), range); if (rc != 0) { if (rc != -ENOENT) diff --git a/lustre/osd-ldiskfs/osd_scrub.c b/lustre/osd-ldiskfs/osd_scrub.c index 047e6568..76895d3 100644 --- a/lustre/osd-ldiskfs/osd_scrub.c +++ b/lustre/osd-ldiskfs/osd_scrub.c @@ -1654,6 +1654,10 @@ static const struct osd_lf_map osd_lf_maps[] = { { "LAST_GROUP", { FID_SEQ_LOCAL_FILE, OFD_LAST_GROUP_OID, 0 }, OLF_SHOW_NAME, sizeof("LAST_GROUP") - 1, NULL, NULL }, + /* committed batchid for cross-MDT operation */ + { "BATCHID", { FID_SEQ_LOCAL_FILE, BATCHID_COMMITTED_OID, 0 }, + OLF_SHOW_NAME, sizeof("BATCHID") - 1, NULL, NULL }, + /* lost+found */ { "lost+found", { FID_SEQ_LOCAL_FILE, OSD_LPF_OID, 0 }, OLF_SCAN_SUBITEMS, sizeof("lost+found") - 1, diff --git a/lustre/osp/osp_dev.c b/lustre/osp/osp_dev.c index 588032d..6c3ad4f 100644 --- a/lustre/osp/osp_dev.c +++ b/lustre/osp/osp_dev.c @@ -791,6 +791,7 @@ const struct dt_device_operations osp_dt_ops = { .dt_trans_create = osp_trans_create, .dt_trans_start = osp_trans_start, .dt_trans_stop = osp_trans_stop, + .dt_trans_cb_add = osp_trans_cb_add, }; /** diff --git a/lustre/osp/osp_internal.h b/lustre/osp/osp_internal.h index 27a8643..4198f45 100644 --- a/lustre/osp/osp_internal.h +++ b/lustre/osp/osp_internal.h @@ -302,6 +302,8 @@ struct osp_thandle { /* OSP will use this thandle to update last oid*/ struct thandle *ot_storage_th; + struct list_head ot_dcb_list; + atomic_t ot_refcount; }; static inline struct osp_thandle * @@ -639,6 +641,7 @@ int osp_xattr_del(const struct lu_env *env, struct dt_object *dt, int osp_trans_stop(const struct lu_env *env, struct dt_device *dt, struct thandle *th); +int osp_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb); struct dt_it *osp_it_init(const struct lu_env *env, struct dt_object *dt, __u32 attr); diff --git a/lustre/osp/osp_md_object.c b/lustre/osp/osp_md_object.c index 11272e1..caa7ee9 100644 --- a/lustre/osp/osp_md_object.c +++ b/lustre/osp/osp_md_object.c @@ -1203,7 +1203,10 @@ static ssize_t osp_md_read(const struct lu_env *env, struct dt_object *dt, memcpy(rbuf->lb_buf, orr->orr_data, orr->orr_size); - GOTO(out, rc = orr->orr_size); + CDEBUG(D_INFO, "%s: read "DFID" pos "LPU64" len %u\n", + osp->opd_obd->obd_name, PFID(lu_object_fid(&dt->do_lu)), + *pos, orr->orr_size); + GOTO(out, rc = (int)orr->orr_size); out: if (req != NULL) ptlrpc_req_finished(req); diff --git a/lustre/osp/osp_object.c b/lustre/osp/osp_object.c index 026fdb3..bd51a8e 100644 --- a/lustre/osp/osp_object.c +++ b/lustre/osp/osp_object.c @@ -1552,6 +1552,7 @@ static int osp_object_destroy(const struct lu_env *env, struct dt_object *dt, int rc = 0; ENTRY; + o->opo_non_exist = 1; LASSERT(!osp->opd_connect_mdt); diff --git a/lustre/osp/osp_trans.c b/lustre/osp/osp_trans.c index d0a8c39..a3aa8d3 100644 --- a/lustre/osp/osp_trans.c +++ b/lustre/osp/osp_trans.c @@ -522,6 +522,17 @@ int osp_trans_update_request_create(struct thandle *th) return 0; } +static void osp_thandle_get(struct osp_thandle *oth) +{ + atomic_inc(&oth->ot_refcount); +} + +static void osp_thandle_put(struct osp_thandle *oth) +{ + if (atomic_dec_and_test(&oth->ot_refcount)) + OBD_FREE_PTR(oth); +} + /** * The OSP layer dt_device_operations::dt_trans_create() interface * to create a transaction. @@ -563,6 +574,9 @@ struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d) th->th_dev = d; th->th_tags = LCT_TX_HANDLE; + atomic_set(&oth->ot_refcount, 1); + INIT_LIST_HEAD(&oth->ot_dcb_list); + RETURN(th); } @@ -572,22 +586,22 @@ struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d) * Prepare OUT update ptlrpc request, and the request usually includes * all of updates (stored in \param ureq) from one operation. * - * \param[in] env execution environment - * \param[in] imp import on which ptlrpc request will be sent - * \param[in] ureq hold all of updates which will be packed into the req - * \param[in] reqp request to be created + * \param[in] env execution environment + * \param[in] imp import on which ptlrpc request will be sent + * \param[in] ureq hold all of updates which will be packed into the req + * \param[in] reqp request to be created * - * \retval 0 if preparation succeeds. - * \retval negative errno if preparation fails. + * \retval 0 if preparation succeeds. + * \retval negative errno if preparation fails. */ int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp, const struct object_update_request *ureq, struct ptlrpc_request **reqp) { - struct ptlrpc_request *req; - struct object_update_request *tmp; - int ureq_len; - int rc; + struct ptlrpc_request *req; + struct object_update_request *tmp; + int ureq_len; + int rc; ENTRY; object_update_request_dump(ureq, D_INFO); @@ -606,7 +620,7 @@ int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp, } req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY, - RCL_SERVER, OUT_UPDATE_REPLY_SIZE); + RCL_SERVER, OUT_UPDATE_REPLY_SIZE); tmp = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE); memcpy(tmp, ureq, ureq_len); @@ -620,18 +634,18 @@ int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp, } /** - * Send update RPC. - * - * Send update request to the remote MDT synchronously. - * - * \param[in] env execution environment - * \param[in] imp import on which ptlrpc request will be sent - * \param[in] dt_update hold all of updates which will be packed into the req - * \param[in] reqp request to be created - * - * \retval 0 if RPC succeeds. - * \retval negative errno if RPC fails. - */ +* Send update RPC. +* +* Send update request to the remote MDT synchronously. +* +* \param[in] env execution environment +* \param[in] imp import on which ptlrpc request will be sent +* \param[in] dt_update hold all of updates which will be packed into the req +* \param[in] reqp request to be created +* +* \retval 0 if RPC succeeds. +* \retval negative errno if RPC fails. +*/ int osp_remote_sync(const struct lu_env *env, struct osp_device *osp, struct dt_update_request *dt_update, struct ptlrpc_request **reqp) @@ -672,6 +686,76 @@ int osp_remote_sync(const struct lu_env *env, struct osp_device *osp, } /** + * Add commit callback to transaction. + * + * Add commit callback to the osp thandle, which will be called + * when the thandle is committed remotely. + * + * \param[in] th the thandle + * \param[in] dcb commit callback structure + * + * \retval only return 0 for now. + */ +int osp_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb) +{ + struct osp_thandle *oth = thandle_to_osp_thandle(th); + + LASSERT(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC); + LASSERT(&dcb->dcb_func != NULL); + list_add(&dcb->dcb_linkage, &oth->ot_dcb_list); + + return 0; +} + +static void osp_trans_commit_cb(struct osp_thandle *oth, int result) +{ + struct dt_txn_commit_cb *dcb; + struct dt_txn_commit_cb *tmp; + + LASSERT(atomic_read(&oth->ot_refcount) > 0); + /* call per-transaction callbacks if any */ + list_for_each_entry_safe(dcb, tmp, &oth->ot_dcb_list, + dcb_linkage) { + LASSERTF(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC, + "commit callback entry: magic=%x name='%s'\n", + dcb->dcb_magic, dcb->dcb_name); + list_del_init(&dcb->dcb_linkage); + dcb->dcb_func(NULL, &oth->ot_super, dcb, result); + } +} + +static void osp_request_commit_cb(struct ptlrpc_request *req) +{ + struct thandle *th = req->rq_cb_data; + struct osp_thandle *oth = thandle_to_osp_thandle(th); + __u64 last_committed_transno = 0; + int result = req->rq_status; + ENTRY; + + if (lustre_msg_get_last_committed(req->rq_repmsg)) + last_committed_transno = + lustre_msg_get_last_committed(req->rq_repmsg); + + if (last_committed_transno < + req->rq_import->imp_peer_committed_transno) + last_committed_transno = + req->rq_import->imp_peer_committed_transno; + + CDEBUG(D_HA, "trans no "LPU64" committed transno "LPU64"\n", + req->rq_transno, last_committed_transno); + + /* If the transaction is not really committed, mark result = 1 */ + if (req->rq_transno != 0 && + (req->rq_transno > last_committed_transno) && result == 0) + result = 1; + + osp_trans_commit_cb(oth, result); + req->rq_committed = 1; + osp_thandle_put(oth); + EXIT; +} + +/** * Trigger the request for remote updates. * * If th_sync is set, then the request will be sent synchronously, @@ -706,7 +790,8 @@ static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp, req->rq_interpret_reply = osp_update_interpret; args = ptlrpc_req_async_args(req); args->oaua_update = dt_update; - if (is_only_remote_trans(th) && !th->th_sync) { + + if (!th->th_wait_submit && is_only_remote_trans(th) && !th->th_sync) { args->oaua_flow_control = true; if (!osp->opd_connect_mdt) { @@ -719,10 +804,30 @@ static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp, ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1); } else { - osp_get_rpc_lock(osp); + struct osp_thandle *oth = thandle_to_osp_thandle(th); + struct lu_device *top_device; + + /* If the transaction is created during MDT recoverying + * process, it means this is an recovery update, we need + * to let OSP send it anyway without checking recoverying + * status, in case the other target is being recoveried + * at the same time, and if we wait here for the import + * to be recoveryed, it might cause deadlock */ + top_device = osp->opd_dt_dev.dd_lu_dev.ld_site->ls_top_dev; + if (top_device->ld_obd->obd_recovering) + req->rq_allow_replay = 1; + args->oaua_flow_control = false; + req->rq_commit_cb = osp_request_commit_cb; + req->rq_cb_data = th; + osp_thandle_get(oth); /* for commit callback */ + osp_get_rpc_lock(osp); rc = ptlrpc_queue_wait(req); osp_put_rpc_lock(osp); + if (req->rq_transno == 0 && !req->rq_committed) + osp_thandle_put(oth); + else + oth->ot_dur = NULL; ptlrpc_req_finished(req); } @@ -842,17 +947,17 @@ int osp_trans_stop(const struct lu_env *env, struct dt_device *dt, } dt_update = oth->ot_dur; - if (dt_update == NULL) + if (dt_update == NULL || th->th_result != 0) { + rc = th->th_result; GOTO(out, rc); + } LASSERT(dt_update != LP_POISON); /* If there are no updates, destroy dt_update and thandle */ if (dt_update->dur_buf.ub_req == NULL || - dt_update->dur_buf.ub_req->ourq_count == 0) { - dt_update_request_destroy(dt_update); + dt_update->dur_buf.ub_req->ourq_count == 0) GOTO(out, rc); - } if (is_only_remote_trans(th) && !th->th_sync) { struct osp_device *osp = dt2osp_dev(th->th_dev); @@ -879,24 +984,29 @@ int osp_trans_stop(const struct lu_env *env, struct dt_device *dt, out: /* If RPC is triggered successfully, dt_update will be freed in * osp_update_interpreter() */ - if (rc != 0 && dt_update != NULL && sent == 0) { + if (sent == 0) { struct osp_update_callback *ouc; struct osp_update_callback *next; - list_for_each_entry_safe(ouc, next, &dt_update->dur_cb_items, - ouc_list) { - list_del_init(&ouc->ouc_list); - if (ouc->ouc_interpreter != NULL) - ouc->ouc_interpreter(env, NULL, NULL, - ouc->ouc_obj, - ouc->ouc_data, 0, rc); - osp_update_callback_fini(env, ouc); + if (dt_update != NULL) { + list_for_each_entry_safe(ouc, next, + &dt_update->dur_cb_items, + ouc_list) { + list_del_init(&ouc->ouc_list); + if (ouc->ouc_interpreter != NULL) + ouc->ouc_interpreter(env, NULL, NULL, + ouc->ouc_obj, + ouc->ouc_data, 0, + rc); + osp_update_callback_fini(env, ouc); + } } - + osp_trans_commit_cb(oth, rc); dt_update_request_destroy(dt_update); + oth->ot_dur = NULL; } - OBD_FREE_PTR(oth); + osp_thandle_put(oth); RETURN(rc); } diff --git a/lustre/ptlrpc/wiretest.c b/lustre/ptlrpc/wiretest.c index 7e85579..6ee0dc4 100644 --- a/lustre/ptlrpc/wiretest.c +++ b/lustre/ptlrpc/wiretest.c @@ -532,6 +532,8 @@ void lustre_assert_wire_constants(void) (long long)FID_SEQ_LAYOUT_RBTREE); LASSERTF(FID_SEQ_UPDATE_LOG == 0x0000000200000009ULL, "found 0x%.16llxULL\n", (long long)FID_SEQ_UPDATE_LOG); + LASSERTF(FID_SEQ_UPDATE_LOG_DIR == 0x000000020000000aULL, "found 0x%.16llxULL\n", + (long long)FID_SEQ_UPDATE_LOG_DIR); LASSERTF(FID_SEQ_NORMAL == 0x0000000200000400ULL, "found 0x%.16llxULL\n", (long long)FID_SEQ_NORMAL); LASSERTF(FID_SEQ_LOV_DEFAULT == 0xffffffffffffffffULL, "found 0x%.16llxULL\n", diff --git a/lustre/target/tgt_internal.h b/lustre/target/tgt_internal.h index 919fa1e..3b52e33 100644 --- a/lustre/target/tgt_internal.h +++ b/lustre/target/tgt_internal.h @@ -225,6 +225,8 @@ int tgt_txn_stop_cb(const struct lu_env *env, struct thandle *th, void update_records_dump(const struct update_records *records, unsigned int mask, bool dump_updates); +int check_and_prepare_update_record(const struct lu_env *env, + struct thandle_update_records *tur); struct update_thread_info { struct lu_attr uti_attr; diff --git a/lustre/target/update_records.c b/lustre/target/update_records.c index fd1fce7..632e071 100644 --- a/lustre/target/update_records.c +++ b/lustre/target/update_records.c @@ -870,31 +870,11 @@ EXPORT_SYMBOL(tur_update_params_extend); * \retval negative errno if updates recording fails. */ int check_and_prepare_update_record(const struct lu_env *env, - struct thandle *th) + struct thandle_update_records *tur) { - struct thandle_update_records *tur; struct llog_update_record *lur; - struct top_thandle *top_th; - struct sub_thandle *lst; - int rc; - bool record_update = false; - ENTRY; - - top_th = container_of(th, struct top_thandle, tt_super); - /* Check if it needs to record updates for this transaction */ - list_for_each_entry(lst, &top_th->tt_sub_thandle_list, st_sub_list) { - if (lst->st_record_update) { - record_update = true; - break; - } - } - if (!record_update) - RETURN(0); - - if (top_th->tt_update_records != NULL) - RETURN(0); + int rc; - tur = &update_env_info(env)->uti_tur; if (tur->tur_update_records == NULL) { rc = tur_update_records_create(tur); if (rc < 0) @@ -917,61 +897,8 @@ int check_and_prepare_update_record(const struct lu_env *env, tur->tur_update_param_count = 0; - top_th->tt_update_records = tur; - RETURN(0); } -EXPORT_SYMBOL(check_and_prepare_update_record); - -/** - * Merge params into the update records - * - * Merge params and ops into the update records attached to top th. - * During transaction execution phase, parameters and update ops - * are collected in two different buffers (see lod_updates_pack()), - * then in transaction stop, it needs to be merged them in one - * buffer, then being written into the update log. - * - * \param[in] env execution environment - * \param[in] tur thandle update records whose updates and - * parameters are merged - * - * \retval 0 if merging succeeds. - * \retval negaitive errno if merging fails. - */ -int merge_params_updates_buf(const struct lu_env *env, - struct thandle_update_records *tur) -{ - struct llog_update_record *lur; - struct update_params *params; - size_t params_size; - size_t record_size; - - if (tur->tur_update_records == NULL || - tur->tur_update_params == NULL) - return 0; - - lur = tur->tur_update_records; - /* Extends the update records buffer if needed */ - params_size = update_params_size(tur->tur_update_params, - tur->tur_update_param_count); - record_size = llog_update_record_size(lur); - if (cfs_size_round(record_size + params_size) > - tur->tur_update_records_buf_size) { - int rc; - - rc = tur_update_records_extend(tur, record_size + params_size); - if (rc < 0) - return rc; - lur = tur->tur_update_records; - } - - params = update_records_get_params(&lur->lur_update_rec); - memcpy(params, tur->tur_update_params, params_size); - lur->lur_update_rec.ur_param_count = tur->tur_update_param_count; - return 0; -} -EXPORT_SYMBOL(merge_params_updates_buf); static void update_key_fini(const struct lu_context *ctx, struct lu_context_key *key, void *data) diff --git a/lustre/target/update_trans.c b/lustre/target/update_trans.c index f56aa7c..7ff50cf 100644 --- a/lustre/target/update_trans.c +++ b/lustre/target/update_trans.c @@ -49,6 +49,7 @@ #define DEBUG_SUBSYSTEM S_CLASS +#include #include #include #include @@ -56,6 +57,39 @@ #include #include +#include +/** + * Dump top mulitple thandle + * + * Dump top multiple thandle and all of its sub thandle to the debug log. + * + * \param[in]mask debug mask + * \param[in]top_th top_thandle to be dumped + */ +static void top_multiple_thandle_dump(struct top_multiple_thandle *tmt, + __u32 mask) +{ + struct sub_thandle *st; + + LASSERT(tmt->tmt_magic == TOP_THANDLE_MAGIC); + CDEBUG(mask, "%s tmt %p refcount %d committed %d result %d" + "batchid "LPU64"\n", + tmt->tmt_master_sub_dt ? + tmt->tmt_master_sub_dt->dd_lu_dev.ld_obd->obd_name : + "NULL", + tmt, atomic_read(&tmt->tmt_refcount), tmt->tmt_committed, + tmt->tmt_result, tmt->tmt_batchid); + + list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) { + CDEBUG(mask, "st %p obd %s committed %d sub_th %p " + " cookie "DOSTID": %u\n", + st, st->st_dt->dd_lu_dev.ld_obd->obd_name, + st->st_committed, st->st_sub_th, + POSTID(&st->st_cookie.lgc_lgl.lgl_oi), + st->st_cookie.lgc_index); + } +} + /** * Declare write update to sub device * @@ -64,17 +98,17 @@ * * \param[in] env execution environment * \param[in] record update records being written - * \param[in] lst sub transaction handle + * \param[in] sub_th sub transaction handle * * \retval 0 if writing succeeds * \retval negative errno if writing fails */ static int sub_declare_updates_write(const struct lu_env *env, struct llog_update_record *record, - struct sub_thandle *lst) + struct thandle *sub_th) { struct llog_ctxt *ctxt; - struct dt_device *dt = lst->st_sub_th->th_dev; + struct dt_device *dt = sub_th->th_dev; int rc; /* If ctxt is NULL, it means not need to write update, @@ -90,7 +124,7 @@ static int sub_declare_updates_write(const struct lu_env *env, } rc = llog_declare_add(env, ctxt->loc_handle, &record->lur_hdr, - lst->st_sub_th); + sub_th); llog_ctxt_put(ctxt); @@ -100,23 +134,27 @@ static int sub_declare_updates_write(const struct lu_env *env, /** * write update to sub device * - * Write updates llog records to the sub device during distribute - * transaction. + * Write llog update record to the sub device during distribute + * transaction. If it succeeds, llog cookie of the record will be + * returned by @cookie. * * \param[in] env execution environment * \param[in] record update records being written - * \param[in] lst sub transaction handle + * \param[in] sub_th sub transaction handle + * \param[out] cookie llog cookie of the update record. * * \retval 1 if writing succeeds * \retval negative errno if writing fails */ static int sub_updates_write(const struct lu_env *env, struct llog_update_record *record, - struct sub_thandle *lst) + struct thandle *sub_th, + struct llog_cookie *cookie) { + struct dt_device *dt = sub_th->th_dev; struct llog_ctxt *ctxt; - struct dt_device *dt = lst->st_sub_th->th_dev; int rc; + ENTRY; ctxt = llog_get_context(dt->dd_lu_dev.ld_obd, LLOG_UPDATELOG_ORIG_CTXT); @@ -126,72 +164,229 @@ static int sub_updates_write(const struct lu_env *env, * in error handler path */ if (ctxt->loc_handle == NULL) { llog_ctxt_put(ctxt); - return 0; + RETURN(0); } + /* Since the cross-MDT updates will includes both local + * and remote updates, the update ops count must > 1 */ + LASSERT(record->lur_update_rec.ur_update_count > 1); LASSERTF(record->lur_hdr.lrh_len == llog_update_record_size(record), "lrh_len %u record_size %zu\n", record->lur_hdr.lrh_len, llog_update_record_size(record)); rc = llog_add(env, ctxt->loc_handle, &record->lur_hdr, - NULL, lst->st_sub_th); - + cookie, sub_th); llog_ctxt_put(ctxt); - return rc; + CDEBUG(D_INFO, "%s: Add update log "DOSTID":%u.\n", + dt->dd_lu_dev.ld_obd->obd_name, + POSTID(&cookie->lgc_lgl.lgl_oi), cookie->lgc_index); + + if (rc > 0) + rc = 0; + + RETURN(rc); } /** - * write update transaction + * Prepare the update records. * - * Check if there are updates being recorded in this transaction, - * it will write the record into the disk. + * Merge params and ops into the update records, then initializing + * the update buffer. + * + * During transaction execution phase, parameters and update ops + * are collected in two different buffers (see lod_updates_pack()), + * during transaction stop, it needs to be merged in one buffer, + * so it will be written in the update log. * * \param[in] env execution environment - * \param[in] top_th top transaction handle + * \param[in] tmt top_multiple_thandle for distribute txn * - * \retval 0 if writing succeeds - * \retval negative errno if writing fails + * \retval 0 if merging succeeds. + * \retval negaitive errno if merging fails. */ -static int top_updates_write(const struct lu_env *env, - struct top_thandle *top_th) +static int prepare_writing_updates(const struct lu_env *env, + struct top_multiple_thandle *tmt) { - struct thandle_update_records *tur; - struct llog_update_record *lur; - struct sub_thandle *lst; - int rc; - ENTRY; + struct thandle_update_records *tur = tmt->tmt_update_records; + struct llog_update_record *lur; + struct update_params *params; + size_t params_size; + size_t update_size; - if (top_th->tt_update_records == NULL) - RETURN(0); + if (tur == NULL || tur->tur_update_records == NULL || + tur->tur_update_params == NULL) + return 0; - tur = top_th->tt_update_records; + lur = tur->tur_update_records; + /* Extends the update records buffer if needed */ + params_size = update_params_size(tur->tur_update_params, + tur->tur_update_param_count); + LASSERT(lur->lur_update_rec.ur_param_count == 0); + update_size = llog_update_record_size(lur); + if (cfs_size_round(update_size + params_size) > + tur->tur_update_records_buf_size) { + int rc; - /* merge the parameters and updates into one buffer */ - rc = merge_params_updates_buf(env, tur); - if (rc < 0) - RETURN(rc); + rc = tur_update_records_extend(tur, + cfs_size_round(update_size + params_size)); + if (rc < 0) + return rc; - lur = tur->tur_update_records; - /* Dump updates to debug log */ - update_records_dump(&lur->lur_update_rec, D_INFO, true); + lur = tur->tur_update_records; + } + + params = update_records_get_params(&lur->lur_update_rec); + memcpy(params, tur->tur_update_params, params_size); + lur->lur_update_rec.ur_param_count = tur->tur_update_param_count; + lur->lur_update_rec.ur_batchid = tmt->tmt_batchid; /* Init update record header */ lur->lur_hdr.lrh_len = llog_update_record_size(lur); lur->lur_hdr.lrh_type = UPDATE_REC; - list_for_each_entry(lst, &top_th->tt_sub_thandle_list, st_sub_list) { - if (!lst->st_record_update) - continue; - rc = sub_updates_write(env, lur, lst); - if (rc < 0) - break; + /* Dump updates for debugging purpose */ + update_records_dump(&lur->lur_update_rec, D_INFO, true); + + return 0; +} + +static inline int +distribute_txn_commit_thread_running(struct lu_target *lut) +{ + return lut->lut_tdtd_commit_thread.t_flags & SVC_RUNNING; +} + +static inline int +distribute_txn_commit_thread_stopped(struct lu_target *lut) +{ + return lut->lut_tdtd_commit_thread.t_flags & SVC_STOPPED; +} + +/** + * Top thandle commit callback + * + * This callback will be called when all of sub transactions are committed. + * + * \param[in] th top thandle to be committed. + */ +static void top_trans_committed_cb(struct top_multiple_thandle *tmt) +{ + struct lu_target *lut; + ENTRY; + + LASSERT(atomic_read(&tmt->tmt_refcount) > 0); + + top_multiple_thandle_dump(tmt, D_HA); + tmt->tmt_committed = 1; + lut = dt2lu_dev(tmt->tmt_master_sub_dt)->ld_site->ls_tgt; + if (distribute_txn_commit_thread_running(lut)) + wake_up(&lut->lut_tdtd->tdtd_commit_thread_waitq); + RETURN_EXIT; +} + +struct sub_thandle *lookup_sub_thandle(struct top_multiple_thandle *tmt, + struct dt_device *dt_dev) +{ + struct sub_thandle *st; + + list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) { + if (st->st_dt == dt_dev) + return st; } + return NULL; +} +EXPORT_SYMBOL(lookup_sub_thandle); - if (rc > 0) - rc = 0; +struct sub_thandle *create_sub_thandle(struct top_multiple_thandle *tmt, + struct dt_device *dt_dev) +{ + struct sub_thandle *st; - RETURN(rc); + OBD_ALLOC_PTR(st); + if (st == NULL) + RETURN(ERR_PTR(-ENOMEM)); + + INIT_LIST_HEAD(&st->st_sub_list); + st->st_dt = dt_dev; + + list_add(&st->st_sub_list, &tmt->tmt_sub_thandle_list); + return st; +} + +/** + * Create sub thandle + * + * Create transaction handle for sub_thandle + * + * \param[in] env execution environment + * \param[in] th top thandle + * \param[in] st sub_thandle + * + * \retval 0 if creation succeeds. + * \retval negative errno if creation fails. + */ +int sub_thandle_trans_create(const struct lu_env *env, + struct top_thandle *top_th, + struct sub_thandle *st) +{ + struct thandle *sub_th; + + sub_th = dt_trans_create(env, st->st_dt); + if (IS_ERR(sub_th)) + return PTR_ERR(sub_th); + + sub_th->th_top = &top_th->tt_super; + st->st_sub_th = sub_th; + return 0; +} + +/** + * sub thandle commit callback + * + * Mark the sub thandle to be committed and if all sub thandle are committed + * notify the top thandle. + * + * \param[in] sub_th sub thandle being committed. + */ +static void sub_trans_commit_cb(struct lu_env *env, struct thandle *sub_th, + struct dt_txn_commit_cb *cb, int err) +{ + struct sub_thandle *st; + struct top_multiple_thandle *tmt = cb->dcb_data; + bool all_committed = true; + ENTRY; + + /* Check if all sub thandles are committed */ + list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) { + if (st->st_sub_th == sub_th) { + st->st_committed = 1; + st->st_result = err; + } + if (!st->st_committed) + all_committed = false; + } + + if (tmt->tmt_result == 0) + tmt->tmt_result = err; + + if (all_committed) + top_trans_committed_cb(tmt); + + top_multiple_thandle_dump(tmt, D_INFO); + top_multiple_thandle_put(tmt); + RETURN_EXIT; +} + +static void sub_thandle_register_commit_cb(struct sub_thandle *st, + struct top_multiple_thandle *tmt) +{ + LASSERT(st->st_sub_th != NULL); + top_multiple_thandle_get(tmt); + st->st_commit_dcb.dcb_func = sub_trans_commit_cb; + st->st_commit_dcb.dcb_data = tmt; + INIT_LIST_HEAD(&st->st_commit_dcb.dcb_linkage); + dt_trans_cb_add(st->st_sub_th, &st->st_commit_dcb); } /** @@ -216,25 +411,162 @@ top_trans_create(const struct lu_env *env, struct dt_device *master_dev) if (top_th == NULL) return ERR_PTR(-ENOMEM); - child_th = dt_trans_create(env, master_dev); - if (IS_ERR(child_th)) { - OBD_FREE_PTR(top_th); - return child_th; - } + top_th->tt_super.th_top = &top_th->tt_super; - top_th->tt_magic = TOP_THANDLE_MAGIC; - top_th->tt_master_sub_thandle = child_th; - child_th->th_top = &top_th->tt_super; + if (master_dev != NULL) { + child_th = dt_trans_create(env, master_dev); + if (IS_ERR(child_th)) { + OBD_FREE_PTR(top_th); + return child_th; + } - top_th->tt_update_records = NULL; - top_th->tt_super.th_top = &top_th->tt_super; - INIT_LIST_HEAD(&top_th->tt_sub_thandle_list); + child_th->th_top = &top_th->tt_super; + child_th->th_wait_submit = 1; + top_th->tt_master_sub_thandle = child_th; + top_th->tt_super.th_tags |= child_th->th_tags; + } return &top_th->tt_super; } EXPORT_SYMBOL(top_trans_create); /** + * Declare write update transaction + * + * Check if there are updates being recorded in this transaction, + * it will write the record into the disk. + * + * \param[in] env execution environment + * \param[in] tmt top multiple transaction handle + * + * \retval 0 if writing succeeds + * \retval negative errno if writing fails + */ +static int declare_updates_write(const struct lu_env *env, + struct top_multiple_thandle *tmt) +{ + struct llog_update_record *record; + struct sub_thandle *st; + int rc = 0; + + record = tmt->tmt_update_records->tur_update_records; + /* Declare update write for all other target */ + list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) { + if (st->st_sub_th == NULL) + continue; + + rc = sub_declare_updates_write(env, record, st->st_sub_th); + if (rc < 0) + break; + } + + return rc; +} + +/** + * Assign batchid to the distribute transaction. + * + * Assign batchid to the distribute transaction + * + * \param[in] tmt distribute transaction + * + */ +static void distribute_txn_assign_batchid(struct top_multiple_thandle *new) +{ + struct target_distribute_txn_data *tdtd; + struct dt_device *dt = new->tmt_master_sub_dt; + + LASSERT(dt != NULL); + tdtd = dt2lu_dev(dt)->ld_site->ls_tgt->lut_tdtd; + spin_lock(&tdtd->tdtd_batchid_lock); + new->tmt_batchid = tdtd->tdtd_batchid++; + list_add_tail(&new->tmt_commit_list, &tdtd->tdtd_list); + spin_unlock(&tdtd->tdtd_batchid_lock); + top_multiple_thandle_get(new); + top_multiple_thandle_dump(new, D_INFO); +} + +/** + * Insert distribute transaction to the distribute txn list. + * + * Insert distribute transaction to the distribute txn list. + * + * \param[in] new the distribute txn to be inserted. + */ +void distribute_txn_insert_by_batchid(struct top_multiple_thandle *new) +{ + struct dt_device *dt = new->tmt_master_sub_dt; + struct top_multiple_thandle *tmt; + struct target_distribute_txn_data *tdtd; + bool at_head = false; + + LASSERT(dt != NULL); + tdtd = dt2lu_dev(dt)->ld_site->ls_tgt->lut_tdtd; + + spin_lock(&tdtd->tdtd_batchid_lock); + list_for_each_entry_reverse(tmt, &tdtd->tdtd_list, tmt_commit_list) { + if (new->tmt_batchid > tmt->tmt_batchid) { + list_add(&new->tmt_commit_list, &tmt->tmt_commit_list); + break; + } + } + if (list_empty(&new->tmt_commit_list)) { + at_head = true; + list_add(&new->tmt_commit_list, &tdtd->tdtd_list); + } + spin_unlock(&tdtd->tdtd_batchid_lock); + top_multiple_thandle_get(new); + top_multiple_thandle_dump(new, D_INFO); + if (new->tmt_committed && at_head) + wake_up(&tdtd->tdtd_commit_thread_waitq); +} + +/** + * Prepare cross-MDT operation. + * + * Create the update record buffer to record updates for cross-MDT operation, + * add master sub transaction to tt_sub_trans_list, and declare the update + * writes. + * + * During updates packing, all of parameters will be packed in + * tur_update_params, and updates will be packed in tur_update_records. + * Then in transaction stop, parameters and updates will be merged + * into one updates buffer. + * + * And also master thandle will be added to the sub_th list, so it will be + * easy to track the commit status. + * + * \param[in] env execution environment + * \param[in] th top transaction handle + * + * \retval 0 if preparation succeeds. + * \retval negative errno if preparation fails. + */ +static int prepare_multiple_node_trans(const struct lu_env *env, + struct top_multiple_thandle *tmt) +{ + struct thandle_update_records *tur; + int rc; + ENTRY; + + /* Prepare the update buffer for recording updates */ + if (tmt->tmt_update_records != NULL) + RETURN(0); + + tur = &update_env_info(env)->uti_tur; + rc = check_and_prepare_update_record(env, tur); + if (rc < 0) + RETURN(rc); + + tmt->tmt_update_records = tur; + + distribute_txn_assign_batchid(tmt); + rc = declare_updates_write(env, tmt); + + RETURN(rc); +} + +/** * start the top transaction. * * Start all of its sub transactions, then start master sub transaction. @@ -251,43 +583,81 @@ int top_trans_start(const struct lu_env *env, struct dt_device *master_dev, { struct top_thandle *top_th = container_of(th, struct top_thandle, tt_super); - struct sub_thandle *lst; - int rc; + struct sub_thandle *st; + struct top_multiple_thandle *tmt = top_th->tt_multiple_thandle; + int rc = 0; + ENTRY; - LASSERT(top_th->tt_magic == TOP_THANDLE_MAGIC); - rc = check_and_prepare_update_record(env, th); + /* Walk through all of sub transaction to see if it needs to + * record updates for this transaction */ + if (tmt == NULL) { + rc = dt_trans_start(env, top_th->tt_master_sub_thandle->th_dev, + top_th->tt_master_sub_thandle); + RETURN(rc); + } + + rc = prepare_multiple_node_trans(env, tmt); if (rc < 0) - return rc; - /* Check if needs to write updates */ - list_for_each_entry(lst, &top_th->tt_sub_thandle_list, st_sub_list) { - struct llog_update_record *record; + RETURN(rc); - if (!lst->st_record_update) + list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) { + if (st->st_sub_th == NULL) continue; - - record = top_th->tt_update_records->tur_update_records; - rc = sub_declare_updates_write(env, record, lst); + st->st_sub_th->th_sync = th->th_sync; + st->st_sub_th->th_local = th->th_local; + st->st_sub_th->th_tags = th->th_tags; + rc = dt_trans_start(env, st->st_sub_th->th_dev, + st->st_sub_th); if (rc != 0) - return rc; - } + RETURN(rc); - list_for_each_entry(lst, &top_th->tt_sub_thandle_list, st_sub_list) { - lst->st_sub_th->th_sync = th->th_sync; - lst->st_sub_th->th_local = th->th_local; - rc = dt_trans_start(env, lst->st_sub_th->th_dev, - lst->st_sub_th); - if (rc != 0) - return rc; + sub_thandle_register_commit_cb(st, tmt); } - top_th->tt_master_sub_thandle->th_local = th->th_local; - top_th->tt_master_sub_thandle->th_sync = th->th_sync; - - return dt_trans_start(env, master_dev, top_th->tt_master_sub_thandle); + RETURN(rc); } EXPORT_SYMBOL(top_trans_start); /** + * Check whether we need write updates record + * + * Check if the updates for the top_thandle needs to be writen + * to all targets. Only if the transaction succeeds and the updates + * number > 2, it will write the updates, + * + * \params [in] top_th top thandle. + * + * \retval true if it needs to write updates + * \retval false if it does not need to write updates + **/ +static bool top_check_write_updates(struct top_thandle *top_th) +{ + struct top_multiple_thandle *tmt; + struct thandle_update_records *tur; + + /* Do not write updates to records if the transaction fails */ + if (top_th->tt_super.th_result != 0) + return false; + + tmt = top_th->tt_multiple_thandle; + if (tmt == NULL) + return false; + + tur = tmt->tmt_update_records; + if (tur == NULL) + return false; + + /* Hmm, false update records, since the cross-MDT operation + * should includes both local and remote updates, so the + * updates count should >= 2 */ + if (tur->tur_update_records == NULL || + tur->tur_update_records->lur_update_rec.ur_update_count <= 1) + return false; + + return true; +} + +/** * Stop the top transaction. * * Stop the transaction on the master device first, then stop transactions @@ -305,55 +675,198 @@ int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev, { struct top_thandle *top_th = container_of(th, struct top_thandle, tt_super); - struct thandle_update_records *tur = top_th->tt_update_records; - struct sub_thandle *lst; - int rc; + struct sub_thandle *st; + struct sub_thandle *master_st; + struct top_multiple_thandle *tmt; + struct thandle_update_records *tur; + bool write_updates = false; + int rc = 0; ENTRY; - /* Note: we always need walk through all of sub_transaction to do - * transaction stop to release the resource here */ - if (tur != NULL && th->th_result == 0) { - rc = top_updates_write(env, top_th); + if (likely(top_th->tt_multiple_thandle == NULL)) { + rc = dt_trans_stop(env, master_dev, + top_th->tt_master_sub_thandle); + OBD_FREE_PTR(top_th); + RETURN(rc); + } + + tmt = top_th->tt_multiple_thandle; + tur = tmt->tmt_update_records; + + /* Note: we need stop the master thandle first, then the stop + * callback will fill the master transno in the update logs, + * then these update logs will be sent to other MDTs */ + /* get the master sub thandle */ + master_st = lookup_sub_thandle(tmt, tmt->tmt_master_sub_dt); + if (master_st == NULL) { + top_multiple_thandle_dump(tmt, D_ERROR); + if (th->th_result == 0) + LBUG(); + } + + write_updates = top_check_write_updates(top_th); + + /* Step 1: write the updates log on Master MDT */ + if (master_st != NULL && master_st->st_sub_th != NULL && + write_updates) { + struct llog_update_record *lur; + + /* Merge the parameters and updates into one buffer */ + rc = prepare_writing_updates(env, tmt); if (rc < 0) { - CERROR("%s: cannot write updates: rc = %d\n", + CERROR("%s: cannot prepare updates: rc = %d\n", master_dev->dd_lu_dev.ld_obd->obd_name, rc); - /* Still need call dt_trans_stop to release resources - * holding by the transaction */ + th->th_result = rc; + GOTO(stop_master_trans, rc); + } + + lur = tur->tur_update_records; + /* Write updates to the master MDT */ + rc = sub_updates_write(env, lur, top_th->tt_master_sub_thandle, + &master_st->st_cookie); + + /* Cleanup the common parameters in the update records, + * master transno callback might add more parameters. + * and we need merge the update records again in the + * following */ + if (tur->tur_update_params != NULL) + lur->lur_update_rec.ur_param_count = 0; + + if (rc < 0) { + CERROR("%s: write updates failed: rc = %d\n", + master_dev->dd_lu_dev.ld_obd->obd_name, rc); + th->th_result = rc; + GOTO(stop_master_trans, rc); } - top_th->tt_update_records = NULL; } - LASSERT(top_th->tt_magic == TOP_THANDLE_MAGIC); +stop_master_trans: + /* Step 2: Stop the transaction on the master MDT, and fill the + * master transno in the update logs to other MDT. */ + if (master_st != NULL && master_st->st_sub_th != NULL) { + master_st->st_sub_th->th_local = th->th_local; + master_st->st_sub_th->th_sync = th->th_sync; + master_st->st_sub_th->th_tags = th->th_tags; + master_st->st_sub_th->th_result = th->th_result; + rc = dt_trans_stop(env, master_st->st_dt, master_st->st_sub_th); + if (rc < 0) { + th->th_result = rc; + GOTO(stop_other_trans, rc); + } else if (tur != NULL && tur->tur_update_records != NULL) { + struct llog_update_record *lur; + + lur = tur->tur_update_records; + if (lur->lur_update_rec.ur_master_transno == 0) + /* Update master transno after master stop + * callback */ + lur->lur_update_rec.ur_master_transno = + tgt_th_info(env)->tti_transno; + } + } - top_th->tt_master_sub_thandle->th_local = th->th_local; - top_th->tt_master_sub_thandle->th_sync = th->th_sync; - top_th->tt_master_sub_thandle->th_result = th->th_result; - /* To avoid sending RPC while holding thandle, it always stop local - * transaction first, then other sub thandle */ - rc = dt_trans_stop(env, master_dev, top_th->tt_master_sub_thandle); + /* Step 3: write updates to other MDTs */ + if (write_updates) { + struct llog_update_record *lur; - list_for_each_entry(lst, &top_th->tt_sub_thandle_list, st_sub_list) { - int rc2; + /* Stop callback of master will add more updates and also update + * master transno, so merge the parameters and updates into one + * buffer again */ + rc = prepare_writing_updates(env, tmt); + if (rc < 0) { + CERROR("%s: prepare updates failed: rc = %d\n", + master_dev->dd_lu_dev.ld_obd->obd_name, rc); + th->th_result = rc; + GOTO(stop_other_trans, rc); + } + lur = tur->tur_update_records; + list_for_each_entry(st, &tmt->tmt_sub_thandle_list, + st_sub_list) { + if (st->st_sub_th == NULL || st == master_st || + st->st_sub_th->th_result < 0) + continue; - if (rc != 0) - lst->st_sub_th->th_result = rc; - else - lst->st_sub_th->th_result = th->th_result; - lst->st_sub_th->th_sync = th->th_sync; - lst->st_sub_th->th_local = th->th_local; - rc2 = dt_trans_stop(env, lst->st_sub_th->th_dev, - lst->st_sub_th); - if (unlikely(rc2 < 0 && rc == 0)) - rc = rc2; + rc = sub_updates_write(env, lur, st->st_sub_th, + &st->st_cookie); + if (rc < 0) + break; + } } - top_thandle_destroy(top_th); +stop_other_trans: + /* Step 4: Stop the transaction on other MDTs */ + list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) { + if (st == master_st || st->st_sub_th == NULL) + continue; + + st->st_sub_th->th_sync = th->th_sync; + st->st_sub_th->th_local = th->th_local; + st->st_sub_th->th_tags = th->th_tags; + st->st_sub_th->th_result = th->th_result; + rc = dt_trans_stop(env, st->st_sub_th->th_dev, + st->st_sub_th); + if (unlikely(rc < 0 && th->th_result == 0)) + th->th_result = rc; + } + tmt->tmt_result = rc; + /* Balance for the refcount in top_trans_create, Note: if it is NOT + * multiple node transaction, the top transaction will be destroyed. */ + top_multiple_thandle_put(tmt); + OBD_FREE_PTR(top_th); RETURN(rc); } EXPORT_SYMBOL(top_trans_stop); /** + * Create top_multiple_thandle for top_thandle + * + * Create top_mutilple_thandle to manage the mutiple node transaction + * for top_thandle, and it also needs to add master sub thandle to the + * sub trans list now. + * + * \param[in] env execution environment + * \param[in] top_th the top thandle + * + * \retval 0 if creation succeeds + * \retval negative errno if creation fails + */ +static int top_trans_create_tmt(const struct lu_env *env, + struct top_thandle *top_th) +{ + struct top_multiple_thandle *tmt; + + OBD_ALLOC_PTR(tmt); + if (tmt == NULL) + return -ENOMEM; + + tmt->tmt_magic = TOP_THANDLE_MAGIC; + INIT_LIST_HEAD(&tmt->tmt_sub_thandle_list); + INIT_LIST_HEAD(&tmt->tmt_commit_list); + atomic_set(&tmt->tmt_refcount, 1); + + top_th->tt_multiple_thandle = tmt; + + return 0; +} + +static struct sub_thandle * +create_sub_thandle_with_thandle(struct top_thandle *top_th, + struct thandle *sub_th) +{ + struct sub_thandle *st; + + /* create and init sub th to the top trans list */ + st = create_sub_thandle(top_th->tt_multiple_thandle, + sub_th->th_dev); + if (IS_ERR(st)) + return st; + + st->st_sub_th = sub_th; + sub_th->th_top = &top_th->tt_super; + return st; +} + +/** * Get sub thandle. * * Get sub thandle from the top thandle according to the sub dt_device. @@ -369,66 +882,568 @@ struct thandle *thandle_get_sub_by_dt(const struct lu_env *env, struct thandle *th, struct dt_device *sub_dt) { - struct sub_thandle *lst; + struct sub_thandle *st = NULL; struct top_thandle *top_th; - struct thandle *sub_th; + struct thandle *sub_th = NULL; + int rc = 0; ENTRY; top_th = container_of(th, struct top_thandle, tt_super); - LASSERT(top_th->tt_magic == TOP_THANDLE_MAGIC); - LASSERT(top_th->tt_master_sub_thandle != NULL); + if (likely(sub_dt == top_th->tt_master_sub_thandle->th_dev)) RETURN(top_th->tt_master_sub_thandle); - /* Find or create the transaction in tt_trans_list, since there is - * always only one thread access the list, so no need lock here */ - list_for_each_entry(lst, &top_th->tt_sub_thandle_list, st_sub_list) { - if (lst->st_sub_th->th_dev == sub_dt) - RETURN(lst->st_sub_th); + if (top_th->tt_multiple_thandle != NULL) { + st = lookup_sub_thandle(top_th->tt_multiple_thandle, sub_dt); + if (st != NULL) + RETURN(st->st_sub_th); } sub_th = dt_trans_create(env, sub_dt); if (IS_ERR(sub_th)) RETURN(sub_th); - /* XXX all of mixed transaction (see struct th_handle) will - * be synchronized until async update is done */ - th->th_sync = 1; + /* Create top_multiple_thandle if necessary */ + if (top_th->tt_multiple_thandle == NULL) { + struct top_multiple_thandle *tmt; - sub_th->th_top = th; - OBD_ALLOC_PTR(lst); - if (lst == NULL) { - dt_trans_stop(env, sub_dt, sub_th); - RETURN(ERR_PTR(-ENOMEM)); + rc = top_trans_create_tmt(env, top_th); + if (rc < 0) + GOTO(stop_trans, rc); + + tmt = top_th->tt_multiple_thandle; + + /* Add master sub th to the top trans list */ + tmt->tmt_master_sub_dt = + top_th->tt_master_sub_thandle->th_dev; + st = create_sub_thandle_with_thandle(top_th, + top_th->tt_master_sub_thandle); + if (IS_ERR(st)) + GOTO(stop_trans, rc = PTR_ERR(st)); + top_th->tt_master_sub_thandle->th_sync = 1; + top_th->tt_super.th_sync = 1; } - INIT_LIST_HEAD(&lst->st_sub_list); - lst->st_sub_th = sub_th; - list_add(&lst->st_sub_list, &top_th->tt_sub_thandle_list); - lst->st_record_update = 1; + /* create and init sub th to the top trans list */ + st = create_sub_thandle_with_thandle(top_th, sub_th); + st->st_sub_th->th_wait_submit = 1; +stop_trans: + if (rc < 0) { + if (st != NULL) + OBD_FREE_PTR(st); + sub_th->th_result = rc; + dt_trans_stop(env, sub_dt, sub_th); + sub_th = ERR_PTR(rc); + } RETURN(sub_th); } EXPORT_SYMBOL(thandle_get_sub_by_dt); /** - * Top thandle destroy + * Top multiple thandle destroy * - * Destroy the top thandle and all of its sub thandle. + * Destroy multiple thandle and all its sub thandle. * - * \param[in] top_th top thandle to be destroyed. + * \param[in] tmt top_multiple_thandle to be destroyed. */ -void top_thandle_destroy(struct top_thandle *top_th) +void top_multiple_thandle_destroy(struct top_multiple_thandle *tmt) { struct sub_thandle *st; struct sub_thandle *tmp; - LASSERT(top_th->tt_magic == TOP_THANDLE_MAGIC); - list_for_each_entry_safe(st, tmp, &top_th->tt_sub_thandle_list, + LASSERT(tmt->tmt_magic == TOP_THANDLE_MAGIC); + list_for_each_entry_safe(st, tmp, &tmt->tmt_sub_thandle_list, st_sub_list) { list_del(&st->st_sub_list); OBD_FREE_PTR(st); } - OBD_FREE_PTR(top_th); + OBD_FREE_PTR(tmt); +} +EXPORT_SYMBOL(top_multiple_thandle_destroy); + +/** + * Cancel the update log on MDTs + * + * Cancel the update log on MDTs then destroy the thandle. + * + * \param[in] env execution environment + * \param[in] tmt the top multiple thandle whose updates records + * will be cancelled. + * + * \retval 0 if cancellation succeeds. + * \retval negative errno if cancellation fails. + */ +static int distribute_txn_cancel_records(const struct lu_env *env, + struct top_multiple_thandle *tmt) +{ + struct sub_thandle *st; + ENTRY; + + top_multiple_thandle_dump(tmt, D_INFO); + /* Cancel update logs on other MDTs */ + list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) { + struct llog_ctxt *ctxt; + struct obd_device *obd; + struct llog_cookie *cookie; + int rc; + + cookie = &st->st_cookie; + if (fid_is_zero(&cookie->lgc_lgl.lgl_oi.oi_fid)) + continue; + + obd = st->st_dt->dd_lu_dev.ld_obd; + ctxt = llog_get_context(obd, LLOG_UPDATELOG_ORIG_CTXT); + LASSERT(ctxt); + + rc = llog_cat_cancel_records(env, ctxt->loc_handle, 1, + cookie); + + llog_ctxt_put(ctxt); + CDEBUG(D_HA, "%s: batchid %llu cancel update log "DOSTID + ".%u : rc = %d\n", obd->obd_name, tmt->tmt_batchid, + POSTID(&cookie->lgc_lgl.lgl_oi), cookie->lgc_index, rc); + } + + RETURN(0); +} + +/** + * Check if there are committed transaction + * + * Check if there are committed transaction in the distribute transaction + * list, then cancel the update records for those committed transaction. + * Because the distribute transaction in the list are sorted by batchid, + * and cancellation will be done by batchid order, so we only check the first + * the transaction(with lowest batchid) in the list. + * + * \param[in] lod lod device where cancel thread is + * + * \retval true if it is ready + * \retval false if it is not ready + */ +static bool tdtd_ready_for_cancel_log(struct target_distribute_txn_data *tdtd) +{ + struct top_multiple_thandle *tmt = NULL; + struct obd_device *obd = tdtd->tdtd_lut->lut_obd; + bool ready = false; + + spin_lock(&tdtd->tdtd_batchid_lock); + if (!list_empty(&tdtd->tdtd_list)) { + tmt = list_entry(tdtd->tdtd_list.next, + struct top_multiple_thandle, tmt_commit_list); + if (tmt->tmt_committed && + (!obd->obd_recovering || (obd->obd_recovering && + tmt->tmt_batchid <= tdtd->tdtd_committed_batchid))) + ready = true; + } + spin_unlock(&tdtd->tdtd_batchid_lock); + + return ready; +} + +struct distribute_txn_bid_data { + struct dt_txn_commit_cb dtbd_cb; + struct target_distribute_txn_data *dtbd_tdtd; + __u64 dtbd_batchid; +}; + +/** + * callback of updating commit batchid + * + * Updating commit batchid then wake up the commit thread to cancel the + * records. + * + * \param[in]env execution environment + * \param[in]th thandle to updating commit batchid + * \param[in]cb commit callback + * \param[in]err result of thandle + */ +static void distribute_txn_batchid_cb(struct lu_env *env, + struct thandle *th, + struct dt_txn_commit_cb *cb, + int err) +{ + struct distribute_txn_bid_data *dtbd = NULL; + struct target_distribute_txn_data *tdtd; + + dtbd = container_of0(cb, struct distribute_txn_bid_data, dtbd_cb); + tdtd = dtbd->dtbd_tdtd; + + CDEBUG(D_HA, "%s: %llu batchid updated\n", + tdtd->tdtd_lut->lut_obd->obd_name, dtbd->dtbd_batchid); + spin_lock(&tdtd->tdtd_batchid_lock); + if (dtbd->dtbd_batchid > tdtd->tdtd_committed_batchid && + !tdtd->tdtd_lut->lut_obd->obd_no_transno) + tdtd->tdtd_committed_batchid = dtbd->dtbd_batchid; + spin_unlock(&tdtd->tdtd_batchid_lock); + atomic_dec(&tdtd->tdtd_refcount); + wake_up(&tdtd->tdtd_commit_thread_waitq); + + OBD_FREE_PTR(dtbd); +} + +/** + * Update the commit batchid in disk + * + * Update commit batchid in the disk, after this is committed, it can start + * to cancel the update records. + * + * \param[in] env execution environment + * \param[in] tdtd distribute transaction structure + * \param[in] batchid commit batchid to be updated + * + * \retval 0 if update succeeds. + * \retval negative errno if update fails. + */ +static int +distribute_txn_commit_batchid_update(const struct lu_env *env, + struct target_distribute_txn_data *tdtd, + __u64 batchid) +{ + struct distribute_txn_bid_data *dtbd = NULL; + struct thandle *th; + struct lu_buf buf; + __u64 tmp; + __u64 off; + int rc; + ENTRY; + + OBD_ALLOC_PTR(dtbd); + if (dtbd == NULL) + RETURN(-ENOMEM); + dtbd->dtbd_batchid = batchid; + dtbd->dtbd_tdtd = tdtd; + dtbd->dtbd_cb.dcb_func = distribute_txn_batchid_cb; + atomic_inc(&tdtd->tdtd_refcount); + + th = dt_trans_create(env, tdtd->tdtd_lut->lut_bottom); + if (IS_ERR(th)) { + OBD_FREE_PTR(dtbd); + RETURN(PTR_ERR(th)); + } + + tmp = cpu_to_le64(batchid); + buf.lb_buf = &tmp; + buf.lb_len = sizeof(tmp); + off = 0; + + rc = dt_declare_record_write(env, tdtd->tdtd_batchid_obj, &buf, off, + th); + if (rc < 0) + GOTO(stop, rc); + + rc = dt_trans_start_local(env, tdtd->tdtd_lut->lut_bottom, th); + if (rc < 0) + GOTO(stop, rc); + + dt_trans_cb_add(th, &dtbd->dtbd_cb); + if (rc < 0) + GOTO(stop, rc); + + rc = dt_record_write(env, tdtd->tdtd_batchid_obj, &buf, + &off, th); + + CDEBUG(D_INFO, "%s: update batchid "LPU64": rc = %d\n", + tdtd->tdtd_lut->lut_obd->obd_name, batchid, rc); + +stop: + dt_trans_stop(env, tdtd->tdtd_lut->lut_bottom, th); + if (rc < 0) + OBD_FREE_PTR(dtbd); + RETURN(rc); +} + +/** + * Init commit batchid for distribute transaction. + * + * Initialize the batchid object and get commit batchid from the object. + * + * \param[in] env execution environment + * \param[in] tdtd distribute transaction whose batchid is initialized. + * + * \retval 0 if initialization succeeds. + * \retval negative errno if initialization fails. + **/ +static int +distribute_txn_commit_batchid_init(const struct lu_env *env, + struct target_distribute_txn_data *tdtd) +{ + struct tgt_thread_info *tti = tgt_th_info(env); + struct lu_target *lut = tdtd->tdtd_lut; + struct lu_attr *attr = &tti->tti_attr; + struct lu_fid *fid = &tti->tti_fid1; + struct dt_object_format *dof = &tti->tti_u.update.tti_update_dof; + struct dt_object *dt_obj = NULL; + struct lu_buf buf; + __u64 tmp; + __u64 off; + int rc; + ENTRY; + + memset(attr, 0, sizeof(*attr)); + attr->la_valid = LA_MODE; + attr->la_mode = S_IFREG | S_IRUGO | S_IWUSR; + dof->dof_type = dt_mode_to_dft(S_IFREG); + + lu_local_obj_fid(fid, BATCHID_COMMITTED_OID); + + dt_obj = dt_find_or_create(env, lut->lut_bottom, fid, dof, + attr); + if (IS_ERR(dt_obj)) + GOTO(out_put, rc = PTR_ERR(dt_obj)); + + tdtd->tdtd_batchid_obj = dt_obj; + + buf.lb_buf = &tmp; + buf.lb_len = sizeof(tmp); + off = 0; + rc = dt_read(env, dt_obj, &buf, &off); + if (rc < 0 || (rc < buf.lb_len && rc > 0)) { + CERROR("%s can't read last committed batchid: rc = %d\n", + tdtd->tdtd_lut->lut_obd->obd_name, rc); + if (rc > 0) + rc = -EINVAL; + GOTO(out_put, rc); + } else if (rc == buf.lb_len) { + tdtd->tdtd_committed_batchid = le64_to_cpu(tmp); + CDEBUG(D_HA, "%s: committed batchid %llu\n", + tdtd->tdtd_lut->lut_obd->obd_name, + tdtd->tdtd_committed_batchid); + rc = 0; + } + +out_put: + if (rc < 0 && dt_obj != NULL) { + lu_object_put(env, &dt_obj->do_lu); + tdtd->tdtd_batchid_obj = NULL; + } + return rc; +} + +/** + * manage the distribute transaction thread + * + * Distribute transaction are linked to the list, and once the distribute + * transaction is committed, it will update the last committed batchid first, + * after it is committed, it will cancel the records. + * + * \param[in] _arg argument for commit thread + * + * \retval 0 if thread is running successfully + * \retval negative errno if the thread can not be run. + */ +static int distribute_txn_commit_thread(void *_arg) +{ + struct target_distribute_txn_data *tdtd = _arg; + struct lu_target *lut = tdtd->tdtd_lut; + struct ptlrpc_thread *thread = &lut->lut_tdtd_commit_thread; + struct l_wait_info lwi = { 0 }; + struct lu_env env; + struct list_head list; + int rc; + struct top_multiple_thandle *tmt; + struct top_multiple_thandle *tmp; + __u64 batchid = 0, committed; + + ENTRY; + + rc = lu_env_init(&env, LCT_LOCAL | LCT_MD_THREAD); + if (rc != 0) + RETURN(rc); + + spin_lock(&tdtd->tdtd_batchid_lock); + thread->t_flags = SVC_RUNNING; + spin_unlock(&tdtd->tdtd_batchid_lock); + wake_up(&thread->t_ctl_waitq); + INIT_LIST_HEAD(&list); + + CDEBUG(D_HA, "%s: start commit thread committed batchid "LPU64"\n", + tdtd->tdtd_lut->lut_obd->obd_name, + tdtd->tdtd_committed_batchid); + + while (distribute_txn_commit_thread_running(lut)) { + spin_lock(&tdtd->tdtd_batchid_lock); + list_for_each_entry_safe(tmt, tmp, &tdtd->tdtd_list, + tmt_commit_list) { + if (tmt->tmt_committed == 0) + break; + + /* Note: right now, replay is based on master MDT + * transno, but cancellation is based on batchid. + * so we do not try to cancel the update log until + * the recoverying is done, unless the update records + * batchid < committed_batchid. */ + if (tmt->tmt_batchid <= tdtd->tdtd_committed_batchid) { + list_move_tail(&tmt->tmt_commit_list, &list); + } else if (!tdtd->tdtd_lut->lut_obd->obd_recovering) { + LASSERTF(tmt->tmt_batchid >= batchid, + "tmt %p tmt_batchid: "LPU64", batchid " + LPU64"\n", tmt, tmt->tmt_batchid, + batchid); + /* There are three types of distribution + * transaction result + * + * 1. If tmt_result < 0, it means the + * distribution transaction fails, which should + * be rare, because once declare phase succeeds, + * the operation should succeeds anyway. Note in + * this case, we will still update batchid so + * cancellation would be stopped. + * + * 2. If tmt_result == 0, it means the + * distribution transaction succeeds, and we + * will update batchid. + * + * 3. If tmt_result > 0, it means distribute + * transaction is not yet committed on every + * node, but we need release this tmt before + * that, which usuually happens during umount. + */ + if (tmt->tmt_result <= 0) + batchid = tmt->tmt_batchid; + list_move_tail(&tmt->tmt_commit_list, &list); + } + } + spin_unlock(&tdtd->tdtd_batchid_lock); + + CDEBUG(D_HA, "%s: batchid: "LPU64" committed batchid " + LPU64"\n", tdtd->tdtd_lut->lut_obd->obd_name, batchid, + tdtd->tdtd_committed_batchid); + /* update globally committed on a storage */ + if (batchid > tdtd->tdtd_committed_batchid) { + distribute_txn_commit_batchid_update(&env, tdtd, + batchid); + spin_lock(&tdtd->tdtd_batchid_lock); + if (batchid > tdtd->tdtd_batchid) { + /* This might happen during recovery, + * batchid is initialized as last transno, + * and the batchid in the update records + * on other MDTs might be bigger than + * the batchid, so we need update it to + * avoid duplicate batchid. */ + CDEBUG(D_HA, "%s update batchid from "LPU64 + " to "LPU64"\n", + tdtd->tdtd_lut->lut_obd->obd_name, + tdtd->tdtd_batchid, batchid); + tdtd->tdtd_batchid = batchid; + } + spin_unlock(&tdtd->tdtd_batchid_lock); + } + /* cancel the records for committed batchid's */ + /* XXX: should we postpone cancel's till the end of recovery? */ + committed = tdtd->tdtd_committed_batchid; + list_for_each_entry_safe(tmt, tmp, &list, tmt_commit_list) { + if (tmt->tmt_batchid > committed) + break; + list_del_init(&tmt->tmt_commit_list); + if (tmt->tmt_result <= 0) + distribute_txn_cancel_records(&env, tmt); + top_multiple_thandle_put(tmt); + } + + l_wait_event(tdtd->tdtd_commit_thread_waitq, + !distribute_txn_commit_thread_running(lut) || + committed < tdtd->tdtd_committed_batchid || + tdtd_ready_for_cancel_log(tdtd), &lwi); + }; + + l_wait_event(tdtd->tdtd_commit_thread_waitq, + atomic_read(&tdtd->tdtd_refcount) == 0, &lwi); + + spin_lock(&tdtd->tdtd_batchid_lock); + list_for_each_entry_safe(tmt, tmp, &tdtd->tdtd_list, + tmt_commit_list) + list_move_tail(&tmt->tmt_commit_list, &list); + spin_unlock(&tdtd->tdtd_batchid_lock); + + CDEBUG(D_INFO, "%s stopping distribute txn commit thread.\n", + tdtd->tdtd_lut->lut_obd->obd_name); + list_for_each_entry_safe(tmt, tmp, &list, tmt_commit_list) { + list_del_init(&tmt->tmt_commit_list); + top_multiple_thandle_dump(tmt, D_HA); + top_multiple_thandle_put(tmt); + } + + thread->t_flags = SVC_STOPPED; + lu_env_fini(&env); + wake_up(&thread->t_ctl_waitq); + + RETURN(0); +} + +/** + * Start llog cancel thread + * + * Start llog cancel(master/slave) thread on LOD + * + * \param[in]lclt cancel log thread to be started. + * + * \retval 0 if the thread is started successfully. + * \retval negative errno if the thread is not being + * started. + */ +int distribute_txn_init(const struct lu_env *env, + struct lu_target *lut, + struct target_distribute_txn_data *tdtd, + __u32 index) +{ + struct task_struct *task; + struct l_wait_info lwi = { 0 }; + int rc; + ENTRY; + + spin_lock_init(&tdtd->tdtd_batchid_lock); + INIT_LIST_HEAD(&tdtd->tdtd_list); + + tdtd->tdtd_batchid = lut->lut_last_transno + 1; + + init_waitqueue_head(&lut->lut_tdtd_commit_thread.t_ctl_waitq); + init_waitqueue_head(&tdtd->tdtd_commit_thread_waitq); + atomic_set(&tdtd->tdtd_refcount, 0); + + tdtd->tdtd_lut = lut; + rc = distribute_txn_commit_batchid_init(env, tdtd); + if (rc != 0) + RETURN(rc); + + task = kthread_run(distribute_txn_commit_thread, tdtd, "tdtd-%u", + index); + if (IS_ERR(task)) + RETURN(PTR_ERR(task)); + + l_wait_event(lut->lut_tdtd_commit_thread.t_ctl_waitq, + distribute_txn_commit_thread_running(lut) || + distribute_txn_commit_thread_stopped(lut), &lwi); + RETURN(0); +} +EXPORT_SYMBOL(distribute_txn_init); + +/** + * Stop llog cancel thread + * + * Stop llog cancel(master/slave) thread on LOD and also destory + * all of transaction in the list. + * + * \param[in]lclt cancel log thread to be stopped. + */ +void distribute_txn_fini(const struct lu_env *env, + struct target_distribute_txn_data *tdtd) +{ + struct lu_target *lut = tdtd->tdtd_lut; + + /* Stop cancel thread */ + if (lut == NULL || !distribute_txn_commit_thread_running(lut)) + return; + + spin_lock(&tdtd->tdtd_batchid_lock); + lut->lut_tdtd_commit_thread.t_flags = SVC_STOPPING; + spin_unlock(&tdtd->tdtd_batchid_lock); + wake_up(&tdtd->tdtd_commit_thread_waitq); + wait_event(lut->lut_tdtd_commit_thread.t_ctl_waitq, + lut->lut_tdtd_commit_thread.t_flags & SVC_STOPPED); + + if (tdtd->tdtd_batchid_obj != NULL) + lu_object_put(env, &tdtd->tdtd_batchid_obj->do_lu); } -EXPORT_SYMBOL(top_thandle_destroy); +EXPORT_SYMBOL(distribute_txn_fini); diff --git a/lustre/utils/wirecheck.c b/lustre/utils/wirecheck.c index 0adf066..2e997c7 100644 --- a/lustre/utils/wirecheck.c +++ b/lustre/utils/wirecheck.c @@ -252,6 +252,7 @@ check_ost_id(void) CHECK_VALUE_64X(FID_SEQ_ROOT); CHECK_VALUE_64X(FID_SEQ_LAYOUT_RBTREE); CHECK_VALUE_64X(FID_SEQ_UPDATE_LOG); + CHECK_VALUE_64X(FID_SEQ_UPDATE_LOG_DIR); CHECK_VALUE_64X(FID_SEQ_NORMAL); CHECK_VALUE_64X(FID_SEQ_LOV_DEFAULT); diff --git a/lustre/utils/wiretest.c b/lustre/utils/wiretest.c index 92a44f1..2d347c7 100644 --- a/lustre/utils/wiretest.c +++ b/lustre/utils/wiretest.c @@ -65,7 +65,6 @@ void lustre_assert_wire_constants(void) * running on Linux q 2.6.32-431.5.1.el6.lustre.x86_64 #1 SMP Wed Feb 12 11:01:08 CST 2014 x8 * with gcc version 4.4.7 20120313 (Red Hat 4.4.7-3) (GCC) */ - /* Constants... */ LASSERTF(PTL_RPC_MSG_REQUEST == 4711, "found %lld\n", (long long)PTL_RPC_MSG_REQUEST); @@ -541,6 +540,8 @@ void lustre_assert_wire_constants(void) (long long)FID_SEQ_LAYOUT_RBTREE); LASSERTF(FID_SEQ_UPDATE_LOG == 0x0000000200000009ULL, "found 0x%.16llxULL\n", (long long)FID_SEQ_UPDATE_LOG); + LASSERTF(FID_SEQ_UPDATE_LOG_DIR == 0x000000020000000aULL, "found 0x%.16llxULL\n", + (long long)FID_SEQ_UPDATE_LOG_DIR); LASSERTF(FID_SEQ_NORMAL == 0x0000000200000400ULL, "found 0x%.16llxULL\n", (long long)FID_SEQ_NORMAL); LASSERTF(FID_SEQ_LOV_DEFAULT == 0xffffffffffffffffULL, "found 0x%.16llxULL\n",