struct osp_device *d = dt2osp_dev(dev);
cfs_time_t expire;
struct l_wait_info lwi = { 0 };
- unsigned long id, old;
- int rc = 0;
- unsigned long start = cfs_time_current();
+ int recs, rc = 0;
+ unsigned long start = cfs_time_current();
+ __u64 old;
+
ENTRY;
/* No Sync between MDTs yet. */
if (d->opd_connect_mdt)
RETURN(0);
+ recs = atomic_read(&d->opd_sync_changes);
+ old = atomic64_read(&d->opd_sync_processed_recs);
+
+ osp_sync_force(env, dt2osp_dev(dev));
+
if (unlikely(d->opd_imp_active == 0))
RETURN(-ENOTCONN);
- id = d->opd_sync_last_used_id;
down_write(&d->opd_async_updates_rwsem);
CDEBUG(D_OTHER, "%s: async updates %d\n", d->opd_obd->obd_name,
if (rc != 0)
GOTO(out, rc);
- CDEBUG(D_CACHE, "%s: id: used %lu, processed %llu\n",
- d->opd_obd->obd_name, id, d->opd_sync_last_processed_id);
-
- /* wait till all-in-line are processed */
- while (d->opd_sync_last_processed_id < id) {
-
- old = d->opd_sync_last_processed_id;
+ CDEBUG(D_CACHE, "%s: processed %lu\n", d->opd_obd->obd_name,
+ atomic64_read(&d->opd_sync_processed_recs));
+ while (atomic64_read(&d->opd_sync_processed_recs) < old + recs) {
+ __u64 last = atomic64_read(&d->opd_sync_processed_recs);
/* make sure the connection is fine */
expire = cfs_time_shift(obd_timeout);
lwi = LWI_TIMEOUT(expire - cfs_time_current(),
osp_sync_timeout, d);
l_wait_event(d->opd_sync_barrier_waitq,
- d->opd_sync_last_processed_id >= id,
- &lwi);
+ atomic64_read(&d->opd_sync_processed_recs)
+ >= old + recs, &lwi);
- if (d->opd_sync_last_processed_id >= id)
+ if (atomic64_read(&d->opd_sync_processed_recs) >= old + recs)
break;
- if (d->opd_sync_last_processed_id != old) {
+ if (atomic64_read(&d->opd_sync_processed_recs) != last) {
/* some progress have been made,
* keep trying... */
continue;
osp->opd_async_requests = NULL;
}
- if (osp->opd_storage_exp)
+ if (osp->opd_storage_exp) {
+ /* wait for the commit callbacks to complete */
+ wait_event(osp->opd_sync_waitq,
+ atomic_read(&osp->opd_commits_registered) == 0);
obd_disconnect(osp->opd_storage_exp);
+ }
if (osp->opd_symlink)
lprocfs_remove(&osp->opd_symlink);
LU_KEY_INIT_FINI(osp_txn, struct osp_txn_info);
struct lu_context_key osp_txn_key = {
- .lct_tags = LCT_OSP_THREAD | LCT_TX_HANDLE,
+ .lct_tags = LCT_OSP_THREAD,
.lct_init = osp_txn_key_init,
.lct_fini = osp_txn_key_fini
};
#include <lustre_update.h>
#include "osp_internal.h"
-static int osp_sync_id_traction_init(struct osp_device *d);
-static void osp_sync_id_traction_fini(struct osp_device *d);
-static __u64 osp_sync_id_get(struct osp_device *d, __u64 id);
-static void osp_sync_remove_from_tracker(struct osp_device *d);
-
/*
* this is a components of OSP implementing synchronization between MDS and OST
* it llogs all interesting changes (currently it's uig/gid change and object
*/
static inline int osp_sync_has_new_job(struct osp_device *d)
{
- return ((d->opd_sync_last_processed_id < d->opd_sync_last_used_id) &&
- (d->opd_sync_last_processed_id < d->opd_sync_last_committed_id))
- || (d->opd_sync_prev_done == 0);
+ return atomic_read(&d->opd_sync_changes) > 0 ||
+ d->opd_sync_prev_done == 0;
}
static inline int osp_sync_in_flight_conflict(struct osp_device *d,
return correct_id;
}
+
/**
* Check and return ready-for-new status.
*
return 1;
if (atomic_read(&d->opd_sync_changes) == 0)
return 0;
- if (rec == NULL ||
- osp_sync_correct_id(d, rec) <= d->opd_sync_last_committed_id)
+ if (rec == NULL)
+ return 1;
+ /* notice "<" not "<=" */
+ if (osp_sync_correct_id(d, rec) < d->opd_sync_last_committed_id)
return 1;
return 0;
}
LBUG();
}
- /* we want ->dt_trans_start() to allocate per-thandle structure */
- storage_th->th_tags |= LCT_OSP_THREAD;
-
ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
LASSERT(ctxt);
RETURN(rc);
}
+/* add the commit callback every second */
+int osp_sync_add_commit_cb_1s(const struct lu_env *env, struct osp_device *d,
+ struct thandle *th)
+{
+ int add = 0;
+
+ /* fast path */
+ if (cfs_time_before(cfs_time_current(), d->opd_sync_next_commit_cb))
+ return 0;
+
+ spin_lock(&d->opd_sync_lock);
+ if (cfs_time_aftereq(cfs_time_current(), d->opd_sync_next_commit_cb))
+ add = 1;
+ d->opd_sync_next_commit_cb = cfs_time_shift(1);
+ spin_unlock(&d->opd_sync_lock);
+
+ if (add == 0)
+ return 0;
+ return osp_sync_add_commit_cb(env, d, th);
+}
+
+
/**
* Generate a llog record for a given change.
*
{
struct osp_thread_info *osi = osp_env_info(env);
struct llog_ctxt *ctxt;
- struct osp_txn_info *txn;
struct thandle *storage_th;
int rc;
LBUG();
}
- txn = osp_txn_info(&storage_th->th_ctx);
- LASSERT(txn);
+ /* we keep the same id, but increment it when the callback
+ * is registered, so that all records upto the one taken
+ * by the callback are subject to processing */
+ spin_lock(&d->opd_sync_lock);
+ osi->osi_hdr.lrh_id = d->opd_sync_last_used_id;
+ spin_unlock(&d->opd_sync_lock);
- txn->oti_current_id = osp_sync_id_get(d, txn->oti_current_id);
- osi->osi_hdr.lrh_id = (txn->oti_current_id & 0xffffffffULL);
ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
if (ctxt == NULL)
RETURN(-ENOMEM);
osi->osi_cookie.lgc_index, rc);
atomic_inc(&d->opd_sync_changes);
}
+
+ rc = osp_sync_add_commit_cb_1s(env, d, th);
+
/* return 0 always here, error case just cause no llog record */
RETURN(0);
}
* we should decrease changes and bump last_processed_id.
*/
if (d->opd_sync_prev_done) {
- __u64 correct_id = osp_sync_correct_id(d, rec);
LASSERT(atomic_read(&d->opd_sync_changes) > 0);
- LASSERT(correct_id <= d->opd_sync_last_committed_id);
- /* NOTE: it's possible to meet same id if
- * OST stores few stripes of same file
- */
- while (1) {
- /* another thread may be trying to set new value */
- rmb();
- if (correct_id > d->opd_sync_last_processed_id) {
- d->opd_sync_last_processed_id = correct_id;
- wake_up(&d->opd_sync_barrier_waitq);
- } else
- break;
- }
atomic_dec(&d->opd_sync_changes);
+ wake_up(&d->opd_sync_barrier_waitq);
}
+ atomic64_inc(&d->opd_sync_processed_recs);
if (rc != 0) {
atomic_dec(&d->opd_sync_rpcs_in_flight);
atomic_dec(&d->opd_sync_rpcs_in_progress);
rec = NULL;
}
- if (d->opd_sync_last_processed_id == d->opd_sync_last_used_id)
- osp_sync_remove_from_tracker(d);
-
l_wait_event(d->opd_sync_waitq,
!osp_sync_running(d) ||
osp_sync_can_process_new(d, rec) ||
if (d->opd_storage->dd_rdonly)
RETURN(0);
- rc = osp_sync_id_traction_init(d);
- if (rc)
- RETURN(rc);
-
/*
* initialize llog storing changes
*/
err_llog:
osp_sync_llog_fini(env, d);
err_id:
- osp_sync_id_traction_fini(d);
return rc;
}
wait_event(thread->t_ctl_waitq, thread_is_stopped(thread));
}
- /*
- * unregister transaction callbacks only when sync thread
- * has finished operations with llog
- */
- osp_sync_id_traction_fini(d);
-
RETURN(0);
}
-static DEFINE_MUTEX(osp_id_tracker_sem);
-static struct list_head osp_id_tracker_list =
- LIST_HEAD_INIT(osp_id_tracker_list);
+struct osp_last_committed_cb {
+ struct dt_txn_commit_cb ospc_cb;
+ struct osp_device *ospc_dev;
+ __u64 ospc_transno;
+};
-/**
- * OSD commit callback.
- *
- * The function is used as a local OSD commit callback to track the highest
- * committed llog record id. see osp_sync_id_traction_init() for the details.
- *
- * \param[in] th local transaction handle committed
- * \param[in] cookie commit callback data (our private structure)
- */
-static void osp_sync_tracker_commit_cb(struct thandle *th, void *cookie)
+void osp_sync_local_commit_cb(struct lu_env *env, struct thandle *th,
+ struct dt_txn_commit_cb *dcb, int err)
{
- struct osp_id_tracker *tr = cookie;
- struct osp_device *d;
- struct osp_txn_info *txn;
+ struct osp_last_committed_cb *cb;
+ struct osp_device *d;
- LASSERT(tr);
+ cb = container_of0(dcb, struct osp_last_committed_cb, ospc_cb);
+ d = cb->ospc_dev;
- txn = osp_txn_info(&th->th_ctx);
- if (txn == NULL || txn->oti_current_id < tr->otr_committed_id)
- return;
+ CDEBUG(D_HA, "%s: %llu committed\n", d->opd_obd->obd_name,
+ cb->ospc_transno);
- spin_lock(&tr->otr_lock);
- if (likely(txn->oti_current_id > tr->otr_committed_id)) {
- CDEBUG(D_OTHER, "committed: %llu -> %llu\n",
- tr->otr_committed_id, txn->oti_current_id);
- tr->otr_committed_id = txn->oti_current_id;
+ spin_lock(&d->opd_sync_lock);
+ if (cb->ospc_transno > d->opd_sync_last_committed_id)
+ d->opd_sync_last_committed_id = cb->ospc_transno;
+ spin_unlock(&d->opd_sync_lock);
- list_for_each_entry(d, &tr->otr_wakeup_list,
- opd_sync_ontrack) {
- d->opd_sync_last_committed_id = tr->otr_committed_id;
- wake_up(&d->opd_sync_waitq);
- }
- }
- spin_unlock(&tr->otr_lock);
+ osp_sync_check_for_work(d);
+ lu_device_put(osp2lu_dev(d));
+ if (atomic_dec_and_test(&d->opd_commits_registered))
+ wake_up(&d->opd_sync_waitq);
+
+ OBD_FREE_PTR(cb);
}
-/**
- * Initialize commit tracking mechanism.
- *
- * Some setups may have thousands of OSTs and each will be represented by OSP.
- * Meaning order of magnitute many more changes to apply every second. In order
- * to keep the number of commit callbacks low this mechanism was introduced.
- * The mechanism is very similar to transno used by MDT service: it's an single
- * ID stream which can be assigned by any OSP to its llog records. The tricky
- * part is that ID is stored in per-transaction data and re-used by all the OSPs
- * involved in that transaction. Then all these OSPs are woken up utilizing a single OSD commit callback.
- *
- * The function initializes the data used by the tracker described above.
- * A singler tracker per OSD device is created.
- *
- * \param[in] d OSP device
- *
- * \retval 0 on success
- * \retval negative negated errno on error
- */
-static int osp_sync_id_traction_init(struct osp_device *d)
+int osp_sync_add_commit_cb(const struct lu_env *env, struct osp_device *d,
+ struct thandle *th)
{
- struct osp_id_tracker *tr, *found = NULL;
- int rc = 0;
+ struct osp_last_committed_cb *cb;
+ struct dt_txn_commit_cb *dcb;
+ int rc = 0;
+
+ OBD_ALLOC_PTR(cb);
+ if (cb == NULL)
+ return -ENOMEM;
+ cb->ospc_dev = d;
+ dcb = &cb->ospc_cb;
+ dcb->dcb_func = osp_sync_local_commit_cb;
+ spin_lock(&d->opd_sync_lock);
+ cb->ospc_transno = ++d->opd_sync_last_used_id;
+ spin_unlock(&d->opd_sync_lock);
- LASSERT(d);
- LASSERT(d->opd_storage);
- LASSERT(d->opd_sync_tracker == NULL);
- INIT_LIST_HEAD(&d->opd_sync_ontrack);
-
- mutex_lock(&osp_id_tracker_sem);
- list_for_each_entry(tr, &osp_id_tracker_list, otr_list) {
- if (tr->otr_dev == d->opd_storage) {
- LASSERT(atomic_read(&tr->otr_refcount));
- atomic_inc(&tr->otr_refcount);
- d->opd_sync_tracker = tr;
- found = tr;
- break;
- }
- }
+ rc = dt_trans_cb_add(th, dcb);
+ CDEBUG(D_HA, "%s: add commit cb at %llu, next at %llu, rc = %d\n",
+ d->opd_obd->obd_name, (unsigned long long) cfs_time_current(),
+ (unsigned long long) d->opd_sync_next_commit_cb, rc);
- if (found == NULL) {
- rc = -ENOMEM;
- OBD_ALLOC_PTR(tr);
- if (tr) {
- d->opd_sync_tracker = tr;
- spin_lock_init(&tr->otr_lock);
- tr->otr_dev = d->opd_storage;
- tr->otr_next_id = 1;
- tr->otr_committed_id = 0;
- atomic_set(&tr->otr_refcount, 1);
- INIT_LIST_HEAD(&tr->otr_wakeup_list);
- list_add(&tr->otr_list, &osp_id_tracker_list);
- tr->otr_tx_cb.dtc_txn_commit =
- osp_sync_tracker_commit_cb;
- tr->otr_tx_cb.dtc_cookie = tr;
- tr->otr_tx_cb.dtc_tag = LCT_MD_THREAD;
- dt_txn_callback_add(d->opd_storage, &tr->otr_tx_cb);
- rc = 0;
- }
- }
- mutex_unlock(&osp_id_tracker_sem);
+ if (likely(rc == 0)) {
+ lu_device_get(osp2lu_dev(d));
+ atomic_inc(&d->opd_commits_registered);
+ } else
+ OBD_FREE_PTR(cb);
return rc;
}
-/**
- * Release commit tracker.
- *
- * Decrease a refcounter on the tracker used by the given OSP device \a d.
- * If no more users left, then the tracker is released.
- *
- * \param[in] d OSP device
+/*
+ * generate an empty transaction and hook the commit callback in
+ * then force transaction commit
*/
-static void osp_sync_id_traction_fini(struct osp_device *d)
+void osp_sync_force(const struct lu_env *env, struct osp_device *d)
{
- struct osp_id_tracker *tr;
-
- ENTRY;
+ struct thandle *th;
+ int rc;
- LASSERT(d);
- tr = d->opd_sync_tracker;
- if (tr == NULL) {
- EXIT;
+ th = dt_trans_create(env, d->opd_storage);
+ if (IS_ERR(th)) {
+ CERROR("%s: can't sync\n", d->opd_obd->obd_name);
return;
}
-
- osp_sync_remove_from_tracker(d);
-
- mutex_lock(&osp_id_tracker_sem);
- if (atomic_dec_and_test(&tr->otr_refcount)) {
- dt_txn_callback_del(d->opd_storage, &tr->otr_tx_cb);
- LASSERT(list_empty(&tr->otr_wakeup_list));
- list_del(&tr->otr_list);
- OBD_FREE_PTR(tr);
- d->opd_sync_tracker = NULL;
- }
- mutex_unlock(&osp_id_tracker_sem);
-
- EXIT;
-}
-
-/**
- * Generate a new ID on a tracker.
- *
- * Generates a new ID using the tracker associated with the given OSP device
- * \a d, if the given ID \a id is non-zero. Unconditially adds OSP device to
- * the wakeup list, so OSP won't miss when a transaction using the ID is
- * committed.
- *
- * \param[in] d OSP device
- * \param[in] id 0 or ID generated previously
- *
- * \retval ID the caller should use
- */
-static __u64 osp_sync_id_get(struct osp_device *d, __u64 id)
-{
- struct osp_id_tracker *tr;
-
- tr = d->opd_sync_tracker;
- LASSERT(tr);
-
- /* XXX: we can improve this introducing per-cpu preallocated ids? */
- spin_lock(&tr->otr_lock);
- if (OBD_FAIL_CHECK(OBD_FAIL_MDS_TRACK_OVERFLOW))
- tr->otr_next_id = 0xfffffff0;
-
- if (unlikely(tr->otr_next_id <= d->opd_sync_last_used_id)) {
- spin_unlock(&tr->otr_lock);
- CERROR("%s: next %llu, last synced %llu\n",
- d->opd_obd->obd_name, tr->otr_next_id,
- d->opd_sync_last_used_id);
- LBUG();
+ rc = dt_trans_start_local(env, d->opd_storage, th);
+ if (rc == 0) {
+ CDEBUG(D_OTHER, "%s: sync forced, %d changes\n",
+ d->opd_obd->obd_name,
+ atomic_read(&d->opd_sync_changes));
+ rc = osp_sync_add_commit_cb(env, d, th);
+ dt_trans_stop(env, d->opd_storage, th);
}
- if (id == 0)
- id = tr->otr_next_id++;
- if (id > d->opd_sync_last_used_id)
- d->opd_sync_last_used_id = id;
- if (list_empty(&d->opd_sync_ontrack))
- list_add(&d->opd_sync_ontrack, &tr->otr_wakeup_list);
- spin_unlock(&tr->otr_lock);
- CDEBUG(D_OTHER, "new id %llu\n", id);
-
- return id;
+ dt_commit_async(env, d->opd_storage);
}
-
-/**
- * Stop to propagate commit status to OSP.
- *
- * If the OSP does not have any llog records she's waiting to commit, then
- * it is possible to unsubscribe from wakeups from the tracking using this
- * method.
- *
- * \param[in] d OSP device not willing to wakeup
- */
-static void osp_sync_remove_from_tracker(struct osp_device *d)
-{
- struct osp_id_tracker *tr;
-
- tr = d->opd_sync_tracker;
- LASSERT(tr);
-
- if (list_empty(&d->opd_sync_ontrack))
- return;
-
- spin_lock(&tr->otr_lock);
- list_del_init(&d->opd_sync_ontrack);
- spin_unlock(&tr->otr_lock);
-}
-