Whamcloud - gitweb
LU-5108 osc: Performance tune for LRU 58/10458/18
authorJinshan Xiong <jinshan.xiong@intel.com>
Sat, 18 Jul 2015 13:10:09 +0000 (06:10 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Tue, 18 Aug 2015 11:13:20 +0000 (11:13 +0000)
Early launch page LRU work in osc_io_rw_iter_init();
Change the page LRU shrinking policy by OSC attributes;
Delete the contented lock osc_object::oo_seatbelt
Other tiny changes for LRU management

Signed-off-by: Jinshan Xiong <jinshan.xiong@intel.com>
Change-Id: I688c29a99a469ef74f929a0689596170c665b2ee
Reviewed-on: http://review.whamcloud.com/10458
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Bobi Jam <bobijam@hotmail.com>
Reviewed-by: Fan Yong <fan.yong@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/obd.h
lustre/osc/lproc_osc.c
lustre/osc/osc_cache.c
lustre/osc/osc_cl_internal.h
lustre/osc/osc_internal.h
lustre/osc/osc_io.c
lustre/osc/osc_object.c
lustre/osc/osc_page.c
lustre/osc/osc_quota.c
lustre/osc/osc_request.c

index 37987ca..19e6544 100644 (file)
@@ -244,16 +244,38 @@ struct client_obd {
        struct obd_histogram    cl_read_offset_hist;
        struct obd_histogram    cl_write_offset_hist;
 
-       /* lru for osc caching pages */
-       struct cl_client_cache  *cl_cache;
-       struct list_head         cl_lru_osc; /* member of cl_cache->ccc_lru */
-       atomic_long_t           *cl_lru_left;
-       atomic_long_t            cl_lru_busy;
-       atomic_long_t            cl_lru_in_list;
-       atomic_long_t            cl_unstable_count;
-       struct list_head         cl_lru_list; /* lru page list */
-       spinlock_t               cl_lru_list_lock; /* page list protector */
-       atomic_t                 cl_lru_shrinkers;
+       /** LRU for osc caching pages */
+       struct cl_client_cache  *cl_cache;
+       /** member of cl_cache->ccc_lru */
+       struct list_head         cl_lru_osc;
+       /** # of available LRU slots left in the per-OSC cache.
+        * Available LRU slots are shared by all OSCs of the same file system,
+        * therefore this is a pointer to cl_client_cache::ccc_lru_left. */
+       atomic_long_t           *cl_lru_left;
+       /** # of busy LRU pages. A page is considered busy if it's in writeback
+        * queue, or in transfer. Busy pages can't be discarded so they are not
+        * in LRU cache. */
+       atomic_long_t            cl_lru_busy;
+       /** # of LRU pages in the cache for this client_obd */
+       atomic_long_t            cl_lru_in_list;
+       /** # of threads are shrinking LRU cache. To avoid contention, it's not
+        * allowed to have multiple threads shrinking LRU cache. */
+       atomic_t                 cl_lru_shrinkers;
+       /** The time when this LRU cache was last used. */
+       time_t                   cl_lru_last_used;
+       /** stats: how many reclaims have happened for this client_obd.
+        * reclaim and shrink - shrink is async, voluntarily rebalancing;
+        * reclaim is sync, initiated by IO thread when the LRU slots are
+        * in shortage. */
+       __u64                    cl_lru_reclaim;
+       /** List of LRU pages for this client_obd */
+       struct list_head         cl_lru_list;
+       /** Lock for LRU page list */
+       spinlock_t               cl_lru_list_lock;
+       /** # of unstable pages in this client_obd.
+        * An unstable page is a page state that WRITE RPC has finished but
+        * the transaction has NOT yet committed. */
+       atomic_long_t            cl_unstable_count;
 
        /* number of in flight destroy rpcs is limited to max_rpcs_in_flight */
        atomic_t                 cl_destroy_in_flight;
index 51bb482..dc372fd 100644 (file)
@@ -183,10 +183,12 @@ static int osc_cached_mb_seq_show(struct seq_file *m, void *v)
 
        rc = seq_printf(m,
                      "used_mb: %ld\n"
-                     "busy_cnt: %ld\n",
+                     "busy_cnt: %ld\n"
+                     "reclaim: "LPU64"\n",
                      (atomic_long_read(&cli->cl_lru_in_list) +
-                       atomic_long_read(&cli->cl_lru_busy)) >> shift,
-                     atomic_long_read(&cli->cl_lru_busy));
+                      atomic_long_read(&cli->cl_lru_busy)) >> shift,
+                     atomic_long_read(&cli->cl_lru_busy),
+                     cli->cl_lru_reclaim);
 
        return rc;
 }
index f4b3f96..a4ae79b 100644 (file)
@@ -916,7 +916,7 @@ static int osc_extent_wait(const struct lu_env *env, struct osc_extent *ext,
        if (rc == -ETIMEDOUT) {
                OSC_EXTENT_DUMP(D_ERROR, ext,
                        "%s: wait ext to %u timedout, recovery in progress?\n",
-                       osc_export(obj)->exp_obd->obd_name, state);
+                       cli_name(osc_cli(obj)), state);
 
                lwi = LWI_INTR(NULL, NULL);
                rc = l_wait_event(ext->oe_waitq, extent_wait_cb(ext, state),
@@ -1279,7 +1279,6 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
 {
        struct osc_page   *opg  = oap2osc_page(oap);
        struct cl_page    *page = oap2cl_page(oap);
-       struct osc_object *obj  = cl2osc(opg->ops_cl.cpl_obj);
        enum cl_req_type   crt;
        int srvlock;
 
@@ -1304,13 +1303,6 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
        /* Clear opg->ops_transfer_pinned before VM lock is released. */
        opg->ops_transfer_pinned = 0;
 
-       spin_lock(&obj->oo_seatbelt);
-       LASSERT(opg->ops_submitter != NULL);
-       LASSERT(!list_empty(&opg->ops_inflight));
-       list_del_init(&opg->ops_inflight);
-       opg->ops_submitter = NULL;
-       spin_unlock(&obj->oo_seatbelt);
-
        opg->ops_submit_time = 0;
        srvlock = oap->oap_brw_flags & OBD_BRW_SRVLOCK;
 
@@ -1340,10 +1332,10 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
 
 #define OSC_DUMP_GRANT(lvl, cli, fmt, args...) do {                    \
        struct client_obd *__tmp = (cli);                               \
-       CDEBUG(lvl, "%s: grant { dirty: %ld/%ld dirty_pages: %ld/%lu "  \
+       CDEBUG(lvl, "%s: grant { dirty: %lu/%lu dirty_pages: %ld/%lu "  \
               "dropped: %ld avail: %ld, reserved: %ld, flight: %d }"   \
               "lru {in list: %ld, left: %ld, waiters: %d }"fmt"\n",    \
-              __tmp->cl_import->imp_obd->obd_name,                     \
+              cli_name(__tmp),                                         \
               __tmp->cl_dirty_pages, __tmp->cl_dirty_max_pages,        \
               atomic_long_read(&obd_dirty_pages), obd_max_dirty_pages, \
               __tmp->cl_lost_grant, __tmp->cl_avail_grant,             \
@@ -1578,7 +1570,7 @@ static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
                osc_io_unplug_async(env, cli, NULL);
 
                CDEBUG(D_CACHE, "%s: sleeping for cache space @ %p for %p\n",
-                      cli->cl_import->imp_obd->obd_name, &ocw, oap);
+                      cli_name(cli), &ocw, oap);
 
                rc = l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
 
@@ -1623,7 +1615,7 @@ static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
        default:
                CDEBUG(D_CACHE, "%s: event for cache space @ %p never arrived "
                       "due to %d, fall back to sync i/o\n",
-                      cli->cl_import->imp_obd->obd_name, &ocw, rc);
+                      cli_name(cli), &ocw, rc);
                break;
        }
        EXIT;
@@ -2214,13 +2206,9 @@ static int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli,
                return 0;
 
        if (!async) {
-               /* disable osc_lru_shrink() temporarily to avoid
-                * potential stack overrun problem. LU-2859 */
-               atomic_inc(&cli->cl_lru_shrinkers);
                spin_lock(&cli->cl_loi_list_lock);
                osc_check_rpcs(env, cli);
                spin_unlock(&cli->cl_loi_list_lock);
-               atomic_dec(&cli->cl_lru_shrinkers);
        } else {
                CDEBUG(D_CACHE, "Queue writeback work for client %p.\n", cli);
                LASSERT(cli->cl_writeback_work != NULL);
@@ -2441,7 +2429,6 @@ int osc_teardown_async_page(const struct lu_env *env,
                            struct osc_object *obj, struct osc_page *ops)
 {
        struct osc_async_page *oap = &ops->ops_oap;
-       struct osc_extent     *ext = NULL;
        int rc = 0;
        ENTRY;
 
@@ -2450,12 +2437,15 @@ int osc_teardown_async_page(const struct lu_env *env,
        CDEBUG(D_INFO, "teardown oap %p page %p at index %lu.\n",
               oap, ops, osc_index(oap2osc(oap)));
 
-       osc_object_lock(obj);
        if (!list_empty(&oap->oap_rpc_item)) {
                CDEBUG(D_CACHE, "oap %p is not in cache.\n", oap);
                rc = -EBUSY;
        } else if (!list_empty(&oap->oap_pending_item)) {
+               struct osc_extent *ext = NULL;
+
+               osc_object_lock(obj);
                ext = osc_extent_lookup(obj, osc_index(oap2osc(oap)));
+               osc_object_unlock(obj);
                /* only truncated pages are allowed to be taken out.
                 * See osc_extent_truncate() and osc_cache_truncate_start()
                 * for details. */
@@ -2464,10 +2454,9 @@ int osc_teardown_async_page(const struct lu_env *env,
                                        osc_index(oap2osc(oap)));
                        rc = -EBUSY;
                }
+               if (ext != NULL)
+                       osc_extent_put(env, ext);
        }
-       osc_object_unlock(obj);
-       if (ext != NULL)
-               osc_extent_put(env, ext);
        RETURN(rc);
 }
 
index 9dc9cdf..49b2a95 100644 (file)
@@ -133,16 +133,6 @@ struct osc_object {
         /** Serialization object for osc_object::oo_debug_io. */
        struct mutex       oo_debug_mutex;
 #endif
-        /**
-         * List of pages in transfer.
-         */
-       struct list_head        oo_inflight[CRT_NR];
-       /**
-        * Lock, protecting osc_page::ops_inflight, because a seat-belt is
-        * locked during take-off and landing.
-        */
-       spinlock_t              oo_seatbelt;
-
        /**
         * used by the osc to keep track of what objects to build into rpcs.
         * Protected by client_obd->cli_loi_list_lock.
@@ -376,15 +366,6 @@ struct osc_page {
         */
        struct list_head        ops_lru;
        /**
-        * Linkage into a per-osc_object list of pages in flight. For
-        * debugging.
-        */
-       struct list_head        ops_inflight;
-       /**
-        * Thread that submitted this page for transfer. For debugging.
-        */
-       struct task_struct           *ops_submitter;
-       /**
         * Submit time - the time when the page is starting RPC. For debugging.
         */
        cfs_time_t            ops_submit_time;
index 985df64..f2f8951 100644 (file)
@@ -135,7 +135,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
                  struct list_head *ext_list, int cmd);
 long osc_lru_shrink(const struct lu_env *env, struct client_obd *cli,
                   long target, bool force);
-long osc_lru_reclaim(struct client_obd *cli);
+long osc_lru_reclaim(struct client_obd *cli, unsigned long npages);
 
 extern spinlock_t osc_ast_guard;
 extern struct lu_kmem_descr osc_caches[];
@@ -166,6 +166,11 @@ static inline unsigned long rpcs_in_flight(struct client_obd *cli)
        return cli->cl_r_in_flight + cli->cl_w_in_flight;
 }
 
+static inline char *cli_name(struct client_obd *cli)
+{
+       return cli->cl_import->imp_obd->obd_name;
+}
+
 #ifndef min_t
 #define min_t(type,x,y) \
         ({ type __x = (x); type __y = (y); __x < __y ? __x: __y; })
index 043a6de..23ef9f4 100644 (file)
@@ -360,8 +360,8 @@ static int osc_io_rw_iter_init(const struct lu_env *env,
        struct osc_object *osc = cl2osc(ios->cis_obj);
        struct client_obd *cli = osc_cli(osc);
        unsigned long c;
-       unsigned long npages;
        unsigned long max_pages;
+       unsigned long npages;
        ENTRY;
 
        if (cl_io_is_append(io))
@@ -376,7 +376,7 @@ static int osc_io_rw_iter_init(const struct lu_env *env,
                npages = max_pages;
 
        c = atomic_long_read(cli->cl_lru_left);
-       if (c < npages && osc_lru_reclaim(cli) > 0)
+       if (c < npages && osc_lru_reclaim(cli, npages) > 0)
                c = atomic_long_read(cli->cl_lru_left);
        while (c >= npages) {
                if (c == atomic_long_cmpxchg(cli->cl_lru_left, c, c - npages)) {
@@ -385,6 +385,15 @@ static int osc_io_rw_iter_init(const struct lu_env *env,
                }
                c = atomic_long_read(cli->cl_lru_left);
        }
+       if (atomic_long_read(cli->cl_lru_left) < max_pages) {
+               /* If there aren't enough pages in the per-OSC LRU then
+                * wake up the LRU thread to try and clear out space, so
+                * we don't block if pages are being dirtied quickly. */
+               CDEBUG(D_CACHE, "%s: queue LRU, left: %lu/%ld.\n",
+                      cli_name(cli), atomic_long_read(cli->cl_lru_left),
+                      max_pages);
+               (void)ptlrpcd_queue_work(cli->cl_lru_work);
+       }
 
        RETURN(0);
 }
index 2b9b50a..00d19e7 100644 (file)
@@ -75,16 +75,11 @@ static int osc_object_init(const struct lu_env *env, struct lu_object *obj,
 {
         struct osc_object           *osc   = lu2osc(obj);
         const struct cl_object_conf *cconf = lu2cl_conf(conf);
-        int i;
 
         osc->oo_oinfo = cconf->u.coc_oinfo;
 #ifdef CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK
        mutex_init(&osc->oo_debug_mutex);
 #endif
-       spin_lock_init(&osc->oo_seatbelt);
-        for (i = 0; i < CRT_NR; ++i)
-               INIT_LIST_HEAD(&osc->oo_inflight[i]);
-
        INIT_LIST_HEAD(&osc->oo_ready_item);
        INIT_LIST_HEAD(&osc->oo_hp_ready_item);
        INIT_LIST_HEAD(&osc->oo_write_item);
@@ -110,10 +105,6 @@ static int osc_object_init(const struct lu_env *env, struct lu_object *obj,
 static void osc_object_free(const struct lu_env *env, struct lu_object *obj)
 {
        struct osc_object *osc = lu2osc(obj);
-       int i;
-
-       for (i = 0; i < CRT_NR; ++i)
-               LASSERT(list_empty(&osc->oo_inflight[i]));
 
        LASSERT(list_empty(&osc->oo_ready_item));
        LASSERT(list_empty(&osc->oo_hp_ready_item));
index f07f462..d5d3ed7 100644 (file)
@@ -198,11 +198,6 @@ static void osc_page_transfer_add(const struct lu_env *env,
        /* ops_lru and ops_inflight share the same field, so take it from LRU
         * first and then use it as inflight. */
        osc_lru_use(osc_cli(obj), opg);
-
-       spin_lock(&obj->oo_seatbelt);
-       list_add(&opg->ops_inflight, &obj->oo_inflight[crt]);
-       opg->ops_submitter = current;
-       spin_unlock(&obj->oo_seatbelt);
 }
 
 int osc_page_cache_add(const struct lu_env *env,
@@ -257,7 +252,7 @@ static int osc_page_print(const struct lu_env *env,
        return (*printer)(env, cookie, LUSTRE_OSC_NAME"-page@%p %lu: "
                          "1< %#x %d %u %s %s > "
                          "2< "LPD64" %u %u %#x %#x | %p %p %p > "
-                         "3< %s %p %d %lu %d > "
+                         "3< %d %lu %d > "
                          "4< %d %d %d %lu %s | %s %s %s %s > "
                          "5< %s %s %s %s | %d %s | %d %s %s>\n",
                          opg, osc_index(opg),
@@ -270,10 +265,9 @@ static int osc_page_print(const struct lu_env *env,
                           oap->oap_obj_off, oap->oap_page_off, oap->oap_count,
                           oap->oap_async_flags, oap->oap_brw_flags,
                          oap->oap_request, oap->oap_cli, obj,
-                          /* 3 */
-                          osc_list(&opg->ops_inflight),
-                          opg->ops_submitter, opg->ops_transfer_pinned,
-                          osc_submit_duration(opg), opg->ops_srvlock,
+                         /* 3 */
+                         opg->ops_transfer_pinned,
+                         osc_submit_duration(opg), opg->ops_srvlock,
                           /* 4 */
                           cli->cl_r_in_flight, cli->cl_w_in_flight,
                           cli->cl_max_rpcs_in_flight,
@@ -314,14 +308,6 @@ static void osc_page_delete(const struct lu_env *env,
                LASSERT(0);
        }
 
-       spin_lock(&obj->oo_seatbelt);
-       if (opg->ops_submitter != NULL) {
-               LASSERT(!list_empty(&opg->ops_inflight));
-               list_del_init(&opg->ops_inflight);
-               opg->ops_submitter = NULL;
-       }
-       spin_unlock(&obj->oo_seatbelt);
-
        osc_lru_del(osc_cli(obj), opg);
 
        if (slice->cpl_page->cp_type == CPT_CACHEABLE) {
@@ -416,9 +402,6 @@ int osc_page_init(const struct lu_env *env, struct cl_object *obj,
 #ifdef CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK
        opg->ops_temp = !osc_page_protected(env, opg, CLM_READ, 1);
 #endif
-       /* ops_inflight and ops_lru are the same field, but it doesn't
-        * hurt to initialize it twice :-) */
-       INIT_LIST_HEAD(&opg->ops_inflight);
        INIT_LIST_HEAD(&opg->ops_lru);
 
        /* reserve an LRU space for this page */
@@ -483,17 +466,31 @@ void osc_page_submit(const struct lu_env *env, struct osc_page *opg,
  */
 
 static DECLARE_WAIT_QUEUE_HEAD(osc_lru_waitq);
-/* LRU pages are freed in batch mode. OSC should at least free this
- * number of pages to avoid running out of LRU budget, and.. */
-static const int lru_shrink_min = 2 << (20 - PAGE_CACHE_SHIFT); /* 2M */
-/* free this number at most otherwise it will take too long time to finsih. */
-static const int lru_shrink_max = 8 << (20 - PAGE_CACHE_SHIFT); /* 8M */
 
-/* Check if we can free LRU slots from this OSC. If there exists LRU waiters,
+/**
+ * LRU pages are freed in batch mode. OSC should at least free this
+ * number of pages to avoid running out of LRU slots.
+ */
+static inline int lru_shrink_min(struct client_obd *cli)
+{
+       return cli->cl_max_pages_per_rpc * 2;
+}
+
+/**
+ * free this number at most otherwise it will take too long time to finsih.
+ */
+static inline int lru_shrink_max(struct client_obd *cli)
+{
+       return cli->cl_max_pages_per_rpc * cli->cl_max_rpcs_in_flight;
+}
+
+/**
+ * Check if we can free LRU slots from this OSC. If there exists LRU waiters,
  * we should free slots aggressively. In this way, slots are freed in a steady
  * step to maintain fairness among OSCs.
  *
- * Return how many LRU pages should be freed. */
+ * Return how many LRU pages should be freed.
+ */
 static int osc_cache_too_much(struct client_obd *cli)
 {
        struct cl_client_cache *cache = cli->cl_cache;
@@ -505,15 +502,18 @@ static int osc_cache_too_much(struct client_obd *cli)
 
        /* if it's going to run out LRU slots, we should free some, but not
         * too much to maintain faireness among OSCs. */
-       if (atomic_long_read(cli->cl_lru_left) < cache->ccc_lru_max >> 4) {
+       if (atomic_long_read(cli->cl_lru_left) < cache->ccc_lru_max >> 2) {
                if (pages >= budget)
-                       return lru_shrink_max;
+                       return lru_shrink_max(cli);
                else if (pages >= budget / 2)
-                       return lru_shrink_min;
-#if 0
-       } else if (pages >= budget * 2)
-               return lru_shrink_min;
-#endif
+                       return lru_shrink_min(cli);
+       } else {
+               int duration = cfs_time_current_sec() - cli->cl_lru_last_used;
+
+               /* knock out pages by duration of no IO activity */
+               duration >>= 6; /* approximately 1 minute */
+               if (duration > 0 && pages >= budget / duration)
+                       return lru_shrink_min(cli);
        }
        return 0;
 }
@@ -521,11 +521,20 @@ static int osc_cache_too_much(struct client_obd *cli)
 int lru_queue_work(const struct lu_env *env, void *data)
 {
        struct client_obd *cli = data;
+       int count;
 
-       CDEBUG(D_CACHE, "Run LRU work for client obd %p.\n", cli);
+       CDEBUG(D_CACHE, "%s: run LRU work for client obd\n", cli_name(cli));
+       count = osc_cache_too_much(cli);
+       if (count > 0) {
+               int rc = osc_lru_shrink(env, cli, count, false);
 
-       if (osc_cache_too_much(cli))
-               osc_lru_shrink(env, cli, lru_shrink_max, true);
+               CDEBUG(D_CACHE, "%s: shrank %d/%d pages from client obd\n",
+                      cli_name(cli), rc, count);
+               if (rc >= count) {
+                       CDEBUG(D_CACHE, "%s: queue again\n", cli_name(cli));
+                       ptlrpcd_queue_work(cli->cl_lru_work);
+               }
+       }
 
        RETURN(0);
 }
@@ -552,10 +561,10 @@ void osc_lru_add_batch(struct client_obd *cli, struct list_head *plist)
                list_splice_tail(&lru, &cli->cl_lru_list);
                atomic_long_sub(npages, &cli->cl_lru_busy);
                atomic_long_add(npages, &cli->cl_lru_in_list);
+               cli->cl_lru_last_used = cfs_time_current_sec();
                spin_unlock(&cli->cl_lru_list_lock);
 
-               /* XXX: May set force to be true for better performance */
-               if (osc_cache_too_much(cli))
+               if (waitqueue_active(&osc_lru_waitq))
                        (void)ptlrpcd_queue_work(cli->cl_lru_work);
        }
 }
@@ -587,8 +596,10 @@ static void osc_lru_del(struct client_obd *cli, struct osc_page *opg)
                /* this is a great place to release more LRU pages if
                 * this osc occupies too many LRU pages and kernel is
                 * stealing one of them. */
-               if (!memory_pressure_get())
+               if (osc_cache_too_much(cli)) {
+                       CDEBUG(D_CACHE, "%s: queue LRU workn", cli_name(cli));
                        (void)ptlrpcd_queue_work(cli->cl_lru_work);
+               }
                wake_up(&osc_lru_waitq);
        } else {
                LASSERT(list_empty(&opg->ops_lru));
@@ -619,6 +630,7 @@ static void discard_pagevec(const struct lu_env *env, struct cl_io *io,
                 struct cl_page *page = pvec[i];
 
                LASSERT(cl_page_is_owned(page, io));
+               cl_page_delete(env, page);
                cl_page_discard(env, io, page);
                cl_page_disown(env, io, page);
                 cl_page_put(env, page);
@@ -669,6 +681,8 @@ long osc_lru_shrink(const struct lu_env *env, struct client_obd *cli,
        if (atomic_long_read(&cli->cl_lru_in_list) == 0 || target <= 0)
                RETURN(0);
 
+       CDEBUG(D_CACHE, "%s: shrinkers: %d, force: %d\n",
+              cli_name(cli), atomic_read(&cli->cl_lru_shrinkers), force);
        if (!force) {
                if (atomic_read(&cli->cl_lru_shrinkers) > 0)
                        RETURN(-EBUSY);
@@ -685,11 +699,16 @@ long osc_lru_shrink(const struct lu_env *env, struct client_obd *cli,
        io = &osc_env_info(env)->oti_io;
 
        spin_lock(&cli->cl_lru_list_lock);
+       if (force)
+               cli->cl_lru_reclaim++;
        maxscan = min(target << 1, atomic_long_read(&cli->cl_lru_in_list));
        while (!list_empty(&cli->cl_lru_list)) {
                struct cl_page *page;
                bool will_free = false;
 
+               if (!force && atomic_read(&cli->cl_lru_shrinkers) > 1)
+                       break;
+
                if (--maxscan < 0)
                        break;
 
@@ -780,13 +799,19 @@ long osc_lru_shrink(const struct lu_env *env, struct client_obd *cli,
        RETURN(count > 0 ? count : rc);
 }
 
-long osc_lru_reclaim(struct client_obd *cli)
+/**
+ * Reclaim LRU pages by an IO thread. The caller wants to reclaim at least
+ * \@npages of LRU slots. For performance consideration, it's better to drop
+ * LRU pages in batch. Therefore, the actual number is adjusted at least
+ * max_pages_per_rpc.
+ */
+long osc_lru_reclaim(struct client_obd *cli, unsigned long npages)
 {
        struct cl_env_nest nest;
        struct lu_env *env;
        struct cl_client_cache *cache = cli->cl_cache;
-       long rc = 0;
        int max_scans;
+       long rc = 0;
        ENTRY;
 
        LASSERT(cache != NULL);
@@ -795,20 +820,23 @@ long osc_lru_reclaim(struct client_obd *cli)
        if (IS_ERR(env))
                RETURN(rc);
 
-       rc = osc_lru_shrink(env, cli, osc_cache_too_much(cli), false);
-       if (rc != 0) {
-               if (rc == -EBUSY)
-                       rc = 0;
-
-               CDEBUG(D_CACHE, "%s: Free %ld pages from own LRU: %p.\n",
-                       cli->cl_import->imp_obd->obd_name, rc, cli);
+       npages = max_t(int, npages, cli->cl_max_pages_per_rpc);
+       CDEBUG(D_CACHE, "%s: start to reclaim %ld pages from LRU\n",
+              cli_name(cli), npages);
+       rc = osc_lru_shrink(env, cli, npages, true);
+       if (rc >= npages) {
+               CDEBUG(D_CACHE, "%s: reclaimed %ld/%ld pages from LRU\n",
+                      cli_name(cli), rc, npages);
+               if (osc_cache_too_much(cli) > 0)
+                       ptlrpcd_queue_work(cli->cl_lru_work);
                GOTO(out, rc);
+       } else if (rc > 0) {
+               npages -= rc;
        }
 
-       CDEBUG(D_CACHE, "%s: cli %p no free slots, pages: %ld, busy: %ld.\n",
-               cli->cl_import->imp_obd->obd_name, cli,
-               atomic_long_read(&cli->cl_lru_in_list),
-               atomic_long_read(&cli->cl_lru_busy));
+       CDEBUG(D_CACHE, "%s: cli %p no free slots, pages: %ld/%ld, want: %ld\n",
+               cli_name(cli), cli, atomic_long_read(&cli->cl_lru_in_list),
+               atomic_long_read(&cli->cl_lru_busy), npages);
 
        /* Reclaim LRU slots from other client_obd as it can't free enough
         * from its own. This should rarely happen. */
@@ -824,7 +852,7 @@ long osc_lru_reclaim(struct client_obd *cli)
                                 cl_lru_osc);
 
                CDEBUG(D_CACHE, "%s: cli %p LRU pages: %ld, busy: %ld.\n",
-                       cli->cl_import->imp_obd->obd_name, cli,
+                       cli_name(cli), cli,
                        atomic_long_read(&cli->cl_lru_in_list),
                        atomic_long_read(&cli->cl_lru_busy));
 
@@ -832,11 +860,12 @@ long osc_lru_reclaim(struct client_obd *cli)
                if (osc_cache_too_much(cli) > 0) {
                        spin_unlock(&cache->ccc_lru_lock);
 
-                       rc = osc_lru_shrink(env, cli, osc_cache_too_much(cli),
-                                           true);
+                       rc = osc_lru_shrink(env, cli, npages, true);
                        spin_lock(&cache->ccc_lru_lock);
-                       if (rc != 0)
+                       if (rc >= npages)
                                break;
+                       if (rc > 0)
+                               npages -= rc;
                }
        }
        spin_unlock(&cache->ccc_lru_lock);
@@ -844,7 +873,7 @@ long osc_lru_reclaim(struct client_obd *cli)
 out:
        cl_env_nested_put(&nest, env);
        CDEBUG(D_CACHE, "%s: cli %p freed %ld pages.\n",
-               cli->cl_import->imp_obd->obd_name, cli, rc);
+               cli_name(cli), cli, rc);
        return rc;
 }
 
@@ -874,9 +903,8 @@ static int osc_lru_reserve(const struct lu_env *env, struct osc_object *obj,
 
        LASSERT(atomic_long_read(cli->cl_lru_left) >= 0);
        while (!atomic_long_add_unless(cli->cl_lru_left, -1, 0)) {
-
                /* run out of LRU spaces, try to drop some by itself */
-               rc = osc_lru_reclaim(cli);
+               rc = osc_lru_reclaim(cli, 1);
                if (rc < 0)
                        break;
                if (rc > 0)
@@ -977,7 +1005,7 @@ void osc_dec_unstable_pages(struct ptlrpc_request *req)
        if (unstable_count == 0)
                wake_up_all(&cli->cl_cache->ccc_unstable_waitq);
 
-       if (osc_cache_too_much(cli))
+       if (waitqueue_active(&osc_lru_waitq))
                (void)ptlrpcd_queue_work(cli->cl_lru_work);
 }
 
@@ -1031,8 +1059,7 @@ bool osc_over_unstable_soft_limit(struct client_obd *cli)
 
        CDEBUG(D_CACHE,
               "%s: cli: %p unstable pages: %lu, osc unstable pages: %lu\n",
-              cli->cl_import->imp_obd->obd_name, cli,
-              unstable_nr, osc_unstable_count);
+              cli_name(cli), cli, unstable_nr, osc_unstable_count);
 
        /* If the LRU slots are in shortage - 25% remaining AND this OSC
         * has one full RPC window of unstable pages, it's a good chance
index 1e14457..2446587 100644 (file)
@@ -110,7 +110,7 @@ int osc_quota_setdq(struct client_obd *cli, const unsigned int qid[],
                        }
 
                        CDEBUG(D_QUOTA, "%s: setdq to insert for %s %d (%d)\n",
-                              cli->cl_import->imp_obd->obd_name,
+                              cli_name(cli),
                               type == USRQUOTA ? "user" : "group",
                               qid[type], rc);
                } else {
@@ -125,7 +125,7 @@ int osc_quota_setdq(struct client_obd *cli, const unsigned int qid[],
                                OBD_SLAB_FREE_PTR(oqi, osc_quota_kmem);
 
                        CDEBUG(D_QUOTA, "%s: setdq to remove for %s %d (%p)\n",
-                              cli->cl_import->imp_obd->obd_name,
+                              cli_name(cli),
                               type == USRQUOTA ? "user" : "group",
                               qid[type], oqi);
                }
index c6ccb68..4760a1d 100644 (file)
@@ -583,13 +583,12 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
                oa->o_undirty = 0;
        } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
                            atomic_long_read(&obd_dirty_transit_pages) >
-                           (obd_max_dirty_pages + 1))) {
+                           (long)(obd_max_dirty_pages + 1))) {
                /* The atomic_read() allowing the atomic_inc() are
                 * not covered by a lock thus they may safely race and trip
                 * this CERROR() unless we add in a small fudge factor (+1). */
-               CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n",
-                      cli->cl_import->imp_obd->obd_name,
-                      atomic_long_read(&obd_dirty_pages),
+               CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
+                      cli_name(cli), atomic_long_read(&obd_dirty_pages),
                       atomic_long_read(&obd_dirty_transit_pages),
                       obd_max_dirty_pages);
                oa->o_undirty = 0;
@@ -776,21 +775,19 @@ static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
 
 static int osc_add_shrink_grant(struct client_obd *client)
 {
-        int rc;
+       int rc;
 
-        rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
-                                       TIMEOUT_GRANT,
-                                       osc_grant_shrink_grant_cb, NULL,
-                                       &client->cl_grant_shrink_list);
-        if (rc) {
-                CERROR("add grant client %s error %d\n",
-                        client->cl_import->imp_obd->obd_name, rc);
-                return rc;
-        }
-        CDEBUG(D_CACHE, "add grant client %s \n",
-               client->cl_import->imp_obd->obd_name);
-        osc_update_next_shrink(client);
-        return 0;
+       rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
+                                      TIMEOUT_GRANT,
+                                      osc_grant_shrink_grant_cb, NULL,
+                                      &client->cl_grant_shrink_list);
+       if (rc) {
+               CERROR("add grant client %s error %d\n", cli_name(client), rc);
+               return rc;
+       }
+       CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
+       osc_update_next_shrink(client);
+       return 0;
 }
 
 static int osc_del_shrink_grant(struct client_obd *client)
@@ -819,7 +816,7 @@ static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
 
         if (cli->cl_avail_grant < 0) {
                CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
-                     cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
+                     cli_name(cli), cli->cl_avail_grant,
                      ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
                /* workaround for servers which do not have the patch from
                 * LU-2679 */
@@ -831,8 +828,8 @@ static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
        spin_unlock(&cli->cl_loi_list_lock);
 
        CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
-               "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
-               cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
+              "chunk bits: %d.\n", cli_name(cli), cli->cl_avail_grant,
+              cli->cl_lost_grant, cli->cl_chunkbits);
 
        if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
            list_empty(&cli->cl_grant_shrink_list))