Whamcloud - gitweb
LU-7934 osp: fix tr->otr_next_id overflow
[fs/lustre-release.git] / lustre / osp / osp_sync.c
index 2c74443..582296f 100644 (file)
@@ -50,7 +50,7 @@
 
 static int osp_sync_id_traction_init(struct osp_device *d);
 static void osp_sync_id_traction_fini(struct osp_device *d);
-static __u32 osp_sync_id_get(struct osp_device *d, __u32 id);
+static __u64 osp_sync_id_get(struct osp_device *d, __u64 id);
 static void osp_sync_remove_from_tracker(struct osp_device *d);
 
 /*
@@ -238,6 +238,25 @@ void __osp_sync_check_for_work(struct osp_device *d)
        osp_sync_check_for_work(d);
 }
 
+static inline __u64 osp_sync_correct_id(struct osp_device *d,
+                                       struct llog_rec_hdr *rec)
+{
+       /*
+        * llog use cyclic store with 32 bit lrh_id
+        * so overflow lrh_id is possible. Range between
+        * last_processed and last_committed is less than
+        * 64745 ^ 2 and less than 2^32 - 1
+        */
+       __u64 correct_id = d->opd_syn_last_committed_id;
+
+       if ((correct_id & 0xffffffffULL) < rec->lrh_id)
+               correct_id -= 0x100000000ULL;
+
+       correct_id &= ~0xffffffffULL;
+       correct_id |= rec->lrh_id;
+
+       return correct_id;
+}
 /**
  * Check and return ready-for-new status.
  *
@@ -271,7 +290,8 @@ static inline int osp_sync_can_process_new(struct osp_device *d,
                return 1;
        if (d->opd_syn_changes == 0)
                return 0;
-       if (rec == NULL || rec->lrh_id <= d->opd_syn_last_committed_id)
+       if (rec == NULL ||
+           osp_sync_correct_id(d, rec) <= d->opd_syn_last_committed_id)
                return 1;
        return 0;
 }
@@ -402,8 +422,7 @@ static int osp_sync_add_rec(const struct lu_env *env, struct osp_device *d,
        LASSERT(txn);
 
        txn->oti_current_id = osp_sync_id_get(d, txn->oti_current_id);
-       osi->osi_hdr.lrh_id = txn->oti_current_id;
-
+       osi->osi_hdr.lrh_id = (txn->oti_current_id & 0xffffffffULL);
        ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
        if (ctxt == NULL)
                RETURN(-ENOMEM);
@@ -915,13 +934,14 @@ static void osp_sync_process_record(const struct lu_env *env,
         * we should decrease changes and bump last_processed_id.
         */
        if (d->opd_syn_prev_done) {
+               __u64 correct_id = osp_sync_correct_id(d, rec);
                LASSERT(d->opd_syn_changes > 0);
-               LASSERT(rec->lrh_id <= d->opd_syn_last_committed_id);
+               LASSERT(correct_id <= d->opd_syn_last_committed_id);
                /* NOTE: it's possible to meet same id if
                 * OST stores few stripes of same file
                 */
-               if (rec->lrh_id > d->opd_syn_last_processed_id) {
-                       d->opd_syn_last_processed_id = rec->lrh_id;
+               if (correct_id > d->opd_syn_last_processed_id) {
+                       d->opd_syn_last_processed_id = correct_id;
                        wake_up(&d->opd_syn_barrier_waitq);
                }
                d->opd_syn_changes--;
@@ -1487,7 +1507,7 @@ static void osp_sync_tracker_commit_cb(struct thandle *th, void *cookie)
 
        spin_lock(&tr->otr_lock);
        if (likely(txn->oti_current_id > tr->otr_committed_id)) {
-               CDEBUG(D_OTHER, "committed: %u -> %u\n",
+               CDEBUG(D_OTHER, "committed: "LPU64" -> "LPU64"\n",
                       tr->otr_committed_id, txn->oti_current_id);
                tr->otr_committed_id = txn->oti_current_id;
 
@@ -1607,14 +1627,14 @@ static void osp_sync_id_traction_fini(struct osp_device *d)
  * Generates a new ID using the tracker associated with the given OSP device
  * \a d, if the given ID \a id is non-zero. Unconditially adds OSP device to
  * the wakeup list, so OSP won't miss when a transaction using the ID is
- * committed. Notice ID is 32bit, but llog doesn't support >2^32 records anyway.
+ * committed.
  *
  * \param[in] d                OSP device
  * \param[in] id       0 or ID generated previously
  *
  * \retval             ID the caller should use
  */
-static __u32 osp_sync_id_get(struct osp_device *d, __u32 id)
+static __u64 osp_sync_id_get(struct osp_device *d, __u64 id)
 {
        struct osp_id_tracker *tr;
 
@@ -1623,9 +1643,12 @@ static __u32 osp_sync_id_get(struct osp_device *d, __u32 id)
 
        /* XXX: we can improve this introducing per-cpu preallocated ids? */
        spin_lock(&tr->otr_lock);
+       if (OBD_FAIL_CHECK(OBD_FAIL_MDS_TRACK_OVERFLOW))
+               tr->otr_next_id = 0xfffffff0;
+
        if (unlikely(tr->otr_next_id <= d->opd_syn_last_used_id)) {
                spin_unlock(&tr->otr_lock);
-               CERROR("%s: next %u, last synced %lu\n",
+               CERROR("%s: next "LPU64", last synced "LPU64"\n",
                       d->opd_obd->obd_name, tr->otr_next_id,
                       d->opd_syn_last_used_id);
                LBUG();
@@ -1638,7 +1661,7 @@ static __u32 osp_sync_id_get(struct osp_device *d, __u32 id)
        if (list_empty(&d->opd_syn_ontrack))
                list_add(&d->opd_syn_ontrack, &tr->otr_wakeup_list);
        spin_unlock(&tr->otr_lock);
-       CDEBUG(D_OTHER, "new id %u\n", (unsigned) id);
+       CDEBUG(D_OTHER, "new id "LPU64"\n", id);
 
        return id;
 }