+ struct cl_io *io;
+ struct cl_object *clobj = NULL;
+ struct cl_page **pvec;
+ struct osc_page *opg;
+ long count = 0;
+ int maxscan = 0;
+ int index = 0;
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(atomic_long_read(&cli->cl_lru_in_list) >= 0);
+ if (atomic_long_read(&cli->cl_lru_in_list) == 0 || target <= 0)
+ RETURN(0);
+
+ CDEBUG(D_CACHE, "%s: shrinkers: %d, force: %d\n",
+ cli_name(cli), atomic_read(&cli->cl_lru_shrinkers), force);
+ if (!force) {
+ if (atomic_read(&cli->cl_lru_shrinkers) > 0)
+ RETURN(-EBUSY);
+
+ if (atomic_inc_return(&cli->cl_lru_shrinkers) > 1) {
+ atomic_dec(&cli->cl_lru_shrinkers);
+ RETURN(-EBUSY);
+ }
+ } else {
+ atomic_inc(&cli->cl_lru_shrinkers);
+ }
+
+ pvec = (struct cl_page **)osc_env_info(env)->oti_pvec;
+ io = &osc_env_info(env)->oti_io;
+
+ spin_lock(&cli->cl_lru_list_lock);
+ if (force)
+ cli->cl_lru_reclaim++;
+ maxscan = min(target << 1, atomic_long_read(&cli->cl_lru_in_list));
+ while (!list_empty(&cli->cl_lru_list)) {
+ struct cl_page *page;
+ bool will_free = false;
+
+ if (!force && atomic_read(&cli->cl_lru_shrinkers) > 1)
+ break;
+
+ if (--maxscan < 0)
+ break;
+
+ opg = list_entry(cli->cl_lru_list.next, struct osc_page,
+ ops_lru);
+ page = opg->ops_cl.cpl_page;
+ if (lru_page_busy(cli, page)) {
+ list_move_tail(&opg->ops_lru, &cli->cl_lru_list);
+ continue;
+ }
+
+ LASSERT(page->cp_obj != NULL);
+ if (clobj != page->cp_obj) {
+ struct cl_object *tmp = page->cp_obj;
+
+ cl_object_get(tmp);
+ spin_unlock(&cli->cl_lru_list_lock);
+
+ if (clobj != NULL) {
+ discard_pagevec(env, io, pvec, index);
+ index = 0;
+
+ cl_io_fini(env, io);
+ cl_object_put(env, clobj);
+ clobj = NULL;
+ }
+
+ clobj = tmp;
+ io->ci_obj = clobj;
+ io->ci_ignore_layout = 1;
+ rc = cl_io_init(env, io, CIT_MISC, clobj);
+
+ spin_lock(&cli->cl_lru_list_lock);
+
+ if (rc != 0)
+ break;
+
+ ++maxscan;
+ continue;
+ }
+
+ if (cl_page_own_try(env, io, page) == 0) {
+ if (!lru_page_busy(cli, page)) {
+ /* remove it from lru list earlier to avoid
+ * lock contention */
+ __osc_lru_del(cli, opg);
+ opg->ops_in_lru = 0; /* will be discarded */
+
+ cl_page_get(page);
+ will_free = true;
+ } else {
+ cl_page_disown(env, io, page);
+ }
+ }
+
+ if (!will_free) {
+ list_move_tail(&opg->ops_lru, &cli->cl_lru_list);
+ continue;
+ }
+
+ /* Don't discard and free the page with cl_lru_list held */
+ pvec[index++] = page;
+ if (unlikely(index == OTI_PVEC_SIZE)) {
+ spin_unlock(&cli->cl_lru_list_lock);
+ discard_pagevec(env, io, pvec, index);
+ index = 0;
+
+ spin_lock(&cli->cl_lru_list_lock);
+ }
+
+ if (++count >= target)
+ break;
+ }
+ spin_unlock(&cli->cl_lru_list_lock);
+
+ if (clobj != NULL) {
+ discard_pagevec(env, io, pvec, index);
+
+ cl_io_fini(env, io);
+ cl_object_put(env, clobj);
+ }
+
+ atomic_dec(&cli->cl_lru_shrinkers);
+ if (count > 0) {
+ atomic_long_add(count, cli->cl_lru_left);
+ wake_up_all(&osc_lru_waitq);
+ }
+ RETURN(count > 0 ? count : rc);
+}
+
+/**
+ * Reclaim LRU pages by an IO thread. The caller wants to reclaim at least
+ * \@npages of LRU slots. For performance consideration, it's better to drop
+ * LRU pages in batch. Therefore, the actual number is adjusted at least
+ * max_pages_per_rpc.
+ */
+static long osc_lru_reclaim(struct client_obd *cli, unsigned long npages)
+{
+ struct lu_env *env;
+ struct cl_client_cache *cache = cli->cl_cache;
+ int max_scans;
+ __u16 refcheck;
+ long rc = 0;
+ ENTRY;
+
+ LASSERT(cache != NULL);
+
+ env = cl_env_get(&refcheck);
+ if (IS_ERR(env))
+ RETURN(rc);
+
+ npages = max_t(int, npages, cli->cl_max_pages_per_rpc);
+ CDEBUG(D_CACHE, "%s: start to reclaim %ld pages from LRU\n",
+ cli_name(cli), npages);
+ rc = osc_lru_shrink(env, cli, npages, true);
+ if (rc >= npages) {
+ CDEBUG(D_CACHE, "%s: reclaimed %ld/%ld pages from LRU\n",
+ cli_name(cli), rc, npages);
+ if (osc_cache_too_much(cli) > 0)
+ ptlrpcd_queue_work(cli->cl_lru_work);
+ GOTO(out, rc);
+ } else if (rc > 0) {
+ npages -= rc;
+ }
+
+ CDEBUG(D_CACHE, "%s: cli %p no free slots, pages: %ld/%ld, want: %ld\n",
+ cli_name(cli), cli, atomic_long_read(&cli->cl_lru_in_list),
+ atomic_long_read(&cli->cl_lru_busy), npages);
+
+ /* Reclaim LRU slots from other client_obd as it can't free enough
+ * from its own. This should rarely happen. */
+ spin_lock(&cache->ccc_lru_lock);
+ LASSERT(!list_empty(&cache->ccc_lru));
+
+ cache->ccc_lru_shrinkers++;
+ list_move_tail(&cli->cl_lru_osc, &cache->ccc_lru);
+
+ max_scans = atomic_read(&cache->ccc_users) - 2;
+ while (--max_scans > 0 && !list_empty(&cache->ccc_lru)) {
+ cli = list_entry(cache->ccc_lru.next, struct client_obd,
+ cl_lru_osc);
+
+ CDEBUG(D_CACHE, "%s: cli %p LRU pages: %ld, busy: %ld.\n",
+ cli_name(cli), cli,
+ atomic_long_read(&cli->cl_lru_in_list),
+ atomic_long_read(&cli->cl_lru_busy));
+
+ list_move_tail(&cli->cl_lru_osc, &cache->ccc_lru);
+ if (osc_cache_too_much(cli) > 0) {
+ spin_unlock(&cache->ccc_lru_lock);
+
+ rc = osc_lru_shrink(env, cli, npages, true);
+ spin_lock(&cache->ccc_lru_lock);
+ if (rc >= npages)
+ break;
+ if (rc > 0)
+ npages -= rc;
+ }
+ }
+ spin_unlock(&cache->ccc_lru_lock);
+
+out:
+ cl_env_put(env, &refcheck);
+ CDEBUG(D_CACHE, "%s: cli %p freed %ld pages.\n",
+ cli_name(cli), cli, rc);
+ return rc;
+}
+
+/**
+ * osc_lru_alloc() is called to allocate an LRU slot for a cl_page.
+ *
+ * Usually the LRU slots are reserved in osc_io_iter_rw_init().
+ * Only in the case that the LRU slots are in extreme shortage, it should
+ * have reserved enough slots for an IO.
+ */
+static int osc_lru_alloc(const struct lu_env *env, struct client_obd *cli,
+ struct osc_page *opg)
+{
+ struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
+ struct osc_io *oio = osc_env_io(env);
+ int rc = 0;
+ ENTRY;
+
+ if (cli->cl_cache == NULL) /* shall not be in LRU */
+ RETURN(0);
+
+ if (oio->oi_lru_reserved > 0) {
+ --oio->oi_lru_reserved;
+ goto out;
+ }
+
+ LASSERT(atomic_long_read(cli->cl_lru_left) >= 0);
+ while (!atomic_long_add_unless(cli->cl_lru_left, -1, 0)) {
+ /* run out of LRU spaces, try to drop some by itself */
+ rc = osc_lru_reclaim(cli, 1);
+ if (rc < 0)
+ break;
+ if (rc > 0)
+ continue;
+
+ cond_resched();
+ rc = l_wait_event(osc_lru_waitq,
+ atomic_long_read(cli->cl_lru_left) > 0,
+ &lwi);
+ if (rc < 0)
+ break;
+ }
+
+out:
+ if (rc >= 0) {
+ atomic_long_inc(&cli->cl_lru_busy);
+ opg->ops_in_lru = 1;
+ rc = 0;
+ }
+
+ RETURN(rc);
+}
+
+/**
+ * osc_lru_reserve() is called to reserve enough LRU slots for I/O.
+ *
+ * The benefit of doing this is to reduce contention against atomic counter
+ * cl_lru_left by changing it from per-page access to per-IO access.
+ */
+unsigned long osc_lru_reserve(struct client_obd *cli, unsigned long npages)
+{
+ unsigned long reserved = 0;
+ unsigned long max_pages;
+ unsigned long c;
+
+ /* reserve a full RPC window at most to avoid that a thread accidentally
+ * consumes too many LRU slots */
+ max_pages = cli->cl_max_pages_per_rpc * cli->cl_max_rpcs_in_flight;
+ if (npages > max_pages)
+ npages = max_pages;
+
+ c = atomic_long_read(cli->cl_lru_left);
+ if (c < npages && osc_lru_reclaim(cli, npages) > 0)
+ c = atomic_long_read(cli->cl_lru_left);
+ while (c >= npages) {
+ if (c == atomic_long_cmpxchg(cli->cl_lru_left, c, c - npages)) {
+ reserved = npages;
+ break;
+ }
+ c = atomic_long_read(cli->cl_lru_left);
+ }
+ if (atomic_long_read(cli->cl_lru_left) < max_pages) {
+ /* If there aren't enough pages in the per-OSC LRU then
+ * wake up the LRU thread to try and clear out space, so
+ * we don't block if pages are being dirtied quickly. */
+ CDEBUG(D_CACHE, "%s: queue LRU, left: %lu/%ld.\n",
+ cli_name(cli), atomic_long_read(cli->cl_lru_left),
+ max_pages);
+ (void)ptlrpcd_queue_work(cli->cl_lru_work);
+ }
+
+ return reserved;
+}
+
+/**
+ * osc_lru_unreserve() is called to unreserve LRU slots.
+ *
+ * LRU slots reserved by osc_lru_reserve() may have entries left due to several
+ * reasons such as page already existing or I/O error. Those reserved slots
+ * should be freed by calling this function.
+ */
+void osc_lru_unreserve(struct client_obd *cli, unsigned long npages)
+{
+ atomic_long_add(npages, cli->cl_lru_left);
+ wake_up_all(&osc_lru_waitq);
+}
+
+/**
+ * Atomic operations are expensive. We accumulate the accounting for the
+ * same page zone to get better performance.
+ * In practice this can work pretty good because the pages in the same RPC
+ * are likely from the same page zone.
+ */
+static inline void unstable_page_accounting(struct ptlrpc_bulk_desc *desc,
+ struct osc_brw_async_args *aa,
+ int factor)
+{
+ int page_count;
+ void *zone = NULL;
+ int count = 0;
+ int i;
+
+ if (desc != NULL) {
+ LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
+ page_count = desc->bd_iov_count;
+ } else {
+ page_count = aa->aa_page_count;
+ }
+
+ for (i = 0; i < page_count; i++) {
+ void *pz;
+ if (desc)
+ pz = page_zone(BD_GET_KIOV(desc, i).kiov_page);
+ else
+ pz = page_zone(aa->aa_ppga[i]->pg);
+
+ if (likely(pz == zone)) {
+ ++count;
+ continue;
+ }
+
+ if (count > 0) {
+ mod_zone_page_state(zone, NR_UNSTABLE_NFS,
+ factor * count);
+ count = 0;
+ }
+ zone = pz;
+ ++count;
+ }
+ if (count > 0)
+ mod_zone_page_state(zone, NR_UNSTABLE_NFS, factor * count);
+}
+
+static inline void add_unstable_page_accounting(struct ptlrpc_bulk_desc *desc,
+ struct osc_brw_async_args *aa)
+{
+ unstable_page_accounting(desc, aa, 1);
+}
+
+static inline void dec_unstable_page_accounting(struct ptlrpc_bulk_desc *desc,
+ struct osc_brw_async_args *aa)
+{
+ unstable_page_accounting(desc, aa, -1);
+}
+
+/**
+ * Performs "unstable" page accounting. This function balances the
+ * increment operations performed in osc_inc_unstable_pages. It is
+ * registered as the RPC request callback, and is executed when the
+ * bulk RPC is committed on the server. Thus at this point, the pages
+ * involved in the bulk transfer are no longer considered unstable.
+ *
+ * If this function is called, the request should have been committed
+ * or req:rq_unstable must have been set; it implies that the unstable
+ * statistic have been added.
+ */
+void osc_dec_unstable_pages(struct ptlrpc_request *req)
+{
+ struct ptlrpc_bulk_desc *desc = req->rq_bulk;
+ struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
+ struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+ int page_count;
+ long unstable_count;