+ struct cl_env_nest nest;
+ struct lu_env *env;
+ struct cl_client_cache *cache = cli->cl_cache;
+ int max_scans;
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(cache != NULL);
+ LASSERT(!list_empty(&cache->ccc_lru));
+
+ env = cl_env_nested_get(&nest);
+ if (IS_ERR(env))
+ RETURN(rc);
+
+ rc = osc_lru_shrink(env, cli, osc_cache_too_much(cli), false);
+ if (rc != 0) {
+ if (rc == -EBUSY)
+ rc = 0;
+
+ CDEBUG(D_CACHE, "%s: Free %d pages from own LRU: %p.\n",
+ cli->cl_import->imp_obd->obd_name, rc, cli);
+ GOTO(out, rc);
+ }
+
+ CDEBUG(D_CACHE, "%s: cli %p no free slots, pages: %d, busy: %d.\n",
+ cli->cl_import->imp_obd->obd_name, cli,
+ atomic_read(&cli->cl_lru_in_list),
+ atomic_read(&cli->cl_lru_busy));
+
+ /* Reclaim LRU slots from other client_obd as it can't free enough
+ * from its own. This should rarely happen. */
+ spin_lock(&cache->ccc_lru_lock);
+ cache->ccc_lru_shrinkers++;
+ list_move_tail(&cli->cl_lru_osc, &cache->ccc_lru);
+
+ max_scans = atomic_read(&cache->ccc_users);
+ while (--max_scans > 0 && !list_empty(&cache->ccc_lru)) {
+ cli = list_entry(cache->ccc_lru.next, struct client_obd,
+ cl_lru_osc);
+
+ CDEBUG(D_CACHE, "%s: cli %p LRU pages: %d, busy: %d.\n",
+ cli->cl_import->imp_obd->obd_name, cli,
+ atomic_read(&cli->cl_lru_in_list),
+ atomic_read(&cli->cl_lru_busy));
+
+ list_move_tail(&cli->cl_lru_osc, &cache->ccc_lru);
+ if (osc_cache_too_much(cli) > 0) {
+ spin_unlock(&cache->ccc_lru_lock);
+
+ rc = osc_lru_shrink(env, cli, osc_cache_too_much(cli),
+ true);
+ spin_lock(&cache->ccc_lru_lock);
+ if (rc != 0)
+ break;
+ }
+ }
+ spin_unlock(&cache->ccc_lru_lock);
+
+out:
+ cl_env_nested_put(&nest, env);
+ CDEBUG(D_CACHE, "%s: cli %p freed %d pages.\n",
+ cli->cl_import->imp_obd->obd_name, cli, rc);
+ return rc;
+}
+
+/**
+ * osc_lru_reserve() is called to reserve an LRU slot for a cl_page.
+ *
+ * Usually the LRU slots are reserved in osc_io_iter_rw_init().
+ * Only in the case that the LRU slots are in extreme shortage, it should
+ * have reserved enough slots for an IO.
+ */
+static int osc_lru_reserve(const struct lu_env *env, struct osc_object *obj,
+ struct osc_page *opg)
+{
+ struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
+ struct osc_io *oio = osc_env_io(env);
+ struct client_obd *cli = osc_cli(obj);
+ int rc = 0;
+ ENTRY;
+
+ if (cli->cl_cache == NULL) /* shall not be in LRU */
+ RETURN(0);
+
+ if (oio->oi_lru_reserved > 0) {
+ --oio->oi_lru_reserved;
+ goto out;
+ }
+
+ LASSERT(atomic_read(cli->cl_lru_left) >= 0);
+ while (!atomic_add_unless(cli->cl_lru_left, -1, 0)) {
+
+ /* run out of LRU spaces, try to drop some by itself */
+ rc = osc_lru_reclaim(cli);
+ if (rc < 0)
+ break;
+ if (rc > 0)
+ continue;
+
+ cond_resched();
+ rc = l_wait_event(osc_lru_waitq,
+ atomic_read(cli->cl_lru_left) > 0,
+ &lwi);
+ if (rc < 0)
+ break;
+ }
+
+out:
+ if (rc >= 0) {
+ atomic_inc(&cli->cl_lru_busy);
+ opg->ops_in_lru = 1;
+ rc = 0;
+ }
+
+ RETURN(rc);
+}
+
+/**
+ * Atomic operations are expensive. We accumulate the accounting for the
+ * same page zone to get better performance.
+ * In practice this can work pretty good because the pages in the same RPC
+ * are likely from the same page zone.
+ */
+static inline void unstable_page_accounting(struct ptlrpc_bulk_desc *desc,
+ int factor)
+{
+ obd_count page_count = desc->bd_iov_count;
+ void *zone = NULL;
+ int count = 0;
+ int i;
+
+ for (i = 0; i < page_count; i++) {
+ void *pz = page_zone(desc->bd_iov[i].kiov_page);
+
+ if (likely(pz == zone)) {
+ ++count;
+ continue;
+ }
+
+ if (count > 0) {
+ mod_zone_page_state(zone, NR_UNSTABLE_NFS,
+ factor * count);
+ count = 0;
+ }
+ zone = pz;
+ ++count;
+ }
+ if (count > 0)
+ mod_zone_page_state(zone, NR_UNSTABLE_NFS, factor * count);
+}