Whamcloud - gitweb
LU-7861 osp: replace the hot spinlock with atomic trackers
[fs/lustre-release.git] / lustre / osp / osp_sync.c
index 582296f..2fcc9e5 100644 (file)
@@ -188,7 +188,8 @@ static inline int osp_sync_inflight_conflict(struct osp_device *d,
 
 static inline int osp_sync_low_in_progress(struct osp_device *d)
 {
-       return d->opd_syn_rpc_in_progress < d->opd_syn_max_rpc_in_progress;
+       return atomic_read(&d->opd_syn_rpc_in_progress) <
+               d->opd_syn_max_rpc_in_progress;
 }
 
 /**
@@ -201,7 +202,8 @@ static inline int osp_sync_low_in_progress(struct osp_device *d)
  */
 static inline int osp_sync_low_in_flight(struct osp_device *d)
 {
-       return d->opd_syn_rpc_in_flight < d->opd_syn_max_rpc_in_flight;
+       return atomic_read(&d->opd_syn_rpc_in_flight) <
+               d->opd_syn_max_rpc_in_flight;
 }
 
 /**
@@ -288,7 +290,7 @@ static inline int osp_sync_can_process_new(struct osp_device *d,
                return 0;
        if (d->opd_syn_prev_done == 0)
                return 1;
-       if (d->opd_syn_changes == 0)
+       if (atomic_read(&d->opd_syn_changes) == 0)
                return 0;
        if (rec == NULL ||
            osp_sync_correct_id(d, rec) <= d->opd_syn_last_committed_id)
@@ -437,9 +439,7 @@ static int osp_sync_add_rec(const struct lu_env *env, struct osp_device *d,
                       POSTID(&osi->osi_cookie.lgc_lgl.lgl_oi),
                       (unsigned long)osi->osi_cookie.lgc_lgl.lgl_ogen,
                       (unsigned long)osi->osi_cookie.lgc_index, rc);
-               spin_lock(&d->opd_syn_lock);
-               d->opd_syn_changes++;
-               spin_unlock(&d->opd_syn_lock);
+               atomic_inc(&d->opd_syn_changes);
        }
        /* return 0 always here, error case just cause no llog record */
        RETURN(0);
@@ -577,10 +577,8 @@ static int osp_sync_interpret(const struct lu_env *env,
                        /* this is the last time we see the request
                         * if transno is not zero, then commit cb
                         * will be called at some point */
-                       LASSERT(d->opd_syn_rpc_in_progress > 0);
-                       spin_lock(&d->opd_syn_lock);
-                       d->opd_syn_rpc_in_progress--;
-                       spin_unlock(&d->opd_syn_lock);
+                       LASSERT(atomic_read(&d->opd_syn_rpc_in_progress) > 0);
+                       atomic_dec(&d->opd_syn_rpc_in_progress);
                }
 
                wake_up(&d->opd_syn_waitq);
@@ -594,16 +592,16 @@ static int osp_sync_interpret(const struct lu_env *env,
                osp_statfs_need_now(d);
        }
 
-       LASSERT(d->opd_syn_rpc_in_flight > 0);
        spin_lock(&d->opd_syn_lock);
-       d->opd_syn_rpc_in_flight--;
        list_del_init(&jra->jra_inflight_link);
        spin_unlock(&d->opd_syn_lock);
+       LASSERT(atomic_read(&d->opd_syn_rpc_in_flight) > 0);
+       atomic_dec(&d->opd_syn_rpc_in_flight);
        if (unlikely(atomic_read(&d->opd_syn_barrier) > 0))
                wake_up(&d->opd_syn_barrier_waitq);
        CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
-              d->opd_obd->obd_name, d->opd_syn_rpc_in_flight,
-              d->opd_syn_rpc_in_progress);
+              d->opd_obd->obd_name, atomic_read(&d->opd_syn_rpc_in_flight),
+              atomic_read(&d->opd_syn_rpc_in_progress));
 
        osp_sync_check_for_work(d);
 
@@ -623,7 +621,8 @@ static void osp_sync_send_new_rpc(struct osp_device *d,
 {
        struct osp_job_req_args *jra;
 
-       LASSERT(d->opd_syn_rpc_in_flight <= d->opd_syn_max_rpc_in_flight);
+       LASSERT(atomic_read(&d->opd_syn_rpc_in_flight) <=
+               d->opd_syn_max_rpc_in_flight);
 
        jra = ptlrpc_req_async_args(req);
        jra->jra_magic = OSP_JOB_MAGIC;
@@ -904,10 +903,8 @@ static void osp_sync_process_record(const struct lu_env *env,
 
        /* notice we increment counters before sending RPC, to be consistent
         * in RPC interpret callback which may happen very quickly */
-       spin_lock(&d->opd_syn_lock);
-       d->opd_syn_rpc_in_flight++;
-       d->opd_syn_rpc_in_progress++;
-       spin_unlock(&d->opd_syn_lock);
+       atomic_inc(&d->opd_syn_rpc_in_flight);
+       atomic_inc(&d->opd_syn_rpc_in_progress);
 
        switch (rec->lrh_type) {
        /* case MDS_UNLINK_REC is kept for compatibility */
@@ -928,33 +925,35 @@ static void osp_sync_process_record(const struct lu_env *env,
                break;
        }
 
-       spin_lock(&d->opd_syn_lock);
-
        /* For all kinds of records, not matter successful or not,
         * we should decrease changes and bump last_processed_id.
         */
        if (d->opd_syn_prev_done) {
                __u64 correct_id = osp_sync_correct_id(d, rec);
-               LASSERT(d->opd_syn_changes > 0);
+               LASSERT(atomic_read(&d->opd_syn_changes) > 0);
                LASSERT(correct_id <= d->opd_syn_last_committed_id);
                /* NOTE: it's possible to meet same id if
                 * OST stores few stripes of same file
                 */
-               if (correct_id > d->opd_syn_last_processed_id) {
-                       d->opd_syn_last_processed_id = correct_id;
-                       wake_up(&d->opd_syn_barrier_waitq);
+               while (1) {
+                       /* another thread may be trying to set new value */
+                       rmb();
+                       if (correct_id > d->opd_syn_last_processed_id) {
+                               d->opd_syn_last_processed_id = correct_id;
+                               wake_up(&d->opd_syn_barrier_waitq);
+                       } else
+                               break;
                }
-               d->opd_syn_changes--;
+               atomic_dec(&d->opd_syn_changes);
        }
        if (rc != 0) {
-               d->opd_syn_rpc_in_flight--;
-               d->opd_syn_rpc_in_progress--;
+               atomic_dec(&d->opd_syn_rpc_in_flight);
+               atomic_dec(&d->opd_syn_rpc_in_progress);
        }
-       CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
-              d->opd_obd->obd_name, d->opd_syn_rpc_in_flight,
-              d->opd_syn_rpc_in_progress);
 
-       spin_unlock(&d->opd_syn_lock);
+       CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
+              d->opd_obd->obd_name, atomic_read(&d->opd_syn_rpc_in_flight),
+              atomic_read(&d->opd_syn_rpc_in_progress));
 
        /* Delete the invalid record */
        if (rc == 1) {
@@ -1059,13 +1058,11 @@ static void osp_sync_process_committed(const struct lu_env *env,
 
        llog_ctxt_put(ctxt);
 
-       LASSERT(d->opd_syn_rpc_in_progress >= done);
-       spin_lock(&d->opd_syn_lock);
-       d->opd_syn_rpc_in_progress -= done;
-       spin_unlock(&d->opd_syn_lock);
+       LASSERT(atomic_read(&d->opd_syn_rpc_in_progress) >= done);
+       atomic_sub(done, &d->opd_syn_rpc_in_progress);
        CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
-              d->opd_obd->obd_name, d->opd_syn_rpc_in_flight,
-              d->opd_syn_rpc_in_progress);
+              d->opd_obd->obd_name, atomic_read(&d->opd_syn_rpc_in_flight),
+              atomic_read(&d->opd_syn_rpc_in_progress));
 
        osp_sync_check_for_work(d);
 
@@ -1116,11 +1113,11 @@ static int osp_sync_process_queues(const struct lu_env *env,
                if (osp_sync_can_process_new(d, rec)) {
                        if (llh == NULL) {
                                /* ask llog for another record */
-                               CDEBUG(D_HA, "%lu changes, %u in progress,"
+                               CDEBUG(D_HA, "%u changes, %u in progress,"
                                       " %u in flight\n",
-                                      d->opd_syn_changes,
-                                      d->opd_syn_rpc_in_progress,
-                                      d->opd_syn_rpc_in_flight);
+                                      atomic_read(&d->opd_syn_changes),
+                                      atomic_read(&d->opd_syn_rpc_in_progress),
+                                      atomic_read(&d->opd_syn_rpc_in_flight));
                                return 0;
                        }
                        osp_sync_process_record(env, d, llh, rec);
@@ -1199,30 +1196,33 @@ static int osp_sync_thread(void *_arg)
 
        rc = llog_cat_process(&env, llh, osp_sync_process_queues, d, 0, 0);
        LASSERTF(rc == 0 || rc == LLOG_PROC_BREAK,
-                "%lu changes, %u in progress, %u in flight: %d\n",
-                d->opd_syn_changes, d->opd_syn_rpc_in_progress,
-                d->opd_syn_rpc_in_flight, rc);
+                "%u changes, %u in progress, %u in flight: %d\n",
+                atomic_read(&d->opd_syn_changes),
+                atomic_read(&d->opd_syn_rpc_in_progress),
+                atomic_read(&d->opd_syn_rpc_in_flight), rc);
 
        /* we don't expect llog_process_thread() to exit till umount */
        LASSERTF(thread->t_flags != SVC_RUNNING,
-                "%lu changes, %u in progress, %u in flight\n",
-                d->opd_syn_changes, d->opd_syn_rpc_in_progress,
-                d->opd_syn_rpc_in_flight);
+                "%u changes, %u in progress, %u in flight\n",
+                atomic_read(&d->opd_syn_changes),
+                atomic_read(&d->opd_syn_rpc_in_progress),
+                atomic_read(&d->opd_syn_rpc_in_flight));
 
        /* wait till all the requests are completed */
        count = 0;
-       while (d->opd_syn_rpc_in_progress > 0) {
+       while (atomic_read(&d->opd_syn_rpc_in_progress) > 0) {
                osp_sync_process_committed(&env, d);
 
                lwi = LWI_TIMEOUT(cfs_time_seconds(5), NULL, NULL);
                rc = l_wait_event(d->opd_syn_waitq,
-                                 d->opd_syn_rpc_in_progress == 0,
+                                 atomic_read(&d->opd_syn_rpc_in_progress) == 0,
                                  &lwi);
                if (rc == -ETIMEDOUT)
                        count++;
                LASSERTF(count < 10, "%s: %d %d %sempty\n",
-                        d->opd_obd->obd_name, d->opd_syn_rpc_in_progress,
-                        d->opd_syn_rpc_in_flight,
+                        d->opd_obd->obd_name,
+                        atomic_read(&d->opd_syn_rpc_in_progress),
+                        atomic_read(&d->opd_syn_rpc_in_flight),
                         list_empty(&d->opd_syn_committed_there) ? "" : "!");
 
        }
@@ -1232,10 +1232,10 @@ static int osp_sync_thread(void *_arg)
        if (rc)
                CERROR("can't cleanup llog: %d\n", rc);
 out:
-       LASSERTF(d->opd_syn_rpc_in_progress == 0,
+       LASSERTF(atomic_read(&d->opd_syn_rpc_in_progress) == 0,
                 "%s: %d %d %sempty\n",
-                d->opd_obd->obd_name, d->opd_syn_rpc_in_progress,
-                d->opd_syn_rpc_in_flight,
+                d->opd_obd->obd_name, atomic_read(&d->opd_syn_rpc_in_progress),
+                atomic_read(&d->opd_syn_rpc_in_flight),
                 list_empty(&d->opd_syn_committed_there) ? "" : "!");
 
        thread->t_flags = SVC_STOPPED;