*/
struct osp_id_tracker {
spinlock_t otr_lock;
- __u32 otr_next_id;
- __u32 otr_committed_id;
+ __u64 otr_next_id;
+ __u64 otr_committed_id;
/* callback is register once per diskfs -- that's the whole point */
struct dt_txn_callback otr_tx_cb;
/* single node can run many clusters */
/* osd api's commit cb control structure */
struct dt_txn_callback opd_syn_txn_cb;
/* last used change number -- semantically similar to transno */
- unsigned long opd_syn_last_used_id;
+ __u64 opd_syn_last_used_id;
/* last committed change number -- semantically similar to
* last_committed */
- unsigned long opd_syn_last_committed_id;
+ __u64 opd_syn_last_committed_id;
/* last processed (taken from llog) id */
- unsigned long opd_syn_last_processed_id;
+ __u64 opd_syn_last_processed_id;
struct osp_id_tracker *opd_syn_tracker;
struct list_head opd_syn_ontrack;
/* stop processing new requests until barrier=0 */
}
struct osp_txn_info {
- __u32 oti_current_id;
+ __u64 oti_current_id;
};
extern struct lu_context_key osp_txn_key;
static int osp_sync_id_traction_init(struct osp_device *d);
static void osp_sync_id_traction_fini(struct osp_device *d);
-static __u32 osp_sync_id_get(struct osp_device *d, __u32 id);
+static __u64 osp_sync_id_get(struct osp_device *d, __u64 id);
static void osp_sync_remove_from_tracker(struct osp_device *d);
/*
osp_sync_check_for_work(d);
}
+static inline __u64 osp_sync_correct_id(struct osp_device *d,
+ struct llog_rec_hdr *rec)
+{
+ /*
+ * llog use cyclic store with 32 bit lrh_id
+ * so overflow lrh_id is possible. Range between
+ * last_processed and last_committed is less than
+ * 64745 ^ 2 and less than 2^32 - 1
+ */
+ __u64 correct_id = d->opd_syn_last_committed_id;
+
+ if ((correct_id & 0xffffffffULL) < rec->lrh_id)
+ correct_id -= 0x100000000ULL;
+
+ correct_id &= ~0xffffffffULL;
+ correct_id |= rec->lrh_id;
+
+ return correct_id;
+}
/**
* Check and return ready-for-new status.
*
return 1;
if (d->opd_syn_changes == 0)
return 0;
- if (rec == NULL || rec->lrh_id <= d->opd_syn_last_committed_id)
+ if (rec == NULL ||
+ osp_sync_correct_id(d, rec) <= d->opd_syn_last_committed_id)
return 1;
return 0;
}
LASSERT(txn);
txn->oti_current_id = osp_sync_id_get(d, txn->oti_current_id);
- osi->osi_hdr.lrh_id = txn->oti_current_id;
-
+ osi->osi_hdr.lrh_id = (txn->oti_current_id & 0xffffffffULL);
ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
if (ctxt == NULL)
RETURN(-ENOMEM);
* we should decrease changes and bump last_processed_id.
*/
if (d->opd_syn_prev_done) {
+ __u64 correct_id = osp_sync_correct_id(d, rec);
LASSERT(d->opd_syn_changes > 0);
- LASSERT(rec->lrh_id <= d->opd_syn_last_committed_id);
+ LASSERT(correct_id <= d->opd_syn_last_committed_id);
/* NOTE: it's possible to meet same id if
* OST stores few stripes of same file
*/
- if (rec->lrh_id > d->opd_syn_last_processed_id) {
- d->opd_syn_last_processed_id = rec->lrh_id;
+ if (correct_id > d->opd_syn_last_processed_id) {
+ d->opd_syn_last_processed_id = correct_id;
wake_up(&d->opd_syn_barrier_waitq);
}
d->opd_syn_changes--;
spin_lock(&tr->otr_lock);
if (likely(txn->oti_current_id > tr->otr_committed_id)) {
- CDEBUG(D_OTHER, "committed: %u -> %u\n",
+ CDEBUG(D_OTHER, "committed: "LPU64" -> "LPU64"\n",
tr->otr_committed_id, txn->oti_current_id);
tr->otr_committed_id = txn->oti_current_id;
* Generates a new ID using the tracker associated with the given OSP device
* \a d, if the given ID \a id is non-zero. Unconditially adds OSP device to
* the wakeup list, so OSP won't miss when a transaction using the ID is
- * committed. Notice ID is 32bit, but llog doesn't support >2^32 records anyway.
+ * committed.
*
* \param[in] d OSP device
* \param[in] id 0 or ID generated previously
*
* \retval ID the caller should use
*/
-static __u32 osp_sync_id_get(struct osp_device *d, __u32 id)
+static __u64 osp_sync_id_get(struct osp_device *d, __u64 id)
{
struct osp_id_tracker *tr;
/* XXX: we can improve this introducing per-cpu preallocated ids? */
spin_lock(&tr->otr_lock);
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_TRACK_OVERFLOW))
+ tr->otr_next_id = 0xfffffff0;
+
if (unlikely(tr->otr_next_id <= d->opd_syn_last_used_id)) {
spin_unlock(&tr->otr_lock);
- CERROR("%s: next %u, last synced %lu\n",
+ CERROR("%s: next "LPU64", last synced "LPU64"\n",
d->opd_obd->obd_name, tr->otr_next_id,
d->opd_syn_last_used_id);
LBUG();
if (list_empty(&d->opd_syn_ontrack))
list_add(&d->opd_syn_ontrack, &tr->otr_wakeup_list);
spin_unlock(&tr->otr_lock);
- CDEBUG(D_OTHER, "new id %u\n", (unsigned) id);
+ CDEBUG(D_OTHER, "new id "LPU64"\n", id);
return id;
}