From 5908965847d5535fc5def6621922e5ed00051e46 Mon Sep 17 00:00:00 2001 From: Alex Zhuravlev Date: Fri, 17 Jun 2016 23:25:04 +0800 Subject: [PATCH] LU-7861 osp: replace the hot spinlock with atomic trackers so that background scanning doesn't race with MDS_REINT_UNLINK Change-Id: I4006be4298ffa841ca4d131ac37c201ff8ea729f Signed-off-by: Alex Zhuravlev Reviewed-on: http://review.whamcloud.com/19211 Tested-by: Jenkins Tested-by: Maloo Reviewed-by: James Simmons Reviewed-by: Mike Pershin Reviewed-by: Oleg Drokin --- lustre/osp/lproc_osp.c | 11 ++--- lustre/osp/osp_dev.c | 13 +++--- lustre/osp/osp_internal.h | 8 ++-- lustre/osp/osp_precreate.c | 21 +++++---- lustre/osp/osp_sync.c | 110 ++++++++++++++++++++++----------------------- 5 files changed, 84 insertions(+), 79 deletions(-) diff --git a/lustre/osp/lproc_osp.c b/lustre/osp/lproc_osp.c index 6dc9287..58fcb2d 100644 --- a/lustre/osp/lproc_osp.c +++ b/lustre/osp/lproc_osp.c @@ -113,7 +113,7 @@ static int osp_syn_in_flight_seq_show(struct seq_file *m, void *data) if (osp == NULL) return -EINVAL; - seq_printf(m, "%u\n", osp->opd_syn_rpc_in_flight); + seq_printf(m, "%u\n", atomic_read(&osp->opd_syn_rpc_in_flight)); return 0; } LPROC_SEQ_FOPS_RO(osp_syn_in_flight); @@ -134,7 +134,7 @@ static int osp_syn_in_prog_seq_show(struct seq_file *m, void *data) if (osp == NULL) return -EINVAL; - seq_printf(m, "%u\n", osp->opd_syn_rpc_in_progress); + seq_printf(m, "%u\n", atomic_read(&osp->opd_syn_rpc_in_progress)); return 0; } LPROC_SEQ_FOPS_RO(osp_syn_in_prog); @@ -155,7 +155,7 @@ static int osp_syn_changes_seq_show(struct seq_file *m, void *data) if (osp == NULL) return -EINVAL; - seq_printf(m, "%lu\n", osp->opd_syn_changes); + seq_printf(m, "%u\n", atomic_read(&osp->opd_syn_changes)); return 0; } @@ -639,8 +639,9 @@ static int osp_destroys_in_flight_seq_show(struct seq_file *m, void *data) if (osp == NULL) return -EINVAL; - seq_printf(m, "%lu\n", - osp->opd_syn_rpc_in_progress + osp->opd_syn_changes); + seq_printf(m, "%u\n", + atomic_read(&osp->opd_syn_rpc_in_progress) + + atomic_read(&osp->opd_syn_changes)); return 0; } LPROC_SEQ_FOPS_RO(osp_destroys_in_flight); diff --git a/lustre/osp/osp_dev.c b/lustre/osp/osp_dev.c index 329da7b..9dab59c 100644 --- a/lustre/osp/osp_dev.c +++ b/lustre/osp/osp_dev.c @@ -844,26 +844,27 @@ static int osp_sync(const struct lu_env *env, struct dt_device *dev) atomic_inc(&d->opd_syn_barrier); CDEBUG(D_CACHE, "%s: %u in flight\n", d->opd_obd->obd_name, - d->opd_syn_rpc_in_flight); + atomic_read(&d->opd_syn_rpc_in_flight)); /* wait till all-in-flight are replied, so executed by the target */ /* XXX: this is used by LFSCK at the moment, which doesn't require * all the changes to be committed, but in general it'd be * better to wait till commit */ - while (d->opd_syn_rpc_in_flight > 0) { + while (atomic_read(&d->opd_syn_rpc_in_flight) > 0) { - old = d->opd_syn_rpc_in_flight; + old = atomic_read(&d->opd_syn_rpc_in_flight); expire = cfs_time_shift(obd_timeout); lwi = LWI_TIMEOUT(expire - cfs_time_current(), osp_sync_timeout, d); l_wait_event(d->opd_syn_barrier_waitq, - d->opd_syn_rpc_in_flight == 0, &lwi); + atomic_read(&d->opd_syn_rpc_in_flight) == 0, + &lwi); - if (d->opd_syn_rpc_in_flight == 0) + if (atomic_read(&d->opd_syn_rpc_in_flight) == 0) break; - if (d->opd_syn_rpc_in_flight != old) { + if (atomic_read(&d->opd_syn_rpc_in_flight) != old) { /* some progress have been made */ continue; } diff --git a/lustre/osp/osp_internal.h b/lustre/osp/osp_internal.h index 3714dc2..64c727d 100644 --- a/lustre/osp/osp_internal.h +++ b/lustre/osp/osp_internal.h @@ -194,7 +194,7 @@ struct osp_device { /* unique generation, to recognize start of new records in the llog */ struct llog_gen opd_syn_generation; /* number of changes to sync, used to wake up sync thread */ - unsigned long opd_syn_changes; + atomic_t opd_syn_changes; /* processing of changes from previous mount is done? */ int opd_syn_prev_done; /* found records */ @@ -207,10 +207,10 @@ struct osp_device { /* number of changes being under sync */ int opd_syn_sync_in_progress; /* number of RPCs in flight - flow control */ - int opd_syn_rpc_in_flight; + atomic_t opd_syn_rpc_in_flight; int opd_syn_max_rpc_in_flight; /* number of RPC in processing (including non-committed by OST) */ - int opd_syn_rpc_in_progress; + atomic_t opd_syn_rpc_in_progress; int opd_syn_max_rpc_in_progress; /* osd api's commit cb control structure */ struct dt_txn_callback opd_syn_txn_cb; @@ -220,7 +220,7 @@ struct osp_device { * last_committed */ __u64 opd_syn_last_committed_id; /* last processed (taken from llog) id */ - __u64 opd_syn_last_processed_id; + volatile __u64 opd_syn_last_processed_id; struct osp_id_tracker *opd_syn_tracker; struct list_head opd_syn_ontrack; /* stop processing new requests until barrier=0 */ diff --git a/lustre/osp/osp_precreate.c b/lustre/osp/osp_precreate.c index 30f3164..e341faf 100644 --- a/lustre/osp/osp_precreate.c +++ b/lustre/osp/osp_precreate.c @@ -973,8 +973,9 @@ void osp_pre_update_status(struct osp_device *d, int rc) msfs->os_bfree, used, msfs->os_bavail, d->opd_pre_status, rc); CDEBUG(D_INFO, - "non-committed changes: %lu, in progress: %u\n", - d->opd_syn_changes, d->opd_syn_rpc_in_progress); + "non-committed changes: %u, in progress: %u\n", + atomic_read(&d->opd_syn_changes), + atomic_read(&d->opd_syn_rpc_in_progress)); } else if (old == -ENOSPC) { d->opd_pre_status = 0; spin_lock(&d->opd_pre_lock); @@ -1243,7 +1244,8 @@ static int osp_precreate_ready_condition(const struct lu_env *env, return 1; /* ready if OST reported no space and no destroys in progress */ - if (d->opd_syn_changes + d->opd_syn_rpc_in_progress == 0 && + if (atomic_read(&d->opd_syn_changes) + + atomic_read(&d->opd_syn_rpc_in_progress) == 0 && d->opd_pre_status == -ENOSPC) return 1; @@ -1268,11 +1270,12 @@ static int osp_precreate_timeout_condition(void *data) struct osp_device *d = data; CDEBUG(D_HA, "%s: slow creates, last="DFID", next="DFID", " - "reserved="LPU64", syn_changes=%lu, " + "reserved="LPU64", syn_changes=%u, " "syn_rpc_in_progress=%d, status=%d\n", d->opd_obd->obd_name, PFID(&d->opd_pre_last_created_fid), PFID(&d->opd_pre_used_fid), d->opd_pre_reserved, - d->opd_syn_changes, d->opd_syn_rpc_in_progress, + atomic_read(&d->opd_syn_changes), + atomic_read(&d->opd_syn_rpc_in_progress), d->opd_pre_status); return 1; @@ -1361,16 +1364,16 @@ int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d) * wait till that is done - some space might be released */ if (unlikely(rc == -ENOSPC)) { - if (d->opd_syn_changes) { + if (atomic_read(&d->opd_syn_changes)) { /* force local commit to release space */ dt_commit_async(env, d->opd_storage); } - if (d->opd_syn_rpc_in_progress) { + if (atomic_read(&d->opd_syn_rpc_in_progress)) { /* just wait till destroys are done */ /* see l_wait_even() few lines below */ } - if (d->opd_syn_changes + - d->opd_syn_rpc_in_progress == 0) { + if (atomic_read(&d->opd_syn_changes) + + atomic_read(&d->opd_syn_rpc_in_progress) == 0) { /* no hope for free space */ break; } diff --git a/lustre/osp/osp_sync.c b/lustre/osp/osp_sync.c index 582296f..2fcc9e5 100644 --- a/lustre/osp/osp_sync.c +++ b/lustre/osp/osp_sync.c @@ -188,7 +188,8 @@ static inline int osp_sync_inflight_conflict(struct osp_device *d, static inline int osp_sync_low_in_progress(struct osp_device *d) { - return d->opd_syn_rpc_in_progress < d->opd_syn_max_rpc_in_progress; + return atomic_read(&d->opd_syn_rpc_in_progress) < + d->opd_syn_max_rpc_in_progress; } /** @@ -201,7 +202,8 @@ static inline int osp_sync_low_in_progress(struct osp_device *d) */ static inline int osp_sync_low_in_flight(struct osp_device *d) { - return d->opd_syn_rpc_in_flight < d->opd_syn_max_rpc_in_flight; + return atomic_read(&d->opd_syn_rpc_in_flight) < + d->opd_syn_max_rpc_in_flight; } /** @@ -288,7 +290,7 @@ static inline int osp_sync_can_process_new(struct osp_device *d, return 0; if (d->opd_syn_prev_done == 0) return 1; - if (d->opd_syn_changes == 0) + if (atomic_read(&d->opd_syn_changes) == 0) return 0; if (rec == NULL || osp_sync_correct_id(d, rec) <= d->opd_syn_last_committed_id) @@ -437,9 +439,7 @@ static int osp_sync_add_rec(const struct lu_env *env, struct osp_device *d, POSTID(&osi->osi_cookie.lgc_lgl.lgl_oi), (unsigned long)osi->osi_cookie.lgc_lgl.lgl_ogen, (unsigned long)osi->osi_cookie.lgc_index, rc); - spin_lock(&d->opd_syn_lock); - d->opd_syn_changes++; - spin_unlock(&d->opd_syn_lock); + atomic_inc(&d->opd_syn_changes); } /* return 0 always here, error case just cause no llog record */ RETURN(0); @@ -577,10 +577,8 @@ static int osp_sync_interpret(const struct lu_env *env, /* this is the last time we see the request * if transno is not zero, then commit cb * will be called at some point */ - LASSERT(d->opd_syn_rpc_in_progress > 0); - spin_lock(&d->opd_syn_lock); - d->opd_syn_rpc_in_progress--; - spin_unlock(&d->opd_syn_lock); + LASSERT(atomic_read(&d->opd_syn_rpc_in_progress) > 0); + atomic_dec(&d->opd_syn_rpc_in_progress); } wake_up(&d->opd_syn_waitq); @@ -594,16 +592,16 @@ static int osp_sync_interpret(const struct lu_env *env, osp_statfs_need_now(d); } - LASSERT(d->opd_syn_rpc_in_flight > 0); spin_lock(&d->opd_syn_lock); - d->opd_syn_rpc_in_flight--; list_del_init(&jra->jra_inflight_link); spin_unlock(&d->opd_syn_lock); + LASSERT(atomic_read(&d->opd_syn_rpc_in_flight) > 0); + atomic_dec(&d->opd_syn_rpc_in_flight); if (unlikely(atomic_read(&d->opd_syn_barrier) > 0)) wake_up(&d->opd_syn_barrier_waitq); CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n", - d->opd_obd->obd_name, d->opd_syn_rpc_in_flight, - d->opd_syn_rpc_in_progress); + d->opd_obd->obd_name, atomic_read(&d->opd_syn_rpc_in_flight), + atomic_read(&d->opd_syn_rpc_in_progress)); osp_sync_check_for_work(d); @@ -623,7 +621,8 @@ static void osp_sync_send_new_rpc(struct osp_device *d, { struct osp_job_req_args *jra; - LASSERT(d->opd_syn_rpc_in_flight <= d->opd_syn_max_rpc_in_flight); + LASSERT(atomic_read(&d->opd_syn_rpc_in_flight) <= + d->opd_syn_max_rpc_in_flight); jra = ptlrpc_req_async_args(req); jra->jra_magic = OSP_JOB_MAGIC; @@ -904,10 +903,8 @@ static void osp_sync_process_record(const struct lu_env *env, /* notice we increment counters before sending RPC, to be consistent * in RPC interpret callback which may happen very quickly */ - spin_lock(&d->opd_syn_lock); - d->opd_syn_rpc_in_flight++; - d->opd_syn_rpc_in_progress++; - spin_unlock(&d->opd_syn_lock); + atomic_inc(&d->opd_syn_rpc_in_flight); + atomic_inc(&d->opd_syn_rpc_in_progress); switch (rec->lrh_type) { /* case MDS_UNLINK_REC is kept for compatibility */ @@ -928,33 +925,35 @@ static void osp_sync_process_record(const struct lu_env *env, break; } - spin_lock(&d->opd_syn_lock); - /* For all kinds of records, not matter successful or not, * we should decrease changes and bump last_processed_id. */ if (d->opd_syn_prev_done) { __u64 correct_id = osp_sync_correct_id(d, rec); - LASSERT(d->opd_syn_changes > 0); + LASSERT(atomic_read(&d->opd_syn_changes) > 0); LASSERT(correct_id <= d->opd_syn_last_committed_id); /* NOTE: it's possible to meet same id if * OST stores few stripes of same file */ - if (correct_id > d->opd_syn_last_processed_id) { - d->opd_syn_last_processed_id = correct_id; - wake_up(&d->opd_syn_barrier_waitq); + while (1) { + /* another thread may be trying to set new value */ + rmb(); + if (correct_id > d->opd_syn_last_processed_id) { + d->opd_syn_last_processed_id = correct_id; + wake_up(&d->opd_syn_barrier_waitq); + } else + break; } - d->opd_syn_changes--; + atomic_dec(&d->opd_syn_changes); } if (rc != 0) { - d->opd_syn_rpc_in_flight--; - d->opd_syn_rpc_in_progress--; + atomic_dec(&d->opd_syn_rpc_in_flight); + atomic_dec(&d->opd_syn_rpc_in_progress); } - CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n", - d->opd_obd->obd_name, d->opd_syn_rpc_in_flight, - d->opd_syn_rpc_in_progress); - spin_unlock(&d->opd_syn_lock); + CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n", + d->opd_obd->obd_name, atomic_read(&d->opd_syn_rpc_in_flight), + atomic_read(&d->opd_syn_rpc_in_progress)); /* Delete the invalid record */ if (rc == 1) { @@ -1059,13 +1058,11 @@ static void osp_sync_process_committed(const struct lu_env *env, llog_ctxt_put(ctxt); - LASSERT(d->opd_syn_rpc_in_progress >= done); - spin_lock(&d->opd_syn_lock); - d->opd_syn_rpc_in_progress -= done; - spin_unlock(&d->opd_syn_lock); + LASSERT(atomic_read(&d->opd_syn_rpc_in_progress) >= done); + atomic_sub(done, &d->opd_syn_rpc_in_progress); CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n", - d->opd_obd->obd_name, d->opd_syn_rpc_in_flight, - d->opd_syn_rpc_in_progress); + d->opd_obd->obd_name, atomic_read(&d->opd_syn_rpc_in_flight), + atomic_read(&d->opd_syn_rpc_in_progress)); osp_sync_check_for_work(d); @@ -1116,11 +1113,11 @@ static int osp_sync_process_queues(const struct lu_env *env, if (osp_sync_can_process_new(d, rec)) { if (llh == NULL) { /* ask llog for another record */ - CDEBUG(D_HA, "%lu changes, %u in progress," + CDEBUG(D_HA, "%u changes, %u in progress," " %u in flight\n", - d->opd_syn_changes, - d->opd_syn_rpc_in_progress, - d->opd_syn_rpc_in_flight); + atomic_read(&d->opd_syn_changes), + atomic_read(&d->opd_syn_rpc_in_progress), + atomic_read(&d->opd_syn_rpc_in_flight)); return 0; } osp_sync_process_record(env, d, llh, rec); @@ -1199,30 +1196,33 @@ static int osp_sync_thread(void *_arg) rc = llog_cat_process(&env, llh, osp_sync_process_queues, d, 0, 0); LASSERTF(rc == 0 || rc == LLOG_PROC_BREAK, - "%lu changes, %u in progress, %u in flight: %d\n", - d->opd_syn_changes, d->opd_syn_rpc_in_progress, - d->opd_syn_rpc_in_flight, rc); + "%u changes, %u in progress, %u in flight: %d\n", + atomic_read(&d->opd_syn_changes), + atomic_read(&d->opd_syn_rpc_in_progress), + atomic_read(&d->opd_syn_rpc_in_flight), rc); /* we don't expect llog_process_thread() to exit till umount */ LASSERTF(thread->t_flags != SVC_RUNNING, - "%lu changes, %u in progress, %u in flight\n", - d->opd_syn_changes, d->opd_syn_rpc_in_progress, - d->opd_syn_rpc_in_flight); + "%u changes, %u in progress, %u in flight\n", + atomic_read(&d->opd_syn_changes), + atomic_read(&d->opd_syn_rpc_in_progress), + atomic_read(&d->opd_syn_rpc_in_flight)); /* wait till all the requests are completed */ count = 0; - while (d->opd_syn_rpc_in_progress > 0) { + while (atomic_read(&d->opd_syn_rpc_in_progress) > 0) { osp_sync_process_committed(&env, d); lwi = LWI_TIMEOUT(cfs_time_seconds(5), NULL, NULL); rc = l_wait_event(d->opd_syn_waitq, - d->opd_syn_rpc_in_progress == 0, + atomic_read(&d->opd_syn_rpc_in_progress) == 0, &lwi); if (rc == -ETIMEDOUT) count++; LASSERTF(count < 10, "%s: %d %d %sempty\n", - d->opd_obd->obd_name, d->opd_syn_rpc_in_progress, - d->opd_syn_rpc_in_flight, + d->opd_obd->obd_name, + atomic_read(&d->opd_syn_rpc_in_progress), + atomic_read(&d->opd_syn_rpc_in_flight), list_empty(&d->opd_syn_committed_there) ? "" : "!"); } @@ -1232,10 +1232,10 @@ static int osp_sync_thread(void *_arg) if (rc) CERROR("can't cleanup llog: %d\n", rc); out: - LASSERTF(d->opd_syn_rpc_in_progress == 0, + LASSERTF(atomic_read(&d->opd_syn_rpc_in_progress) == 0, "%s: %d %d %sempty\n", - d->opd_obd->obd_name, d->opd_syn_rpc_in_progress, - d->opd_syn_rpc_in_flight, + d->opd_obd->obd_name, atomic_read(&d->opd_syn_rpc_in_progress), + atomic_read(&d->opd_syn_rpc_in_flight), list_empty(&d->opd_syn_committed_there) ? "" : "!"); thread->t_flags = SVC_STOPPED; -- 1.8.3.1