Whamcloud - gitweb
LU-7861 osp: replace the hot spinlock with atomic trackers 11/19211/5
authorAlex Zhuravlev <alexey.zhuravlev@intel.com>
Fri, 17 Jun 2016 15:25:04 +0000 (23:25 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Wed, 22 Jun 2016 02:54:15 +0000 (02:54 +0000)
so that background scanning doesn't race with MDS_REINT_UNLINK

Change-Id: I4006be4298ffa841ca4d131ac37c201ff8ea729f
Signed-off-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-on: http://review.whamcloud.com/19211
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: James Simmons <uja.ornl@yahoo.com>
Reviewed-by: Mike Pershin <mike.pershin@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/osp/lproc_osp.c
lustre/osp/osp_dev.c
lustre/osp/osp_internal.h
lustre/osp/osp_precreate.c
lustre/osp/osp_sync.c

index 6dc9287..58fcb2d 100644 (file)
@@ -113,7 +113,7 @@ static int osp_syn_in_flight_seq_show(struct seq_file *m, void *data)
        if (osp == NULL)
                return -EINVAL;
 
-       seq_printf(m, "%u\n", osp->opd_syn_rpc_in_flight);
+       seq_printf(m, "%u\n", atomic_read(&osp->opd_syn_rpc_in_flight));
        return 0;
 }
 LPROC_SEQ_FOPS_RO(osp_syn_in_flight);
@@ -134,7 +134,7 @@ static int osp_syn_in_prog_seq_show(struct seq_file *m, void *data)
        if (osp == NULL)
                return -EINVAL;
 
-       seq_printf(m, "%u\n", osp->opd_syn_rpc_in_progress);
+       seq_printf(m, "%u\n", atomic_read(&osp->opd_syn_rpc_in_progress));
        return 0;
 }
 LPROC_SEQ_FOPS_RO(osp_syn_in_prog);
@@ -155,7 +155,7 @@ static int osp_syn_changes_seq_show(struct seq_file *m, void *data)
        if (osp == NULL)
                return -EINVAL;
 
-       seq_printf(m, "%lu\n", osp->opd_syn_changes);
+       seq_printf(m, "%u\n", atomic_read(&osp->opd_syn_changes));
        return 0;
 }
 
@@ -639,8 +639,9 @@ static int osp_destroys_in_flight_seq_show(struct seq_file *m, void *data)
        if (osp == NULL)
                return -EINVAL;
 
-       seq_printf(m, "%lu\n",
-                  osp->opd_syn_rpc_in_progress + osp->opd_syn_changes);
+       seq_printf(m, "%u\n",
+                  atomic_read(&osp->opd_syn_rpc_in_progress) +
+                  atomic_read(&osp->opd_syn_changes));
        return 0;
 }
 LPROC_SEQ_FOPS_RO(osp_destroys_in_flight);
index 329da7b..9dab59c 100644 (file)
@@ -844,26 +844,27 @@ static int osp_sync(const struct lu_env *env, struct dt_device *dev)
        atomic_inc(&d->opd_syn_barrier);
 
        CDEBUG(D_CACHE, "%s: %u in flight\n", d->opd_obd->obd_name,
-              d->opd_syn_rpc_in_flight);
+              atomic_read(&d->opd_syn_rpc_in_flight));
 
        /* wait till all-in-flight are replied, so executed by the target */
        /* XXX: this is used by LFSCK at the moment, which doesn't require
         *      all the changes to be committed, but in general it'd be
         *      better to wait till commit */
-       while (d->opd_syn_rpc_in_flight > 0) {
+       while (atomic_read(&d->opd_syn_rpc_in_flight) > 0) {
 
-               old = d->opd_syn_rpc_in_flight;
+               old = atomic_read(&d->opd_syn_rpc_in_flight);
 
                expire = cfs_time_shift(obd_timeout);
                lwi = LWI_TIMEOUT(expire - cfs_time_current(),
                                  osp_sync_timeout, d);
                l_wait_event(d->opd_syn_barrier_waitq,
-                               d->opd_syn_rpc_in_flight == 0, &lwi);
+                            atomic_read(&d->opd_syn_rpc_in_flight) == 0,
+                            &lwi);
 
-               if (d->opd_syn_rpc_in_flight == 0)
+               if (atomic_read(&d->opd_syn_rpc_in_flight) == 0)
                        break;
 
-               if (d->opd_syn_rpc_in_flight != old) {
+               if (atomic_read(&d->opd_syn_rpc_in_flight) != old) {
                        /* some progress have been made */
                        continue;
                }
index 3714dc2..64c727d 100644 (file)
@@ -194,7 +194,7 @@ struct osp_device {
        /* unique generation, to recognize start of new records in the llog */
        struct llog_gen                  opd_syn_generation;
        /* number of changes to sync, used to wake up sync thread */
-       unsigned long                    opd_syn_changes;
+       atomic_t                         opd_syn_changes;
        /* processing of changes from previous mount is done? */
        int                              opd_syn_prev_done;
        /* found records */
@@ -207,10 +207,10 @@ struct osp_device {
        /* number of changes being under sync */
        int                              opd_syn_sync_in_progress;
        /* number of RPCs in flight - flow control */
-       int                              opd_syn_rpc_in_flight;
+       atomic_t                         opd_syn_rpc_in_flight;
        int                              opd_syn_max_rpc_in_flight;
        /* number of RPC in processing (including non-committed by OST) */
-       int                              opd_syn_rpc_in_progress;
+       atomic_t                         opd_syn_rpc_in_progress;
        int                              opd_syn_max_rpc_in_progress;
        /* osd api's commit cb control structure */
        struct dt_txn_callback           opd_syn_txn_cb;
@@ -220,7 +220,7 @@ struct osp_device {
         * last_committed */
        __u64                            opd_syn_last_committed_id;
        /* last processed (taken from llog) id */
-       __u64                            opd_syn_last_processed_id;
+       volatile __u64                   opd_syn_last_processed_id;
        struct osp_id_tracker           *opd_syn_tracker;
        struct list_head                 opd_syn_ontrack;
        /* stop processing new requests until barrier=0 */
index 30f3164..e341faf 100644 (file)
@@ -973,8 +973,9 @@ void osp_pre_update_status(struct osp_device *d, int rc)
                                       msfs->os_bfree, used, msfs->os_bavail,
                                       d->opd_pre_status, rc);
                        CDEBUG(D_INFO,
-                              "non-committed changes: %lu, in progress: %u\n",
-                              d->opd_syn_changes, d->opd_syn_rpc_in_progress);
+                              "non-committed changes: %u, in progress: %u\n",
+                              atomic_read(&d->opd_syn_changes),
+                              atomic_read(&d->opd_syn_rpc_in_progress));
                } else if (old == -ENOSPC) {
                        d->opd_pre_status = 0;
                        spin_lock(&d->opd_pre_lock);
@@ -1243,7 +1244,8 @@ static int osp_precreate_ready_condition(const struct lu_env *env,
                return 1;
 
        /* ready if OST reported no space and no destroys in progress */
-       if (d->opd_syn_changes + d->opd_syn_rpc_in_progress == 0 &&
+       if (atomic_read(&d->opd_syn_changes) +
+           atomic_read(&d->opd_syn_rpc_in_progress) == 0 &&
            d->opd_pre_status == -ENOSPC)
                return 1;
 
@@ -1268,11 +1270,12 @@ static int osp_precreate_timeout_condition(void *data)
        struct osp_device *d = data;
 
        CDEBUG(D_HA, "%s: slow creates, last="DFID", next="DFID", "
-             "reserved="LPU64", syn_changes=%lu, "
+             "reserved="LPU64", syn_changes=%u, "
              "syn_rpc_in_progress=%d, status=%d\n",
              d->opd_obd->obd_name, PFID(&d->opd_pre_last_created_fid),
              PFID(&d->opd_pre_used_fid), d->opd_pre_reserved,
-             d->opd_syn_changes, d->opd_syn_rpc_in_progress,
+             atomic_read(&d->opd_syn_changes),
+             atomic_read(&d->opd_syn_rpc_in_progress),
              d->opd_pre_status);
 
        return 1;
@@ -1361,16 +1364,16 @@ int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d)
                 * wait till that is done - some space might be released
                 */
                if (unlikely(rc == -ENOSPC)) {
-                       if (d->opd_syn_changes) {
+                       if (atomic_read(&d->opd_syn_changes)) {
                                /* force local commit to release space */
                                dt_commit_async(env, d->opd_storage);
                        }
-                       if (d->opd_syn_rpc_in_progress) {
+                       if (atomic_read(&d->opd_syn_rpc_in_progress)) {
                                /* just wait till destroys are done */
                                /* see l_wait_even() few lines below */
                        }
-                       if (d->opd_syn_changes +
-                           d->opd_syn_rpc_in_progress == 0) {
+                       if (atomic_read(&d->opd_syn_changes) +
+                           atomic_read(&d->opd_syn_rpc_in_progress) == 0) {
                                /* no hope for free space */
                                break;
                        }
index 582296f..2fcc9e5 100644 (file)
@@ -188,7 +188,8 @@ static inline int osp_sync_inflight_conflict(struct osp_device *d,
 
 static inline int osp_sync_low_in_progress(struct osp_device *d)
 {
-       return d->opd_syn_rpc_in_progress < d->opd_syn_max_rpc_in_progress;
+       return atomic_read(&d->opd_syn_rpc_in_progress) <
+               d->opd_syn_max_rpc_in_progress;
 }
 
 /**
@@ -201,7 +202,8 @@ static inline int osp_sync_low_in_progress(struct osp_device *d)
  */
 static inline int osp_sync_low_in_flight(struct osp_device *d)
 {
-       return d->opd_syn_rpc_in_flight < d->opd_syn_max_rpc_in_flight;
+       return atomic_read(&d->opd_syn_rpc_in_flight) <
+               d->opd_syn_max_rpc_in_flight;
 }
 
 /**
@@ -288,7 +290,7 @@ static inline int osp_sync_can_process_new(struct osp_device *d,
                return 0;
        if (d->opd_syn_prev_done == 0)
                return 1;
-       if (d->opd_syn_changes == 0)
+       if (atomic_read(&d->opd_syn_changes) == 0)
                return 0;
        if (rec == NULL ||
            osp_sync_correct_id(d, rec) <= d->opd_syn_last_committed_id)
@@ -437,9 +439,7 @@ static int osp_sync_add_rec(const struct lu_env *env, struct osp_device *d,
                       POSTID(&osi->osi_cookie.lgc_lgl.lgl_oi),
                       (unsigned long)osi->osi_cookie.lgc_lgl.lgl_ogen,
                       (unsigned long)osi->osi_cookie.lgc_index, rc);
-               spin_lock(&d->opd_syn_lock);
-               d->opd_syn_changes++;
-               spin_unlock(&d->opd_syn_lock);
+               atomic_inc(&d->opd_syn_changes);
        }
        /* return 0 always here, error case just cause no llog record */
        RETURN(0);
@@ -577,10 +577,8 @@ static int osp_sync_interpret(const struct lu_env *env,
                        /* this is the last time we see the request
                         * if transno is not zero, then commit cb
                         * will be called at some point */
-                       LASSERT(d->opd_syn_rpc_in_progress > 0);
-                       spin_lock(&d->opd_syn_lock);
-                       d->opd_syn_rpc_in_progress--;
-                       spin_unlock(&d->opd_syn_lock);
+                       LASSERT(atomic_read(&d->opd_syn_rpc_in_progress) > 0);
+                       atomic_dec(&d->opd_syn_rpc_in_progress);
                }
 
                wake_up(&d->opd_syn_waitq);
@@ -594,16 +592,16 @@ static int osp_sync_interpret(const struct lu_env *env,
                osp_statfs_need_now(d);
        }
 
-       LASSERT(d->opd_syn_rpc_in_flight > 0);
        spin_lock(&d->opd_syn_lock);
-       d->opd_syn_rpc_in_flight--;
        list_del_init(&jra->jra_inflight_link);
        spin_unlock(&d->opd_syn_lock);
+       LASSERT(atomic_read(&d->opd_syn_rpc_in_flight) > 0);
+       atomic_dec(&d->opd_syn_rpc_in_flight);
        if (unlikely(atomic_read(&d->opd_syn_barrier) > 0))
                wake_up(&d->opd_syn_barrier_waitq);
        CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
-              d->opd_obd->obd_name, d->opd_syn_rpc_in_flight,
-              d->opd_syn_rpc_in_progress);
+              d->opd_obd->obd_name, atomic_read(&d->opd_syn_rpc_in_flight),
+              atomic_read(&d->opd_syn_rpc_in_progress));
 
        osp_sync_check_for_work(d);
 
@@ -623,7 +621,8 @@ static void osp_sync_send_new_rpc(struct osp_device *d,
 {
        struct osp_job_req_args *jra;
 
-       LASSERT(d->opd_syn_rpc_in_flight <= d->opd_syn_max_rpc_in_flight);
+       LASSERT(atomic_read(&d->opd_syn_rpc_in_flight) <=
+               d->opd_syn_max_rpc_in_flight);
 
        jra = ptlrpc_req_async_args(req);
        jra->jra_magic = OSP_JOB_MAGIC;
@@ -904,10 +903,8 @@ static void osp_sync_process_record(const struct lu_env *env,
 
        /* notice we increment counters before sending RPC, to be consistent
         * in RPC interpret callback which may happen very quickly */
-       spin_lock(&d->opd_syn_lock);
-       d->opd_syn_rpc_in_flight++;
-       d->opd_syn_rpc_in_progress++;
-       spin_unlock(&d->opd_syn_lock);
+       atomic_inc(&d->opd_syn_rpc_in_flight);
+       atomic_inc(&d->opd_syn_rpc_in_progress);
 
        switch (rec->lrh_type) {
        /* case MDS_UNLINK_REC is kept for compatibility */
@@ -928,33 +925,35 @@ static void osp_sync_process_record(const struct lu_env *env,
                break;
        }
 
-       spin_lock(&d->opd_syn_lock);
-
        /* For all kinds of records, not matter successful or not,
         * we should decrease changes and bump last_processed_id.
         */
        if (d->opd_syn_prev_done) {
                __u64 correct_id = osp_sync_correct_id(d, rec);
-               LASSERT(d->opd_syn_changes > 0);
+               LASSERT(atomic_read(&d->opd_syn_changes) > 0);
                LASSERT(correct_id <= d->opd_syn_last_committed_id);
                /* NOTE: it's possible to meet same id if
                 * OST stores few stripes of same file
                 */
-               if (correct_id > d->opd_syn_last_processed_id) {
-                       d->opd_syn_last_processed_id = correct_id;
-                       wake_up(&d->opd_syn_barrier_waitq);
+               while (1) {
+                       /* another thread may be trying to set new value */
+                       rmb();
+                       if (correct_id > d->opd_syn_last_processed_id) {
+                               d->opd_syn_last_processed_id = correct_id;
+                               wake_up(&d->opd_syn_barrier_waitq);
+                       } else
+                               break;
                }
-               d->opd_syn_changes--;
+               atomic_dec(&d->opd_syn_changes);
        }
        if (rc != 0) {
-               d->opd_syn_rpc_in_flight--;
-               d->opd_syn_rpc_in_progress--;
+               atomic_dec(&d->opd_syn_rpc_in_flight);
+               atomic_dec(&d->opd_syn_rpc_in_progress);
        }
-       CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
-              d->opd_obd->obd_name, d->opd_syn_rpc_in_flight,
-              d->opd_syn_rpc_in_progress);
 
-       spin_unlock(&d->opd_syn_lock);
+       CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
+              d->opd_obd->obd_name, atomic_read(&d->opd_syn_rpc_in_flight),
+              atomic_read(&d->opd_syn_rpc_in_progress));
 
        /* Delete the invalid record */
        if (rc == 1) {
@@ -1059,13 +1058,11 @@ static void osp_sync_process_committed(const struct lu_env *env,
 
        llog_ctxt_put(ctxt);
 
-       LASSERT(d->opd_syn_rpc_in_progress >= done);
-       spin_lock(&d->opd_syn_lock);
-       d->opd_syn_rpc_in_progress -= done;
-       spin_unlock(&d->opd_syn_lock);
+       LASSERT(atomic_read(&d->opd_syn_rpc_in_progress) >= done);
+       atomic_sub(done, &d->opd_syn_rpc_in_progress);
        CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
-              d->opd_obd->obd_name, d->opd_syn_rpc_in_flight,
-              d->opd_syn_rpc_in_progress);
+              d->opd_obd->obd_name, atomic_read(&d->opd_syn_rpc_in_flight),
+              atomic_read(&d->opd_syn_rpc_in_progress));
 
        osp_sync_check_for_work(d);
 
@@ -1116,11 +1113,11 @@ static int osp_sync_process_queues(const struct lu_env *env,
                if (osp_sync_can_process_new(d, rec)) {
                        if (llh == NULL) {
                                /* ask llog for another record */
-                               CDEBUG(D_HA, "%lu changes, %u in progress,"
+                               CDEBUG(D_HA, "%u changes, %u in progress,"
                                       " %u in flight\n",
-                                      d->opd_syn_changes,
-                                      d->opd_syn_rpc_in_progress,
-                                      d->opd_syn_rpc_in_flight);
+                                      atomic_read(&d->opd_syn_changes),
+                                      atomic_read(&d->opd_syn_rpc_in_progress),
+                                      atomic_read(&d->opd_syn_rpc_in_flight));
                                return 0;
                        }
                        osp_sync_process_record(env, d, llh, rec);
@@ -1199,30 +1196,33 @@ static int osp_sync_thread(void *_arg)
 
        rc = llog_cat_process(&env, llh, osp_sync_process_queues, d, 0, 0);
        LASSERTF(rc == 0 || rc == LLOG_PROC_BREAK,
-                "%lu changes, %u in progress, %u in flight: %d\n",
-                d->opd_syn_changes, d->opd_syn_rpc_in_progress,
-                d->opd_syn_rpc_in_flight, rc);
+                "%u changes, %u in progress, %u in flight: %d\n",
+                atomic_read(&d->opd_syn_changes),
+                atomic_read(&d->opd_syn_rpc_in_progress),
+                atomic_read(&d->opd_syn_rpc_in_flight), rc);
 
        /* we don't expect llog_process_thread() to exit till umount */
        LASSERTF(thread->t_flags != SVC_RUNNING,
-                "%lu changes, %u in progress, %u in flight\n",
-                d->opd_syn_changes, d->opd_syn_rpc_in_progress,
-                d->opd_syn_rpc_in_flight);
+                "%u changes, %u in progress, %u in flight\n",
+                atomic_read(&d->opd_syn_changes),
+                atomic_read(&d->opd_syn_rpc_in_progress),
+                atomic_read(&d->opd_syn_rpc_in_flight));
 
        /* wait till all the requests are completed */
        count = 0;
-       while (d->opd_syn_rpc_in_progress > 0) {
+       while (atomic_read(&d->opd_syn_rpc_in_progress) > 0) {
                osp_sync_process_committed(&env, d);
 
                lwi = LWI_TIMEOUT(cfs_time_seconds(5), NULL, NULL);
                rc = l_wait_event(d->opd_syn_waitq,
-                                 d->opd_syn_rpc_in_progress == 0,
+                                 atomic_read(&d->opd_syn_rpc_in_progress) == 0,
                                  &lwi);
                if (rc == -ETIMEDOUT)
                        count++;
                LASSERTF(count < 10, "%s: %d %d %sempty\n",
-                        d->opd_obd->obd_name, d->opd_syn_rpc_in_progress,
-                        d->opd_syn_rpc_in_flight,
+                        d->opd_obd->obd_name,
+                        atomic_read(&d->opd_syn_rpc_in_progress),
+                        atomic_read(&d->opd_syn_rpc_in_flight),
                         list_empty(&d->opd_syn_committed_there) ? "" : "!");
 
        }
@@ -1232,10 +1232,10 @@ static int osp_sync_thread(void *_arg)
        if (rc)
                CERROR("can't cleanup llog: %d\n", rc);
 out:
-       LASSERTF(d->opd_syn_rpc_in_progress == 0,
+       LASSERTF(atomic_read(&d->opd_syn_rpc_in_progress) == 0,
                 "%s: %d %d %sempty\n",
-                d->opd_obd->obd_name, d->opd_syn_rpc_in_progress,
-                d->opd_syn_rpc_in_flight,
+                d->opd_obd->obd_name, atomic_read(&d->opd_syn_rpc_in_progress),
+                atomic_read(&d->opd_syn_rpc_in_flight),
                 list_empty(&d->opd_syn_committed_there) ? "" : "!");
 
        thread->t_flags = SVC_STOPPED;