Whamcloud - gitweb
LU-1201 checksum: add libcfs crypto hash
[fs/lustre-release.git] / lustre / osc / osc_request.c
index 6d87a0a..b8353c5 100644 (file)
@@ -1,6 +1,4 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
  * GPL HEADER START
  *
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
@@ -36,9 +34,6 @@
  * Lustre is a trademark of Sun Microsystems, Inc.
  */
 
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
 #define DEBUG_SUBSYSTEM S_OSC
 
 #include <libcfs/libcfs.h>
 #include <lustre_debug.h>
 #include <lustre_param.h>
 #include "osc_internal.h"
+#include "osc_cl_internal.h"
 
 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
 static int brw_interpret(const struct lu_env *env,
                          struct ptlrpc_request *req, void *data, int rc);
-static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli,
-                            int ptlrpc);
 int osc_cleanup(struct obd_device *obd);
 
 /* Pack OSC object metadata for disk storage (LE byte order). */
@@ -266,7 +260,8 @@ static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
         RETURN(0);
 }
 
-static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
+static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
+                       struct obd_info *oinfo)
 {
         struct ptlrpc_request *req;
         struct ost_body       *body;
@@ -309,8 +304,8 @@ static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
         return rc;
 }
 
-static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
-                       struct obd_trans_info *oti)
+static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
+                       struct obd_info *oinfo, struct obd_trans_info *oti)
 {
         struct ptlrpc_request *req;
         struct ost_body       *body;
@@ -544,7 +539,6 @@ int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
 
         ptlrpc_request_set_replen(req);
 
-
         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
         sa = ptlrpc_req_async_args(req);
@@ -559,8 +553,8 @@ int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
         RETURN(0);
 }
 
-static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
-                     struct obd_trans_info *oti,
+static int osc_punch(const struct lu_env *env, struct obd_export *exp,
+                     struct obd_info *oinfo, struct obd_trans_info *oti,
                      struct ptlrpc_request_set *rqset)
 {
         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
@@ -574,7 +568,7 @@ static int osc_sync_interpret(const struct lu_env *env,
                               struct ptlrpc_request *req,
                               void *arg, int rc)
 {
-        struct osc_async_args *aa = arg;
+       struct osc_fsync_args *fa = arg;
         struct ost_body *body;
         ENTRY;
 
@@ -587,27 +581,22 @@ static int osc_sync_interpret(const struct lu_env *env,
                 GOTO(out, rc = -EPROTO);
         }
 
-        *aa->aa_oi->oi_oa = body->oa;
+       *fa->fa_oi->oi_oa = body->oa;
 out:
-        rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
-        RETURN(rc);
+       rc = fa->fa_upcall(fa->fa_cookie, rc);
+       RETURN(rc);
 }
 
-static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
-                    obd_size start, obd_size end,
-                    struct ptlrpc_request_set *set)
+int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
+                 obd_enqueue_update_f upcall, void *cookie,
+                  struct ptlrpc_request_set *rqset)
 {
-        struct ptlrpc_request *req;
-        struct ost_body       *body;
-        struct osc_async_args *aa;
+       struct ptlrpc_request *req;
+       struct ost_body       *body;
+       struct osc_fsync_args *fa;
         int                    rc;
         ENTRY;
 
-        if (!oinfo->oi_oa) {
-                CDEBUG(D_INFO, "oa NULL\n");
-                RETURN(-EINVAL);
-        }
-
         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
         if (req == NULL)
                 RETURN(-ENOMEM);
@@ -623,20 +612,41 @@ static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
         LASSERT(body);
         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
-        body->oa.o_size = start;
-        body->oa.o_blocks = end;
-        body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
         osc_pack_capa(req, body, oinfo->oi_capa);
 
         ptlrpc_request_set_replen(req);
         req->rq_interpret_reply = osc_sync_interpret;
 
-        CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
-        aa = ptlrpc_req_async_args(req);
-        aa->aa_oi = oinfo;
+       CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
+       fa = ptlrpc_req_async_args(req);
+       fa->fa_oi = oinfo;
+       fa->fa_upcall = upcall;
+       fa->fa_cookie = cookie;
 
-        ptlrpc_set_add_req(set, req);
-        RETURN (0);
+       if (rqset == PTLRPCD_SET)
+               ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+       else
+               ptlrpc_set_add_req(rqset, req);
+
+       RETURN (0);
+}
+
+static int osc_sync(const struct lu_env *env, struct obd_export *exp,
+                   struct obd_info *oinfo, obd_size start, obd_size end,
+                   struct ptlrpc_request_set *set)
+{
+       ENTRY;
+
+       if (!oinfo->oi_oa) {
+               CDEBUG(D_INFO, "oa NULL\n");
+               RETURN(-EINVAL);
+       }
+
+       oinfo->oi_oa->o_size = start;
+       oinfo->oi_oa->o_blocks = end;
+       oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
+
+       RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
 }
 
 /* Find and cancel locally locks matched by @mode in the resource found by
@@ -704,9 +714,10 @@ static int osc_can_send_destroy(struct client_obd *cli)
  * the records are not cancelled, and when the OST reconnects to the MDS next,
  * it will retrieve the llog unlink logs and then sends the log cancellation
  * cookies to the MDS after committing destroy transactions. */
-static int osc_destroy(struct obd_export *exp, struct obdo *oa,
-                       struct lov_stripe_md *ea, struct obd_trans_info *oti,
-                       struct obd_export *md_export, void *capa)
+static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
+                       struct obdo *oa, struct lov_stripe_md *ea,
+                       struct obd_trans_info *oti, struct obd_export *md_export,
+                       void *capa)
 {
         struct client_obd     *cli = &exp->exp_obd->u.cli;
         struct ptlrpc_request *req;
@@ -813,7 +824,7 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
 
 }
 
-static void osc_update_next_shrink(struct client_obd *cli)
+void osc_update_next_shrink(struct client_obd *cli)
 {
         cli->cl_next_shrink_grant =
                 cfs_time_shift(cli->cl_grant_shrink_interval);
@@ -821,127 +832,6 @@ static void osc_update_next_shrink(struct client_obd *cli)
                cli->cl_next_shrink_grant);
 }
 
-/* caller must hold loi_list_lock */
-static void osc_consume_write_grant(struct client_obd *cli,
-                                    struct brw_page *pga)
-{
-        LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
-        LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
-        cfs_atomic_inc(&obd_dirty_pages);
-        cli->cl_dirty += CFS_PAGE_SIZE;
-        cli->cl_avail_grant -= CFS_PAGE_SIZE;
-        pga->flag |= OBD_BRW_FROM_GRANT;
-        CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
-               CFS_PAGE_SIZE, pga, pga->pg);
-        LASSERT(cli->cl_avail_grant >= 0);
-        osc_update_next_shrink(cli);
-}
-
-/* the companion to osc_consume_write_grant, called when a brw has completed.
- * must be called with the loi lock held. */
-static void osc_release_write_grant(struct client_obd *cli,
-                                    struct brw_page *pga, int sent)
-{
-        int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
-        ENTRY;
-
-        LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
-        if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
-                EXIT;
-                return;
-        }
-
-        pga->flag &= ~OBD_BRW_FROM_GRANT;
-        cfs_atomic_dec(&obd_dirty_pages);
-        cli->cl_dirty -= CFS_PAGE_SIZE;
-        if (pga->flag & OBD_BRW_NOCACHE) {
-                pga->flag &= ~OBD_BRW_NOCACHE;
-                cfs_atomic_dec(&obd_dirty_transit_pages);
-                cli->cl_dirty_transit -= CFS_PAGE_SIZE;
-        }
-        if (!sent) {
-                /* Reclaim grant from truncated pages. This is used to solve
-                 * write-truncate and grant all gone(to lost_grant) problem.
-                 * For a vfs write this problem can be easily solved by a sync
-                 * write, however, this is not an option for page_mkwrite()
-                 * because grant has to be allocated before a page becomes
-                 * dirty. */
-                if (cli->cl_avail_grant < PTLRPC_MAX_BRW_SIZE)
-                        cli->cl_avail_grant += CFS_PAGE_SIZE;
-                else
-                        cli->cl_lost_grant += CFS_PAGE_SIZE;
-                CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
-                       cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
-        } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
-                /* For short writes we shouldn't count parts of pages that
-                 * span a whole block on the OST side, or our accounting goes
-                 * wrong.  Should match the code in filter_grant_check. */
-                int offset = pga->off & ~CFS_PAGE_MASK;
-                int count = pga->count + (offset & (blocksize - 1));
-                int end = (offset + pga->count) & (blocksize - 1);
-                if (end)
-                        count += blocksize - end;
-
-                cli->cl_lost_grant += CFS_PAGE_SIZE - count;
-                CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
-                       CFS_PAGE_SIZE - count, cli->cl_lost_grant,
-                       cli->cl_avail_grant, cli->cl_dirty);
-        }
-
-        EXIT;
-}
-
-static unsigned long rpcs_in_flight(struct client_obd *cli)
-{
-        return cli->cl_r_in_flight + cli->cl_w_in_flight;
-}
-
-/* caller must hold loi_list_lock */
-void osc_wake_cache_waiters(struct client_obd *cli)
-{
-        cfs_list_t *l, *tmp;
-        struct osc_cache_waiter *ocw;
-
-        ENTRY;
-        cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
-                /* if we can't dirty more, we must wait until some is written */
-                if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
-                   (cfs_atomic_read(&obd_dirty_pages) + 1 >
-                    obd_max_dirty_pages)) {
-                        CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
-                               "osc max %ld, sys max %d\n", cli->cl_dirty,
-                               cli->cl_dirty_max, obd_max_dirty_pages);
-                        return;
-                }
-
-                /* if still dirty cache but no grant wait for pending RPCs that
-                 * may yet return us some grant before doing sync writes */
-                if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
-                        CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
-                               cli->cl_w_in_flight);
-                        return;
-                }
-
-                ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
-                cfs_list_del_init(&ocw->ocw_entry);
-                if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
-                        /* no more RPCs in flight to return grant, do sync IO */
-                        ocw->ocw_rc = -EDQUOT;
-                        CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
-                } else {
-                        osc_consume_write_grant(cli,
-                                                &ocw->ocw_oap->oap_brw_page);
-                }
-
-                CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld\n",
-                       ocw, ocw->ocw_oap, cli->cl_avail_grant);
-
-                cfs_waitq_signal(&ocw->ocw_waitq);
-        }
-
-        EXIT;
-}
-
 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
 {
         client_obd_list_lock(&cli->cl_loi_list_lock);
@@ -957,9 +847,9 @@ static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
         }
 }
 
-static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
-                              void *key, obd_count vallen, void *val,
-                              struct ptlrpc_request_set *set);
+static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
+                              obd_count keylen, void *key, obd_count vallen,
+                              void *val, struct ptlrpc_request_set *set);
 
 static int osc_shrink_grant_interpret(const struct lu_env *env,
                                       struct ptlrpc_request *req,
@@ -1049,7 +939,7 @@ int osc_shrink_grant_to_target(struct client_obd *cli, long target)
         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
         osc_update_next_shrink(cli);
 
-        rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
+        rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
                                 sizeof(*body), body, NULL);
         if (rc != 0)
@@ -1246,39 +1136,60 @@ static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
 }
 
 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
-                                   struct brw_page **pga, int opc,
-                                   cksum_type_t cksum_type)
-{
-        __u32 cksum;
-        int i = 0;
-
-        LASSERT (pg_count > 0);
-        cksum = init_checksum(cksum_type);
-        while (nob > 0 && pg_count > 0) {
-                unsigned char *ptr = cfs_kmap(pga[i]->pg);
-                int off = pga[i]->off & ~CFS_PAGE_MASK;
-                int count = pga[i]->count > nob ? nob : pga[i]->count;
-
-                /* corrupt the data before we compute the checksum, to
-                 * simulate an OST->client data error */
-                if (i == 0 && opc == OST_READ &&
-                    OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
-                        memcpy(ptr + off, "bad1", min(4, nob));
-                cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
-                cfs_kunmap(pga[i]->pg);
-                LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
-                               off, cksum);
-
-                nob -= pga[i]->count;
-                pg_count--;
-                i++;
-        }
-        /* For sending we only compute the wrong checksum instead
-         * of corrupting the data so it is still correct on a redo */
-        if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
-                cksum++;
-
-        return fini_checksum(cksum, cksum_type);
+                                  struct brw_page **pga, int opc,
+                                  cksum_type_t cksum_type)
+{
+       __u32                           cksum;
+       int                             i = 0;
+       struct cfs_crypto_hash_desc     *hdesc;
+       unsigned int                    bufsize;
+       int                             err;
+       unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
+
+       LASSERT(pg_count > 0);
+
+       hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
+       if (IS_ERR(hdesc)) {
+               CERROR("Unable to initialize checksum hash %s\n",
+                      cfs_crypto_hash_name(cfs_alg));
+               return PTR_ERR(hdesc);
+       }
+
+       while (nob > 0 && pg_count > 0) {
+               int count = pga[i]->count > nob ? nob : pga[i]->count;
+
+               /* corrupt the data before we compute the checksum, to
+                * simulate an OST->client data error */
+               if (i == 0 && opc == OST_READ &&
+                   OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
+                       unsigned char *ptr = cfs_kmap(pga[i]->pg);
+                       int off = pga[i]->off & ~CFS_PAGE_MASK;
+                       memcpy(ptr + off, "bad1", min(4, nob));
+                       cfs_kunmap(pga[i]->pg);
+               }
+               cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
+                                 pga[i]->off & ~CFS_PAGE_MASK,
+                                 count);
+               LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
+                              (int)(pga[i]->off & ~CFS_PAGE_MASK), cksum);
+
+               nob -= pga[i]->count;
+               pg_count--;
+               i++;
+       }
+
+       bufsize = 4;
+       err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
+
+       if (err)
+               cfs_crypto_hash_final(hdesc, NULL, NULL);
+
+       /* For sending we only compute the wrong checksum instead
+        * of corrupting the data so it is still correct on a redo */
+       if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
+               cksum++;
+
+       return cksum;
 }
 
 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
@@ -1696,12 +1607,13 @@ static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
         struct ptlrpc_request *req;
         int                    rc;
         cfs_waitq_t            waitq;
-        int                    resends = 0;
+        int                    generation, resends = 0;
         struct l_wait_info     lwi;
 
         ENTRY;
 
         cfs_waitq_init(&waitq);
+        generation = exp->exp_obd->u.cli.cl_import->imp_generation;
 
 restart_bulk:
         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
@@ -1709,6 +1621,12 @@ restart_bulk:
         if (rc != 0)
                 return (rc);
 
+        if (resends) {
+                req->rq_generation_set = 1;
+                req->rq_import_generation = generation;
+                req->rq_sent = cfs_time_current_sec() + resends;
+        }
+
         rc = ptlrpc_queue_wait(req);
 
         if (rc == -ETIMEDOUT && req->rq_resend) {
@@ -1720,19 +1638,34 @@ restart_bulk:
         rc = osc_brw_fini_request(req, rc);
 
         ptlrpc_req_finished(req);
+        /* When server return -EINPROGRESS, client should always retry
+         * regardless of the number of times the bulk was resent already.*/
         if (osc_recoverable_error(rc)) {
                 resends++;
-                if (!client_should_resend(resends, &exp->exp_obd->u.cli)) {
-                        CERROR("too many resend retries, returning error\n");
-                        RETURN(-EIO);
+                if (rc != -EINPROGRESS &&
+                    !client_should_resend(resends, &exp->exp_obd->u.cli)) {
+                        CERROR("%s: too many resend retries for object: "
+                               ""LPU64":"LPU64", rc = %d.\n",
+                               exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
+                        goto out;
+                }
+                if (generation !=
+                    exp->exp_obd->u.cli.cl_import->imp_generation) {
+                        CDEBUG(D_HA, "%s: resend cross eviction for object: "
+                               ""LPU64":"LPU64", rc = %d.\n",
+                               exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
+                        goto out;
                 }
 
-                lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
+                lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
+                                       NULL);
                 l_wait_event(waitq, 0, &lwi);
 
                 goto restart_bulk;
         }
-
+out:
+        if (rc == -EAGAIN || rc == -EINPROGRESS)
+                rc = -EIO;
         RETURN (rc);
 }
 
@@ -1746,11 +1679,6 @@ int osc_brw_redo_request(struct ptlrpc_request *request,
         int rc = 0;
         ENTRY;
 
-        if (!client_should_resend(aa->aa_resends, aa->aa_cli)) {
-                CERROR("too many resent retries, returning error\n");
-                RETURN(-EIO);
-        }
-
         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
 
         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
@@ -1782,6 +1710,8 @@ int osc_brw_redo_request(struct ptlrpc_request *request,
         new_req->rq_interpret_reply = request->rq_interpret_reply;
         new_req->rq_async_args = request->rq_async_args;
         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
+        new_req->rq_generation_set = 1;
+        new_req->rq_import_generation = request->rq_import_generation;
 
         new_aa = ptlrpc_req_async_args(new_req);
 
@@ -1962,280 +1892,39 @@ out:
         RETURN(rc);
 }
 
-/* The companion to osc_enter_cache(), called when @oap is no longer part of
- * the dirty accounting.  Writeback completes or truncate happens before
- * writing starts.  Must be called with the loi lock held. */
-static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
-                           int sent)
-{
-        osc_release_write_grant(cli, &oap->oap_brw_page, sent);
-}
-
-
-/* This maintains the lists of pending pages to read/write for a given object
- * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
- * to quickly find objects that are ready to send an RPC. */
-static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
-                         int cmd)
-{
-        ENTRY;
-
-        if (lop->lop_num_pending == 0)
-                RETURN(0);
-
-        /* if we have an invalid import we want to drain the queued pages
-         * by forcing them through rpcs that immediately fail and complete
-         * the pages.  recovery relies on this to empty the queued pages
-         * before canceling the locks and evicting down the llite pages */
-        if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
-                RETURN(1);
-
-        /* stream rpcs in queue order as long as as there is an urgent page
-         * queued.  this is our cheap solution for good batching in the case
-         * where writepage marks some random page in the middle of the file
-         * as urgent because of, say, memory pressure */
-        if (!cfs_list_empty(&lop->lop_urgent)) {
-                CDEBUG(D_CACHE, "urgent request forcing RPC\n");
-                RETURN(1);
-        }
-
-        if (cmd & OBD_BRW_WRITE) {
-                /* trigger a write rpc stream as long as there are dirtiers
-                 * waiting for space.  as they're waiting, they're not going to
-                 * create more pages to coalesce with what's waiting.. */
-                if (!cfs_list_empty(&cli->cl_cache_waiters)) {
-                        CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
-                        RETURN(1);
-                }
-        }
-        if (lop->lop_num_pending >= cli->cl_max_pages_per_rpc)
-                RETURN(1);
-
-        RETURN(0);
-}
-
-static int lop_makes_hprpc(struct loi_oap_pages *lop)
-{
-        struct osc_async_page *oap;
-        ENTRY;
-
-        if (cfs_list_empty(&lop->lop_urgent))
-                RETURN(0);
-
-        oap = cfs_list_entry(lop->lop_urgent.next,
-                         struct osc_async_page, oap_urgent_item);
-
-        if (oap->oap_async_flags & ASYNC_HP) {
-                CDEBUG(D_CACHE, "hp request forcing RPC\n");
-                RETURN(1);
-        }
-
-        RETURN(0);
-}
-
-static void on_list(cfs_list_t *item, cfs_list_t *list,
-                    int should_be_on)
-{
-        if (cfs_list_empty(item) && should_be_on)
-                cfs_list_add_tail(item, list);
-        else if (!cfs_list_empty(item) && !should_be_on)
-                cfs_list_del_init(item);
-}
-
-/* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
- * can find pages to build into rpcs quickly */
-void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
-{
-        if (lop_makes_hprpc(&loi->loi_write_lop) ||
-            lop_makes_hprpc(&loi->loi_read_lop)) {
-                /* HP rpc */
-                on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
-                on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
-        } else {
-                on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
-                on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
-                        lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
-                        lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
-        }
-
-        on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
-                loi->loi_write_lop.lop_num_pending);
-
-        on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
-                loi->loi_read_lop.lop_num_pending);
-}
-
-static void lop_update_pending(struct client_obd *cli,
-                               struct loi_oap_pages *lop, int cmd, int delta)
-{
-        lop->lop_num_pending += delta;
-        if (cmd & OBD_BRW_WRITE)
-                cli->cl_pending_w_pages += delta;
-        else
-                cli->cl_pending_r_pages += delta;
-}
-
-/**
- * this is called when a sync waiter receives an interruption.  Its job is to
- * get the caller woken as soon as possible.  If its page hasn't been put in an
- * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
- * desiring interruption which will forcefully complete the rpc once the rpc
- * has timed out.
- */
-int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
-{
-        struct loi_oap_pages *lop;
-        struct lov_oinfo *loi;
-        int rc = -EBUSY;
-        ENTRY;
-
-        LASSERT(!oap->oap_interrupted);
-        oap->oap_interrupted = 1;
-
-        /* ok, it's been put in an rpc. only one oap gets a request reference */
-        if (oap->oap_request != NULL) {
-                ptlrpc_mark_interrupted(oap->oap_request);
-                ptlrpcd_wake(oap->oap_request);
-                ptlrpc_req_finished(oap->oap_request);
-                oap->oap_request = NULL;
-        }
-
-        /*
-         * page completion may be called only if ->cpo_prep() method was
-         * executed by osc_io_submit(), that also adds page the to pending list
-         */
-        if (!cfs_list_empty(&oap->oap_pending_item)) {
-                cfs_list_del_init(&oap->oap_pending_item);
-                cfs_list_del_init(&oap->oap_urgent_item);
-
-                loi = oap->oap_loi;
-                lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
-                        &loi->loi_write_lop : &loi->loi_read_lop;
-                lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
-                loi_list_maint(oap->oap_cli, oap->oap_loi);
-                rc = oap->oap_caller_ops->ap_completion(env,
-                                          oap->oap_caller_data,
-                                          oap->oap_cmd, NULL, -EINTR);
-        }
-
-        RETURN(rc);
-}
-
-/* this is trying to propogate async writeback errors back up to the
- * application.  As an async write fails we record the error code for later if
- * the app does an fsync.  As long as errors persist we force future rpcs to be
- * sync so that the app can get a sync error and break the cycle of queueing
- * pages for which writeback will fail. */
-static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
-                           int rc)
-{
-        if (rc) {
-                if (!ar->ar_rc)
-                        ar->ar_rc = rc;
-
-                ar->ar_force_sync = 1;
-                ar->ar_min_xid = ptlrpc_sample_next_xid();
-                return;
-
-        }
-
-        if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
-                ar->ar_force_sync = 0;
-}
-
-void osc_oap_to_pending(struct osc_async_page *oap)
-{
-        struct loi_oap_pages *lop;
-
-        if (oap->oap_cmd & OBD_BRW_WRITE)
-                lop = &oap->oap_loi->loi_write_lop;
-        else
-                lop = &oap->oap_loi->loi_read_lop;
-
-        if (oap->oap_async_flags & ASYNC_HP)
-                cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
-        else if (oap->oap_async_flags & ASYNC_URGENT)
-                cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
-        cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
-        lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
-}
-
-/* this must be called holding the loi list lock to give coverage to exit_cache,
- * async_flag maintenance, and oap_request */
-static void osc_ap_completion(const struct lu_env *env,
-                              struct client_obd *cli, struct obdo *oa,
-                              struct osc_async_page *oap, int sent, int rc)
-{
-        __u64 xid = 0;
-
-        ENTRY;
-        if (oap->oap_request != NULL) {
-                xid = ptlrpc_req_xid(oap->oap_request);
-                ptlrpc_req_finished(oap->oap_request);
-                oap->oap_request = NULL;
-        }
-
-        cfs_spin_lock(&oap->oap_lock);
-        oap->oap_async_flags = 0;
-        cfs_spin_unlock(&oap->oap_lock);
-        oap->oap_interrupted = 0;
-
-        if (oap->oap_cmd & OBD_BRW_WRITE) {
-                osc_process_ar(&cli->cl_ar, xid, rc);
-                osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
-        }
-
-        if (rc == 0 && oa != NULL) {
-                if (oa->o_valid & OBD_MD_FLBLOCKS)
-                        oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
-                if (oa->o_valid & OBD_MD_FLMTIME)
-                        oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
-                if (oa->o_valid & OBD_MD_FLATIME)
-                        oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
-                if (oa->o_valid & OBD_MD_FLCTIME)
-                        oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
-        }
-
-        rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
-                                                oap->oap_cmd, oa, rc);
-
-        /* cl_page_completion() drops PG_locked. so, a new I/O on the page could
-         * start, but OSC calls it under lock and thus we can add oap back to
-         * pending safely */
-        if (rc)
-                /* upper layer wants to leave the page on pending queue */
-                osc_oap_to_pending(oap);
-        else
-                osc_exit_cache(cli, oap, sent);
-        EXIT;
-}
-
-static int brw_queue_work(const struct lu_env *env, void *data)
-{
-        struct client_obd *cli = data;
-
-        CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
-
-        client_obd_list_lock(&cli->cl_loi_list_lock);
-        osc_check_rpcs0(env, cli, 1);
-        client_obd_list_unlock(&cli->cl_loi_list_lock);
-        RETURN(0);
-}
-
 static int brw_interpret(const struct lu_env *env,
                          struct ptlrpc_request *req, void *data, int rc)
 {
         struct osc_brw_async_args *aa = data;
+       struct osc_async_page *oap, *tmp;
         struct client_obd *cli;
-        int async;
         ENTRY;
 
         rc = osc_brw_fini_request(req, rc);
         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
+        /* When server return -EINPROGRESS, client should always retry
+         * regardless of the number of times the bulk was resent already. */
         if (osc_recoverable_error(rc)) {
-                rc = osc_brw_redo_request(req, aa);
+                if (req->rq_import_generation !=
+                    req->rq_import->imp_generation) {
+                        CDEBUG(D_HA, "%s: resend cross eviction for object: "
+                               ""LPU64":"LPU64", rc = %d.\n",
+                               req->rq_import->imp_obd->obd_name,
+                               aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
+                } else if (rc == -EINPROGRESS ||
+                    client_should_resend(aa->aa_resends, aa->aa_cli)) {
+                        rc = osc_brw_redo_request(req, aa);
+                } else {
+                        CERROR("%s: too many resent retries for object: "
+                               ""LPU64":"LPU64", rc = %d.\n",
+                               req->rq_import->imp_obd->obd_name,
+                               aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
+                }
+
                 if (rc == 0)
                         RETURN(0);
+                else if (rc == -EAGAIN || rc == -EINPROGRESS)
+                        rc = -EIO;
         }
 
         if (aa->aa_ocapa) {
@@ -2254,45 +1943,38 @@ static int brw_interpret(const struct lu_env *env,
         else
                 cli->cl_r_in_flight--;
 
-        async = cfs_list_empty(&aa->aa_oaps);
-        if (!async) { /* from osc_send_oap_rpc() */
-                struct osc_async_page *oap, *tmp;
-                /* the caller may re-use the oap after the completion call so
-                 * we need to clean it up a little */
-                cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
-                                             oap_rpc_item) {
-                        cfs_list_del_init(&oap->oap_rpc_item);
-                        osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
-                }
-                OBDO_FREE(aa->aa_oa);
-        } else { /* from async_internal() */
-                obd_count i;
-                for (i = 0; i < aa->aa_page_count; i++)
-                        osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
-        }
-        osc_wake_cache_waiters(cli);
-        osc_check_rpcs0(env, cli, 1);
-        client_obd_list_unlock(&cli->cl_loi_list_lock);
+       /* the caller may re-use the oap after the completion call so
+        * we need to clean it up a little */
+       cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
+                       oap_rpc_item) {
+               cfs_list_del_init(&oap->oap_rpc_item);
+               osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
+       }
+       OBDO_FREE(aa->aa_oa);
 
-        if (!async)
-                cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
-                                  req->rq_bulk->bd_nob_transferred);
-        osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
-        ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
+       osc_wake_cache_waiters(cli);
+       osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
+       client_obd_list_unlock(&cli->cl_loi_list_lock);
 
-        RETURN(rc);
+       cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
+                         req->rq_bulk->bd_nob_transferred);
+       osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
+       ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
+
+       RETURN(rc);
 }
 
-static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
-                                            struct client_obd *cli,
-                                            cfs_list_t *rpc_list,
-                                            int page_count, int cmd)
+/* The most tricky part of this function is that it will return with
+ * cli->cli_loi_list_lock held.
+ */
+int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
+                 cfs_list_t *rpc_list, int page_count, int cmd,
+                 pdl_policy_t pol)
 {
-        struct ptlrpc_request *req;
-        struct brw_page **pga = NULL;
-        struct osc_brw_async_args *aa;
+       struct ptlrpc_request *req = NULL;
+       struct brw_page **pga = NULL;
+       struct osc_brw_async_args *aa = NULL;
         struct obdo *oa = NULL;
-        const struct obd_async_page_ops *ops = NULL;
         struct osc_async_page *oap;
         struct osc_async_page *tmp;
         struct cl_req *clerq = NULL;
@@ -2310,23 +1992,21 @@ static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
         memset(&crattr, 0, sizeof crattr);
         OBD_ALLOC(pga, sizeof(*pga) * page_count);
         if (pga == NULL)
-                GOTO(out, req = ERR_PTR(-ENOMEM));
-
-        OBDO_ALLOC(oa);
-        if (oa == NULL)
-                GOTO(out, req = ERR_PTR(-ENOMEM));
-
-        i = 0;
-        cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
-                struct cl_page *page = osc_oap2cl_page(oap);
-                if (ops == NULL) {
-                        ops = oap->oap_caller_ops;
-
-                        clerq = cl_req_alloc(env, page, crt,
-                                             1 /* only 1-object rpcs for
-                                                * now */);
-                        if (IS_ERR(clerq))
-                                GOTO(out, req = (void *)clerq);
+               GOTO(out, rc = -ENOMEM);
+
+       OBDO_ALLOC(oa);
+       if (oa == NULL)
+               GOTO(out, rc = -ENOMEM);
+
+       i = 0;
+       cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
+               struct cl_page *page = osc_oap2cl_page(oap);
+               if (clerq == NULL) {
+                       clerq = cl_req_alloc(env, page, crt,
+                                            1 /* only 1-object rpcs for
+                                               * now */);
+                       if (IS_ERR(clerq))
+                               GOTO(out, rc = PTR_ERR(clerq));
                         lock = oap->oap_ldlm_lock;
                 }
                 pga[i] = &oap->oap_brw_page;
@@ -2338,9 +2018,10 @@ static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
         }
 
         /* always get the data for the obdo for the rpc */
-        LASSERT(ops != NULL);
+       LASSERT(clerq != NULL);
         crattr.cra_oa = oa;
         crattr.cra_capa = NULL;
+       memset(crattr.cra_jobid, 0, JOBSTATS_JOBID_SIZE);
         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
         if (lock) {
                 oa->o_handle = lock->l_remote_handle;
@@ -2350,7 +2031,7 @@ static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
         rc = cl_req_prep(env, clerq);
         if (rc != 0) {
                 CERROR("cl_req_prep failed: %d\n", rc);
-                GOTO(out, req = ERR_PTR(rc));
+               GOTO(out, rc);
         }
 
         sort_brw_pages(pga, page_count);
@@ -2358,9 +2039,10 @@ static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
                                   pga, &req, crattr.cra_capa, 1, 0);
         if (rc != 0) {
                 CERROR("prep_req failed: %d\n", rc);
-                GOTO(out, req = ERR_PTR(rc));
-        }
+               GOTO(out, rc);
+       }
 
+       req->rq_interpret_reply = brw_interpret;
         if (cmd & OBD_BRW_MEMALLOC)
                 req->rq_memalloc = 1;
 
@@ -2372,6 +2054,8 @@ static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
         cl_req_attr_set(env, clerq, &crattr,
                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
 
+       lustre_msg_set_jobid(req->rq_reqmsg, crattr.cra_jobid);
+
         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
         aa = ptlrpc_req_async_args(req);
         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
@@ -2383,7 +2067,9 @@ out:
                 cfs_memory_pressure_restore(mpflag);
 
         capa_put(crattr.cra_capa);
-        if (IS_ERR(req)) {
+       if (rc != 0) {
+               LASSERT(req == NULL);
+
                 if (oa)
                         OBDO_FREE(oa);
                 if (pga)
@@ -2402,724 +2088,49 @@ out:
                                                   oap->oap_count);
                                 continue;
                         }
-                        osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
-                }
-                if (clerq && !IS_ERR(clerq))
-                        cl_req_completion(env, clerq, PTR_ERR(req));
-        }
-        RETURN(req);
-}
-
-/**
- * prepare pages for ASYNC io and put pages in send queue.
- *
- * \param cmd OBD_BRW_* macroses
- * \param lop pending pages
- *
- * \return zero if no page added to send queue.
- * \return 1 if pages successfully added to send queue.
- * \return negative on errors.
- */
-static int
-osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
-                 struct lov_oinfo *loi, int cmd,
-                 struct loi_oap_pages *lop, pdl_policy_t pol)
-{
-        struct ptlrpc_request *req;
-        obd_count page_count = 0;
-        struct osc_async_page *oap = NULL, *tmp;
-        struct osc_brw_async_args *aa;
-        const struct obd_async_page_ops *ops;
-        CFS_LIST_HEAD(rpc_list);
-        int srvlock = 0, mem_tight = 0;
-        struct cl_object *clob = NULL;
-        obd_off starting_offset = OBD_OBJECT_EOF;
-        unsigned int ending_offset;
-        int starting_page_off = 0;
-        ENTRY;
-
-        /* ASYNC_HP pages first. At present, when the lock the pages is
-         * to be canceled, the pages covered by the lock will be sent out
-         * with ASYNC_HP. We have to send out them as soon as possible. */
-        cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
-                if (oap->oap_async_flags & ASYNC_HP)
-                        cfs_list_move(&oap->oap_pending_item, &rpc_list);
-                else if (!(oap->oap_brw_flags & OBD_BRW_SYNC))
-                        /* only do this for writeback pages. */
-                        cfs_list_move_tail(&oap->oap_pending_item, &rpc_list);
-                if (++page_count >= cli->cl_max_pages_per_rpc)
-                        break;
-        }
-        cfs_list_splice_init(&rpc_list, &lop->lop_pending);
-        page_count = 0;
-
-        /* first we find the pages we're allowed to work with */
-        cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
-                                     oap_pending_item) {
-                ops = oap->oap_caller_ops;
-
-                LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
-                         "magic 0x%x\n", oap, oap->oap_magic);
-
-                if (clob == NULL) {
-                        /* pin object in memory, so that completion call-backs
-                         * can be safely called under client_obd_list lock. */
-                        clob = osc_oap2cl_page(oap)->cp_obj;
-                        cl_object_get(clob);
-                }
-
-                if (page_count != 0 &&
-                    srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
-                        CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
-                               " oap %p, page %p, srvlock %u\n",
-                               oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
-                        break;
-                }
-
-                /* If there is a gap at the start of this page, it can't merge
-                 * with any previous page, so we'll hand the network a
-                 * "fragmented" page array that it can't transfer in 1 RDMA */
-                if (oap->oap_obj_off < starting_offset) {
-                        if (starting_page_off != 0)
-                                break;
-
-                        starting_page_off = oap->oap_page_off;
-                        starting_offset = oap->oap_obj_off + starting_page_off;
-                } else if (oap->oap_page_off != 0)
-                        break;
-
-                /* in llite being 'ready' equates to the page being locked
-                 * until completion unlocks it.  commit_write submits a page
-                 * as not ready because its unlock will happen unconditionally
-                 * as the call returns.  if we race with commit_write giving
-                 * us that page we don't want to create a hole in the page
-                 * stream, so we stop and leave the rpc to be fired by
-                 * another dirtier or kupdated interval (the not ready page
-                 * will still be on the dirty list).  we could call in
-                 * at the end of ll_file_write to process the queue again. */
-                if (!(oap->oap_async_flags & ASYNC_READY)) {
-                        int rc = ops->ap_make_ready(env, oap->oap_caller_data,
-                                                    cmd);
-                        if (rc < 0)
-                                CDEBUG(D_INODE, "oap %p page %p returned %d "
-                                                "instead of ready\n", oap,
-                                                oap->oap_page, rc);
-                        switch (rc) {
-                        case -EAGAIN:
-                                /* llite is telling us that the page is still
-                                 * in commit_write and that we should try
-                                 * and put it in an rpc again later.  we
-                                 * break out of the loop so we don't create
-                                 * a hole in the sequence of pages in the rpc
-                                 * stream.*/
-                                oap = NULL;
-                                break;
-                        case -EINTR:
-                                /* the io isn't needed.. tell the checks
-                                 * below to complete the rpc with EINTR */
-                                cfs_spin_lock(&oap->oap_lock);
-                                oap->oap_async_flags |= ASYNC_COUNT_STABLE;
-                                cfs_spin_unlock(&oap->oap_lock);
-                                oap->oap_count = -EINTR;
-                                break;
-                        case 0:
-                                cfs_spin_lock(&oap->oap_lock);
-                                oap->oap_async_flags |= ASYNC_READY;
-                                cfs_spin_unlock(&oap->oap_lock);
-                                break;
-                        default:
-                                LASSERTF(0, "oap %p page %p returned %d "
-                                            "from make_ready\n", oap,
-                                            oap->oap_page, rc);
-                                break;
-                        }
-                }
-                if (oap == NULL)
-                        break;
-
-                /* take the page out of our book-keeping */
-                cfs_list_del_init(&oap->oap_pending_item);
-                lop_update_pending(cli, lop, cmd, -1);
-                cfs_list_del_init(&oap->oap_urgent_item);
-
-                /* ask the caller for the size of the io as the rpc leaves. */
-                if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
-                        oap->oap_count =
-                                ops->ap_refresh_count(env, oap->oap_caller_data,
-                                                      cmd);
-                        LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
-                }
-                if (oap->oap_count <= 0) {
-                        CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
-                               oap->oap_count);
-                        osc_ap_completion(env, cli, NULL,
-                                          oap, 0, oap->oap_count);
-                        continue;
-                }
-
-                /* now put the page back in our accounting */
-                cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
-                if (page_count++ == 0)
-                        srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
-
-                if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
-                        mem_tight = 1;
-
-                /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
-                 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
-                 * have the same alignment as the initial writes that allocated
-                 * extents on the server. */
-                ending_offset = oap->oap_obj_off + oap->oap_page_off +
-                                oap->oap_count;
-                if (!(ending_offset & (PTLRPC_MAX_BRW_SIZE - 1)))
-                        break;
-
-                if (page_count >= cli->cl_max_pages_per_rpc)
-                        break;
-
-                /* If there is a gap at the end of this page, it can't merge
-                 * with any subsequent pages, so we'll hand the network a
-                 * "fragmented" page array that it can't transfer in 1 RDMA */
-                if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
-                        break;
-        }
-
-        loi_list_maint(cli, loi);
-
-        client_obd_list_unlock(&cli->cl_loi_list_lock);
-
-        if (clob != NULL)
-                cl_object_put(env, clob);
-
-        if (page_count == 0) {
-                client_obd_list_lock(&cli->cl_loi_list_lock);
-                RETURN(0);
-        }
-
-        req = osc_build_req(env, cli, &rpc_list, page_count,
-                            mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
-        if (IS_ERR(req)) {
-                LASSERT(cfs_list_empty(&rpc_list));
-                loi_list_maint(cli, loi);
-                RETURN(PTR_ERR(req));
-        }
-
-        aa = ptlrpc_req_async_args(req);
-
-        starting_offset &= PTLRPC_MAX_BRW_SIZE - 1;
-        if (cmd == OBD_BRW_READ) {
-                lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
-                lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
-                lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
-                                      (starting_offset >> CFS_PAGE_SHIFT) + 1);
-        } else {
-                lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
-                lprocfs_oh_tally(&cli->cl_write_rpc_hist,
-                                 cli->cl_w_in_flight);
-                lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
-                                      (starting_offset >> CFS_PAGE_SHIFT) + 1);
-        }
-
-        client_obd_list_lock(&cli->cl_loi_list_lock);
-
-        if (cmd == OBD_BRW_READ)
-                cli->cl_r_in_flight++;
-        else
-                cli->cl_w_in_flight++;
-
-        /* queued sync pages can be torn down while the pages
-         * were between the pending list and the rpc */
-        tmp = NULL;
-        cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
-                /* only one oap gets a request reference */
-                if (tmp == NULL)
-                        tmp = oap;
-                if (oap->oap_interrupted && !req->rq_intr) {
-                        CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
-                               oap, req);
-                        ptlrpc_mark_interrupted(req);
-                }
-        }
-        if (tmp != NULL)
-                tmp->oap_request = ptlrpc_request_addref(req);
-
-        DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
-                  page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
-
-        req->rq_interpret_reply = brw_interpret;
-
-        /* XXX: Maybe the caller can check the RPC bulk descriptor to see which
-         *      CPU/NUMA node the majority of pages were allocated on, and try
-         *      to assign the async RPC to the CPU core (PDL_POLICY_PREFERRED)
-         *      to reduce cross-CPU memory traffic.
-         *
-         *      But on the other hand, we expect that multiple ptlrpcd threads
-         *      and the initial write sponsor can run in parallel, especially
-         *      when data checksum is enabled, which is CPU-bound operation and
-         *      single ptlrpcd thread cannot process in time. So more ptlrpcd
-         *      threads sharing BRW load (with PDL_POLICY_ROUND) seems better.
-         */
-        ptlrpcd_add_req(req, pol, -1);
-        RETURN(1);
-}
-
-#define LOI_DEBUG(LOI, STR, args...)                                     \
-        CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
-               !cfs_list_empty(&(LOI)->loi_ready_item) ||                \
-               !cfs_list_empty(&(LOI)->loi_hp_ready_item),               \
-               (LOI)->loi_write_lop.lop_num_pending,                     \
-               !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent),        \
-               (LOI)->loi_read_lop.lop_num_pending,                      \
-               !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent),         \
-               args)                                                     \
-
-/* This is called by osc_check_rpcs() to find which objects have pages that
- * we could be sending.  These lists are maintained by lop_makes_rpc(). */
-struct lov_oinfo *osc_next_loi(struct client_obd *cli)
-{
-        ENTRY;
-
-        /* First return objects that have blocked locks so that they
-         * will be flushed quickly and other clients can get the lock,
-         * then objects which have pages ready to be stuffed into RPCs */
-        if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
-                RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
-                                      struct lov_oinfo, loi_hp_ready_item));
-        if (!cfs_list_empty(&cli->cl_loi_ready_list))
-                RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
-                                      struct lov_oinfo, loi_ready_item));
-
-        /* then if we have cache waiters, return all objects with queued
-         * writes.  This is especially important when many small files
-         * have filled up the cache and not been fired into rpcs because
-         * they don't pass the nr_pending/object threshhold */
-        if (!cfs_list_empty(&cli->cl_cache_waiters) &&
-            !cfs_list_empty(&cli->cl_loi_write_list))
-                RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
-                                      struct lov_oinfo, loi_write_item));
-
-        /* then return all queued objects when we have an invalid import
-         * so that they get flushed */
-        if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
-                if (!cfs_list_empty(&cli->cl_loi_write_list))
-                        RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
-                                              struct lov_oinfo,
-                                              loi_write_item));
-                if (!cfs_list_empty(&cli->cl_loi_read_list))
-                        RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
-                                              struct lov_oinfo, loi_read_item));
-        }
-        RETURN(NULL);
-}
-
-static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
-{
-        struct osc_async_page *oap;
-        int hprpc = 0;
-
-        if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
-                oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
-                                     struct osc_async_page, oap_urgent_item);
-                hprpc = !!(oap->oap_async_flags & ASYNC_HP);
-        }
-
-        if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
-                oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
-                                     struct osc_async_page, oap_urgent_item);
-                hprpc = !!(oap->oap_async_flags & ASYNC_HP);
-        }
-
-        return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
-}
-
-/* called with the loi list lock held */
-static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli, int ptlrpc)
-{
-        struct lov_oinfo *loi;
-        int rc = 0, race_counter = 0;
-        pdl_policy_t pol;
-        ENTRY;
-
-        pol = ptlrpc ? PDL_POLICY_SAME : PDL_POLICY_ROUND;
-
-        while ((loi = osc_next_loi(cli)) != NULL) {
-                LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
-
-                if (osc_max_rpc_in_flight(cli, loi))
-                        break;
-
-                /* attempt some read/write balancing by alternating between
-                 * reads and writes in an object.  The makes_rpc checks here
-                 * would be redundant if we were getting read/write work items
-                 * instead of objects.  we don't want send_oap_rpc to drain a
-                 * partial read pending queue when we're given this object to
-                 * do io on writes while there are cache waiters */
-                if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
-                        rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
-                                              &loi->loi_write_lop, pol);
-                        if (rc < 0) {
-                                CERROR("Write request failed with %d\n", rc);
-
-                                /* osc_send_oap_rpc failed, mostly because of
-                                 * memory pressure.
-                                 *
-                                 * It can't break here, because if:
-                                 *  - a page was submitted by osc_io_submit, so
-                                 *    page locked;
-                                 *  - no request in flight
-                                 *  - no subsequent request
-                                 * The system will be in live-lock state,
-                                 * because there is no chance to call
-                                 * osc_io_unplug() and osc_check_rpcs() any
-                                 * more. pdflush can't help in this case,
-                                 * because it might be blocked at grabbing
-                                 * the page lock as we mentioned.
-                                 *
-                                 * Anyway, continue to drain pages. */
-                                /* break; */
-                        }
-
-                        if (rc > 0)
-                                race_counter = 0;
-                        else if (rc == 0)
-                                race_counter++;
-                }
-                if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
-                        rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
-                                              &loi->loi_read_lop, pol);
-                        if (rc < 0)
-                                CERROR("Read request failed with %d\n", rc);
-
-                        if (rc > 0)
-                                race_counter = 0;
-                        else if (rc == 0)
-                                race_counter++;
-                }
-
-                /* attempt some inter-object balancing by issuing rpcs
-                 * for each object in turn */
-                if (!cfs_list_empty(&loi->loi_hp_ready_item))
-                        cfs_list_del_init(&loi->loi_hp_ready_item);
-                if (!cfs_list_empty(&loi->loi_ready_item))
-                        cfs_list_del_init(&loi->loi_ready_item);
-                if (!cfs_list_empty(&loi->loi_write_item))
-                        cfs_list_del_init(&loi->loi_write_item);
-                if (!cfs_list_empty(&loi->loi_read_item))
-                        cfs_list_del_init(&loi->loi_read_item);
-
-                loi_list_maint(cli, loi);
-
-                /* send_oap_rpc fails with 0 when make_ready tells it to
-                 * back off.  llite's make_ready does this when it tries
-                 * to lock a page queued for write that is already locked.
-                 * we want to try sending rpcs from many objects, but we
-                 * don't want to spin failing with 0.  */
-                if (race_counter == 10)
-                        break;
-        }
-}
-
-void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
-{
-        osc_check_rpcs0(env, cli, 0);
-}
-
-/**
- * Non-blocking version of osc_enter_cache() that consumes grant only when it
- * is available.
- */
-int osc_enter_cache_try(const struct lu_env *env,
-                        struct client_obd *cli, struct lov_oinfo *loi,
-                        struct osc_async_page *oap, int transient)
-{
-        int has_grant;
-
-        has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
-        if (has_grant) {
-                osc_consume_write_grant(cli, &oap->oap_brw_page);
-                if (transient) {
-                        cli->cl_dirty_transit += CFS_PAGE_SIZE;
-                        cfs_atomic_inc(&obd_dirty_transit_pages);
-                        oap->oap_brw_flags |= OBD_BRW_NOCACHE;
-                }
-        }
-        return has_grant;
-}
-
-/* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
- * grant or cache space. */
-static int osc_enter_cache(const struct lu_env *env,
-                           struct client_obd *cli, struct lov_oinfo *loi,
-                           struct osc_async_page *oap)
-{
-        struct osc_cache_waiter ocw;
-        struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
-        int rc = -EDQUOT;
-        ENTRY;
-
-        CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
-               "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
-               cli->cl_dirty_max, obd_max_dirty_pages,
-               cli->cl_lost_grant, cli->cl_avail_grant);
-
-        /* force the caller to try sync io.  this can jump the list
-         * of queued writes and create a discontiguous rpc stream */
-        if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
-            cli->cl_dirty_max < CFS_PAGE_SIZE     ||
-            cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync)
-                RETURN(-EDQUOT);
-
-        /* Hopefully normal case - cache space and write credits available */
-        if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
-            cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
-            osc_enter_cache_try(env, cli, loi, oap, 0))
-                RETURN(0);
-
-        /* We can get here for two reasons: too many dirty pages in cache, or
-         * run out of grants. In both cases we should write dirty pages out.
-         * Adding a cache waiter will trigger urgent write-out no matter what
-         * RPC size will be.
-         * The exiting condition is no avail grants and no dirty pages caching,
-         * that really means there is no space on the OST. */
-        cfs_waitq_init(&ocw.ocw_waitq);
-        ocw.ocw_oap = oap;
-        while (cli->cl_dirty > 0) {
-                cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
-                ocw.ocw_rc = 0;
-
-                loi_list_maint(cli, loi);
-                osc_check_rpcs(env, cli);
-                client_obd_list_unlock(&cli->cl_loi_list_lock);
-
-                CDEBUG(D_CACHE, "%s: sleeping for cache space @ %p for %p\n",
-                       cli->cl_import->imp_obd->obd_name, &ocw, oap);
-
-                rc = l_wait_event(ocw.ocw_waitq, cfs_list_empty(&ocw.ocw_entry), &lwi);
-
-                client_obd_list_lock(&cli->cl_loi_list_lock);
-                cfs_list_del_init(&ocw.ocw_entry);
-                if (rc < 0)
-                        break;
-
-                rc = ocw.ocw_rc;
-                if (rc != -EDQUOT)
-                        break;
-        }
-
-        RETURN(rc);
-}
-
-
-int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
-                        struct lov_oinfo *loi, cfs_page_t *page,
-                        obd_off offset, const struct obd_async_page_ops *ops,
-                        void *data, void **res, int nocache,
-                        struct lustre_handle *lockh)
-{
-        struct osc_async_page *oap;
-
-        ENTRY;
-
-        if (!page)
-                return cfs_size_round(sizeof(*oap));
-
-        oap = *res;
-        oap->oap_magic = OAP_MAGIC;
-        oap->oap_cli = &exp->exp_obd->u.cli;
-        oap->oap_loi = loi;
-
-        oap->oap_caller_ops = ops;
-        oap->oap_caller_data = data;
-
-        oap->oap_page = page;
-        oap->oap_obj_off = offset;
-        if (!client_is_remote(exp) &&
-            cfs_capable(CFS_CAP_SYS_RESOURCE))
-                oap->oap_brw_flags = OBD_BRW_NOQUOTA;
-
-        LASSERT(!(offset & ~CFS_PAGE_MASK));
-
-        CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
-        CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
-        CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
-        CFS_INIT_LIST_HEAD(&oap->oap_page_list);
-
-        cfs_spin_lock_init(&oap->oap_lock);
-        CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
-        RETURN(0);
-}
-
-int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp,
-                       struct lov_stripe_md *lsm, struct lov_oinfo *loi,
-                       struct osc_async_page *oap, int cmd, int off,
-                       int count, obd_flag brw_flags, enum async_flags async_flags)
-{
-        struct client_obd *cli = &exp->exp_obd->u.cli;
-        int rc = 0;
-        ENTRY;
-
-        if (oap->oap_magic != OAP_MAGIC)
-                RETURN(-EINVAL);
-
-        if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
-                RETURN(-EIO);
-
-        if (!cfs_list_empty(&oap->oap_pending_item) ||
-            !cfs_list_empty(&oap->oap_urgent_item) ||
-            !cfs_list_empty(&oap->oap_rpc_item))
-                RETURN(-EBUSY);
-
-        /* check if the file's owner/group is over quota */
-        if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
-                struct cl_object *obj;
-                struct cl_attr    attr; /* XXX put attr into thread info */
-                unsigned int qid[MAXQUOTAS];
-
-                obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
-
-                cl_object_attr_lock(obj);
-                rc = cl_object_attr_get(env, obj, &attr);
-                cl_object_attr_unlock(obj);
-
-                qid[USRQUOTA] = attr.cat_uid;
-                qid[GRPQUOTA] = attr.cat_gid;
-                if (rc == 0 &&
-                    osc_quota_chkdq(cli, qid) == NO_QUOTA)
-                        rc = -EDQUOT;
-                if (rc)
-                        RETURN(rc);
-        }
-
-        if (loi == NULL)
-                loi = lsm->lsm_oinfo[0];
-
-        client_obd_list_lock(&cli->cl_loi_list_lock);
-
-        LASSERT(off + count <= CFS_PAGE_SIZE);
-        oap->oap_cmd = cmd;
-        oap->oap_page_off = off;
-        oap->oap_count = count;
-        oap->oap_brw_flags = brw_flags;
-        /* Give a hint to OST that requests are coming from kswapd - bug19529 */
-        if (cfs_memory_pressure_get())
-                oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
-        cfs_spin_lock(&oap->oap_lock);
-        oap->oap_async_flags = async_flags;
-        cfs_spin_unlock(&oap->oap_lock);
-
-        if (cmd & OBD_BRW_WRITE) {
-                rc = osc_enter_cache(env, cli, loi, oap);
-                if (rc) {
-                        client_obd_list_unlock(&cli->cl_loi_list_lock);
-                        RETURN(rc);
-                }
-        }
-
-        LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
-                  cmd);
-
-        osc_oap_to_pending(oap);
-        loi_list_maint(cli, loi);
-        if (!osc_max_rpc_in_flight(cli, loi) &&
-            lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
-                LASSERT(cli->cl_writeback_work != NULL);
-                rc = ptlrpcd_queue_work(cli->cl_writeback_work);
-
-                CDEBUG(D_CACHE, "Queued writeback work for client obd %p/%d.\n",
-                       cli, rc);
-        }
-        client_obd_list_unlock(&cli->cl_loi_list_lock);
-
-        RETURN(0);
-}
-
-/* aka (~was & now & flag), but this is more clear :) */
-#define SETTING(was, now, flag) (!(was & flag) && (now & flag))
-
-int osc_set_async_flags_base(struct client_obd *cli,
-                             struct lov_oinfo *loi, struct osc_async_page *oap,
-                             obd_flag async_flags)
-{
-        struct loi_oap_pages *lop;
-        int flags = 0;
-        ENTRY;
-
-        LASSERT(!cfs_list_empty(&oap->oap_pending_item));
-
-        if (oap->oap_cmd & OBD_BRW_WRITE) {
-                lop = &loi->loi_write_lop;
-        } else {
-                lop = &loi->loi_read_lop;
-        }
-
-        if ((oap->oap_async_flags & async_flags) == async_flags)
-                RETURN(0);
-
-        if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
-                flags |= ASYNC_READY;
-
-        if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
-            cfs_list_empty(&oap->oap_rpc_item)) {
-                if (oap->oap_async_flags & ASYNC_HP)
-                        cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
-                else
-                        cfs_list_add_tail(&oap->oap_urgent_item,
-                                          &lop->lop_urgent);
-                flags |= ASYNC_URGENT;
-                loi_list_maint(cli, loi);
-        }
-        cfs_spin_lock(&oap->oap_lock);
-        oap->oap_async_flags |= flags;
-        cfs_spin_unlock(&oap->oap_lock);
-
-        LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
-                        oap->oap_async_flags);
-        RETURN(0);
-}
-
-int osc_teardown_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
-                            struct lov_oinfo *loi, struct osc_async_page *oap)
-{
-        struct client_obd *cli = &exp->exp_obd->u.cli;
-        struct loi_oap_pages *lop;
-        int rc = 0;
-        ENTRY;
-
-        if (oap->oap_magic != OAP_MAGIC)
-                RETURN(-EINVAL);
-
-        if (loi == NULL)
-                loi = lsm->lsm_oinfo[0];
-
-        if (oap->oap_cmd & OBD_BRW_WRITE) {
-                lop = &loi->loi_write_lop;
-        } else {
-                lop = &loi->loi_read_lop;
-        }
-
-        client_obd_list_lock(&cli->cl_loi_list_lock);
-
-        if (!cfs_list_empty(&oap->oap_rpc_item))
-                GOTO(out, rc = -EBUSY);
-
-        osc_exit_cache(cli, oap, 0);
-        osc_wake_cache_waiters(cli);
-
-        if (!cfs_list_empty(&oap->oap_urgent_item)) {
-                cfs_list_del_init(&oap->oap_urgent_item);
-                cfs_spin_lock(&oap->oap_lock);
-                oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
-                cfs_spin_unlock(&oap->oap_lock);
-        }
-        if (!cfs_list_empty(&oap->oap_pending_item)) {
-                cfs_list_del_init(&oap->oap_pending_item);
-                lop_update_pending(cli, lop, oap->oap_cmd, -1);
-        }
-        loi_list_maint(cli, loi);
-        LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
-out:
-        client_obd_list_unlock(&cli->cl_loi_list_lock);
-        RETURN(rc);
+                       osc_ap_completion(env, cli, NULL, oap, 0, rc);
+               }
+               if (clerq && !IS_ERR(clerq))
+                       cl_req_completion(env, clerq, rc);
+       } else {
+               struct osc_async_page *tmp = NULL;
+
+               /* queued sync pages can be torn down while the pages
+                * were between the pending list and the rpc */
+               LASSERT(aa != NULL);
+               client_obd_list_lock(&cli->cl_loi_list_lock);
+               cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
+                       /* only one oap gets a request reference */
+                       if (tmp == NULL)
+                               tmp = oap;
+                       if (oap->oap_interrupted && !req->rq_intr) {
+                               CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
+                                               oap, req);
+                               ptlrpc_mark_interrupted(req);
+                       }
+               }
+               if (tmp != NULL)
+                       tmp->oap_request = ptlrpc_request_addref(req);
+
+               DEBUG_REQ(D_INODE,req, "%d pages, aa %p. now %dr/%dw in flight",
+                         page_count, aa, cli->cl_r_in_flight,
+                         cli->cl_w_in_flight);
+
+               /* XXX: Maybe the caller can check the RPC bulk descriptor to
+                * see which CPU/NUMA node the majority of pages were allocated
+                * on, and try to assign the async RPC to the CPU core
+                * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
+                *
+                * But on the other hand, we expect that multiple ptlrpcd
+                * threads and the initial write sponsor can run in parallel,
+                * especially when data checksum is enabled, which is CPU-bound
+                * operation and single ptlrpcd thread cannot process in time.
+                * So more ptlrpcd threads sharing BRW load
+                * (with PDL_POLICY_ROUND) seems better.
+                */
+               ptlrpcd_add_req(req, pol, -1);
+       }
+       RETURN(rc);
 }
 
 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
@@ -3662,9 +2673,11 @@ out:
         RETURN(rc);
 }
 
-static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
-                            __u64 max_age, struct ptlrpc_request_set *rqset)
+static int osc_statfs_async(struct obd_export *exp,
+                            struct obd_info *oinfo, __u64 max_age,
+                            struct ptlrpc_request_set *rqset)
 {
+        struct obd_device     *obd = class_exp2obd(exp);
         struct ptlrpc_request *req;
         struct osc_async_args *aa;
         int                    rc;
@@ -3704,9 +2717,10 @@ static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
         RETURN(0);
 }
 
-static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
-                      __u64 max_age, __u32 flags)
+static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
+                      struct obd_statfs *osfs, __u64 max_age, __u32 flags)
 {
+        struct obd_device     *obd = class_exp2obd(exp);
         struct obd_statfs     *msfs;
         struct ptlrpc_request *req;
         struct obd_import     *imp = NULL;
@@ -3917,8 +2931,8 @@ out:
         return err;
 }
 
-static int osc_get_info(struct obd_export *exp, obd_count keylen,
-                        void *key, __u32 *vallen, void *val,
+static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
+                        obd_count keylen, void *key, __u32 *vallen, void *val,
                         struct lov_stripe_md *lsm)
 {
         ENTRY;
@@ -4048,9 +3062,9 @@ static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
         RETURN(osc_setinfo_mds_connect_import(req->rq_import));
 }
 
-static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
-                              void *key, obd_count vallen, void *val,
-                              struct ptlrpc_request_set *set)
+static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
+                              obd_count keylen, void *key, obd_count vallen,
+                              void *val, struct ptlrpc_request_set *set)
 {
         struct ptlrpc_request *req;
         struct obd_device     *obd = exp->exp_obd;
@@ -4384,7 +3398,7 @@ static int osc_import_event(struct obd_device *obd,
                         client_obd_list_lock(&cli->cl_loi_list_lock);
                         /* all pages go to failing rpcs due to the invalid
                          * import */
-                        osc_check_rpcs(env, cli);
+                       osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
                         client_obd_list_unlock(&cli->cl_loi_list_lock);
 
                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
@@ -4460,6 +3474,18 @@ static int osc_cancel_for_recovery(struct ldlm_lock *lock)
         RETURN(0);
 }
 
+static int brw_queue_work(const struct lu_env *env, void *data)
+{
+       struct client_obd *cli = data;
+
+       CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
+
+       client_obd_list_lock(&cli->cl_loi_list_lock);
+       osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
+       client_obd_list_unlock(&cli->cl_loi_list_lock);
+       RETURN(0);
+}
+
 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
 {
         struct client_obd *cli = &obd->u.cli;
@@ -4682,11 +3708,9 @@ int __init osc_init(void)
 #ifdef __KERNEL__
 static void /*__exit*/ osc_exit(void)
 {
-        lu_device_type_fini(&osc_device_type);
-
-        osc_quota_exit();
-        class_unregister_type(LUSTRE_OSC_NAME);
-        lu_kmem_fini(osc_caches);
+       osc_quota_exit();
+       class_unregister_type(LUSTRE_OSC_NAME);
+       lu_kmem_fini(osc_caches);
 }
 
 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");