+/**
+ * osc_lru_reserve() is called to reserve enough LRU slots for I/O.
+ *
+ * The benefit of doing this is to reduce contention against atomic counter
+ * cl_lru_left by changing it from per-page access to per-IO access.
+ */
+unsigned long osc_lru_reserve(struct client_obd *cli, unsigned long npages)
+{
+ unsigned long reserved = 0;
+ unsigned long max_pages;
+ unsigned long c;
+
+ /* reserve a full RPC window at most to avoid that a thread accidentally
+ * consumes too many LRU slots */
+ max_pages = cli->cl_max_pages_per_rpc * cli->cl_max_rpcs_in_flight;
+ if (npages > max_pages)
+ npages = max_pages;
+
+ c = atomic_long_read(cli->cl_lru_left);
+ if (c < npages && osc_lru_reclaim(cli, npages) > 0)
+ c = atomic_long_read(cli->cl_lru_left);
+ while (c >= npages) {
+ if (c == atomic_long_cmpxchg(cli->cl_lru_left, c, c - npages)) {
+ reserved = npages;
+ break;
+ }
+ c = atomic_long_read(cli->cl_lru_left);
+ }
+ if (atomic_long_read(cli->cl_lru_left) < max_pages) {
+ /* If there aren't enough pages in the per-OSC LRU then
+ * wake up the LRU thread to try and clear out space, so
+ * we don't block if pages are being dirtied quickly. */
+ CDEBUG(D_CACHE, "%s: queue LRU, left: %lu/%ld.\n",
+ cli_name(cli), atomic_long_read(cli->cl_lru_left),
+ max_pages);
+ (void)ptlrpcd_queue_work(cli->cl_lru_work);
+ }
+
+ return reserved;
+}
+
+/**
+ * osc_lru_unreserve() is called to unreserve LRU slots.
+ *
+ * LRU slots reserved by osc_lru_reserve() may have entries left due to several
+ * reasons such as page already existing or I/O error. Those reserved slots
+ * should be freed by calling this function.
+ */
+void osc_lru_unreserve(struct client_obd *cli, unsigned long npages)
+{
+ atomic_long_add(npages, cli->cl_lru_left);
+ wake_up_all(&osc_lru_waitq);
+}
+
+/**
+ * Atomic operations are expensive. We accumulate the accounting for the
+ * same page zone to get better performance.
+ * In practice this can work pretty good because the pages in the same RPC
+ * are likely from the same page zone.
+ */
+static inline void unstable_page_accounting(struct ptlrpc_bulk_desc *desc,
+ struct osc_brw_async_args *aa,
+ int factor)
+{
+ int page_count;
+ void *zone = NULL;
+ int count = 0;
+ int i;
+
+ if (desc != NULL) {
+ LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
+ page_count = desc->bd_iov_count;
+ } else {
+ page_count = aa->aa_page_count;
+ }
+
+ for (i = 0; i < page_count; i++) {
+ void *pz;
+ if (desc)
+ pz = page_zone(BD_GET_KIOV(desc, i).kiov_page);
+ else
+ pz = page_zone(aa->aa_ppga[i]->pg);
+
+ if (likely(pz == zone)) {
+ ++count;
+ continue;
+ }
+
+ if (count > 0) {
+ mod_zone_page_state(zone, NR_UNSTABLE_NFS,
+ factor * count);
+ count = 0;
+ }
+ zone = pz;
+ ++count;
+ }
+ if (count > 0)
+ mod_zone_page_state(zone, NR_UNSTABLE_NFS, factor * count);
+}
+
+static inline void add_unstable_page_accounting(struct ptlrpc_bulk_desc *desc,
+ struct osc_brw_async_args *aa)
+{
+ unstable_page_accounting(desc, aa, 1);
+}
+
+static inline void dec_unstable_page_accounting(struct ptlrpc_bulk_desc *desc,
+ struct osc_brw_async_args *aa)
+{
+ unstable_page_accounting(desc, aa, -1);
+}
+
+/**
+ * Performs "unstable" page accounting. This function balances the
+ * increment operations performed in osc_inc_unstable_pages. It is
+ * registered as the RPC request callback, and is executed when the
+ * bulk RPC is committed on the server. Thus at this point, the pages
+ * involved in the bulk transfer are no longer considered unstable.
+ *
+ * If this function is called, the request should have been committed
+ * or req:rq_unstable must have been set; it implies that the unstable
+ * statistic have been added.
+ */
+void osc_dec_unstable_pages(struct ptlrpc_request *req)
+{
+ struct ptlrpc_bulk_desc *desc = req->rq_bulk;
+ struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
+ struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+ int page_count;
+ long unstable_count;
+
+ if (desc)
+ page_count = desc->bd_iov_count;
+ else
+ page_count = aa->aa_page_count;
+
+ LASSERT(page_count >= 0);
+
+ dec_unstable_page_accounting(desc, aa);
+
+ unstable_count = atomic_long_sub_return(page_count,
+ &cli->cl_unstable_count);
+ LASSERT(unstable_count >= 0);
+
+ unstable_count = atomic_long_sub_return(page_count,
+ &cli->cl_cache->ccc_unstable_nr);
+ LASSERT(unstable_count >= 0);
+ if (unstable_count == 0)
+ wake_up_all(&cli->cl_cache->ccc_unstable_waitq);
+
+ if (waitqueue_active(&osc_lru_waitq))
+ (void)ptlrpcd_queue_work(cli->cl_lru_work);
+}
+
+/**
+ * "unstable" page accounting. See: osc_dec_unstable_pages.
+ */
+void osc_inc_unstable_pages(struct ptlrpc_request *req)
+{
+ struct ptlrpc_bulk_desc *desc = req->rq_bulk;
+ struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
+ struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+ long page_count;
+
+ /* No unstable page tracking */
+ if (cli->cl_cache == NULL || !cli->cl_cache->ccc_unstable_check)
+ return;
+
+ if (desc)
+ page_count = desc->bd_iov_count;
+ else
+ page_count = aa->aa_page_count;
+
+ add_unstable_page_accounting(desc, aa);
+ atomic_long_add(page_count, &cli->cl_unstable_count);
+ atomic_long_add(page_count, &cli->cl_cache->ccc_unstable_nr);
+
+ /* If the request has already been committed (i.e. brw_commit
+ * called via rq_commit_cb), we need to undo the unstable page
+ * increments we just performed because rq_commit_cb wont be
+ * called again. */
+ spin_lock(&req->rq_lock);
+ if (unlikely(req->rq_committed)) {
+ spin_unlock(&req->rq_lock);
+
+ osc_dec_unstable_pages(req);
+ } else {
+ req->rq_unstable = 1;
+ spin_unlock(&req->rq_lock);
+ }
+}
+
+/**
+ * Check if it piggybacks SOFT_SYNC flag to OST from this OSC.
+ * This function will be called by every BRW RPC so it's critical
+ * to make this function fast.
+ */
+bool osc_over_unstable_soft_limit(struct client_obd *cli)
+{
+ long unstable_nr, osc_unstable_count;
+
+ /* Can't check cli->cl_unstable_count, therefore, no soft limit */
+ if (cli->cl_cache == NULL || !cli->cl_cache->ccc_unstable_check)
+ return false;
+
+ osc_unstable_count = atomic_long_read(&cli->cl_unstable_count);
+ unstable_nr = atomic_long_read(&cli->cl_cache->ccc_unstable_nr);
+
+ CDEBUG(D_CACHE,
+ "%s: cli: %p unstable pages: %lu, osc unstable pages: %lu\n",
+ cli_name(cli), cli, unstable_nr, osc_unstable_count);
+
+ /* If the LRU slots are in shortage - 25% remaining AND this OSC
+ * has one full RPC window of unstable pages, it's a good chance
+ * to piggyback a SOFT_SYNC flag.
+ * Please notice that the OST won't take immediate response for the
+ * SOFT_SYNC request so active OSCs will have more chance to carry
+ * the flag, this is reasonable. */
+ return unstable_nr > cli->cl_cache->ccc_lru_max >> 2 &&
+ osc_unstable_count > cli->cl_max_pages_per_rpc *
+ cli->cl_max_rpcs_in_flight;
+}
+
+/**
+ * Return how many LRU pages in the cache of all OSC devices
+ *
+ * \retval return # of cached LRU pages times reclaimation tendency
+ * \retval SHRINK_STOP if it cannot do any scanning in this time
+ */
+unsigned long osc_cache_shrink_count(struct shrinker *sk,
+ struct shrink_control *sc)
+{
+ struct client_obd *cli;
+ unsigned long cached = 0;
+
+ spin_lock(&osc_shrink_lock);
+ list_for_each_entry(cli, &osc_shrink_list, cl_shrink_list)
+ cached += atomic_long_read(&cli->cl_lru_in_list);
+ spin_unlock(&osc_shrink_lock);
+
+ return (cached * sysctl_vfs_cache_pressure) / 100;
+}
+
+/**
+ * Scan and try to reclaim sc->nr_to_scan cached LRU pages
+ *
+ * \retval number of cached LRU pages reclaimed
+ * \retval SHRINK_STOP if it cannot do any scanning in this time
+ *
+ * Linux kernel will loop calling this shrinker scan routine with
+ * sc->nr_to_scan = SHRINK_BATCH(128 for now) until kernel got enough memory.
+ *
+ * If sc->nr_to_scan is 0, the VM is querying the cache size, we don't need
+ * to scan and try to reclaim LRU pages, just return 0 and
+ * osc_cache_shrink_count() will report the LRU page number.
+ */
+unsigned long osc_cache_shrink_scan(struct shrinker *sk,
+ struct shrink_control *sc)
+{
+ struct client_obd *cli;
+ struct client_obd *stop_anchor = NULL;
+ struct lu_env *env;
+ long shrank = 0;
+ int rc;
+ __u16 refcheck;
+
+ if (sc->nr_to_scan == 0)
+ return 0;
+
+ if (!(sc->gfp_mask & __GFP_FS))
+ return SHRINK_STOP;
+
+ env = cl_env_get(&refcheck);
+ if (IS_ERR(env))
+ return SHRINK_STOP;
+
+ spin_lock(&osc_shrink_lock);
+ while (!list_empty(&osc_shrink_list)) {
+ cli = list_entry(osc_shrink_list.next, struct client_obd,
+ cl_shrink_list);
+
+ if (stop_anchor == NULL)
+ stop_anchor = cli;
+ else if (cli == stop_anchor)
+ break;
+
+ list_move_tail(&cli->cl_shrink_list, &osc_shrink_list);
+ spin_unlock(&osc_shrink_lock);
+
+ /* shrink no more than max_pages_per_rpc for an OSC */
+ rc = osc_lru_shrink(env, cli, (sc->nr_to_scan - shrank) >
+ cli->cl_max_pages_per_rpc ?
+ cli->cl_max_pages_per_rpc :
+ sc->nr_to_scan - shrank, true);
+ if (rc > 0)
+ shrank += rc;
+
+ if (shrank >= sc->nr_to_scan)
+ goto out;
+
+ spin_lock(&osc_shrink_lock);
+ }
+ spin_unlock(&osc_shrink_lock);
+
+out:
+ cl_env_put(env, &refcheck);
+
+ return shrank;
+}
+