From: Alex Zhuravlev Date: Fri, 19 Oct 2012 14:18:05 +0000 (+0400) Subject: LU-7251 osp: do not assign commit callback to every thandle X-Git-Tag: 2.10.55~19 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=0ba690a526be74c4cdffe7a7dd3031b4bd2b37d8 LU-7251 osp: do not assign commit callback to every thandle with OSP there is a risk of getting a lot of commit callbacks. say, 10K unlinks/sec on 4-striped files could result in 4*10K*5 = 200K commit callbacks. this patch implements another schema: every OSP registers own callback every second. this should result in 4*5 commit callbacks in the same situation. in case of forced sync the commit callback is registered unconditionally. the patch removes th_tags and th_ctx from struct thandle as they are not used anymore. this elimintates 3 allocations from every transaction: (lu_object.c:1714:keys_init()) kmalloced 'ctx->lc_value': 320 (update_records.c:1217:update_key_init()) kmalloced 'value': 408 (osp_dev.c:1807:osp_txn_key_init()) kmalloced 'value': 4 Change-Id: I460d5eccb585b166423d84d5c142af2e27751d8b Signed-off-by: Alex Zhuravlev Reviewed-on: https://review.whamcloud.com/17270 Tested-by: Jenkins Tested-by: Maloo Reviewed-by: Andreas Dilger Reviewed-by: Lai Siyao --- diff --git a/lustre/include/dt_object.h b/lustre/include/dt_object.h index 436139f..98873e8 100644 --- a/lustre/include/dt_object.h +++ b/lustre/include/dt_object.h @@ -1885,11 +1885,6 @@ struct thandle { * top thandle here for now, will fix it when we have better * callback mechanism */ struct thandle *th_top; - /** context for this transaction, tag is LCT_TX_HANDLE */ - struct lu_context th_ctx; - - /** additional tags (layers can add in declare) */ - __u32 th_tags; /** the last operation result in this transaction. * this value is used in recovery */ diff --git a/lustre/include/lu_object.h b/lustre/include/lu_object.h index 344b933..af57228 100644 --- a/lustre/include/lu_object.h +++ b/lustre/include/lu_object.h @@ -998,10 +998,6 @@ enum lu_context_tag { */ LCT_DT_THREAD = 1 << 1, /** - * Context for transaction handle - */ - LCT_TX_HANDLE = 1 << 2, - /** * Thread on client */ LCT_CL_THREAD = 1 << 3, diff --git a/lustre/osd-ldiskfs/osd_handler.c b/lustre/osd-ldiskfs/osd_handler.c index c4e6d4c..87a07d4 100644 --- a/lustre/osd-ldiskfs/osd_handler.c +++ b/lustre/osd-ldiskfs/osd_handler.c @@ -1590,8 +1590,6 @@ static void osd_trans_commit_cb(struct super_block *sb, lu_device_put(lud); th->th_dev = NULL; - lu_context_exit(&th->th_ctx); - lu_context_fini(&th->th_ctx); OBD_FREE_PTR(oh); } @@ -1629,7 +1627,6 @@ static struct thandle *osd_trans_create(const struct lu_env *env, th = &oh->ot_super; th->th_dev = d; th->th_result = 0; - th->th_tags = LCT_TX_HANDLE; oh->ot_credits = 0; INIT_LIST_HEAD(&oh->ot_commit_dcb_list); INIT_LIST_HEAD(&oh->ot_stop_dcb_list); @@ -1763,8 +1760,6 @@ static int osd_trans_start(const struct lu_env *env, struct dt_device *d, if (!IS_ERR(jh)) { oh->ot_handle = jh; LASSERT(oti->oti_txns == 0); - lu_context_init(&th->th_ctx, th->th_tags); - lu_context_enter(&th->th_ctx); lu_device_get(&d->dd_lu_dev); lu_ref_add_at(&d->dd_lu_dev.ld_reference, &oh->ot_dev_link, diff --git a/lustre/osd-zfs/osd_handler.c b/lustre/osd-zfs/osd_handler.c index 974a0a2..fc6c701 100644 --- a/lustre/osd-zfs/osd_handler.c +++ b/lustre/osd-zfs/osd_handler.c @@ -166,8 +166,6 @@ static void osd_trans_commit_cb(void *cb_data, int error) lu_device_put(lud); th->th_dev = NULL; - lu_context_exit(&th->th_ctx); - lu_context_fini(&th->th_ctx); OBD_FREE_PTR(oh); EXIT; @@ -225,8 +223,6 @@ static int osd_trans_start(const struct lu_env *env, struct dt_device *d, /* add commit callback */ dmu_tx_callback_register(oh->ot_tx, osd_trans_commit_cb, oh); oh->ot_assigned = 1; - lu_context_init(&th->th_ctx, th->th_tags); - lu_context_enter(&th->th_ctx); lu_device_get(&d->dd_lu_dev); } @@ -360,7 +356,6 @@ static struct thandle *osd_trans_create(const struct lu_env *env, th = &oh->ot_super; th->th_dev = dt; th->th_result = 0; - th->th_tags = LCT_TX_HANDLE; RETURN(th); } diff --git a/lustre/osp/lproc_osp.c b/lustre/osp/lproc_osp.c index d88646b..a33513b 100644 --- a/lustre/osp/lproc_osp.c +++ b/lustre/osp/lproc_osp.c @@ -874,6 +874,26 @@ osp_reserved_mb_low_seq_write(struct file *file, const char __user *buffer, } LPROC_SEQ_FOPS(osp_reserved_mb_low); +static ssize_t +lprocfs_force_sync_seq_write(struct file *file, const char __user *buffer, + size_t count, loff_t *off) +{ + struct seq_file *m = file->private_data; + struct obd_device *dev = m->private; + struct dt_device *dt = lu2dt_dev(dev->obd_lu_dev); + struct lu_env env; + int rc; + + rc = lu_env_init(&env, LCT_LOCAL); + if (rc) + return rc; + rc = dt_sync(&env, dt); + lu_env_fini(&env); + + return rc == 0 ? count : rc; +} +LPROC_SEQ_FOPS_WR_ONLY(osp, force_sync); + static struct lprocfs_vars lprocfs_osp_obd_vars[] = { { .name = "ping", .fops = &osp_ping_fops, @@ -926,6 +946,8 @@ static struct lprocfs_vars lprocfs_osp_obd_vars[] = { .fops = &osp_reserved_mb_high_fops }, { .name = "reserved_mb_low", .fops = &osp_reserved_mb_low_fops }, + { .name = "force_sync", + .fops = &osp_force_sync_fops }, /* for compatibility reasons */ { .name = "destroys_in_flight", diff --git a/lustre/osp/osp_dev.c b/lustre/osp/osp_dev.c index 81a62c7..4c7f791 100644 --- a/lustre/osp/osp_dev.c +++ b/lustre/osp/osp_dev.c @@ -797,19 +797,24 @@ static int osp_sync(const struct lu_env *env, struct dt_device *dev) struct osp_device *d = dt2osp_dev(dev); cfs_time_t expire; struct l_wait_info lwi = { 0 }; - unsigned long id, old; - int rc = 0; - unsigned long start = cfs_time_current(); + int recs, rc = 0; + unsigned long start = cfs_time_current(); + __u64 old; + ENTRY; /* No Sync between MDTs yet. */ if (d->opd_connect_mdt) RETURN(0); + recs = atomic_read(&d->opd_sync_changes); + old = atomic64_read(&d->opd_sync_processed_recs); + + osp_sync_force(env, dt2osp_dev(dev)); + if (unlikely(d->opd_imp_active == 0)) RETURN(-ENOTCONN); - id = d->opd_sync_last_used_id; down_write(&d->opd_async_updates_rwsem); CDEBUG(D_OTHER, "%s: async updates %d\n", d->opd_obd->obd_name, @@ -825,26 +830,23 @@ static int osp_sync(const struct lu_env *env, struct dt_device *dev) if (rc != 0) GOTO(out, rc); - CDEBUG(D_CACHE, "%s: id: used %lu, processed %llu\n", - d->opd_obd->obd_name, id, d->opd_sync_last_processed_id); - - /* wait till all-in-line are processed */ - while (d->opd_sync_last_processed_id < id) { - - old = d->opd_sync_last_processed_id; + CDEBUG(D_CACHE, "%s: processed %lu\n", d->opd_obd->obd_name, + atomic64_read(&d->opd_sync_processed_recs)); + while (atomic64_read(&d->opd_sync_processed_recs) < old + recs) { + __u64 last = atomic64_read(&d->opd_sync_processed_recs); /* make sure the connection is fine */ expire = cfs_time_shift(obd_timeout); lwi = LWI_TIMEOUT(expire - cfs_time_current(), osp_sync_timeout, d); l_wait_event(d->opd_sync_barrier_waitq, - d->opd_sync_last_processed_id >= id, - &lwi); + atomic64_read(&d->opd_sync_processed_recs) + >= old + recs, &lwi); - if (d->opd_sync_last_processed_id >= id) + if (atomic64_read(&d->opd_sync_processed_recs) >= old + recs) break; - if (d->opd_sync_last_processed_id != old) { + if (atomic64_read(&d->opd_sync_processed_recs) != last) { /* some progress have been made, * keep trying... */ continue; @@ -1344,8 +1346,12 @@ static struct lu_device *osp_device_fini(const struct lu_env *env, osp->opd_async_requests = NULL; } - if (osp->opd_storage_exp) + if (osp->opd_storage_exp) { + /* wait for the commit callbacks to complete */ + wait_event(osp->opd_sync_waitq, + atomic_read(&osp->opd_commits_registered) == 0); obd_disconnect(osp->opd_storage_exp); + } if (osp->opd_symlink) lprocfs_remove(&osp->opd_symlink); @@ -1869,7 +1875,7 @@ struct lu_context_key osp_thread_key = { LU_KEY_INIT_FINI(osp_txn, struct osp_txn_info); struct lu_context_key osp_txn_key = { - .lct_tags = LCT_OSP_THREAD | LCT_TX_HANDLE, + .lct_tags = LCT_OSP_THREAD, .lct_init = osp_txn_key_init, .lct_fini = osp_txn_key_fini }; diff --git a/lustre/osp/osp_internal.h b/lustre/osp/osp_internal.h index 16c6423..74dc8f8 100644 --- a/lustre/osp/osp_internal.h +++ b/lustre/osp/osp_internal.h @@ -224,19 +224,20 @@ struct osp_device { /* osd api's commit cb control structure */ struct dt_txn_callback opd_sync_txn_cb; /* last used change number -- semantically similar to transno */ - __u64 opd_sync_last_used_id; + unsigned long opd_sync_last_used_id; /* last committed change number -- semantically similar to * last_committed */ __u64 opd_sync_last_committed_id; - /* last processed (taken from llog) id */ - volatile __u64 opd_sync_last_processed_id; /* last processed catalog index */ - int opd_sync_last_catalog_idx; - struct osp_id_tracker *opd_sync_tracker; - struct list_head opd_sync_ontrack; + int opd_sync_last_catalog_idx; + /* number of processed records */ + atomic64_t opd_sync_processed_recs; /* stop processing new requests until barrier=0 */ atomic_t opd_sync_barrier; wait_queue_head_t opd_sync_barrier_waitq; + /* last generated id */ + cfs_time_t opd_sync_next_commit_cb; + atomic_t opd_commits_registered; /* * statfs related fields: OSP maintains it on its own @@ -814,6 +815,11 @@ int osp_sync_add(const struct lu_env *env, struct osp_object *o, int osp_sync_init(const struct lu_env *env, struct osp_device *d); int osp_sync_fini(struct osp_device *d); void osp_sync_check_for_work(struct osp_device *osp); +void osp_sync_force(const struct lu_env *env, struct osp_device *d); +int osp_sync_add_commit_cb(const struct lu_env *env, struct osp_device *d, + struct thandle *th); +int osp_sync_add_commit_cb_1s(const struct lu_env *env, struct osp_device *d, + struct thandle *th); /* lwp_dev.c */ extern struct obd_ops lwp_obd_device_ops; diff --git a/lustre/osp/osp_precreate.c b/lustre/osp/osp_precreate.c index e3d2ee1..cc40e98 100644 --- a/lustre/osp/osp_precreate.c +++ b/lustre/osp/osp_precreate.c @@ -29,7 +29,7 @@ * This file is part of Lustre, http://www.lustre.org/ * Lustre is a trademark of Sun Microsystems, Inc. * - * lustre/osp/osp_sync.c + * lustre/osp/osp_precreate.c * * Lustre OST Proxy Device * @@ -168,7 +168,7 @@ out: * * \param[in] d OSP device */ -static int osp_statfs_update(struct osp_device *d) +static int osp_statfs_update(const struct lu_env *env, struct osp_device *d) { struct ptlrpc_request *req; struct obd_import *imp; @@ -208,6 +208,29 @@ static int osp_statfs_update(struct osp_device *d) ptlrpcd_add_req(req); + /* we still want to sync changes if no new changes are coming */ + if (cfs_time_before(cfs_time_current(), d->opd_sync_next_commit_cb)) + GOTO(out, rc); + + if (atomic_read(&d->opd_sync_changes)) { + struct thandle *th; + + th = dt_trans_create(env, d->opd_storage); + if (IS_ERR(th)) { + CERROR("%s: can't sync\n", d->opd_obd->obd_name); + GOTO(out, rc); + } + rc = dt_trans_start_local(env, d->opd_storage, th); + if (rc == 0) { + CDEBUG(D_OTHER, "%s: sync forced, %d changes\n", + d->opd_obd->obd_name, + atomic_read(&d->opd_sync_changes)); + osp_sync_add_commit_cb_1s(env, d, th); + dt_trans_stop(env, d->opd_storage, th); + } + } + +out: RETURN(0); } @@ -1193,7 +1216,7 @@ static int osp_precreate_thread(void *_arg) continue; } - if (osp_statfs_update(d)) { + if (osp_statfs_update(&env, d)) { l_wait_event(d->opd_pre_waitq, !osp_precreate_running(d), &lwi2); continue; @@ -1226,7 +1249,7 @@ static int osp_precreate_thread(void *_arg) break; if (osp_statfs_need_update(d)) - if (osp_statfs_update(d)) + if (osp_statfs_update(&env, d)) break; /* To avoid handling different seq in precreate/orphan @@ -1359,7 +1382,7 @@ int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d) { struct l_wait_info lwi; cfs_time_t expire = cfs_time_shift(obd_timeout); - int precreated, rc; + int precreated, rc, synced = 0; ENTRY; @@ -1418,9 +1441,11 @@ int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d) * wait till that is done - some space might be released */ if (unlikely(rc == -ENOSPC)) { - if (atomic_read(&d->opd_sync_changes)) { + if (atomic_read(&d->opd_sync_changes) && synced == 0) { /* force local commit to release space */ dt_commit_async(env, d->opd_storage); + osp_sync_force(env, d); + synced = 1; } if (atomic_read(&d->opd_sync_rpcs_in_progress)) { /* just wait till destroys are done */ diff --git a/lustre/osp/osp_sync.c b/lustre/osp/osp_sync.c index fed6202..80c11c2 100644 --- a/lustre/osp/osp_sync.c +++ b/lustre/osp/osp_sync.c @@ -44,11 +44,6 @@ #include #include "osp_internal.h" -static int osp_sync_id_traction_init(struct osp_device *d); -static void osp_sync_id_traction_fini(struct osp_device *d); -static __u64 osp_sync_id_get(struct osp_device *d, __u64 id); -static void osp_sync_remove_from_tracker(struct osp_device *d); - /* * this is a components of OSP implementing synchronization between MDS and OST * it llogs all interesting changes (currently it's uig/gid change and object @@ -128,9 +123,8 @@ static inline int osp_sync_stopped(struct osp_device *d) */ static inline int osp_sync_has_new_job(struct osp_device *d) { - return ((d->opd_sync_last_processed_id < d->opd_sync_last_used_id) && - (d->opd_sync_last_processed_id < d->opd_sync_last_committed_id)) - || (d->opd_sync_prev_done == 0); + return atomic_read(&d->opd_sync_changes) > 0 || + d->opd_sync_prev_done == 0; } static inline int osp_sync_in_flight_conflict(struct osp_device *d, @@ -259,6 +253,7 @@ static inline __u64 osp_sync_correct_id(struct osp_device *d, return correct_id; } + /** * Check and return ready-for-new status. * @@ -292,8 +287,10 @@ static inline int osp_sync_can_process_new(struct osp_device *d, return 1; if (atomic_read(&d->opd_sync_changes) == 0) return 0; - if (rec == NULL || - osp_sync_correct_id(d, rec) <= d->opd_sync_last_committed_id) + if (rec == NULL) + return 1; + /* notice "<" not "<=" */ + if (osp_sync_correct_id(d, rec) < d->opd_sync_last_committed_id) return 1; return 0; } @@ -342,9 +339,6 @@ int osp_sync_declare_add(const struct lu_env *env, struct osp_object *o, LBUG(); } - /* we want ->dt_trans_start() to allocate per-thandle structure */ - storage_th->th_tags |= LCT_OSP_THREAD; - ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT); LASSERT(ctxt); @@ -355,6 +349,28 @@ int osp_sync_declare_add(const struct lu_env *env, struct osp_object *o, RETURN(rc); } +/* add the commit callback every second */ +int osp_sync_add_commit_cb_1s(const struct lu_env *env, struct osp_device *d, + struct thandle *th) +{ + int add = 0; + + /* fast path */ + if (cfs_time_before(cfs_time_current(), d->opd_sync_next_commit_cb)) + return 0; + + spin_lock(&d->opd_sync_lock); + if (cfs_time_aftereq(cfs_time_current(), d->opd_sync_next_commit_cb)) + add = 1; + d->opd_sync_next_commit_cb = cfs_time_shift(1); + spin_unlock(&d->opd_sync_lock); + + if (add == 0) + return 0; + return osp_sync_add_commit_cb(env, d, th); +} + + /** * Generate a llog record for a given change. * @@ -383,7 +399,6 @@ static int osp_sync_add_rec(const struct lu_env *env, struct osp_device *d, { struct osp_thread_info *osi = osp_env_info(env); struct llog_ctxt *ctxt; - struct osp_txn_info *txn; struct thandle *storage_th; int rc; @@ -422,11 +437,13 @@ static int osp_sync_add_rec(const struct lu_env *env, struct osp_device *d, LBUG(); } - txn = osp_txn_info(&storage_th->th_ctx); - LASSERT(txn); + /* we keep the same id, but increment it when the callback + * is registered, so that all records upto the one taken + * by the callback are subject to processing */ + spin_lock(&d->opd_sync_lock); + osi->osi_hdr.lrh_id = d->opd_sync_last_used_id; + spin_unlock(&d->opd_sync_lock); - txn->oti_current_id = osp_sync_id_get(d, txn->oti_current_id); - osi->osi_hdr.lrh_id = (txn->oti_current_id & 0xffffffffULL); ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT); if (ctxt == NULL) RETURN(-ENOMEM); @@ -443,6 +460,9 @@ static int osp_sync_add_rec(const struct lu_env *env, struct osp_device *d, osi->osi_cookie.lgc_index, rc); atomic_inc(&d->opd_sync_changes); } + + rc = osp_sync_add_commit_cb_1s(env, d, th); + /* return 0 always here, error case just cause no llog record */ RETURN(0); } @@ -932,23 +952,11 @@ static void osp_sync_process_record(const struct lu_env *env, * we should decrease changes and bump last_processed_id. */ if (d->opd_sync_prev_done) { - __u64 correct_id = osp_sync_correct_id(d, rec); LASSERT(atomic_read(&d->opd_sync_changes) > 0); - LASSERT(correct_id <= d->opd_sync_last_committed_id); - /* NOTE: it's possible to meet same id if - * OST stores few stripes of same file - */ - while (1) { - /* another thread may be trying to set new value */ - rmb(); - if (correct_id > d->opd_sync_last_processed_id) { - d->opd_sync_last_processed_id = correct_id; - wake_up(&d->opd_sync_barrier_waitq); - } else - break; - } atomic_dec(&d->opd_sync_changes); + wake_up(&d->opd_sync_barrier_waitq); } + atomic64_inc(&d->opd_sync_processed_recs); if (rc != 0) { atomic_dec(&d->opd_sync_rpcs_in_flight); atomic_dec(&d->opd_sync_rpcs_in_progress); @@ -1128,9 +1136,6 @@ static int osp_sync_process_queues(const struct lu_env *env, rec = NULL; } - if (d->opd_sync_last_processed_id == d->opd_sync_last_used_id) - osp_sync_remove_from_tracker(d); - l_wait_event(d->opd_sync_waitq, !osp_sync_running(d) || osp_sync_can_process_new(d, rec) || @@ -1464,10 +1469,6 @@ int osp_sync_init(const struct lu_env *env, struct osp_device *d) if (d->opd_storage->dd_rdonly) RETURN(0); - rc = osp_sync_id_traction_init(d); - if (rc) - RETURN(rc); - /* * initialize llog storing changes */ @@ -1497,7 +1498,6 @@ int osp_sync_init(const struct lu_env *env, struct osp_device *d) err_llog: osp_sync_llog_fini(env, d); err_id: - osp_sync_id_traction_fini(d); return rc; } @@ -1522,222 +1522,93 @@ int osp_sync_fini(struct osp_device *d) wait_event(thread->t_ctl_waitq, thread_is_stopped(thread)); } - /* - * unregister transaction callbacks only when sync thread - * has finished operations with llog - */ - osp_sync_id_traction_fini(d); - RETURN(0); } -static DEFINE_MUTEX(osp_id_tracker_sem); -static struct list_head osp_id_tracker_list = - LIST_HEAD_INIT(osp_id_tracker_list); +struct osp_last_committed_cb { + struct dt_txn_commit_cb ospc_cb; + struct osp_device *ospc_dev; + __u64 ospc_transno; +}; -/** - * OSD commit callback. - * - * The function is used as a local OSD commit callback to track the highest - * committed llog record id. see osp_sync_id_traction_init() for the details. - * - * \param[in] th local transaction handle committed - * \param[in] cookie commit callback data (our private structure) - */ -static void osp_sync_tracker_commit_cb(struct thandle *th, void *cookie) +void osp_sync_local_commit_cb(struct lu_env *env, struct thandle *th, + struct dt_txn_commit_cb *dcb, int err) { - struct osp_id_tracker *tr = cookie; - struct osp_device *d; - struct osp_txn_info *txn; + struct osp_last_committed_cb *cb; + struct osp_device *d; - LASSERT(tr); + cb = container_of0(dcb, struct osp_last_committed_cb, ospc_cb); + d = cb->ospc_dev; - txn = osp_txn_info(&th->th_ctx); - if (txn == NULL || txn->oti_current_id < tr->otr_committed_id) - return; + CDEBUG(D_HA, "%s: %llu committed\n", d->opd_obd->obd_name, + cb->ospc_transno); - spin_lock(&tr->otr_lock); - if (likely(txn->oti_current_id > tr->otr_committed_id)) { - CDEBUG(D_OTHER, "committed: %llu -> %llu\n", - tr->otr_committed_id, txn->oti_current_id); - tr->otr_committed_id = txn->oti_current_id; + spin_lock(&d->opd_sync_lock); + if (cb->ospc_transno > d->opd_sync_last_committed_id) + d->opd_sync_last_committed_id = cb->ospc_transno; + spin_unlock(&d->opd_sync_lock); - list_for_each_entry(d, &tr->otr_wakeup_list, - opd_sync_ontrack) { - d->opd_sync_last_committed_id = tr->otr_committed_id; - wake_up(&d->opd_sync_waitq); - } - } - spin_unlock(&tr->otr_lock); + osp_sync_check_for_work(d); + lu_device_put(osp2lu_dev(d)); + if (atomic_dec_and_test(&d->opd_commits_registered)) + wake_up(&d->opd_sync_waitq); + + OBD_FREE_PTR(cb); } -/** - * Initialize commit tracking mechanism. - * - * Some setups may have thousands of OSTs and each will be represented by OSP. - * Meaning order of magnitute many more changes to apply every second. In order - * to keep the number of commit callbacks low this mechanism was introduced. - * The mechanism is very similar to transno used by MDT service: it's an single - * ID stream which can be assigned by any OSP to its llog records. The tricky - * part is that ID is stored in per-transaction data and re-used by all the OSPs - * involved in that transaction. Then all these OSPs are woken up utilizing a single OSD commit callback. - * - * The function initializes the data used by the tracker described above. - * A singler tracker per OSD device is created. - * - * \param[in] d OSP device - * - * \retval 0 on success - * \retval negative negated errno on error - */ -static int osp_sync_id_traction_init(struct osp_device *d) +int osp_sync_add_commit_cb(const struct lu_env *env, struct osp_device *d, + struct thandle *th) { - struct osp_id_tracker *tr, *found = NULL; - int rc = 0; + struct osp_last_committed_cb *cb; + struct dt_txn_commit_cb *dcb; + int rc = 0; + + OBD_ALLOC_PTR(cb); + if (cb == NULL) + return -ENOMEM; + cb->ospc_dev = d; + dcb = &cb->ospc_cb; + dcb->dcb_func = osp_sync_local_commit_cb; + spin_lock(&d->opd_sync_lock); + cb->ospc_transno = ++d->opd_sync_last_used_id; + spin_unlock(&d->opd_sync_lock); - LASSERT(d); - LASSERT(d->opd_storage); - LASSERT(d->opd_sync_tracker == NULL); - INIT_LIST_HEAD(&d->opd_sync_ontrack); - - mutex_lock(&osp_id_tracker_sem); - list_for_each_entry(tr, &osp_id_tracker_list, otr_list) { - if (tr->otr_dev == d->opd_storage) { - LASSERT(atomic_read(&tr->otr_refcount)); - atomic_inc(&tr->otr_refcount); - d->opd_sync_tracker = tr; - found = tr; - break; - } - } + rc = dt_trans_cb_add(th, dcb); + CDEBUG(D_HA, "%s: add commit cb at %llu, next at %llu, rc = %d\n", + d->opd_obd->obd_name, (unsigned long long) cfs_time_current(), + (unsigned long long) d->opd_sync_next_commit_cb, rc); - if (found == NULL) { - rc = -ENOMEM; - OBD_ALLOC_PTR(tr); - if (tr) { - d->opd_sync_tracker = tr; - spin_lock_init(&tr->otr_lock); - tr->otr_dev = d->opd_storage; - tr->otr_next_id = 1; - tr->otr_committed_id = 0; - atomic_set(&tr->otr_refcount, 1); - INIT_LIST_HEAD(&tr->otr_wakeup_list); - list_add(&tr->otr_list, &osp_id_tracker_list); - tr->otr_tx_cb.dtc_txn_commit = - osp_sync_tracker_commit_cb; - tr->otr_tx_cb.dtc_cookie = tr; - tr->otr_tx_cb.dtc_tag = LCT_MD_THREAD; - dt_txn_callback_add(d->opd_storage, &tr->otr_tx_cb); - rc = 0; - } - } - mutex_unlock(&osp_id_tracker_sem); + if (likely(rc == 0)) { + lu_device_get(osp2lu_dev(d)); + atomic_inc(&d->opd_commits_registered); + } else + OBD_FREE_PTR(cb); return rc; } -/** - * Release commit tracker. - * - * Decrease a refcounter on the tracker used by the given OSP device \a d. - * If no more users left, then the tracker is released. - * - * \param[in] d OSP device +/* + * generate an empty transaction and hook the commit callback in + * then force transaction commit */ -static void osp_sync_id_traction_fini(struct osp_device *d) +void osp_sync_force(const struct lu_env *env, struct osp_device *d) { - struct osp_id_tracker *tr; - - ENTRY; + struct thandle *th; + int rc; - LASSERT(d); - tr = d->opd_sync_tracker; - if (tr == NULL) { - EXIT; + th = dt_trans_create(env, d->opd_storage); + if (IS_ERR(th)) { + CERROR("%s: can't sync\n", d->opd_obd->obd_name); return; } - - osp_sync_remove_from_tracker(d); - - mutex_lock(&osp_id_tracker_sem); - if (atomic_dec_and_test(&tr->otr_refcount)) { - dt_txn_callback_del(d->opd_storage, &tr->otr_tx_cb); - LASSERT(list_empty(&tr->otr_wakeup_list)); - list_del(&tr->otr_list); - OBD_FREE_PTR(tr); - d->opd_sync_tracker = NULL; - } - mutex_unlock(&osp_id_tracker_sem); - - EXIT; -} - -/** - * Generate a new ID on a tracker. - * - * Generates a new ID using the tracker associated with the given OSP device - * \a d, if the given ID \a id is non-zero. Unconditially adds OSP device to - * the wakeup list, so OSP won't miss when a transaction using the ID is - * committed. - * - * \param[in] d OSP device - * \param[in] id 0 or ID generated previously - * - * \retval ID the caller should use - */ -static __u64 osp_sync_id_get(struct osp_device *d, __u64 id) -{ - struct osp_id_tracker *tr; - - tr = d->opd_sync_tracker; - LASSERT(tr); - - /* XXX: we can improve this introducing per-cpu preallocated ids? */ - spin_lock(&tr->otr_lock); - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_TRACK_OVERFLOW)) - tr->otr_next_id = 0xfffffff0; - - if (unlikely(tr->otr_next_id <= d->opd_sync_last_used_id)) { - spin_unlock(&tr->otr_lock); - CERROR("%s: next %llu, last synced %llu\n", - d->opd_obd->obd_name, tr->otr_next_id, - d->opd_sync_last_used_id); - LBUG(); + rc = dt_trans_start_local(env, d->opd_storage, th); + if (rc == 0) { + CDEBUG(D_OTHER, "%s: sync forced, %d changes\n", + d->opd_obd->obd_name, + atomic_read(&d->opd_sync_changes)); + rc = osp_sync_add_commit_cb(env, d, th); + dt_trans_stop(env, d->opd_storage, th); } - if (id == 0) - id = tr->otr_next_id++; - if (id > d->opd_sync_last_used_id) - d->opd_sync_last_used_id = id; - if (list_empty(&d->opd_sync_ontrack)) - list_add(&d->opd_sync_ontrack, &tr->otr_wakeup_list); - spin_unlock(&tr->otr_lock); - CDEBUG(D_OTHER, "new id %llu\n", id); - - return id; + dt_commit_async(env, d->opd_storage); } - -/** - * Stop to propagate commit status to OSP. - * - * If the OSP does not have any llog records she's waiting to commit, then - * it is possible to unsubscribe from wakeups from the tracking using this - * method. - * - * \param[in] d OSP device not willing to wakeup - */ -static void osp_sync_remove_from_tracker(struct osp_device *d) -{ - struct osp_id_tracker *tr; - - tr = d->opd_sync_tracker; - LASSERT(tr); - - if (list_empty(&d->opd_sync_ontrack)) - return; - - spin_lock(&tr->otr_lock); - list_del_init(&d->opd_sync_ontrack); - spin_unlock(&tr->otr_lock); -} - diff --git a/lustre/osp/osp_trans.c b/lustre/osp/osp_trans.c index 498ed18..c28d13b 100644 --- a/lustre/osp/osp_trans.c +++ b/lustre/osp/osp_trans.c @@ -973,7 +973,6 @@ struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d) oth->ot_magic = OSP_THANDLE_MAGIC; th = &oth->ot_super; th->th_dev = d; - th->th_tags = LCT_TX_HANDLE; atomic_set(&oth->ot_refcount, 1); INIT_LIST_HEAD(&oth->ot_commit_dcb_list); diff --git a/lustre/target/update_records.c b/lustre/target/update_records.c index a64dc38..a36d554 100644 --- a/lustre/target/update_records.c +++ b/lustre/target/update_records.c @@ -1217,7 +1217,7 @@ static void update_key_fini(const struct lu_context *ctx, LU_KEY_INIT(update, struct update_thread_info); /* context key: update_thread_key */ LU_CONTEXT_KEY_DEFINE(update, LCT_MD_THREAD | LCT_MG_THREAD | - LCT_DT_THREAD | LCT_TX_HANDLE | LCT_LOCAL); + LCT_DT_THREAD | LCT_LOCAL); EXPORT_SYMBOL(update_thread_key); LU_KEY_INIT_GENERIC(update); diff --git a/lustre/target/update_trans.c b/lustre/target/update_trans.c index 76153b1..6c3e414 100644 --- a/lustre/target/update_trans.c +++ b/lustre/target/update_trans.c @@ -615,8 +615,6 @@ top_trans_create(const struct lu_env *env, struct dt_device *master_dev) child_th->th_top = &top_th->tt_super; child_th->th_wait_submit = 1; top_th->tt_master_sub_thandle = child_th; - - top_th->tt_super.th_tags |= child_th->th_tags; } return &top_th->tt_super; } @@ -795,7 +793,6 @@ int top_trans_start(const struct lu_env *env, struct dt_device *master_dev, top_th->tt_master_sub_thandle->th_sync = th->th_sync; if (th->th_local) top_th->tt_master_sub_thandle->th_local = th->th_local; - top_th->tt_master_sub_thandle->th_tags = th->th_tags; rc = dt_trans_start(env, top_th->tt_master_sub_thandle->th_dev, top_th->tt_master_sub_thandle); RETURN(rc); @@ -813,7 +810,6 @@ int top_trans_start(const struct lu_env *env, struct dt_device *master_dev, st->st_sub_th->th_sync = th->th_sync; if (th->th_local) st->st_sub_th->th_local = th->th_local; - st->st_sub_th->th_tags = th->th_tags; rc = dt_trans_start(env, st->st_sub_th->th_dev, st->st_sub_th); if (rc != 0) @@ -951,7 +947,6 @@ int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev, top_th->tt_master_sub_thandle->th_sync = th->th_sync; if (th->th_local) top_th->tt_master_sub_thandle->th_local = th->th_local; - top_th->tt_master_sub_thandle->th_tags = th->th_tags; rc = dt_trans_stop(env, master_dev, top_th->tt_master_sub_thandle); OBD_FREE_PTR(top_th); @@ -1011,7 +1006,6 @@ stop_master_trans: master_st->st_sub_th->th_local = th->th_local; if (th->th_sync) master_st->st_sub_th->th_sync = th->th_sync; - master_st->st_sub_th->th_tags = th->th_tags; master_st->st_sub_th->th_result = th->th_result; rc = dt_trans_stop(env, master_st->st_dt, master_st->st_sub_th); /* If it does not write_updates, then we call submit callback @@ -1075,7 +1069,6 @@ stop_other_trans: st->st_sub_th->th_sync = th->th_sync; if (th->th_local) st->st_sub_th->th_local = th->th_local; - st->st_sub_th->th_tags = th->th_tags; st->st_sub_th->th_result = th->th_result; rc = dt_trans_stop(env, st->st_sub_th->th_dev, st->st_sub_th); diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index a1063007..6acddd5 100755 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -13947,9 +13947,10 @@ test_239() { mkdir -p $DIR/$tdir createmany -o $DIR/$tdir/f- 5000 unlinkmany $DIR/$tdir/f- 5000 - do_nodes $list "lctl set_param -n osp*.*.sync_changes 1" - changes=$(do_nodes $list "lctl get_param -n osc.*MDT*.sync_changes \ - osc.*MDT*.sync_in_flight" | calc_sum) + [ $(lustre_version_code $SINGLEMDS) -gt $(version_code 2.10.53) ] && + do_nodes $list "lctl set_param -n osp.*.force_sync=1" + changes=$(do_nodes $list "lctl get_param -n osp.*MDT*.sync_changes \ + osp.*MDT*.sync_in_flight" | calc_sum) [ "$changes" -eq 0 ] || error "$changes not synced" } run_test 239 "osp_sync test" diff --git a/lustre/tests/test-framework.sh b/lustre/tests/test-framework.sh index ed94924..8774f8e 100755 --- a/lustre/tests/test-framework.sh +++ b/lustre/tests/test-framework.sh @@ -2501,7 +2501,7 @@ wait_update_facet() { sync_all_data() { do_nodes $(comma_list $(mdts_nodes)) \ - "lctl set_param -n osd*.*MDT*.force_sync=1" + "lctl set_param -n os[cd]*.*MDT*.force_sync=1" do_nodes $(comma_list $(osts_nodes)) \ "lctl set_param -n osd*.*OS*.force_sync=1" 2>&1 | grep -v 'Found no match' @@ -2542,7 +2542,7 @@ wait_delete_completed_mds() { mds2sync=$(comma_list $mds2sync) # sync MDS transactions - do_nodes $mds2sync "$LCTL set_param -n osd*.*MD*.force_sync 1" + do_nodes $mds2sync "$LCTL set_param -n os[cd]*.*MD*.force_sync 1" # wait till all changes are sent and commmitted by OSTs # for ldiskfs space is released upon execution, but DMU