Whamcloud - gitweb
LU-1030 osc: move io data from lov_oinfo into osc_object
authorJinshan Xiong <jinshan.xiong@whamcloud.com>
Mon, 20 Feb 2012 19:01:00 +0000 (11:01 -0800)
committerOleg Drokin <green@whamcloud.com>
Fri, 1 Jun 2012 21:07:19 +0000 (17:07 -0400)
Cleanup the code and move io related data structure into osc_object
so that it will be easier to manage pages with red-black tree.

Signed-off-by: Jinshan Xiong <jinshan.xiong@whamcloud.com>
Change-Id: I7cd8d6278bb70b221351d9db4193c679f93aeb21
Reviewed-on: http://review.whamcloud.com/2009
Tested-by: Hudson
Reviewed-by: Johann Lombardi <johann@whamcloud.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/obd.h
lustre/osc/Makefile.in
lustre/osc/autoMakefile.am
lustre/osc/osc_cache.c [new file with mode: 0644]
lustre/osc/osc_cl_internal.h
lustre/osc/osc_internal.h
lustre/osc/osc_io.c
lustre/osc/osc_object.c
lustre/osc/osc_page.c
lustre/osc/osc_request.c

index da354e2..66b14f0 100644 (file)
 
 #define MAX_OBD_DEVICES 8192
 
-/* this is really local to the OSC */
-struct loi_oap_pages {
-        cfs_list_t              lop_pending;
-        cfs_list_t              lop_urgent;
-        cfs_list_t              lop_pending_group;
-        int                     lop_num_pending;
-};
-
 struct osc_async_rc {
         int     ar_rc;
         int     ar_force_sync;
@@ -91,14 +83,6 @@ struct lov_oinfo {                 /* per-stripe data structure */
         int loi_ost_idx;           /* OST stripe index in lov_tgt_desc->tgts */
         int loi_ost_gen;           /* generation of this loi_ost_idx */
 
-        /* used by the osc to keep track of what objects to build into rpcs */
-        struct loi_oap_pages loi_read_lop;
-        struct loi_oap_pages loi_write_lop;
-        cfs_list_t loi_ready_item;
-        cfs_list_t loi_hp_ready_item;
-        cfs_list_t loi_write_item;
-        cfs_list_t loi_read_item;
-
         unsigned long loi_kms_valid:1;
         __u64 loi_kms;             /* known minimum size */
         struct ost_lvb loi_lvb;
@@ -115,16 +99,6 @@ static inline void loi_kms_set(struct lov_oinfo *oinfo, __u64 kms)
 
 static inline void loi_init(struct lov_oinfo *loi)
 {
-        CFS_INIT_LIST_HEAD(&loi->loi_read_lop.lop_pending);
-        CFS_INIT_LIST_HEAD(&loi->loi_read_lop.lop_urgent);
-        CFS_INIT_LIST_HEAD(&loi->loi_read_lop.lop_pending_group);
-        CFS_INIT_LIST_HEAD(&loi->loi_write_lop.lop_pending);
-        CFS_INIT_LIST_HEAD(&loi->loi_write_lop.lop_urgent);
-        CFS_INIT_LIST_HEAD(&loi->loi_write_lop.lop_pending_group);
-        CFS_INIT_LIST_HEAD(&loi->loi_ready_item);
-        CFS_INIT_LIST_HEAD(&loi->loi_hp_ready_item);
-        CFS_INIT_LIST_HEAD(&loi->loi_write_item);
-        CFS_INIT_LIST_HEAD(&loi->loi_read_item);
 }
 
 struct lov_stripe_md {
@@ -501,6 +475,9 @@ struct client_obd {
          * Exact type of ->cl_loi_list_lock is defined in arch/obd.h together
          * with client_obd_list_{un,}lock() and
          * client_obd_list_lock_{init,done}() functions.
+        *
+        * NB by Jinshan: though field names are still _loi_, but actually
+        * osc_object{}s are in the list.
          */
         client_obd_lock_t        cl_loi_list_lock;
         cfs_list_t               cl_loi_ready_list;
index 34d6c6e..13a8517 100644 (file)
@@ -1,5 +1,5 @@
 MODULES := osc
-osc-objs := osc_request.o lproc_osc.o osc_create.o osc_dev.o osc_object.o osc_page.o osc_lock.o osc_io.o osc_quota.o
+osc-objs := osc_request.o lproc_osc.o osc_create.o osc_dev.o osc_object.o osc_page.o osc_lock.o osc_io.o osc_quota.o osc_cache.o
 
 EXTRA_DIST = $(osc-objs:%.o=%.c) osc_internal.h osc_cl_internal.h
 
index 1a94992..e93e90d 100644 (file)
@@ -38,7 +38,7 @@
 
 if LIBLUSTRE
 noinst_LIBRARIES = libosc.a
-libosc_a_SOURCES = osc_request.c osc_create.c osc_internal.h osc_cl_internal.h osc_dev.c osc_object.c osc_page.c osc_lock.c osc_io.c osc_quota.c
+libosc_a_SOURCES = osc_request.c osc_create.c osc_internal.h osc_cl_internal.h osc_dev.c osc_object.c osc_page.c osc_lock.c osc_io.c osc_quota.c osc_cache.c
 
 libosc_a_CPPFLAGS = $(LLCPPFLAGS)
 libosc_a_CFLAGS = $(LLCFLAGS)
@@ -61,7 +61,8 @@ osc_SOURCES := \
         osc_lock.c   \
         osc_io.c     \
         osc_request.c \
-        osc_quota.c
+       osc_quota.c  \
+       osc_cache.c
 
 osc_CFLAGS := $(EXTRA_KCFLAGS)
 osc_LDFLAGS := $(EXTRA_KLDFLAGS)
diff --git a/lustre/osc/osc_cache.c b/lustre/osc/osc_cache.c
new file mode 100644 (file)
index 0000000..9f12ad6
--- /dev/null
@@ -0,0 +1,1236 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Use is subject to license terms.
+ *
+ * Copyright (c) 2011 Whamcloud, Inc.
+ *
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * osc cache management.
+ *
+ * Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
+ */
+
+#define DEBUG_SUBSYSTEM S_OSC
+
+#include "osc_cl_internal.h"
+#include "osc_internal.h"
+
+static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
+                          struct osc_async_page *oap);
+static int osc_enter_cache_try(const struct lu_env *env, struct client_obd *cli,
+                              struct osc_async_page *oap, int transient);
+static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
+                          int sent);
+
+/** \addtogroup osc
+ *  @{
+ */
+
+#define OSC_IO_DEBUG(OSC, STR, args...)                                  \
+       CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,     \
+              !cfs_list_empty(&(OSC)->oo_ready_item) ||                 \
+              !cfs_list_empty(&(OSC)->oo_hp_ready_item),               \
+              (OSC)->oo_write_pages.oop_num_pending,               \
+              !cfs_list_empty(&(OSC)->oo_write_pages.oop_urgent),       \
+              (OSC)->oo_read_pages.oop_num_pending,                 \
+              !cfs_list_empty(&(OSC)->oo_read_pages.oop_urgent),       \
+              args)
+
+static inline struct osc_page *oap2osc_page(struct osc_async_page *oap)
+{
+       return (struct osc_page *)container_of(oap, struct osc_page, ops_oap);
+}
+
+static int osc_make_ready(const struct lu_env *env, struct osc_async_page *oap,
+                         int cmd)
+{
+       struct osc_page *opg  = oap2osc_page(oap);
+       struct cl_page  *page = cl_page_top(opg->ops_cl.cpl_page);
+       int result;
+
+       LASSERT(cmd == OBD_BRW_WRITE); /* no cached reads */
+
+       ENTRY;
+       result = cl_page_make_ready(env, page, CRT_WRITE);
+       if (result == 0)
+               opg->ops_submit_time = cfs_time_current();
+       RETURN(result);
+}
+
+static int osc_refresh_count(const struct lu_env *env,
+                            struct osc_async_page *oap, int cmd)
+{
+       struct osc_page  *opg = oap2osc_page(oap);
+       struct cl_page   *page;
+       struct cl_object *obj;
+       struct cl_attr   *attr = &osc_env_info(env)->oti_attr;
+
+       int result;
+       loff_t kms;
+
+       /* readpage queues with _COUNT_STABLE, shouldn't get here. */
+       LASSERT(!(cmd & OBD_BRW_READ));
+       LASSERT(opg != NULL);
+       page = opg->ops_cl.cpl_page;
+       obj = opg->ops_cl.cpl_obj;
+
+       cl_object_attr_lock(obj);
+       result = cl_object_attr_get(env, obj, attr);
+       cl_object_attr_unlock(obj);
+       if (result < 0)
+               return result;
+       kms = attr->cat_kms;
+       if (cl_offset(obj, page->cp_index) >= kms)
+               /* catch race with truncate */
+               return 0;
+       else if (cl_offset(obj, page->cp_index + 1) > kms)
+               /* catch sub-page write at end of file */
+               return kms % CFS_PAGE_SIZE;
+       else
+               return CFS_PAGE_SIZE;
+}
+
+static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
+                         int cmd, struct obdo *oa, int rc)
+{
+       struct osc_page   *opg  = oap2osc_page(oap);
+       struct cl_page    *page = cl_page_top(opg->ops_cl.cpl_page);
+       struct osc_object *obj  = cl2osc(opg->ops_cl.cpl_obj);
+       enum cl_req_type   crt;
+       int srvlock;
+
+       ENTRY;
+
+       cmd &= ~OBD_BRW_NOQUOTA;
+       LASSERT(equi(page->cp_state == CPS_PAGEIN,  cmd == OBD_BRW_READ));
+       LASSERT(equi(page->cp_state == CPS_PAGEOUT, cmd == OBD_BRW_WRITE));
+       LASSERT(opg->ops_transfer_pinned);
+
+       /*
+        * page->cp_req can be NULL if io submission failed before
+        * cl_req was allocated.
+        */
+       if (page->cp_req != NULL)
+               cl_req_page_done(env, page);
+       LASSERT(page->cp_req == NULL);
+
+       /* As the transfer for this page is being done, clear the flags */
+       cfs_spin_lock(&oap->oap_lock);
+       oap->oap_async_flags = 0;
+       cfs_spin_unlock(&oap->oap_lock);
+
+       crt = cmd == OBD_BRW_READ ? CRT_READ : CRT_WRITE;
+       /* Clear opg->ops_transfer_pinned before VM lock is released. */
+       opg->ops_transfer_pinned = 0;
+
+       cfs_spin_lock(&obj->oo_seatbelt);
+       LASSERT(opg->ops_submitter != NULL);
+       LASSERT(!cfs_list_empty(&opg->ops_inflight));
+       cfs_list_del_init(&opg->ops_inflight);
+       cfs_spin_unlock(&obj->oo_seatbelt);
+
+       opg->ops_submit_time = 0;
+       srvlock = oap->oap_brw_flags & OBD_BRW_SRVLOCK;
+
+       cl_page_completion(env, page, crt, rc);
+
+       /* statistic */
+       if (rc == 0 && srvlock) {
+               struct lu_device *ld    = opg->ops_cl.cpl_obj->co_lu.lo_dev;
+               struct osc_stats *stats = &lu2osc_dev(ld)->od_stats;
+               int bytes = oap->oap_count;
+
+               if (crt == CRT_READ)
+                       stats->os_lockless_reads += bytes;
+               else
+                       stats->os_lockless_writes += bytes;
+       }
+
+       /*
+        * This has to be the last operation with the page, as locks are
+        * released in cl_page_completion() and nothing except for the
+        * reference counter protects page from concurrent reclaim.
+        */
+       lu_ref_del(&page->cp_reference, "transfer", page);
+       /*
+        * As page->cp_obj is pinned by a reference from page->cp_req, it is
+        * safe to call cl_page_put() without risking object destruction in a
+        * non-blocking context.
+        */
+       cl_page_put(env, page);
+       RETURN(0);
+}
+
+/* caller must hold loi_list_lock */
+static void osc_consume_write_grant(struct client_obd *cli,
+                                   struct brw_page *pga)
+{
+       LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
+       LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
+       cfs_atomic_inc(&obd_dirty_pages);
+       cli->cl_dirty += CFS_PAGE_SIZE;
+       cli->cl_avail_grant -= CFS_PAGE_SIZE;
+       pga->flag |= OBD_BRW_FROM_GRANT;
+       CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
+              CFS_PAGE_SIZE, pga, pga->pg);
+       LASSERT(cli->cl_avail_grant >= 0);
+       osc_update_next_shrink(cli);
+}
+
+/* the companion to osc_consume_write_grant, called when a brw has completed.
+ * must be called with the loi lock held. */
+static void osc_release_write_grant(struct client_obd *cli,
+                                   struct brw_page *pga, int sent)
+{
+       int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
+       ENTRY;
+
+       LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
+       if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
+               EXIT;
+               return;
+       }
+
+       pga->flag &= ~OBD_BRW_FROM_GRANT;
+       cfs_atomic_dec(&obd_dirty_pages);
+       cli->cl_dirty -= CFS_PAGE_SIZE;
+       if (pga->flag & OBD_BRW_NOCACHE) {
+               pga->flag &= ~OBD_BRW_NOCACHE;
+               cfs_atomic_dec(&obd_dirty_transit_pages);
+               cli->cl_dirty_transit -= CFS_PAGE_SIZE;
+       }
+       if (!sent) {
+               /* Reclaim grant from truncated pages. This is used to solve
+                * write-truncate and grant all gone(to lost_grant) problem.
+                * For a vfs write this problem can be easily solved by a sync
+                * write, however, this is not an option for page_mkwrite()
+                * because grant has to be allocated before a page becomes
+                * dirty. */
+               if (cli->cl_avail_grant < PTLRPC_MAX_BRW_SIZE)
+                       cli->cl_avail_grant += CFS_PAGE_SIZE;
+               else
+                       cli->cl_lost_grant += CFS_PAGE_SIZE;
+               CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
+                      cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
+       } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
+               /* For short writes we shouldn't count parts of pages that
+                * span a whole block on the OST side, or our accounting goes
+                * wrong.  Should match the code in filter_grant_check. */
+               int offset = pga->off & ~CFS_PAGE_MASK;
+               int count = pga->count + (offset & (blocksize - 1));
+               int end = (offset + pga->count) & (blocksize - 1);
+               if (end)
+                       count += blocksize - end;
+
+               cli->cl_lost_grant += CFS_PAGE_SIZE - count;
+               CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
+                      CFS_PAGE_SIZE - count, cli->cl_lost_grant,
+                      cli->cl_avail_grant, cli->cl_dirty);
+       }
+
+       EXIT;
+}
+
+/* The companion to osc_enter_cache(), called when @oap is no longer part of
+ * the dirty accounting.  Writeback completes or truncate happens before
+ * writing starts.  Must be called with the loi lock held. */
+static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
+                          int sent)
+{
+       osc_release_write_grant(cli, &oap->oap_brw_page, sent);
+}
+
+/**
+ * Non-blocking version of osc_enter_cache() that consumes grant only when it
+ * is available.
+ */
+static int osc_enter_cache_try(const struct lu_env *env, struct client_obd *cli,
+                              struct osc_async_page *oap, int transient)
+{
+       int has_grant;
+
+       has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
+       if (has_grant) {
+               osc_consume_write_grant(cli, &oap->oap_brw_page);
+               if (transient) {
+                       cli->cl_dirty_transit += CFS_PAGE_SIZE;
+                       cfs_atomic_inc(&obd_dirty_transit_pages);
+                       oap->oap_brw_flags |= OBD_BRW_NOCACHE;
+               }
+       }
+       return has_grant;
+}
+
+/* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
+ * grant or cache space. */
+static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
+                          struct osc_async_page *oap)
+{
+       struct osc_object *osc = oap->oap_obj;
+       struct lov_oinfo  *loi = osc->oo_oinfo;
+       struct osc_cache_waiter ocw;
+       struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
+       int rc = -EDQUOT;
+       ENTRY;
+
+       CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
+              "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
+              cli->cl_dirty_max, obd_max_dirty_pages,
+              cli->cl_lost_grant, cli->cl_avail_grant);
+
+       /* force the caller to try sync io.  this can jump the list
+        * of queued writes and create a discontiguous rpc stream */
+       if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
+           cli->cl_dirty_max < CFS_PAGE_SIZE     ||
+           cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync)
+               RETURN(-EDQUOT);
+
+       /* Hopefully normal case - cache space and write credits available */
+       if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
+           cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
+           osc_enter_cache_try(env, cli, oap, 0))
+               RETURN(0);
+
+       /* We can get here for two reasons: too many dirty pages in cache, or
+        * run out of grants. In both cases we should write dirty pages out.
+        * Adding a cache waiter will trigger urgent write-out no matter what
+        * RPC size will be.
+        * The exiting condition is no avail grants and no dirty pages caching,
+        * that really means there is no space on the OST. */
+       cfs_waitq_init(&ocw.ocw_waitq);
+       ocw.ocw_oap = oap;
+       while (cli->cl_dirty > 0) {
+               cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
+               ocw.ocw_rc = 0;
+
+               osc_io_unplug(env, cli, osc, PDL_POLICY_ROUND);
+               client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+               CDEBUG(D_CACHE, "%s: sleeping for cache space @ %p for %p\n",
+                      cli->cl_import->imp_obd->obd_name, &ocw, oap);
+
+               rc = l_wait_event(ocw.ocw_waitq, cfs_list_empty(&ocw.ocw_entry),
+                                 &lwi);
+
+               client_obd_list_lock(&cli->cl_loi_list_lock);
+               cfs_list_del_init(&ocw.ocw_entry);
+               if (rc < 0)
+                       break;
+
+               rc = ocw.ocw_rc;
+               if (rc != -EDQUOT)
+                       break;
+       }
+
+       RETURN(rc);
+}
+
+/* caller must hold loi_list_lock */
+void osc_wake_cache_waiters(struct client_obd *cli)
+{
+       cfs_list_t *l, *tmp;
+       struct osc_cache_waiter *ocw;
+
+       ENTRY;
+       cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
+               /* if we can't dirty more, we must wait until some is written */
+               if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
+                  (cfs_atomic_read(&obd_dirty_pages) + 1 >
+                   obd_max_dirty_pages)) {
+                       CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
+                              "osc max %ld, sys max %d\n", cli->cl_dirty,
+                              cli->cl_dirty_max, obd_max_dirty_pages);
+                       return;
+               }
+
+               /* if still dirty cache but no grant wait for pending RPCs that
+                * may yet return us some grant before doing sync writes */
+               if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
+                       CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
+                              cli->cl_w_in_flight);
+                       return;
+               }
+
+               ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
+               cfs_list_del_init(&ocw->ocw_entry);
+               if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
+                       /* no more RPCs in flight to return grant, do sync IO */
+                       ocw->ocw_rc = -EDQUOT;
+                       CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
+               } else {
+                       osc_consume_write_grant(cli,
+                                               &ocw->ocw_oap->oap_brw_page);
+               }
+
+               CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld\n",
+                      ocw, ocw->ocw_oap, cli->cl_avail_grant);
+
+               cfs_waitq_signal(&ocw->ocw_waitq);
+       }
+
+       EXIT;
+}
+
+static int osc_max_rpc_in_flight(struct client_obd *cli, struct osc_object *osc)
+{
+       struct osc_async_page *oap;
+       int hprpc = 0;
+
+       if (!cfs_list_empty(&osc->oo_write_pages.oop_urgent)) {
+               oap = cfs_list_entry(osc->oo_write_pages.oop_urgent.next,
+                                    struct osc_async_page, oap_urgent_item);
+               hprpc = !!(oap->oap_async_flags & ASYNC_HP);
+       }
+
+       if (!hprpc && !cfs_list_empty(&osc->oo_read_pages.oop_urgent)) {
+               oap = cfs_list_entry(osc->oo_read_pages.oop_urgent.next,
+                                    struct osc_async_page, oap_urgent_item);
+               hprpc = !!(oap->oap_async_flags & ASYNC_HP);
+       }
+
+       return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
+}
+
+/* This maintains the lists of pending pages to read/write for a given object
+ * (lop).  This is used by osc_check_rpcs->osc_next_obj() and osc_list_maint()
+ * to quickly find objects that are ready to send an RPC. */
+static int osc_makes_rpc(struct client_obd *cli, struct osc_object *osc,
+                        int cmd)
+{
+       struct osc_oap_pages *lop;
+       ENTRY;
+
+       if (cmd & OBD_BRW_WRITE) {
+               lop = &osc->oo_write_pages;
+       } else {
+               lop = &osc->oo_read_pages;
+       }
+
+       if (lop->oop_num_pending == 0)
+               RETURN(0);
+
+       /* if we have an invalid import we want to drain the queued pages
+        * by forcing them through rpcs that immediately fail and complete
+        * the pages.  recovery relies on this to empty the queued pages
+        * before canceling the locks and evicting down the llite pages */
+       if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
+               RETURN(1);
+
+       /* stream rpcs in queue order as long as as there is an urgent page
+        * queued.  this is our cheap solution for good batching in the case
+        * where writepage marks some random page in the middle of the file
+        * as urgent because of, say, memory pressure */
+       if (!cfs_list_empty(&lop->oop_urgent)) {
+               CDEBUG(D_CACHE, "urgent request forcing RPC\n");
+               RETURN(1);
+       }
+
+       if (cmd & OBD_BRW_WRITE) {
+               /* trigger a write rpc stream as long as there are dirtiers
+                * waiting for space.  as they're waiting, they're not going to
+                * create more pages to coalesce with what's waiting.. */
+               if (!cfs_list_empty(&cli->cl_cache_waiters)) {
+                       CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
+                       RETURN(1);
+               }
+       }
+       if (lop->oop_num_pending >= cli->cl_max_pages_per_rpc)
+               RETURN(1);
+
+       RETURN(0);
+}
+
+static void lop_update_pending(struct client_obd *cli,
+                              struct osc_oap_pages *lop, int cmd, int delta)
+{
+       lop->oop_num_pending += delta;
+       if (cmd & OBD_BRW_WRITE)
+               cli->cl_pending_w_pages += delta;
+       else
+               cli->cl_pending_r_pages += delta;
+}
+
+static int osc_makes_hprpc(struct osc_oap_pages *lop)
+{
+       struct osc_async_page *oap;
+       ENTRY;
+
+       if (cfs_list_empty(&lop->oop_urgent))
+               RETURN(0);
+
+       oap = cfs_list_entry(lop->oop_urgent.next,
+                        struct osc_async_page, oap_urgent_item);
+
+       if (oap->oap_async_flags & ASYNC_HP) {
+               CDEBUG(D_CACHE, "hp request forcing RPC\n");
+               RETURN(1);
+       }
+
+       RETURN(0);
+}
+
+static void on_list(cfs_list_t *item, cfs_list_t *list, int should_be_on)
+{
+       if (cfs_list_empty(item) && should_be_on)
+               cfs_list_add_tail(item, list);
+       else if (!cfs_list_empty(item) && !should_be_on)
+               cfs_list_del_init(item);
+}
+
+/* maintain the osc's cli list membership invariants so that osc_send_oap_rpc
+ * can find pages to build into rpcs quickly */
+static void osc_list_maint(struct client_obd *cli, struct osc_object *osc)
+{
+       if (osc_makes_hprpc(&osc->oo_write_pages) ||
+           osc_makes_hprpc(&osc->oo_read_pages)) {
+               /* HP rpc */
+               on_list(&osc->oo_ready_item, &cli->cl_loi_ready_list, 0);
+               on_list(&osc->oo_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
+       } else {
+               on_list(&osc->oo_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
+               on_list(&osc->oo_ready_item, &cli->cl_loi_ready_list,
+                       osc_makes_rpc(cli, osc, OBD_BRW_WRITE) ||
+                       osc_makes_rpc(cli, osc, OBD_BRW_READ));
+       }
+
+       on_list(&osc->oo_write_item, &cli->cl_loi_write_list,
+               osc->oo_write_pages.oop_num_pending);
+
+       on_list(&osc->oo_read_item, &cli->cl_loi_read_list,
+               osc->oo_read_pages.oop_num_pending);
+}
+
+/* this is trying to propogate async writeback errors back up to the
+ * application.  As an async write fails we record the error code for later if
+ * the app does an fsync.  As long as errors persist we force future rpcs to be
+ * sync so that the app can get a sync error and break the cycle of queueing
+ * pages for which writeback will fail. */
+static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
+                          int rc)
+{
+       if (rc) {
+               if (!ar->ar_rc)
+                       ar->ar_rc = rc;
+
+               ar->ar_force_sync = 1;
+               ar->ar_min_xid = ptlrpc_sample_next_xid();
+               return;
+
+       }
+
+       if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
+               ar->ar_force_sync = 0;
+}
+
+static void osc_oap_to_pending(struct osc_async_page *oap)
+{
+       struct osc_object    *osc = oap->oap_obj;
+       struct osc_oap_pages *lop;
+
+       if (oap->oap_cmd & OBD_BRW_WRITE)
+               lop = &osc->oo_write_pages;
+       else
+               lop = &osc->oo_read_pages;
+
+       if (oap->oap_async_flags & ASYNC_HP)
+               cfs_list_add(&oap->oap_urgent_item, &lop->oop_urgent);
+       else if (oap->oap_async_flags & ASYNC_URGENT)
+               cfs_list_add_tail(&oap->oap_urgent_item, &lop->oop_urgent);
+       cfs_list_add_tail(&oap->oap_pending_item, &lop->oop_pending);
+       lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
+}
+
+/* this must be called holding the loi list lock to give coverage to exit_cache,
+ * async_flag maintenance, and oap_request */
+void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
+                      struct obdo *oa, struct osc_async_page *oap,
+                      int sent, int rc)
+{
+       struct osc_object *osc = oap->oap_obj;
+       struct lov_oinfo  *loi = osc->oo_oinfo;
+       __u64 xid = 0;
+
+       ENTRY;
+       if (oap->oap_request != NULL) {
+               xid = ptlrpc_req_xid(oap->oap_request);
+               ptlrpc_req_finished(oap->oap_request);
+               oap->oap_request = NULL;
+       }
+
+       cfs_spin_lock(&oap->oap_lock);
+       oap->oap_async_flags = 0;
+       cfs_spin_unlock(&oap->oap_lock);
+       oap->oap_interrupted = 0;
+
+       if (oap->oap_cmd & OBD_BRW_WRITE) {
+               osc_process_ar(&cli->cl_ar, xid, rc);
+               osc_process_ar(&loi->loi_ar, xid, rc);
+       }
+
+       if (rc == 0 && oa != NULL) {
+               if (oa->o_valid & OBD_MD_FLBLOCKS)
+                       loi->loi_lvb.lvb_blocks = oa->o_blocks;
+               if (oa->o_valid & OBD_MD_FLMTIME)
+                       loi->loi_lvb.lvb_mtime = oa->o_mtime;
+               if (oa->o_valid & OBD_MD_FLATIME)
+                       loi->loi_lvb.lvb_atime = oa->o_atime;
+               if (oa->o_valid & OBD_MD_FLCTIME)
+                       loi->loi_lvb.lvb_ctime = oa->o_ctime;
+       }
+
+       rc = osc_completion(env, oap, oap->oap_cmd, oa, rc);
+
+       /* cl_page_completion() drops PG_locked. so, a new I/O on the page could
+        * start, but OSC calls it under lock and thus we can add oap back to
+        * pending safely */
+       if (rc)
+               /* upper layer wants to leave the page on pending queue */
+               osc_oap_to_pending(oap);
+       else
+               osc_exit_cache(cli, oap, sent);
+       EXIT;
+}
+
+/**
+ * prepare pages for ASYNC io and put pages in send queue.
+ *
+ * \param cmd OBD_BRW_* macroses
+ * \param lop pending pages
+ *
+ * \return zero if no page added to send queue.
+ * \return 1 if pages successfully added to send queue.
+ * \return negative on errors.
+ */
+static int
+osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
+                struct osc_object *osc, int cmd,
+                struct osc_oap_pages *lop, pdl_policy_t pol)
+{
+       obd_count page_count = 0;
+       struct osc_async_page *oap = NULL, *tmp;
+       CFS_LIST_HEAD(rpc_list);
+       int srvlock = 0, mem_tight = 0;
+       obd_off starting_offset = OBD_OBJECT_EOF;
+       unsigned int ending_offset;
+       int starting_page_off = 0;
+       int rc;
+       ENTRY;
+
+       /* ASYNC_HP pages first. At present, when the lock the pages is
+        * to be canceled, the pages covered by the lock will be sent out
+        * with ASYNC_HP. We have to send out them as soon as possible. */
+       cfs_list_for_each_entry_safe(oap, tmp, &lop->oop_urgent, oap_urgent_item) {
+               if (oap->oap_async_flags & ASYNC_HP)
+                       cfs_list_move(&oap->oap_pending_item, &rpc_list);
+               else if (!(oap->oap_brw_flags & OBD_BRW_SYNC))
+                       /* only do this for writeback pages. */
+                       cfs_list_move_tail(&oap->oap_pending_item, &rpc_list);
+               if (++page_count >= cli->cl_max_pages_per_rpc)
+                       break;
+       }
+       cfs_list_splice_init(&rpc_list, &lop->oop_pending);
+       page_count = 0;
+
+       /* first we find the pages we're allowed to work with */
+       cfs_list_for_each_entry_safe(oap, tmp, &lop->oop_pending,
+                                    oap_pending_item) {
+               LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
+                        "magic 0x%x\n", oap, oap->oap_magic);
+
+               if (page_count != 0 &&
+                   srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
+                       CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
+                              " oap %p, page %p, srvlock %u\n",
+                              oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
+                       break;
+               }
+
+               /* If there is a gap at the start of this page, it can't merge
+                * with any previous page, so we'll hand the network a
+                * "fragmented" page array that it can't transfer in 1 RDMA */
+               if (oap->oap_obj_off < starting_offset) {
+                       if (starting_page_off != 0)
+                               break;
+
+                       starting_page_off = oap->oap_page_off;
+                       starting_offset = oap->oap_obj_off + starting_page_off;
+               } else if (oap->oap_page_off != 0)
+                       break;
+
+               /* in llite being 'ready' equates to the page being locked
+                * until completion unlocks it.  commit_write submits a page
+                * as not ready because its unlock will happen unconditionally
+                * as the call returns.  if we race with commit_write giving
+                * us that page we don't want to create a hole in the page
+                * stream, so we stop and leave the rpc to be fired by
+                * another dirtier or kupdated interval (the not ready page
+                * will still be on the dirty list).  we could call in
+                * at the end of ll_file_write to process the queue again. */
+               if (!(oap->oap_async_flags & ASYNC_READY)) {
+                       int rc = osc_make_ready(env, oap, cmd);
+                       if (rc < 0)
+                               CDEBUG(D_INODE, "oap %p page %p returned %d "
+                                               "instead of ready\n", oap,
+                                               oap->oap_page, rc);
+                       switch (rc) {
+                       case -EAGAIN:
+                               /* llite is telling us that the page is still
+                                * in commit_write and that we should try
+                                * and put it in an rpc again later.  we
+                                * break out of the loop so we don't create
+                                * a hole in the sequence of pages in the rpc
+                                * stream.*/
+                               oap = NULL;
+                               break;
+                       case -EINTR:
+                               /* the io isn't needed.. tell the checks
+                                * below to complete the rpc with EINTR */
+                               cfs_spin_lock(&oap->oap_lock);
+                               oap->oap_async_flags |= ASYNC_COUNT_STABLE;
+                               cfs_spin_unlock(&oap->oap_lock);
+                               oap->oap_count = -EINTR;
+                               break;
+                       case 0:
+                               cfs_spin_lock(&oap->oap_lock);
+                               oap->oap_async_flags |= ASYNC_READY;
+                               cfs_spin_unlock(&oap->oap_lock);
+                               break;
+                       default:
+                               LASSERTF(0, "oap %p page %p returned %d "
+                                           "from make_ready\n", oap,
+                                           oap->oap_page, rc);
+                               break;
+                       }
+               }
+               if (oap == NULL)
+                       break;
+
+               /* take the page out of our book-keeping */
+               cfs_list_del_init(&oap->oap_pending_item);
+               lop_update_pending(cli, lop, cmd, -1);
+               cfs_list_del_init(&oap->oap_urgent_item);
+
+               /* ask the caller for the size of the io as the rpc leaves. */
+               if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
+                       oap->oap_count = osc_refresh_count(env, oap, cmd);
+                       LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
+               }
+               if (oap->oap_count <= 0) {
+                       CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
+                              oap->oap_count);
+                       osc_ap_completion(env, cli, NULL,
+                                         oap, 0, oap->oap_count);
+                       continue;
+               }
+
+               /* now put the page back in our accounting */
+               cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
+               if (page_count++ == 0)
+                       srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
+
+               if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
+                       mem_tight = 1;
+
+               /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
+                * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
+                * have the same alignment as the initial writes that allocated
+                * extents on the server. */
+               ending_offset = oap->oap_obj_off + oap->oap_page_off +
+                               oap->oap_count;
+               if (!(ending_offset & (PTLRPC_MAX_BRW_SIZE - 1)))
+                       break;
+
+               if (page_count >= cli->cl_max_pages_per_rpc)
+                       break;
+
+               /* If there is a gap at the end of this page, it can't merge
+                * with any subsequent pages, so we'll hand the network a
+                * "fragmented" page array that it can't transfer in 1 RDMA */
+               if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
+                       break;
+       }
+
+       osc_list_maint(cli, osc);
+
+       client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+       if (page_count == 0) {
+               client_obd_list_lock(&cli->cl_loi_list_lock);
+               RETURN(0);
+       }
+
+       if (mem_tight)
+               cmd |= OBD_BRW_MEMALLOC;
+       rc = osc_build_rpc(env, cli, &rpc_list, page_count, cmd, pol);
+       if (rc != 0) {
+               LASSERT(cfs_list_empty(&rpc_list));
+               osc_list_maint(cli, osc);
+               RETURN(rc);
+       }
+
+       starting_offset &= PTLRPC_MAX_BRW_SIZE - 1;
+       if (cmd == OBD_BRW_READ) {
+               cli->cl_r_in_flight++;
+               lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
+               lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
+               lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
+                                     (starting_offset >> CFS_PAGE_SHIFT) + 1);
+       } else {
+               cli->cl_w_in_flight++;
+               lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
+               lprocfs_oh_tally(&cli->cl_write_rpc_hist,
+                                cli->cl_w_in_flight);
+               lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
+                                     (starting_offset >> CFS_PAGE_SHIFT) + 1);
+       }
+
+       RETURN(1);
+}
+
+#define list_to_obj(list, item) \
+       cfs_list_entry((list)->next, struct osc_object, oo_##item)
+
+/* This is called by osc_check_rpcs() to find which objects have pages that
+ * we could be sending.  These lists are maintained by osc_makes_rpc(). */
+static struct osc_object *osc_next_obj(struct client_obd *cli)
+{
+       ENTRY;
+
+       /* First return objects that have blocked locks so that they
+        * will be flushed quickly and other clients can get the lock,
+        * then objects which have pages ready to be stuffed into RPCs */
+       if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
+               RETURN(list_to_obj(&cli->cl_loi_hp_ready_list, hp_ready_item));
+       if (!cfs_list_empty(&cli->cl_loi_ready_list))
+               RETURN(list_to_obj(&cli->cl_loi_ready_list, ready_item));
+
+       /* then if we have cache waiters, return all objects with queued
+        * writes.  This is especially important when many small files
+        * have filled up the cache and not been fired into rpcs because
+        * they don't pass the nr_pending/object threshhold */
+       if (!cfs_list_empty(&cli->cl_cache_waiters) &&
+           !cfs_list_empty(&cli->cl_loi_write_list))
+               RETURN(list_to_obj(&cli->cl_loi_write_list, write_item));
+
+       /* then return all queued objects when we have an invalid import
+        * so that they get flushed */
+       if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
+               if (!cfs_list_empty(&cli->cl_loi_write_list))
+                       RETURN(list_to_obj(&cli->cl_loi_write_list,
+                                          write_item));
+               if (!cfs_list_empty(&cli->cl_loi_read_list))
+                       RETURN(list_to_obj(&cli->cl_loi_read_list,
+                                          read_item));
+       }
+       RETURN(NULL);
+}
+
+/* called with the loi list lock held */
+static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli,
+                          pdl_policy_t pol)
+{
+       struct osc_object *osc;
+       int rc = 0, race_counter = 0;
+       ENTRY;
+
+       while ((osc = osc_next_obj(cli)) != NULL) {
+               OSC_IO_DEBUG(osc, "%lu in flight\n", rpcs_in_flight(cli));
+
+               if (osc_max_rpc_in_flight(cli, osc))
+                       break;
+
+               /* attempt some read/write balancing by alternating between
+                * reads and writes in an object.  The makes_rpc checks here
+                * would be redundant if we were getting read/write work items
+                * instead of objects.  we don't want send_oap_rpc to drain a
+                * partial read pending queue when we're given this object to
+                * do io on writes while there are cache waiters */
+               if (osc_makes_rpc(cli, osc, OBD_BRW_WRITE)) {
+                       rc = osc_send_oap_rpc(env, cli, osc, OBD_BRW_WRITE,
+                                             &osc->oo_write_pages, pol);
+                       if (rc < 0) {
+                               CERROR("Write request failed with %d\n", rc);
+
+                               /* osc_send_oap_rpc failed, mostly because of
+                                * memory pressure.
+                                *
+                                * It can't break here, because if:
+                                *  - a page was submitted by osc_io_submit, so
+                                *    page locked;
+                                *  - no request in flight
+                                *  - no subsequent request
+                                * The system will be in live-lock state,
+                                * because there is no chance to call
+                                * osc_io_unplug() and osc_check_rpcs() any
+                                * more. pdflush can't help in this case,
+                                * because it might be blocked at grabbing
+                                * the page lock as we mentioned.
+                                *
+                                * Anyway, continue to drain pages. */
+                               /* break; */
+                       }
+
+                       if (rc > 0)
+                               race_counter = 0;
+                       else if (rc == 0)
+                               race_counter++;
+               }
+               if (osc_makes_rpc(cli, osc, OBD_BRW_READ)) {
+                       rc = osc_send_oap_rpc(env, cli, osc, OBD_BRW_READ,
+                                             &osc->oo_read_pages, pol);
+                       if (rc < 0)
+                               CERROR("Read request failed with %d\n", rc);
+
+                       if (rc > 0)
+                               race_counter = 0;
+                       else if (rc == 0)
+                               race_counter++;
+               }
+
+               /* attempt some inter-object balancing by issuing rpcs
+                * for each object in turn */
+               if (!cfs_list_empty(&osc->oo_hp_ready_item))
+                       cfs_list_del_init(&osc->oo_hp_ready_item);
+               if (!cfs_list_empty(&osc->oo_ready_item))
+                       cfs_list_del_init(&osc->oo_ready_item);
+               if (!cfs_list_empty(&osc->oo_write_item))
+                       cfs_list_del_init(&osc->oo_write_item);
+               if (!cfs_list_empty(&osc->oo_read_item))
+                       cfs_list_del_init(&osc->oo_read_item);
+
+               osc_list_maint(cli, osc);
+
+               /* send_oap_rpc fails with 0 when make_ready tells it to
+                * back off.  llite's make_ready does this when it tries
+                * to lock a page queued for write that is already locked.
+                * we want to try sending rpcs from many objects, but we
+                * don't want to spin failing with 0.  */
+               if (race_counter == 10)
+                       break;
+       }
+}
+
+void osc_io_unplug(const struct lu_env *env, struct client_obd *cli,
+                  struct osc_object *osc, pdl_policy_t pol)
+{
+       if (osc)
+               osc_list_maint(cli, osc);
+       osc_check_rpcs(env, cli, pol);
+}
+
+int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops,
+                       cfs_page_t *page, loff_t offset)
+{
+       struct obd_export     *exp = osc_export(osc);
+       struct osc_async_page *oap = &ops->ops_oap;
+       ENTRY;
+
+       if (!page)
+               return cfs_size_round(sizeof(*oap));
+
+       oap->oap_magic = OAP_MAGIC;
+       oap->oap_cli = &exp->exp_obd->u.cli;
+       oap->oap_obj = osc;
+
+       oap->oap_page = page;
+       oap->oap_obj_off = offset;
+       LASSERT(!(offset & ~CFS_PAGE_MASK));
+
+       if (!client_is_remote(exp) && cfs_capable(CFS_CAP_SYS_RESOURCE))
+               oap->oap_brw_flags = OBD_BRW_NOQUOTA;
+
+       CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
+       CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
+       CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
+
+       cfs_spin_lock_init(&oap->oap_lock);
+       CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n",
+              oap, page, oap->oap_obj_off);
+       RETURN(0);
+}
+
+int osc_queue_async_io(const struct lu_env *env, struct osc_page *ops)
+{
+       struct osc_async_page *oap = &ops->ops_oap;
+       struct client_obd     *cli = oap->oap_cli;
+       struct osc_object     *osc = oap->oap_obj;
+       struct obd_export     *exp = osc_export(osc);
+       int brw_flags = OBD_BRW_ASYNC;
+       int cmd = OBD_BRW_WRITE;
+       int rc = 0;
+       ENTRY;
+
+       if (oap->oap_magic != OAP_MAGIC)
+               RETURN(-EINVAL);
+
+       if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
+               RETURN(-EIO);
+
+       if (!cfs_list_empty(&oap->oap_pending_item) ||
+           !cfs_list_empty(&oap->oap_urgent_item) ||
+           !cfs_list_empty(&oap->oap_rpc_item))
+               RETURN(-EBUSY);
+
+       /* Set the OBD_BRW_SRVLOCK before the page is queued. */
+       brw_flags |= ops->ops_srvlock ? OBD_BRW_SRVLOCK : 0;
+       if (!client_is_remote(exp) && cfs_capable(CFS_CAP_SYS_RESOURCE)) {
+               brw_flags |= OBD_BRW_NOQUOTA;
+               cmd |= OBD_BRW_NOQUOTA;
+       }
+
+       /* check if the file's owner/group is over quota */
+       if (!(cmd & OBD_BRW_NOQUOTA)) {
+               struct cl_object *obj;
+               struct cl_attr   *attr;
+               unsigned int qid[MAXQUOTAS];
+
+               obj = cl_object_top(&osc->oo_cl);
+               attr = &osc_env_info(env)->oti_attr;
+
+               cl_object_attr_lock(obj);
+               rc = cl_object_attr_get(env, obj, attr);
+               cl_object_attr_unlock(obj);
+
+               qid[USRQUOTA] = attr->cat_uid;
+               qid[GRPQUOTA] = attr->cat_gid;
+               if (rc == 0 &&
+                   osc_quota_chkdq(cli, qid) == NO_QUOTA)
+                       rc = -EDQUOT;
+               if (rc)
+                       RETURN(rc);
+       }
+
+       client_obd_list_lock(&cli->cl_loi_list_lock);
+
+       oap->oap_cmd = cmd;
+       oap->oap_page_off = ops->ops_from;
+       oap->oap_count = ops->ops_to - ops->ops_from;
+       oap->oap_async_flags = 0;
+       oap->oap_brw_flags = brw_flags;
+       /* Give a hint to OST that requests are coming from kswapd - bug19529 */
+       if (cfs_memory_pressure_get())
+               oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
+
+       rc = osc_enter_cache(env, cli, oap);
+       if (rc) {
+               client_obd_list_unlock(&cli->cl_loi_list_lock);
+               RETURN(rc);
+       }
+
+       OSC_IO_DEBUG(osc, "oap %p page %p added for cmd %d\n",
+                    oap, oap->oap_page, cmd);
+
+       osc_oap_to_pending(oap);
+       osc_list_maint(cli, osc);
+       if (!osc_max_rpc_in_flight(cli, osc) &&
+           osc_makes_rpc(cli, osc, OBD_BRW_WRITE)) {
+               LASSERT(cli->cl_writeback_work != NULL);
+               rc = ptlrpcd_queue_work(cli->cl_writeback_work);
+
+               CDEBUG(D_CACHE, "Queued writeback work for client obd %p/%d.\n",
+                      cli, rc);
+       }
+       client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+       RETURN(0);
+}
+
+int osc_teardown_async_page(struct osc_object *obj, struct osc_page *ops)
+{
+       struct osc_async_page *oap = &ops->ops_oap;
+       struct client_obd     *cli = oap->oap_cli;
+       struct osc_oap_pages  *lop;
+       int rc = 0;
+       ENTRY;
+
+       if (oap->oap_magic != OAP_MAGIC)
+               RETURN(-EINVAL);
+
+       if (oap->oap_cmd & OBD_BRW_WRITE) {
+               lop = &obj->oo_write_pages;
+       } else {
+               lop = &obj->oo_read_pages;
+       }
+
+       client_obd_list_lock(&cli->cl_loi_list_lock);
+
+       if (!cfs_list_empty(&oap->oap_rpc_item))
+               GOTO(out, rc = -EBUSY);
+
+       osc_exit_cache(cli, oap, 0);
+       osc_wake_cache_waiters(cli);
+
+       if (!cfs_list_empty(&oap->oap_urgent_item)) {
+               cfs_list_del_init(&oap->oap_urgent_item);
+               cfs_spin_lock(&oap->oap_lock);
+               oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
+               cfs_spin_unlock(&oap->oap_lock);
+       }
+       if (!cfs_list_empty(&oap->oap_pending_item)) {
+               cfs_list_del_init(&oap->oap_pending_item);
+               lop_update_pending(cli, lop, oap->oap_cmd, -1);
+       }
+       osc_list_maint(cli, obj);
+       OSC_IO_DEBUG(obj, "oap %p page %p torn down\n", oap, oap->oap_page);
+out:
+       client_obd_list_unlock(&cli->cl_loi_list_lock);
+       RETURN(rc);
+}
+
+/* aka (~was & now & flag), but this is more clear :) */
+#define SETTING(was, now, flag) (!(was & flag) && (now & flag))
+
+int osc_set_async_flags(struct osc_object *obj, struct osc_page *opg,
+                       obd_flag async_flags)
+{
+       struct osc_async_page *oap = &opg->ops_oap;
+       struct osc_oap_pages *lop;
+       int flags = 0;
+       ENTRY;
+
+       LASSERT(!cfs_list_empty(&oap->oap_pending_item));
+
+       if (oap->oap_cmd & OBD_BRW_WRITE) {
+               lop = &obj->oo_write_pages;
+       } else {
+               lop = &obj->oo_read_pages;
+       }
+
+       if ((oap->oap_async_flags & async_flags) == async_flags)
+               RETURN(0);
+
+       if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
+               flags |= ASYNC_READY;
+
+       if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
+           cfs_list_empty(&oap->oap_rpc_item)) {
+               if (oap->oap_async_flags & ASYNC_HP)
+                       cfs_list_add(&oap->oap_urgent_item, &lop->oop_urgent);
+               else
+                       cfs_list_add_tail(&oap->oap_urgent_item,
+                                         &lop->oop_urgent);
+               flags |= ASYNC_URGENT;
+               osc_list_maint(oap->oap_cli, obj);
+       }
+       cfs_spin_lock(&oap->oap_lock);
+       oap->oap_async_flags |= flags;
+       cfs_spin_unlock(&oap->oap_lock);
+
+       OSC_IO_DEBUG(obj, "oap %p page %p has flags %x\n", oap,
+                    oap->oap_page, oap->oap_async_flags);
+       RETURN(0);
+}
+
+/**
+ * this is called when a sync waiter receives an interruption.  Its job is to
+ * get the caller woken as soon as possible.  If its page hasn't been put in an
+ * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
+ * desiring interruption which will forcefully complete the rpc once the rpc
+ * has timed out.
+ */
+int osc_cancel_async_page(const struct lu_env *env, struct osc_page *ops)
+{
+       struct osc_async_page *oap = &ops->ops_oap;
+       int rc = -EBUSY;
+       ENTRY;
+
+       LASSERT(!oap->oap_interrupted);
+       oap->oap_interrupted = 1;
+
+       /* ok, it's been put in an rpc. only one oap gets a request reference */
+       if (oap->oap_request != NULL) {
+               ptlrpc_mark_interrupted(oap->oap_request);
+               ptlrpcd_wake(oap->oap_request);
+               ptlrpc_req_finished(oap->oap_request);
+               oap->oap_request = NULL;
+       }
+
+       /*
+        * page completion may be called only if ->cpo_prep() method was
+        * executed by osc_io_submit(), that also adds page the to pending list
+        */
+       if (!cfs_list_empty(&oap->oap_pending_item)) {
+               struct osc_oap_pages *lop;
+               struct osc_object *osc = oap->oap_obj;
+
+               cfs_list_del_init(&oap->oap_pending_item);
+               cfs_list_del_init(&oap->oap_urgent_item);
+
+               lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
+                       &osc->oo_write_pages : &osc->oo_read_pages;
+               lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
+               osc_list_maint(oap->oap_cli, osc);
+               rc = osc_completion(env, oap, oap->oap_cmd, NULL, -EINTR);
+       }
+
+       RETURN(rc);
+}
+
+int osc_queue_sync_page(const struct lu_env *env, struct osc_page *opg,
+                       int cmd, int brw_flags)
+{
+       struct osc_async_page *oap = &opg->ops_oap;
+       struct client_obd     *cli = oap->oap_cli;
+       int flags = 0;
+       ENTRY;
+
+       oap->oap_cmd       = cmd;
+       oap->oap_page_off  = opg->ops_from;
+       oap->oap_count     = opg->ops_to - opg->ops_from;
+       oap->oap_brw_flags = OBD_BRW_SYNC | brw_flags;
+
+       /* Give a hint to OST that requests are coming from kswapd - bug19529 */
+       if (cfs_memory_pressure_get())
+               oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
+
+       if (!client_is_remote(osc_export(cl2osc(opg->ops_cl.cpl_obj))) &&
+           cfs_capable(CFS_CAP_SYS_RESOURCE)) {
+               oap->oap_brw_flags |= OBD_BRW_NOQUOTA;
+               oap->oap_cmd |= OBD_BRW_NOQUOTA;
+       }
+
+       if (oap->oap_cmd & OBD_BRW_READ)
+               flags = ASYNC_COUNT_STABLE;
+       else if (!(oap->oap_brw_page.flag & OBD_BRW_FROM_GRANT))
+               osc_enter_cache_try(env, cli, oap, 1);
+
+       cfs_spin_lock(&oap->oap_lock);
+       oap->oap_async_flags |= OSC_FLAGS | flags;
+       cfs_spin_unlock(&oap->oap_lock);
+
+       osc_oap_to_pending(oap);
+       RETURN(0);
+}
+
+/** @} osc */
index 39f866c..57350ea 100644 (file)
@@ -100,6 +100,22 @@ struct osc_thread_info {
         struct cl_page_list     oti_plist;
 };
 
+/**
+ * Manage osc_async_page
+ */
+struct osc_oap_pages {
+       cfs_list_t      oop_pending;
+       cfs_list_t      oop_urgent;
+       int             oop_num_pending;
+};
+
+static inline void osc_oap_pages_init(struct osc_oap_pages *list)
+{
+       CFS_INIT_LIST_HEAD(&list->oop_pending);
+       CFS_INIT_LIST_HEAD(&list->oop_urgent);
+       list->oop_num_pending = 0;
+}
+
 struct osc_object {
         struct cl_object   oo_cl;
         struct lov_oinfo  *oo_oinfo;
@@ -125,6 +141,16 @@ struct osc_object {
          * locked during take-off and landing.
          */
         cfs_spinlock_t     oo_seatbelt;
+
+       /**
+        * used by the osc to keep track of what objects to build into rpcs
+        */
+       struct osc_oap_pages oo_read_pages;
+       struct osc_oap_pages oo_write_pages;
+       cfs_list_t           oo_ready_item;
+       cfs_list_t           oo_hp_ready_item;
+       cfs_list_t           oo_write_item;
+       cfs_list_t           oo_read_item;
 };
 
 /*
@@ -361,9 +387,25 @@ void osc_index2policy  (ldlm_policy_data_t *policy, const struct cl_object *obj,
                         pgoff_t start, pgoff_t end);
 int  osc_lvb_print     (const struct lu_env *env, void *cookie,
                         lu_printer_t p, const struct ost_lvb *lvb);
+
 void osc_io_submit_page(const struct lu_env *env,
                         struct osc_io *oio, struct osc_page *opg,
                         enum cl_req_type crt);
+void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
+                      struct obdo *oa, struct osc_async_page *oap,
+                      int sent, int rc);
+int osc_cancel_async_page(const struct lu_env *env, struct osc_page *ops);
+int osc_set_async_flags(struct osc_object *obj, struct osc_page *opg,
+                       obd_flag async_flags);
+int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops,
+                       cfs_page_t *page, loff_t offset);
+int osc_queue_async_io(const struct lu_env *env, struct osc_page *ops);
+int osc_teardown_async_page(struct osc_object *obj,
+                           struct osc_page *ops);
+int osc_queue_sync_page(const struct lu_env *env, struct osc_page *ops,
+                       int cmd, int brw_flags);
+void osc_io_unplug(const struct lu_env *env, struct client_obd *cli,
+                  struct osc_object *osc, pdl_policy_t pol);
 
 void osc_object_set_contended  (struct osc_object *obj);
 void osc_object_clear_contended(struct osc_object *obj);
index 6890547..7293627 100644 (file)
@@ -51,13 +51,6 @@ enum async_flags {
         ASYNC_HP = 0x10,
 };
 
-struct obd_async_page_ops {
-        int  (*ap_make_ready)(const struct lu_env *env, void *data, int cmd);
-        int  (*ap_refresh_count)(const struct lu_env *env, void *data, int cmd);
-        int  (*ap_completion)(const struct lu_env *env,
-                              void *data, int cmd, struct obdo *oa, int rc);
-};
-
 struct osc_async_page {
         int                     oap_magic;
         unsigned short          oap_cmd;
@@ -75,11 +68,8 @@ struct osc_async_page {
 
         struct ptlrpc_request   *oap_request;
         struct client_obd       *oap_cli;
-        struct lov_oinfo        *oap_loi;
+       struct osc_object       *oap_obj;
 
-        const struct obd_async_page_ops *oap_caller_ops;
-        void                    *oap_caller_data;
-        cfs_list_t               oap_page_list;
         struct ldlm_lock        *oap_ldlm_lock;
         cfs_spinlock_t           oap_lock;
 };
@@ -116,6 +106,7 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa,
 void oscc_init(struct obd_device *obd);
 void osc_wake_cache_waiters(struct client_obd *cli);
 int osc_shrink_grant_to_target(struct client_obd *cli, long target);
+void osc_update_next_shrink(struct client_obd *cli);
 
 /*
  * cl integration.
@@ -146,28 +137,10 @@ int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
                    obd_enqueue_update_f upcall, void *cookie,
                    struct ptlrpc_request_set *rqset);
 
-int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
-                        struct lov_oinfo *loi, cfs_page_t *page,
-                        obd_off offset, const struct obd_async_page_ops *ops,
-                        void *data, void **res, int nocache,
-                        struct lustre_handle *lockh);
-void osc_oap_to_pending(struct osc_async_page *oap);
-int  osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap);
-void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi);
-void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli);
-int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp,
-                       struct lov_stripe_md *lsm, struct lov_oinfo *loi,
-                       struct osc_async_page *oap, int cmd, int off,
-                       int count,  obd_flag brw_flags, enum async_flags async_flags);
-int osc_teardown_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
-                            struct lov_oinfo *loi, struct osc_async_page *oap);
 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *cfg);
-int osc_set_async_flags_base(struct client_obd *cli,
-                             struct lov_oinfo *loi, struct osc_async_page *oap,
-                             obd_flag async_flags);
-int osc_enter_cache_try(const struct lu_env *env,
-                        struct client_obd *cli, struct lov_oinfo *loi,
-                        struct osc_async_page *oap, int transient);
+int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
+                 cfs_list_t *rpc_list, int page_count, int cmd,
+                 pdl_policy_t p);
 
 struct cl_page *osc_oap2cl_page(struct osc_async_page *oap);
 extern cfs_spinlock_t osc_ast_guard;
@@ -194,6 +167,11 @@ static inline int osc_recoverable_error(int rc)
                 rc == -EAGAIN || rc == -EINPROGRESS);
 }
 
+static inline unsigned long rpcs_in_flight(struct client_obd *cli)
+{
+       return cli->cl_r_in_flight + cli->cl_w_in_flight;
+}
+
 #ifndef min_t
 #define min_t(type,x,y) \
         ({ type __x = (x); type __y = (y); __x < __y ? __x: __y; })
index e7aeed8..32044fa 100644 (file)
@@ -36,6 +36,7 @@
  * Implementation of cl_io for OSC layer.
  *
  *   Author: Nikita Danilov <nikita.danilov@sun.com>
+ *   Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
  */
 
 #define DEBUG_SUBSYSTEM S_OSC
@@ -92,19 +93,12 @@ struct cl_page *osc_oap2cl_page(struct osc_async_page *oap)
         return container_of(oap, struct osc_page, ops_oap)->ops_cl.cpl_page;
 }
 
-static void osc_io_unplug(const struct lu_env *env, struct osc_object *osc,
-                          struct client_obd *cli)
-{
-        loi_list_maint(cli, osc->oo_oinfo);
-        osc_check_rpcs(env, cli);
-}
-
 /**
  * An implementation of cl_io_operations::cio_io_submit() method for osc
  * layer. Iterates over pages in the in-queue, prepares each for io by calling
  * cl_page_prep() and then either submits them through osc_io_submit_page()
  * or, if page is already submitted, changes osc flags through
- * osc_set_async_flags_base().
+ * osc_set_async_flags().
  */
 static int osc_io_submit(const struct lu_env *env,
                          const struct cl_io_slice *ios,
@@ -172,19 +166,17 @@ static int osc_io_submit(const struct lu_env *env,
                                 osc_io_submit_page(env, cl2osc_io(env, ios),
                                                    opg, crt);
                         } else {
-                                result = osc_set_async_flags_base(cli,
-                                                                  osc->oo_oinfo,
-                                                                  oap,
-                                                                  OSC_FLAGS);
-                                /*
-                                 * bug 18881: we can't just break out here when
-                                 * error occurs after cl_page_prep has been
-                                 * called against the page. The correct
-                                 * way is to call page's completion routine,
-                                 * as in osc_oap_interrupted.  For simplicity,
-                                 * we just force osc_set_async_flags_base() to
-                                 * not return error.
-                                 */
+                               result = osc_set_async_flags(osc, opg,
+                                                            OSC_FLAGS);
+                               /*
+                                * bug 18881: we can't just break out here when
+                                * error occurs after cl_page_prep has been
+                                * called against the page. The correct
+                                * way is to call page's completion routine,
+                                * as in osc_oap_interrupted.  For simplicity,
+                                * we just force osc_set_async_flags() to
+                                * not return error.
+                                */
                                 LASSERT(result == 0);
                         }
                         opg->ops_submit_time = cfs_time_current();
@@ -217,7 +209,7 @@ static int osc_io_submit(const struct lu_env *env,
         LASSERT(ergo(result == 0, osc == osc0));
 
         if (queued > 0)
-                osc_io_unplug(env, osc, cli);
+               osc_io_unplug(env, cli, osc, PDL_POLICY_ROUND);
         if (osc0)
                 client_obd_list_unlock(&cli->cl_loi_list_lock);
         CDEBUG(D_INFO, "%d/%d %d\n", qin->pl_nr, qout->pl_nr, result);
index c4f78aa..47ebe5f 100644 (file)
@@ -83,7 +83,15 @@ static int osc_object_init(const struct lu_env *env, struct lu_object *obj,
         cfs_spin_lock_init(&osc->oo_seatbelt);
         for (i = 0; i < CRT_NR; ++i)
                 CFS_INIT_LIST_HEAD(&osc->oo_inflight[i]);
-        return 0;
+
+       osc_oap_pages_init(&osc->oo_read_pages);
+       osc_oap_pages_init(&osc->oo_write_pages);
+       CFS_INIT_LIST_HEAD(&osc->oo_ready_item);
+       CFS_INIT_LIST_HEAD(&osc->oo_hp_ready_item);
+       CFS_INIT_LIST_HEAD(&osc->oo_write_item);
+       CFS_INIT_LIST_HEAD(&osc->oo_read_item);
+
+       return 0;
 }
 
 static void osc_object_free(const struct lu_env *env, struct lu_object *obj)
index da58fc4..32fb5f4 100644 (file)
@@ -209,34 +209,19 @@ static int osc_page_cache_add(const struct lu_env *env,
                               const struct cl_page_slice *slice,
                               struct cl_io *unused)
 {
-        struct osc_page   *opg = cl2osc_page(slice);
-        struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj);
-        int result;
-        /* All cacheable IO is async-capable */
-        int brw_flags = OBD_BRW_ASYNC;
-        int noquota = 0;
-
-        LINVRNT(osc_page_protected(env, opg, CLM_WRITE, 0));
-        ENTRY;
-
-        /* Set the OBD_BRW_SRVLOCK before the page is queued. */
-        brw_flags |= opg->ops_srvlock ? OBD_BRW_SRVLOCK : 0;
-        if (!client_is_remote(osc_export(obj)) &&
-            cfs_capable(CFS_CAP_SYS_RESOURCE)) {
-                brw_flags |= OBD_BRW_NOQUOTA;
-                noquota = OBD_BRW_NOQUOTA;
-        }
-
-        osc_page_transfer_get(opg, "transfer\0cache");
-        result = osc_queue_async_io(env, osc_export(obj), NULL, obj->oo_oinfo,
-                                    &opg->ops_oap, OBD_BRW_WRITE | noquota,
-                                    opg->ops_from, opg->ops_to - opg->ops_from,
-                                    brw_flags, 0);
-        if (result != 0)
-                osc_page_transfer_put(env, opg);
-        else
-                osc_page_transfer_add(env, opg, CRT_WRITE);
-        RETURN(result);
+       struct osc_page *opg = cl2osc_page(slice);
+       int result;
+       ENTRY;
+
+       LINVRNT(osc_page_protected(env, opg, CLM_WRITE, 0));
+
+       osc_page_transfer_get(opg, "transfer\0cache");
+       result = osc_queue_async_io(env, opg);
+       if (result != 0)
+               osc_page_transfer_put(env, opg);
+       else
+               osc_page_transfer_add(env, opg, CRT_WRITE);
+       RETURN(result);
 }
 
 void osc_index2policy(ldlm_policy_data_t *policy, const struct cl_object *obj,
@@ -354,11 +339,10 @@ static int osc_page_print(const struct lu_env *env,
         struct osc_async_page *oap = &opg->ops_oap;
         struct osc_object     *obj = cl2osc(slice->cpl_obj);
         struct client_obd     *cli = &osc_export(obj)->exp_obd->u.cli;
-        struct lov_oinfo      *loi = obj->oo_oinfo;
 
         return (*printer)(env, cookie, LUSTRE_OSC_NAME"-page@%p: "
                           "1< %#x %d %u %s %s %s > "
-                          "2< "LPU64" %u %u %#x %#x | %p %p %p %p %p > "
+                         "2< "LPU64" %u %u %#x %#x | %p %p %p > "
                           "3< %s %p %d %lu %d > "
                           "4< %d %d %d %lu %s | %s %s %s %s > "
                           "5< %s %s %s %s | %d %s %s | %d %s %s>\n",
@@ -372,9 +356,7 @@ static int osc_page_print(const struct lu_env *env,
                           /* 2 */
                           oap->oap_obj_off, oap->oap_page_off, oap->oap_count,
                           oap->oap_async_flags, oap->oap_brw_flags,
-                          oap->oap_request,
-                          oap->oap_cli, oap->oap_loi, oap->oap_caller_ops,
-                          oap->oap_caller_data,
+                         oap->oap_request, oap->oap_cli, obj,
                           /* 3 */
                           osc_list(&opg->ops_inflight),
                           opg->ops_submitter, opg->ops_transfer_pinned,
@@ -389,24 +371,23 @@ static int osc_page_print(const struct lu_env *env,
                           osc_list(&cli->cl_loi_write_list),
                           osc_list(&cli->cl_loi_read_list),
                           /* 5 */
-                          osc_list(&loi->loi_ready_item),
-                          osc_list(&loi->loi_hp_ready_item),
-                          osc_list(&loi->loi_write_item),
-                          osc_list(&loi->loi_read_item),
-                          loi->loi_read_lop.lop_num_pending,
-                          osc_list(&loi->loi_read_lop.lop_pending),
-                          osc_list(&loi->loi_read_lop.lop_urgent),
-                          loi->loi_write_lop.lop_num_pending,
-                          osc_list(&loi->loi_write_lop.lop_pending),
-                          osc_list(&loi->loi_write_lop.lop_urgent));
+                         osc_list(&obj->oo_ready_item),
+                         osc_list(&obj->oo_hp_ready_item),
+                         osc_list(&obj->oo_write_item),
+                         osc_list(&obj->oo_read_item),
+                         obj->oo_read_pages.oop_num_pending,
+                         osc_list(&obj->oo_read_pages.oop_pending),
+                         osc_list(&obj->oo_read_pages.oop_urgent),
+                         obj->oo_write_pages.oop_num_pending,
+                         osc_list(&obj->oo_write_pages.oop_pending),
+                         osc_list(&obj->oo_write_pages.oop_urgent));
 }
 
 static void osc_page_delete(const struct lu_env *env,
                             const struct cl_page_slice *slice)
 {
-        struct osc_page       *opg = cl2osc_page(slice);
-        struct osc_object     *obj = cl2osc(opg->ops_cl.cpl_obj);
-        struct osc_async_page *oap = &opg->ops_oap;
+       struct osc_page   *opg = cl2osc_page(slice);
+       struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj);
         int rc;
 
         LINVRNT(opg->ops_temp || osc_page_protected(env, opg, CLM_READ, 1));
@@ -414,7 +395,7 @@ static void osc_page_delete(const struct lu_env *env,
         ENTRY;
         CDEBUG(D_TRACE, "%p\n", opg);
         osc_page_transfer_put(env, opg);
-        rc = osc_teardown_async_page(osc_export(obj), NULL, obj->oo_oinfo, oap);
+       rc = osc_teardown_async_page(obj, opg);
         if (rc) {
                 CL_PAGE_DEBUG(D_ERROR, env, cl_page_top(slice->cpl_page),
                               "Trying to teardown failed: %d\n", rc);
@@ -455,7 +436,7 @@ static int osc_page_cancel(const struct lu_env *env,
          * is completed, or not even queued. */
         if (opg->ops_transfer_pinned)
                 /* FIXME: may not be interrupted.. */
-                rc = osc_oap_interrupted(env, oap);
+               rc = osc_cancel_async_page(env, opg);
         LASSERT(ergo(rc == 0, opg->ops_transfer_pinned == 0));
         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
         return rc;
@@ -480,136 +461,6 @@ static const struct cl_page_operations osc_page_ops = {
         .cpo_cancel         = osc_page_cancel
 };
 
-static int osc_make_ready(const struct lu_env *env, void *data, int cmd)
-{
-        struct osc_page *opg  = data;
-        struct cl_page  *page = cl_page_top(opg->ops_cl.cpl_page);
-        int result;
-
-        LASSERT(cmd == OBD_BRW_WRITE); /* no cached reads */
-        LINVRNT(osc_page_protected(env, opg, CLM_WRITE, 1));
-
-        ENTRY;
-        result = cl_page_make_ready(env, page, CRT_WRITE);
-        if (result == 0)
-                opg->ops_submit_time = cfs_time_current();
-        RETURN(result);
-}
-
-static int osc_refresh_count(const struct lu_env *env, void *data, int cmd)
-{
-        struct cl_page   *page;
-        struct osc_page  *osc = data;
-        struct cl_object *obj;
-        struct cl_attr   *attr = &osc_env_info(env)->oti_attr;
-
-        int result;
-        loff_t kms;
-
-        LINVRNT(osc_page_protected(env, osc, CLM_READ, 1));
-
-        /* readpage queues with _COUNT_STABLE, shouldn't get here. */
-        LASSERT(!(cmd & OBD_BRW_READ));
-        LASSERT(osc != NULL);
-        page = osc->ops_cl.cpl_page;
-        obj = osc->ops_cl.cpl_obj;
-
-        cl_object_attr_lock(obj);
-        result = cl_object_attr_get(env, obj, attr);
-        cl_object_attr_unlock(obj);
-        if (result < 0)
-                return result;
-        kms = attr->cat_kms;
-        if (cl_offset(obj, page->cp_index) >= kms)
-                /* catch race with truncate */
-                return 0;
-        else if (cl_offset(obj, page->cp_index + 1) > kms)
-                /* catch sub-page write at end of file */
-                return kms % CFS_PAGE_SIZE;
-        else
-                return CFS_PAGE_SIZE;
-}
-
-static int osc_completion(const struct lu_env *env,
-                          void *data, int cmd, struct obdo *oa, int rc)
-{
-        struct osc_page       *opg  = data;
-        struct osc_async_page *oap  = &opg->ops_oap;
-        struct cl_page        *page = cl_page_top(opg->ops_cl.cpl_page);
-        struct osc_object     *obj  = cl2osc(opg->ops_cl.cpl_obj);
-        enum cl_req_type crt;
-        int srvlock;
-
-        LINVRNT(osc_page_protected(env, opg, CLM_READ, 1));
-
-        ENTRY;
-
-        cmd &= ~OBD_BRW_NOQUOTA;
-        LASSERT(equi(page->cp_state == CPS_PAGEIN,  cmd == OBD_BRW_READ));
-        LASSERT(equi(page->cp_state == CPS_PAGEOUT, cmd == OBD_BRW_WRITE));
-        LASSERT(opg->ops_transfer_pinned);
-
-        /*
-         * page->cp_req can be NULL if io submission failed before
-         * cl_req was allocated.
-         */
-        if (page->cp_req != NULL)
-                cl_req_page_done(env, page);
-        LASSERT(page->cp_req == NULL);
-
-        /* As the transfer for this page is being done, clear the flags */
-        cfs_spin_lock(&oap->oap_lock);
-        oap->oap_async_flags = 0;
-        cfs_spin_unlock(&oap->oap_lock);
-
-        crt = cmd == OBD_BRW_READ ? CRT_READ : CRT_WRITE;
-        /* Clear opg->ops_transfer_pinned before VM lock is released. */
-        opg->ops_transfer_pinned = 0;
-
-        cfs_spin_lock(&obj->oo_seatbelt);
-        LASSERT(opg->ops_submitter != NULL);
-        LASSERT(!cfs_list_empty(&opg->ops_inflight));
-        cfs_list_del_init(&opg->ops_inflight);
-        cfs_spin_unlock(&obj->oo_seatbelt);
-
-        opg->ops_submit_time = 0;
-        srvlock = oap->oap_brw_flags & OBD_BRW_SRVLOCK;
-
-        cl_page_completion(env, page, crt, rc);
-
-        /* statistic */
-        if (rc == 0 && srvlock) {
-                struct lu_device *ld    = opg->ops_cl.cpl_obj->co_lu.lo_dev;
-                struct osc_stats *stats = &lu2osc_dev(ld)->od_stats;
-                int bytes = oap->oap_count;
-
-                if (crt == CRT_READ)
-                        stats->os_lockless_reads += bytes;
-                else
-                        stats->os_lockless_writes += bytes;
-        }
-
-        /*
-         * This has to be the last operation with the page, as locks are
-         * released in cl_page_completion() and nothing except for the
-         * reference counter protects page from concurrent reclaim.
-         */
-        lu_ref_del(&page->cp_reference, "transfer", page);
-        /*
-         * As page->cp_obj is pinned by a reference from page->cp_req, it is
-         * safe to call cl_page_put() without risking object destruction in a
-         * non-blocking context.
-         */
-        cl_page_put(env, page);
-        RETURN(0);
-}
-
-const static struct obd_async_page_ops osc_async_page_ops = {
-        .ap_make_ready    = osc_make_ready,
-        .ap_refresh_count = osc_refresh_count,
-        .ap_completion    = osc_completion
-};
-
 struct cl_page *osc_page_init(const struct lu_env *env,
                               struct cl_object *obj,
                               struct cl_page *page, cfs_page_t *vmpage)
@@ -620,16 +471,11 @@ struct cl_page *osc_page_init(const struct lu_env *env,
 
         OBD_SLAB_ALLOC_PTR_GFP(opg, osc_page_kmem, CFS_ALLOC_IO);
         if (opg != NULL) {
-                void *oap = &opg->ops_oap;
-
                 opg->ops_from = 0;
                 opg->ops_to   = CFS_PAGE_SIZE;
 
-                result = osc_prep_async_page(osc_export(osc),
-                                             NULL, osc->oo_oinfo, vmpage,
-                                             cl_offset(obj, page->cp_index),
-                                             &osc_async_page_ops,
-                                             opg, (void **)&oap, 1, NULL);
+               result = osc_prep_async_page(osc, opg, vmpage,
+                                            cl_offset(obj, page->cp_index));
                 if (result == 0) {
                         struct osc_io *oio = osc_env_io(env);
                         opg->ops_srvlock = osc_io_srvlock(oio);
@@ -657,39 +503,13 @@ void osc_io_submit_page(const struct lu_env *env,
                         struct osc_io *oio, struct osc_page *opg,
                         enum cl_req_type crt)
 {
-        struct osc_async_page *oap = &opg->ops_oap;
-        struct client_obd     *cli = oap->oap_cli;
-        int flags = 0;
-
         LINVRNT(osc_page_protected(env, opg,
                                    crt == CRT_WRITE ? CLM_WRITE : CLM_READ, 1));
 
-        oap->oap_page_off   = opg->ops_from;
-        oap->oap_count      = opg->ops_to - opg->ops_from;
-        /* Give a hint to OST that requests are coming from kswapd - bug19529 */
-        if (cfs_memory_pressure_get())
-                oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
-        oap->oap_brw_flags |= OBD_BRW_SYNC;
-        if (osc_io_srvlock(oio))
-                oap->oap_brw_flags |= OBD_BRW_SRVLOCK;
-
-        oap->oap_cmd = crt == CRT_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ;
-        if (!client_is_remote(osc_export(cl2osc(opg->ops_cl.cpl_obj))) &&
-            cfs_capable(CFS_CAP_SYS_RESOURCE)) {
-                oap->oap_brw_flags |= OBD_BRW_NOQUOTA;
-                oap->oap_cmd |= OBD_BRW_NOQUOTA;
-        }
-
-        if (oap->oap_cmd & OBD_BRW_READ)
-                flags = ASYNC_COUNT_STABLE;
-        else if (!(oap->oap_brw_page.flag & OBD_BRW_FROM_GRANT))
-                osc_enter_cache_try(env, cli, oap->oap_loi, oap, 1);
-
-        cfs_spin_lock(&oap->oap_lock);
-        oap->oap_async_flags |= OSC_FLAGS | flags;
-        cfs_spin_unlock(&oap->oap_lock);
+       osc_queue_sync_page(env, opg,
+                           crt == CRT_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
+                           osc_io_srvlock(oio) ? OBD_BRW_SRVLOCK : 0);
 
-        osc_oap_to_pending(oap);
         osc_page_transfer_get(opg, "transfer\0imm");
         osc_page_transfer_add(env, opg, crt);
 }
index ba5946b..0de44b7 100644 (file)
 #include <lustre_debug.h>
 #include <lustre_param.h>
 #include "osc_internal.h"
+#include "osc_cl_internal.h"
 
 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
 static int brw_interpret(const struct lu_env *env,
                          struct ptlrpc_request *req, void *data, int rc);
-static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli,
-                            int ptlrpc);
 int osc_cleanup(struct obd_device *obd);
 
 /* Pack OSC object metadata for disk storage (LE byte order). */
@@ -809,7 +808,7 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
 
 }
 
-static void osc_update_next_shrink(struct client_obd *cli)
+void osc_update_next_shrink(struct client_obd *cli)
 {
         cli->cl_next_shrink_grant =
                 cfs_time_shift(cli->cl_grant_shrink_interval);
@@ -817,127 +816,6 @@ static void osc_update_next_shrink(struct client_obd *cli)
                cli->cl_next_shrink_grant);
 }
 
-/* caller must hold loi_list_lock */
-static void osc_consume_write_grant(struct client_obd *cli,
-                                    struct brw_page *pga)
-{
-        LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
-        LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
-        cfs_atomic_inc(&obd_dirty_pages);
-        cli->cl_dirty += CFS_PAGE_SIZE;
-        cli->cl_avail_grant -= CFS_PAGE_SIZE;
-        pga->flag |= OBD_BRW_FROM_GRANT;
-        CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
-               CFS_PAGE_SIZE, pga, pga->pg);
-        LASSERT(cli->cl_avail_grant >= 0);
-        osc_update_next_shrink(cli);
-}
-
-/* the companion to osc_consume_write_grant, called when a brw has completed.
- * must be called with the loi lock held. */
-static void osc_release_write_grant(struct client_obd *cli,
-                                    struct brw_page *pga, int sent)
-{
-        int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
-        ENTRY;
-
-        LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
-        if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
-                EXIT;
-                return;
-        }
-
-        pga->flag &= ~OBD_BRW_FROM_GRANT;
-        cfs_atomic_dec(&obd_dirty_pages);
-        cli->cl_dirty -= CFS_PAGE_SIZE;
-        if (pga->flag & OBD_BRW_NOCACHE) {
-                pga->flag &= ~OBD_BRW_NOCACHE;
-                cfs_atomic_dec(&obd_dirty_transit_pages);
-                cli->cl_dirty_transit -= CFS_PAGE_SIZE;
-        }
-        if (!sent) {
-                /* Reclaim grant from truncated pages. This is used to solve
-                 * write-truncate and grant all gone(to lost_grant) problem.
-                 * For a vfs write this problem can be easily solved by a sync
-                 * write, however, this is not an option for page_mkwrite()
-                 * because grant has to be allocated before a page becomes
-                 * dirty. */
-                if (cli->cl_avail_grant < PTLRPC_MAX_BRW_SIZE)
-                        cli->cl_avail_grant += CFS_PAGE_SIZE;
-                else
-                        cli->cl_lost_grant += CFS_PAGE_SIZE;
-                CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
-                       cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
-        } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
-                /* For short writes we shouldn't count parts of pages that
-                 * span a whole block on the OST side, or our accounting goes
-                 * wrong.  Should match the code in filter_grant_check. */
-                int offset = pga->off & ~CFS_PAGE_MASK;
-                int count = pga->count + (offset & (blocksize - 1));
-                int end = (offset + pga->count) & (blocksize - 1);
-                if (end)
-                        count += blocksize - end;
-
-                cli->cl_lost_grant += CFS_PAGE_SIZE - count;
-                CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
-                       CFS_PAGE_SIZE - count, cli->cl_lost_grant,
-                       cli->cl_avail_grant, cli->cl_dirty);
-        }
-
-        EXIT;
-}
-
-static unsigned long rpcs_in_flight(struct client_obd *cli)
-{
-        return cli->cl_r_in_flight + cli->cl_w_in_flight;
-}
-
-/* caller must hold loi_list_lock */
-void osc_wake_cache_waiters(struct client_obd *cli)
-{
-        cfs_list_t *l, *tmp;
-        struct osc_cache_waiter *ocw;
-
-        ENTRY;
-        cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
-                /* if we can't dirty more, we must wait until some is written */
-                if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
-                   (cfs_atomic_read(&obd_dirty_pages) + 1 >
-                    obd_max_dirty_pages)) {
-                        CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
-                               "osc max %ld, sys max %d\n", cli->cl_dirty,
-                               cli->cl_dirty_max, obd_max_dirty_pages);
-                        return;
-                }
-
-                /* if still dirty cache but no grant wait for pending RPCs that
-                 * may yet return us some grant before doing sync writes */
-                if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
-                        CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
-                               cli->cl_w_in_flight);
-                        return;
-                }
-
-                ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
-                cfs_list_del_init(&ocw->ocw_entry);
-                if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
-                        /* no more RPCs in flight to return grant, do sync IO */
-                        ocw->ocw_rc = -EDQUOT;
-                        CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
-                } else {
-                        osc_consume_write_grant(cli,
-                                                &ocw->ocw_oap->oap_brw_page);
-                }
-
-                CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld\n",
-                       ocw, ocw->ocw_oap, cli->cl_avail_grant);
-
-                cfs_waitq_signal(&ocw->ocw_waitq);
-        }
-
-        EXIT;
-}
-
 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
 {
         client_obd_list_lock(&cli->cl_loi_list_lock);
@@ -1977,272 +1855,12 @@ out:
         RETURN(rc);
 }
 
-/* The companion to osc_enter_cache(), called when @oap is no longer part of
- * the dirty accounting.  Writeback completes or truncate happens before
- * writing starts.  Must be called with the loi lock held. */
-static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
-                           int sent)
-{
-        osc_release_write_grant(cli, &oap->oap_brw_page, sent);
-}
-
-
-/* This maintains the lists of pending pages to read/write for a given object
- * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
- * to quickly find objects that are ready to send an RPC. */
-static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
-                         int cmd)
-{
-        ENTRY;
-
-        if (lop->lop_num_pending == 0)
-                RETURN(0);
-
-        /* if we have an invalid import we want to drain the queued pages
-         * by forcing them through rpcs that immediately fail and complete
-         * the pages.  recovery relies on this to empty the queued pages
-         * before canceling the locks and evicting down the llite pages */
-        if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
-                RETURN(1);
-
-        /* stream rpcs in queue order as long as as there is an urgent page
-         * queued.  this is our cheap solution for good batching in the case
-         * where writepage marks some random page in the middle of the file
-         * as urgent because of, say, memory pressure */
-        if (!cfs_list_empty(&lop->lop_urgent)) {
-                CDEBUG(D_CACHE, "urgent request forcing RPC\n");
-                RETURN(1);
-        }
-
-        if (cmd & OBD_BRW_WRITE) {
-                /* trigger a write rpc stream as long as there are dirtiers
-                 * waiting for space.  as they're waiting, they're not going to
-                 * create more pages to coalesce with what's waiting.. */
-                if (!cfs_list_empty(&cli->cl_cache_waiters)) {
-                        CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
-                        RETURN(1);
-                }
-        }
-        if (lop->lop_num_pending >= cli->cl_max_pages_per_rpc)
-                RETURN(1);
-
-        RETURN(0);
-}
-
-static int lop_makes_hprpc(struct loi_oap_pages *lop)
-{
-        struct osc_async_page *oap;
-        ENTRY;
-
-        if (cfs_list_empty(&lop->lop_urgent))
-                RETURN(0);
-
-        oap = cfs_list_entry(lop->lop_urgent.next,
-                         struct osc_async_page, oap_urgent_item);
-
-        if (oap->oap_async_flags & ASYNC_HP) {
-                CDEBUG(D_CACHE, "hp request forcing RPC\n");
-                RETURN(1);
-        }
-
-        RETURN(0);
-}
-
-static void on_list(cfs_list_t *item, cfs_list_t *list,
-                    int should_be_on)
-{
-        if (cfs_list_empty(item) && should_be_on)
-                cfs_list_add_tail(item, list);
-        else if (!cfs_list_empty(item) && !should_be_on)
-                cfs_list_del_init(item);
-}
-
-/* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
- * can find pages to build into rpcs quickly */
-void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
-{
-        if (lop_makes_hprpc(&loi->loi_write_lop) ||
-            lop_makes_hprpc(&loi->loi_read_lop)) {
-                /* HP rpc */
-                on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
-                on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
-        } else {
-                on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
-                on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
-                        lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
-                        lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
-        }
-
-        on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
-                loi->loi_write_lop.lop_num_pending);
-
-        on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
-                loi->loi_read_lop.lop_num_pending);
-}
-
-static void lop_update_pending(struct client_obd *cli,
-                               struct loi_oap_pages *lop, int cmd, int delta)
-{
-        lop->lop_num_pending += delta;
-        if (cmd & OBD_BRW_WRITE)
-                cli->cl_pending_w_pages += delta;
-        else
-                cli->cl_pending_r_pages += delta;
-}
-
-/**
- * this is called when a sync waiter receives an interruption.  Its job is to
- * get the caller woken as soon as possible.  If its page hasn't been put in an
- * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
- * desiring interruption which will forcefully complete the rpc once the rpc
- * has timed out.
- */
-int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
-{
-        struct loi_oap_pages *lop;
-        struct lov_oinfo *loi;
-        int rc = -EBUSY;
-        ENTRY;
-
-        LASSERT(!oap->oap_interrupted);
-        oap->oap_interrupted = 1;
-
-        /* ok, it's been put in an rpc. only one oap gets a request reference */
-        if (oap->oap_request != NULL) {
-                ptlrpc_mark_interrupted(oap->oap_request);
-                ptlrpcd_wake(oap->oap_request);
-                ptlrpc_req_finished(oap->oap_request);
-                oap->oap_request = NULL;
-        }
-
-        /*
-         * page completion may be called only if ->cpo_prep() method was
-         * executed by osc_io_submit(), that also adds page the to pending list
-         */
-        if (!cfs_list_empty(&oap->oap_pending_item)) {
-                cfs_list_del_init(&oap->oap_pending_item);
-                cfs_list_del_init(&oap->oap_urgent_item);
-
-                loi = oap->oap_loi;
-                lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
-                        &loi->loi_write_lop : &loi->loi_read_lop;
-                lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
-                loi_list_maint(oap->oap_cli, oap->oap_loi);
-                rc = oap->oap_caller_ops->ap_completion(env,
-                                          oap->oap_caller_data,
-                                          oap->oap_cmd, NULL, -EINTR);
-        }
-
-        RETURN(rc);
-}
-
-/* this is trying to propogate async writeback errors back up to the
- * application.  As an async write fails we record the error code for later if
- * the app does an fsync.  As long as errors persist we force future rpcs to be
- * sync so that the app can get a sync error and break the cycle of queueing
- * pages for which writeback will fail. */
-static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
-                           int rc)
-{
-        if (rc) {
-                if (!ar->ar_rc)
-                        ar->ar_rc = rc;
-
-                ar->ar_force_sync = 1;
-                ar->ar_min_xid = ptlrpc_sample_next_xid();
-                return;
-
-        }
-
-        if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
-                ar->ar_force_sync = 0;
-}
-
-void osc_oap_to_pending(struct osc_async_page *oap)
-{
-        struct loi_oap_pages *lop;
-
-        if (oap->oap_cmd & OBD_BRW_WRITE)
-                lop = &oap->oap_loi->loi_write_lop;
-        else
-                lop = &oap->oap_loi->loi_read_lop;
-
-        if (oap->oap_async_flags & ASYNC_HP)
-                cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
-        else if (oap->oap_async_flags & ASYNC_URGENT)
-                cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
-        cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
-        lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
-}
-
-/* this must be called holding the loi list lock to give coverage to exit_cache,
- * async_flag maintenance, and oap_request */
-static void osc_ap_completion(const struct lu_env *env,
-                              struct client_obd *cli, struct obdo *oa,
-                              struct osc_async_page *oap, int sent, int rc)
-{
-        __u64 xid = 0;
-
-        ENTRY;
-        if (oap->oap_request != NULL) {
-                xid = ptlrpc_req_xid(oap->oap_request);
-                ptlrpc_req_finished(oap->oap_request);
-                oap->oap_request = NULL;
-        }
-
-        cfs_spin_lock(&oap->oap_lock);
-        oap->oap_async_flags = 0;
-        cfs_spin_unlock(&oap->oap_lock);
-        oap->oap_interrupted = 0;
-
-        if (oap->oap_cmd & OBD_BRW_WRITE) {
-                osc_process_ar(&cli->cl_ar, xid, rc);
-                osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
-        }
-
-        if (rc == 0 && oa != NULL) {
-                if (oa->o_valid & OBD_MD_FLBLOCKS)
-                        oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
-                if (oa->o_valid & OBD_MD_FLMTIME)
-                        oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
-                if (oa->o_valid & OBD_MD_FLATIME)
-                        oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
-                if (oa->o_valid & OBD_MD_FLCTIME)
-                        oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
-        }
-
-        rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
-                                                oap->oap_cmd, oa, rc);
-
-        /* cl_page_completion() drops PG_locked. so, a new I/O on the page could
-         * start, but OSC calls it under lock and thus we can add oap back to
-         * pending safely */
-        if (rc)
-                /* upper layer wants to leave the page on pending queue */
-                osc_oap_to_pending(oap);
-        else
-                osc_exit_cache(cli, oap, sent);
-        EXIT;
-}
-
-static int brw_queue_work(const struct lu_env *env, void *data)
-{
-        struct client_obd *cli = data;
-
-        CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
-
-        client_obd_list_lock(&cli->cl_loi_list_lock);
-        osc_check_rpcs0(env, cli, 1);
-        client_obd_list_unlock(&cli->cl_loi_list_lock);
-        RETURN(0);
-}
-
 static int brw_interpret(const struct lu_env *env,
                          struct ptlrpc_request *req, void *data, int rc)
 {
         struct osc_brw_async_args *aa = data;
+       struct osc_async_page *oap, *tmp;
         struct client_obd *cli;
-        int async;
         ENTRY;
 
         rc = osc_brw_fini_request(req, rc);
@@ -2288,45 +1906,38 @@ static int brw_interpret(const struct lu_env *env,
         else
                 cli->cl_r_in_flight--;
 
-        async = cfs_list_empty(&aa->aa_oaps);
-        if (!async) { /* from osc_send_oap_rpc() */
-                struct osc_async_page *oap, *tmp;
-                /* the caller may re-use the oap after the completion call so
-                 * we need to clean it up a little */
-                cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
-                                             oap_rpc_item) {
-                        cfs_list_del_init(&oap->oap_rpc_item);
-                        osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
-                }
-                OBDO_FREE(aa->aa_oa);
-        } else { /* from async_internal() */
-                obd_count i;
-                for (i = 0; i < aa->aa_page_count; i++)
-                        osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
-        }
-        osc_wake_cache_waiters(cli);
-        osc_check_rpcs0(env, cli, 1);
-        client_obd_list_unlock(&cli->cl_loi_list_lock);
+       /* the caller may re-use the oap after the completion call so
+        * we need to clean it up a little */
+       cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
+                       oap_rpc_item) {
+               cfs_list_del_init(&oap->oap_rpc_item);
+               osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
+       }
+       OBDO_FREE(aa->aa_oa);
 
-        if (!async)
-                cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
-                                  req->rq_bulk->bd_nob_transferred);
-        osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
-        ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
+       osc_wake_cache_waiters(cli);
+       osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
+       client_obd_list_unlock(&cli->cl_loi_list_lock);
 
-        RETURN(rc);
+       cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
+                         req->rq_bulk->bd_nob_transferred);
+       osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
+       ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
+
+       RETURN(rc);
 }
 
-static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
-                                            struct client_obd *cli,
-                                            cfs_list_t *rpc_list,
-                                            int page_count, int cmd)
+/* The most tricky part of this function is that it will return with
+ * cli->cli_loi_list_lock held.
+ */
+int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
+                 cfs_list_t *rpc_list, int page_count, int cmd,
+                 pdl_policy_t pol)
 {
-        struct ptlrpc_request *req;
-        struct brw_page **pga = NULL;
-        struct osc_brw_async_args *aa;
+       struct ptlrpc_request *req = NULL;
+       struct brw_page **pga = NULL;
+       struct osc_brw_async_args *aa = NULL;
         struct obdo *oa = NULL;
-        const struct obd_async_page_ops *ops = NULL;
         struct osc_async_page *oap;
         struct osc_async_page *tmp;
         struct cl_req *clerq = NULL;
@@ -2344,23 +1955,21 @@ static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
         memset(&crattr, 0, sizeof crattr);
         OBD_ALLOC(pga, sizeof(*pga) * page_count);
         if (pga == NULL)
-                GOTO(out, req = ERR_PTR(-ENOMEM));
-
-        OBDO_ALLOC(oa);
-        if (oa == NULL)
-                GOTO(out, req = ERR_PTR(-ENOMEM));
-
-        i = 0;
-        cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
-                struct cl_page *page = osc_oap2cl_page(oap);
-                if (ops == NULL) {
-                        ops = oap->oap_caller_ops;
-
-                        clerq = cl_req_alloc(env, page, crt,
-                                             1 /* only 1-object rpcs for
-                                                * now */);
-                        if (IS_ERR(clerq))
-                                GOTO(out, req = (void *)clerq);
+               GOTO(out, rc = -ENOMEM);
+
+       OBDO_ALLOC(oa);
+       if (oa == NULL)
+               GOTO(out, rc = -ENOMEM);
+
+       i = 0;
+       cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
+               struct cl_page *page = osc_oap2cl_page(oap);
+               if (clerq == NULL) {
+                       clerq = cl_req_alloc(env, page, crt,
+                                            1 /* only 1-object rpcs for
+                                               * now */);
+                       if (IS_ERR(clerq))
+                               GOTO(out, rc = PTR_ERR(clerq));
                         lock = oap->oap_ldlm_lock;
                 }
                 pga[i] = &oap->oap_brw_page;
@@ -2372,7 +1981,7 @@ static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
         }
 
         /* always get the data for the obdo for the rpc */
-        LASSERT(ops != NULL);
+       LASSERT(clerq != NULL);
         crattr.cra_oa = oa;
         crattr.cra_capa = NULL;
        memset(crattr.cra_jobid, 0, JOBSTATS_JOBID_SIZE);
@@ -2385,7 +1994,7 @@ static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
         rc = cl_req_prep(env, clerq);
         if (rc != 0) {
                 CERROR("cl_req_prep failed: %d\n", rc);
-                GOTO(out, req = ERR_PTR(rc));
+               GOTO(out, rc);
         }
 
         sort_brw_pages(pga, page_count);
@@ -2393,9 +2002,10 @@ static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
                                   pga, &req, crattr.cra_capa, 1, 0);
         if (rc != 0) {
                 CERROR("prep_req failed: %d\n", rc);
-                GOTO(out, req = ERR_PTR(rc));
-        }
+               GOTO(out, rc);
+       }
 
+       req->rq_interpret_reply = brw_interpret;
         if (cmd & OBD_BRW_MEMALLOC)
                 req->rq_memalloc = 1;
 
@@ -2420,7 +2030,9 @@ out:
                 cfs_memory_pressure_restore(mpflag);
 
         capa_put(crattr.cra_capa);
-        if (IS_ERR(req)) {
+       if (rc != 0) {
+               LASSERT(req == NULL);
+
                 if (oa)
                         OBDO_FREE(oa);
                 if (pga)
@@ -2439,724 +2051,49 @@ out:
                                                   oap->oap_count);
                                 continue;
                         }
-                        osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
-                }
-                if (clerq && !IS_ERR(clerq))
-                        cl_req_completion(env, clerq, PTR_ERR(req));
-        }
-        RETURN(req);
-}
-
-/**
- * prepare pages for ASYNC io and put pages in send queue.
- *
- * \param cmd OBD_BRW_* macroses
- * \param lop pending pages
- *
- * \return zero if no page added to send queue.
- * \return 1 if pages successfully added to send queue.
- * \return negative on errors.
- */
-static int
-osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
-                 struct lov_oinfo *loi, int cmd,
-                 struct loi_oap_pages *lop, pdl_policy_t pol)
-{
-        struct ptlrpc_request *req;
-        obd_count page_count = 0;
-        struct osc_async_page *oap = NULL, *tmp;
-        struct osc_brw_async_args *aa;
-        const struct obd_async_page_ops *ops;
-        CFS_LIST_HEAD(rpc_list);
-        int srvlock = 0, mem_tight = 0;
-        struct cl_object *clob = NULL;
-        obd_off starting_offset = OBD_OBJECT_EOF;
-        unsigned int ending_offset;
-        int starting_page_off = 0;
-        ENTRY;
-
-        /* ASYNC_HP pages first. At present, when the lock the pages is
-         * to be canceled, the pages covered by the lock will be sent out
-         * with ASYNC_HP. We have to send out them as soon as possible. */
-        cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
-                if (oap->oap_async_flags & ASYNC_HP)
-                        cfs_list_move(&oap->oap_pending_item, &rpc_list);
-                else if (!(oap->oap_brw_flags & OBD_BRW_SYNC))
-                        /* only do this for writeback pages. */
-                        cfs_list_move_tail(&oap->oap_pending_item, &rpc_list);
-                if (++page_count >= cli->cl_max_pages_per_rpc)
-                        break;
-        }
-        cfs_list_splice_init(&rpc_list, &lop->lop_pending);
-        page_count = 0;
-
-        /* first we find the pages we're allowed to work with */
-        cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
-                                     oap_pending_item) {
-                ops = oap->oap_caller_ops;
-
-                LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
-                         "magic 0x%x\n", oap, oap->oap_magic);
-
-                if (clob == NULL) {
-                        /* pin object in memory, so that completion call-backs
-                         * can be safely called under client_obd_list lock. */
-                        clob = osc_oap2cl_page(oap)->cp_obj;
-                        cl_object_get(clob);
-                }
-
-                if (page_count != 0 &&
-                    srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
-                        CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
-                               " oap %p, page %p, srvlock %u\n",
-                               oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
-                        break;
-                }
-
-                /* If there is a gap at the start of this page, it can't merge
-                 * with any previous page, so we'll hand the network a
-                 * "fragmented" page array that it can't transfer in 1 RDMA */
-                if (oap->oap_obj_off < starting_offset) {
-                        if (starting_page_off != 0)
-                                break;
-
-                        starting_page_off = oap->oap_page_off;
-                        starting_offset = oap->oap_obj_off + starting_page_off;
-                } else if (oap->oap_page_off != 0)
-                        break;
-
-                /* in llite being 'ready' equates to the page being locked
-                 * until completion unlocks it.  commit_write submits a page
-                 * as not ready because its unlock will happen unconditionally
-                 * as the call returns.  if we race with commit_write giving
-                 * us that page we don't want to create a hole in the page
-                 * stream, so we stop and leave the rpc to be fired by
-                 * another dirtier or kupdated interval (the not ready page
-                 * will still be on the dirty list).  we could call in
-                 * at the end of ll_file_write to process the queue again. */
-                if (!(oap->oap_async_flags & ASYNC_READY)) {
-                        int rc = ops->ap_make_ready(env, oap->oap_caller_data,
-                                                    cmd);
-                        if (rc < 0)
-                                CDEBUG(D_INODE, "oap %p page %p returned %d "
-                                                "instead of ready\n", oap,
-                                                oap->oap_page, rc);
-                        switch (rc) {
-                        case -EAGAIN:
-                                /* llite is telling us that the page is still
-                                 * in commit_write and that we should try
-                                 * and put it in an rpc again later.  we
-                                 * break out of the loop so we don't create
-                                 * a hole in the sequence of pages in the rpc
-                                 * stream.*/
-                                oap = NULL;
-                                break;
-                        case -EINTR:
-                                /* the io isn't needed.. tell the checks
-                                 * below to complete the rpc with EINTR */
-                                cfs_spin_lock(&oap->oap_lock);
-                                oap->oap_async_flags |= ASYNC_COUNT_STABLE;
-                                cfs_spin_unlock(&oap->oap_lock);
-                                oap->oap_count = -EINTR;
-                                break;
-                        case 0:
-                                cfs_spin_lock(&oap->oap_lock);
-                                oap->oap_async_flags |= ASYNC_READY;
-                                cfs_spin_unlock(&oap->oap_lock);
-                                break;
-                        default:
-                                LASSERTF(0, "oap %p page %p returned %d "
-                                            "from make_ready\n", oap,
-                                            oap->oap_page, rc);
-                                break;
-                        }
-                }
-                if (oap == NULL)
-                        break;
-
-                /* take the page out of our book-keeping */
-                cfs_list_del_init(&oap->oap_pending_item);
-                lop_update_pending(cli, lop, cmd, -1);
-                cfs_list_del_init(&oap->oap_urgent_item);
-
-                /* ask the caller for the size of the io as the rpc leaves. */
-                if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
-                        oap->oap_count =
-                                ops->ap_refresh_count(env, oap->oap_caller_data,
-                                                      cmd);
-                        LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
-                }
-                if (oap->oap_count <= 0) {
-                        CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
-                               oap->oap_count);
-                        osc_ap_completion(env, cli, NULL,
-                                          oap, 0, oap->oap_count);
-                        continue;
-                }
-
-                /* now put the page back in our accounting */
-                cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
-                if (page_count++ == 0)
-                        srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
-
-                if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
-                        mem_tight = 1;
-
-                /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
-                 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
-                 * have the same alignment as the initial writes that allocated
-                 * extents on the server. */
-                ending_offset = oap->oap_obj_off + oap->oap_page_off +
-                                oap->oap_count;
-                if (!(ending_offset & (PTLRPC_MAX_BRW_SIZE - 1)))
-                        break;
-
-                if (page_count >= cli->cl_max_pages_per_rpc)
-                        break;
-
-                /* If there is a gap at the end of this page, it can't merge
-                 * with any subsequent pages, so we'll hand the network a
-                 * "fragmented" page array that it can't transfer in 1 RDMA */
-                if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
-                        break;
-        }
-
-        loi_list_maint(cli, loi);
-
-        client_obd_list_unlock(&cli->cl_loi_list_lock);
-
-        if (clob != NULL)
-                cl_object_put(env, clob);
-
-        if (page_count == 0) {
-                client_obd_list_lock(&cli->cl_loi_list_lock);
-                RETURN(0);
-        }
-
-        req = osc_build_req(env, cli, &rpc_list, page_count,
-                            mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
-        if (IS_ERR(req)) {
-                LASSERT(cfs_list_empty(&rpc_list));
-                loi_list_maint(cli, loi);
-                RETURN(PTR_ERR(req));
-        }
-
-        aa = ptlrpc_req_async_args(req);
-
-        starting_offset &= PTLRPC_MAX_BRW_SIZE - 1;
-        if (cmd == OBD_BRW_READ) {
-                lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
-                lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
-                lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
-                                      (starting_offset >> CFS_PAGE_SHIFT) + 1);
-        } else {
-                lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
-                lprocfs_oh_tally(&cli->cl_write_rpc_hist,
-                                 cli->cl_w_in_flight);
-                lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
-                                      (starting_offset >> CFS_PAGE_SHIFT) + 1);
-        }
-
-        client_obd_list_lock(&cli->cl_loi_list_lock);
-
-        if (cmd == OBD_BRW_READ)
-                cli->cl_r_in_flight++;
-        else
-                cli->cl_w_in_flight++;
-
-        /* queued sync pages can be torn down while the pages
-         * were between the pending list and the rpc */
-        tmp = NULL;
-        cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
-                /* only one oap gets a request reference */
-                if (tmp == NULL)
-                        tmp = oap;
-                if (oap->oap_interrupted && !req->rq_intr) {
-                        CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
-                               oap, req);
-                        ptlrpc_mark_interrupted(req);
-                }
-        }
-        if (tmp != NULL)
-                tmp->oap_request = ptlrpc_request_addref(req);
-
-        DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
-                  page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
-
-        req->rq_interpret_reply = brw_interpret;
-
-        /* XXX: Maybe the caller can check the RPC bulk descriptor to see which
-         *      CPU/NUMA node the majority of pages were allocated on, and try
-         *      to assign the async RPC to the CPU core (PDL_POLICY_PREFERRED)
-         *      to reduce cross-CPU memory traffic.
-         *
-         *      But on the other hand, we expect that multiple ptlrpcd threads
-         *      and the initial write sponsor can run in parallel, especially
-         *      when data checksum is enabled, which is CPU-bound operation and
-         *      single ptlrpcd thread cannot process in time. So more ptlrpcd
-         *      threads sharing BRW load (with PDL_POLICY_ROUND) seems better.
-         */
-        ptlrpcd_add_req(req, pol, -1);
-        RETURN(1);
-}
-
-#define LOI_DEBUG(LOI, STR, args...)                                     \
-        CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
-               !cfs_list_empty(&(LOI)->loi_ready_item) ||                \
-               !cfs_list_empty(&(LOI)->loi_hp_ready_item),               \
-               (LOI)->loi_write_lop.lop_num_pending,                     \
-               !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent),        \
-               (LOI)->loi_read_lop.lop_num_pending,                      \
-               !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent),         \
-               args)                                                     \
-
-/* This is called by osc_check_rpcs() to find which objects have pages that
- * we could be sending.  These lists are maintained by lop_makes_rpc(). */
-struct lov_oinfo *osc_next_loi(struct client_obd *cli)
-{
-        ENTRY;
-
-        /* First return objects that have blocked locks so that they
-         * will be flushed quickly and other clients can get the lock,
-         * then objects which have pages ready to be stuffed into RPCs */
-        if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
-                RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
-                                      struct lov_oinfo, loi_hp_ready_item));
-        if (!cfs_list_empty(&cli->cl_loi_ready_list))
-                RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
-                                      struct lov_oinfo, loi_ready_item));
-
-        /* then if we have cache waiters, return all objects with queued
-         * writes.  This is especially important when many small files
-         * have filled up the cache and not been fired into rpcs because
-         * they don't pass the nr_pending/object threshhold */
-        if (!cfs_list_empty(&cli->cl_cache_waiters) &&
-            !cfs_list_empty(&cli->cl_loi_write_list))
-                RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
-                                      struct lov_oinfo, loi_write_item));
-
-        /* then return all queued objects when we have an invalid import
-         * so that they get flushed */
-        if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
-                if (!cfs_list_empty(&cli->cl_loi_write_list))
-                        RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
-                                              struct lov_oinfo,
-                                              loi_write_item));
-                if (!cfs_list_empty(&cli->cl_loi_read_list))
-                        RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
-                                              struct lov_oinfo, loi_read_item));
-        }
-        RETURN(NULL);
-}
-
-static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
-{
-        struct osc_async_page *oap;
-        int hprpc = 0;
-
-        if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
-                oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
-                                     struct osc_async_page, oap_urgent_item);
-                hprpc = !!(oap->oap_async_flags & ASYNC_HP);
-        }
-
-        if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
-                oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
-                                     struct osc_async_page, oap_urgent_item);
-                hprpc = !!(oap->oap_async_flags & ASYNC_HP);
-        }
-
-        return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
-}
-
-/* called with the loi list lock held */
-static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli, int ptlrpc)
-{
-        struct lov_oinfo *loi;
-        int rc = 0, race_counter = 0;
-        pdl_policy_t pol;
-        ENTRY;
-
-        pol = ptlrpc ? PDL_POLICY_SAME : PDL_POLICY_ROUND;
-
-        while ((loi = osc_next_loi(cli)) != NULL) {
-                LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
-
-                if (osc_max_rpc_in_flight(cli, loi))
-                        break;
-
-                /* attempt some read/write balancing by alternating between
-                 * reads and writes in an object.  The makes_rpc checks here
-                 * would be redundant if we were getting read/write work items
-                 * instead of objects.  we don't want send_oap_rpc to drain a
-                 * partial read pending queue when we're given this object to
-                 * do io on writes while there are cache waiters */
-                if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
-                        rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
-                                              &loi->loi_write_lop, pol);
-                        if (rc < 0) {
-                                CERROR("Write request failed with %d\n", rc);
-
-                                /* osc_send_oap_rpc failed, mostly because of
-                                 * memory pressure.
-                                 *
-                                 * It can't break here, because if:
-                                 *  - a page was submitted by osc_io_submit, so
-                                 *    page locked;
-                                 *  - no request in flight
-                                 *  - no subsequent request
-                                 * The system will be in live-lock state,
-                                 * because there is no chance to call
-                                 * osc_io_unplug() and osc_check_rpcs() any
-                                 * more. pdflush can't help in this case,
-                                 * because it might be blocked at grabbing
-                                 * the page lock as we mentioned.
-                                 *
-                                 * Anyway, continue to drain pages. */
-                                /* break; */
-                        }
-
-                        if (rc > 0)
-                                race_counter = 0;
-                        else if (rc == 0)
-                                race_counter++;
-                }
-                if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
-                        rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
-                                              &loi->loi_read_lop, pol);
-                        if (rc < 0)
-                                CERROR("Read request failed with %d\n", rc);
-
-                        if (rc > 0)
-                                race_counter = 0;
-                        else if (rc == 0)
-                                race_counter++;
-                }
-
-                /* attempt some inter-object balancing by issuing rpcs
-                 * for each object in turn */
-                if (!cfs_list_empty(&loi->loi_hp_ready_item))
-                        cfs_list_del_init(&loi->loi_hp_ready_item);
-                if (!cfs_list_empty(&loi->loi_ready_item))
-                        cfs_list_del_init(&loi->loi_ready_item);
-                if (!cfs_list_empty(&loi->loi_write_item))
-                        cfs_list_del_init(&loi->loi_write_item);
-                if (!cfs_list_empty(&loi->loi_read_item))
-                        cfs_list_del_init(&loi->loi_read_item);
-
-                loi_list_maint(cli, loi);
-
-                /* send_oap_rpc fails with 0 when make_ready tells it to
-                 * back off.  llite's make_ready does this when it tries
-                 * to lock a page queued for write that is already locked.
-                 * we want to try sending rpcs from many objects, but we
-                 * don't want to spin failing with 0.  */
-                if (race_counter == 10)
-                        break;
-        }
-}
-
-void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
-{
-        osc_check_rpcs0(env, cli, 0);
-}
-
-/**
- * Non-blocking version of osc_enter_cache() that consumes grant only when it
- * is available.
- */
-int osc_enter_cache_try(const struct lu_env *env,
-                        struct client_obd *cli, struct lov_oinfo *loi,
-                        struct osc_async_page *oap, int transient)
-{
-        int has_grant;
-
-        has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
-        if (has_grant) {
-                osc_consume_write_grant(cli, &oap->oap_brw_page);
-                if (transient) {
-                        cli->cl_dirty_transit += CFS_PAGE_SIZE;
-                        cfs_atomic_inc(&obd_dirty_transit_pages);
-                        oap->oap_brw_flags |= OBD_BRW_NOCACHE;
-                }
-        }
-        return has_grant;
-}
-
-/* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
- * grant or cache space. */
-static int osc_enter_cache(const struct lu_env *env,
-                           struct client_obd *cli, struct lov_oinfo *loi,
-                           struct osc_async_page *oap)
-{
-        struct osc_cache_waiter ocw;
-        struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
-        int rc = -EDQUOT;
-        ENTRY;
-
-        CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
-               "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
-               cli->cl_dirty_max, obd_max_dirty_pages,
-               cli->cl_lost_grant, cli->cl_avail_grant);
-
-        /* force the caller to try sync io.  this can jump the list
-         * of queued writes and create a discontiguous rpc stream */
-        if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
-            cli->cl_dirty_max < CFS_PAGE_SIZE     ||
-            cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync)
-                RETURN(-EDQUOT);
-
-        /* Hopefully normal case - cache space and write credits available */
-        if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
-            cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
-            osc_enter_cache_try(env, cli, loi, oap, 0))
-                RETURN(0);
-
-        /* We can get here for two reasons: too many dirty pages in cache, or
-         * run out of grants. In both cases we should write dirty pages out.
-         * Adding a cache waiter will trigger urgent write-out no matter what
-         * RPC size will be.
-         * The exiting condition is no avail grants and no dirty pages caching,
-         * that really means there is no space on the OST. */
-        cfs_waitq_init(&ocw.ocw_waitq);
-        ocw.ocw_oap = oap;
-        while (cli->cl_dirty > 0) {
-                cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
-                ocw.ocw_rc = 0;
-
-                loi_list_maint(cli, loi);
-                osc_check_rpcs(env, cli);
-                client_obd_list_unlock(&cli->cl_loi_list_lock);
-
-                CDEBUG(D_CACHE, "%s: sleeping for cache space @ %p for %p\n",
-                       cli->cl_import->imp_obd->obd_name, &ocw, oap);
-
-                rc = l_wait_event(ocw.ocw_waitq, cfs_list_empty(&ocw.ocw_entry), &lwi);
-
-                client_obd_list_lock(&cli->cl_loi_list_lock);
-                cfs_list_del_init(&ocw.ocw_entry);
-                if (rc < 0)
-                        break;
-
-                rc = ocw.ocw_rc;
-                if (rc != -EDQUOT)
-                        break;
-        }
-
-        RETURN(rc);
-}
-
-
-int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
-                        struct lov_oinfo *loi, cfs_page_t *page,
-                        obd_off offset, const struct obd_async_page_ops *ops,
-                        void *data, void **res, int nocache,
-                        struct lustre_handle *lockh)
-{
-        struct osc_async_page *oap;
-
-        ENTRY;
-
-        if (!page)
-                return cfs_size_round(sizeof(*oap));
-
-        oap = *res;
-        oap->oap_magic = OAP_MAGIC;
-        oap->oap_cli = &exp->exp_obd->u.cli;
-        oap->oap_loi = loi;
-
-        oap->oap_caller_ops = ops;
-        oap->oap_caller_data = data;
-
-        oap->oap_page = page;
-        oap->oap_obj_off = offset;
-        if (!client_is_remote(exp) &&
-            cfs_capable(CFS_CAP_SYS_RESOURCE))
-                oap->oap_brw_flags = OBD_BRW_NOQUOTA;
-
-        LASSERT(!(offset & ~CFS_PAGE_MASK));
-
-        CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
-        CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
-        CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
-        CFS_INIT_LIST_HEAD(&oap->oap_page_list);
-
-        cfs_spin_lock_init(&oap->oap_lock);
-        CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
-        RETURN(0);
-}
-
-int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp,
-                       struct lov_stripe_md *lsm, struct lov_oinfo *loi,
-                       struct osc_async_page *oap, int cmd, int off,
-                       int count, obd_flag brw_flags, enum async_flags async_flags)
-{
-        struct client_obd *cli = &exp->exp_obd->u.cli;
-        int rc = 0;
-        ENTRY;
-
-        if (oap->oap_magic != OAP_MAGIC)
-                RETURN(-EINVAL);
-
-        if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
-                RETURN(-EIO);
-
-        if (!cfs_list_empty(&oap->oap_pending_item) ||
-            !cfs_list_empty(&oap->oap_urgent_item) ||
-            !cfs_list_empty(&oap->oap_rpc_item))
-                RETURN(-EBUSY);
-
-        /* check if the file's owner/group is over quota */
-        if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
-                struct cl_object *obj;
-                struct cl_attr    attr; /* XXX put attr into thread info */
-                unsigned int qid[MAXQUOTAS];
-
-                obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
-
-                cl_object_attr_lock(obj);
-                rc = cl_object_attr_get(env, obj, &attr);
-                cl_object_attr_unlock(obj);
-
-                qid[USRQUOTA] = attr.cat_uid;
-                qid[GRPQUOTA] = attr.cat_gid;
-                if (rc == 0 &&
-                    osc_quota_chkdq(cli, qid) == NO_QUOTA)
-                        rc = -EDQUOT;
-                if (rc)
-                        RETURN(rc);
-        }
-
-        if (loi == NULL)
-                loi = lsm->lsm_oinfo[0];
-
-        client_obd_list_lock(&cli->cl_loi_list_lock);
-
-        LASSERT(off + count <= CFS_PAGE_SIZE);
-        oap->oap_cmd = cmd;
-        oap->oap_page_off = off;
-        oap->oap_count = count;
-        oap->oap_brw_flags = brw_flags;
-        /* Give a hint to OST that requests are coming from kswapd - bug19529 */
-        if (cfs_memory_pressure_get())
-                oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
-        cfs_spin_lock(&oap->oap_lock);
-        oap->oap_async_flags = async_flags;
-        cfs_spin_unlock(&oap->oap_lock);
-
-        if (cmd & OBD_BRW_WRITE) {
-                rc = osc_enter_cache(env, cli, loi, oap);
-                if (rc) {
-                        client_obd_list_unlock(&cli->cl_loi_list_lock);
-                        RETURN(rc);
-                }
-        }
-
-        LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
-                  cmd);
-
-        osc_oap_to_pending(oap);
-        loi_list_maint(cli, loi);
-        if (!osc_max_rpc_in_flight(cli, loi) &&
-            lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
-                LASSERT(cli->cl_writeback_work != NULL);
-                rc = ptlrpcd_queue_work(cli->cl_writeback_work);
-
-                CDEBUG(D_CACHE, "Queued writeback work for client obd %p/%d.\n",
-                       cli, rc);
-        }
-        client_obd_list_unlock(&cli->cl_loi_list_lock);
-
-        RETURN(0);
-}
-
-/* aka (~was & now & flag), but this is more clear :) */
-#define SETTING(was, now, flag) (!(was & flag) && (now & flag))
-
-int osc_set_async_flags_base(struct client_obd *cli,
-                             struct lov_oinfo *loi, struct osc_async_page *oap,
-                             obd_flag async_flags)
-{
-        struct loi_oap_pages *lop;
-        int flags = 0;
-        ENTRY;
-
-        LASSERT(!cfs_list_empty(&oap->oap_pending_item));
-
-        if (oap->oap_cmd & OBD_BRW_WRITE) {
-                lop = &loi->loi_write_lop;
-        } else {
-                lop = &loi->loi_read_lop;
-        }
-
-        if ((oap->oap_async_flags & async_flags) == async_flags)
-                RETURN(0);
-
-        if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
-                flags |= ASYNC_READY;
-
-        if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
-            cfs_list_empty(&oap->oap_rpc_item)) {
-                if (oap->oap_async_flags & ASYNC_HP)
-                        cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
-                else
-                        cfs_list_add_tail(&oap->oap_urgent_item,
-                                          &lop->lop_urgent);
-                flags |= ASYNC_URGENT;
-                loi_list_maint(cli, loi);
-        }
-        cfs_spin_lock(&oap->oap_lock);
-        oap->oap_async_flags |= flags;
-        cfs_spin_unlock(&oap->oap_lock);
-
-        LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
-                        oap->oap_async_flags);
-        RETURN(0);
-}
-
-int osc_teardown_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
-                            struct lov_oinfo *loi, struct osc_async_page *oap)
-{
-        struct client_obd *cli = &exp->exp_obd->u.cli;
-        struct loi_oap_pages *lop;
-        int rc = 0;
-        ENTRY;
-
-        if (oap->oap_magic != OAP_MAGIC)
-                RETURN(-EINVAL);
-
-        if (loi == NULL)
-                loi = lsm->lsm_oinfo[0];
-
-        if (oap->oap_cmd & OBD_BRW_WRITE) {
-                lop = &loi->loi_write_lop;
-        } else {
-                lop = &loi->loi_read_lop;
-        }
-
-        client_obd_list_lock(&cli->cl_loi_list_lock);
-
-        if (!cfs_list_empty(&oap->oap_rpc_item))
-                GOTO(out, rc = -EBUSY);
-
-        osc_exit_cache(cli, oap, 0);
-        osc_wake_cache_waiters(cli);
-
-        if (!cfs_list_empty(&oap->oap_urgent_item)) {
-                cfs_list_del_init(&oap->oap_urgent_item);
-                cfs_spin_lock(&oap->oap_lock);
-                oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
-                cfs_spin_unlock(&oap->oap_lock);
-        }
-        if (!cfs_list_empty(&oap->oap_pending_item)) {
-                cfs_list_del_init(&oap->oap_pending_item);
-                lop_update_pending(cli, lop, oap->oap_cmd, -1);
-        }
-        loi_list_maint(cli, loi);
-        LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
-out:
-        client_obd_list_unlock(&cli->cl_loi_list_lock);
-        RETURN(rc);
+                       osc_ap_completion(env, cli, NULL, oap, 0, rc);
+               }
+               if (clerq && !IS_ERR(clerq))
+                       cl_req_completion(env, clerq, rc);
+       } else {
+               struct osc_async_page *tmp = NULL;
+
+               /* queued sync pages can be torn down while the pages
+                * were between the pending list and the rpc */
+               LASSERT(aa != NULL);
+               client_obd_list_lock(&cli->cl_loi_list_lock);
+               cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
+                       /* only one oap gets a request reference */
+                       if (tmp == NULL)
+                               tmp = oap;
+                       if (oap->oap_interrupted && !req->rq_intr) {
+                               CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
+                                               oap, req);
+                               ptlrpc_mark_interrupted(req);
+                       }
+               }
+               if (tmp != NULL)
+                       tmp->oap_request = ptlrpc_request_addref(req);
+
+               DEBUG_REQ(D_INODE,req, "%d pages, aa %p. now %dr/%dw in flight",
+                         page_count, aa, cli->cl_r_in_flight,
+                         cli->cl_w_in_flight);
+
+               /* XXX: Maybe the caller can check the RPC bulk descriptor to
+                * see which CPU/NUMA node the majority of pages were allocated
+                * on, and try to assign the async RPC to the CPU core
+                * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
+                *
+                * But on the other hand, we expect that multiple ptlrpcd
+                * threads and the initial write sponsor can run in parallel,
+                * especially when data checksum is enabled, which is CPU-bound
+                * operation and single ptlrpcd thread cannot process in time.
+                * So more ptlrpcd threads sharing BRW load
+                * (with PDL_POLICY_ROUND) seems better.
+                */
+               ptlrpcd_add_req(req, pol, -1);
+       }
+       RETURN(rc);
 }
 
 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
@@ -4424,7 +3361,7 @@ static int osc_import_event(struct obd_device *obd,
                         client_obd_list_lock(&cli->cl_loi_list_lock);
                         /* all pages go to failing rpcs due to the invalid
                          * import */
-                        osc_check_rpcs(env, cli);
+                       osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
                         client_obd_list_unlock(&cli->cl_loi_list_lock);
 
                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
@@ -4500,6 +3437,18 @@ static int osc_cancel_for_recovery(struct ldlm_lock *lock)
         RETURN(0);
 }
 
+static int brw_queue_work(const struct lu_env *env, void *data)
+{
+       struct client_obd *cli = data;
+
+       CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
+
+       client_obd_list_lock(&cli->cl_loi_list_lock);
+       osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
+       client_obd_list_unlock(&cli->cl_loi_list_lock);
+       RETURN(0);
+}
+
 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
 {
         struct client_obd *cli = &obd->u.cli;