Whamcloud - gitweb
LU-17705 ptlrpc: replace synchronize_rcu() with rcu_barrier()
[fs/lustre-release.git] / lustre / osc / osc_cache.c
index 364a71f..ec71de5 100644 (file)
@@ -28,7 +28,6 @@
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * osc cache management.
  *
@@ -48,10 +47,11 @@ static void osc_update_pending(struct osc_object *obj, int cmd, int delta);
 static int osc_extent_wait(const struct lu_env *env, struct osc_extent *ext,
                           enum osc_extent_state state);
 static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
+                             struct osc_object *osc,
                              struct osc_async_page *oap, int sent, int rc);
 static int osc_make_ready(const struct lu_env *env, struct osc_async_page *oap,
                          int cmd);
-static int osc_refresh_count(const struct lu_env *env,
+static int osc_refresh_count(const struct lu_env *env, struct osc_object *osc,
                             struct osc_async_page *oap, int cmd);
 static int osc_io_unplug_async(const struct lu_env *env,
                               struct client_obd *cli, struct osc_object *osc);
@@ -97,7 +97,7 @@ static inline char *ext_flags(struct osc_extent *ext, char *flags)
 
 #define EXTSTR       "[%lu -> %lu/%lu]"
 #define EXTPARA(ext) (ext)->oe_start, (ext)->oe_end, (ext)->oe_max_end
-static const char *oes_strings[] = {
+static const char *const oes_strings[] = {
        "inv", "active", "cache", "locking", "lockdone", "rpc", "trunc", NULL };
 
 #define OSC_EXTENT_DUMP_WITH_LOC(file, func, line, mask, extent, fmt, ...) do {\
@@ -212,7 +212,7 @@ static int osc_extent_sanity_check0(struct osc_extent *ext,
                        GOTO(out, rc = 60);
                if (ext->oe_fsync_wait && !ext->oe_urgent && !ext->oe_hp)
                        GOTO(out, rc = 65);
-               /* fallthrough */
+               fallthrough;
        default:
                if (atomic_read(&ext->oe_users) > 0)
                        GOTO(out, rc = 70);
@@ -230,8 +230,8 @@ static int osc_extent_sanity_check0(struct osc_extent *ext,
                struct ldlm_extent *extent;
 
                extent = &ext->oe_dlmlock->l_policy_data.l_extent;
-               if (!(extent->start <= cl_offset(osc2cl(obj), ext->oe_start) &&
-                     extent->end   >= cl_offset(osc2cl(obj), ext->oe_max_end)))
+               if (!(extent->start <= ext->oe_start << PAGE_SHIFT &&
+                     extent->end >= ext->oe_max_end << PAGE_SHIFT))
                        GOTO(out, rc = 100);
 
                if (!(ext->oe_dlmlock->l_granted_mode & (LCK_PW | LCK_GROUP)))
@@ -354,7 +354,7 @@ static void osc_extent_free(struct kref *kref)
        if (ext->oe_dlmlock) {
                lu_ref_del(&ext->oe_dlmlock->l_reference,
                           "osc_extent", ext);
-               LDLM_LOCK_PUT(ext->oe_dlmlock);
+               LDLM_LOCK_RELEASE(ext->oe_dlmlock);
                ext->oe_dlmlock = NULL;
        }
 #if 0
@@ -531,7 +531,8 @@ static int osc_extent_merge(const struct lu_env *env, struct osc_extent *cur,
        if (victim == NULL)
                return -EINVAL;
 
-       if (victim->oe_state != OES_CACHE || victim->oe_fsync_wait)
+       if (victim->oe_state != OES_INV &&
+           (victim->oe_state != OES_CACHE || victim->oe_fsync_wait))
                return -EBUSY;
 
        if (cur->oe_max_end != victim->oe_max_end)
@@ -583,11 +584,10 @@ static int osc_extent_merge(const struct lu_env *env, struct osc_extent *cur,
 /**
  * Drop user count of osc_extent, and unplug IO asynchronously.
  */
-int osc_extent_release(const struct lu_env *env, struct osc_extent *ext)
+void osc_extent_release(const struct lu_env *env, struct osc_extent *ext)
 {
        struct osc_object *obj = ext->oe_obj;
        struct client_obd *cli = osc_cli(obj);
-       int rc = 0;
        ENTRY;
 
        LASSERT(atomic_read(&ext->oe_users) > 0);
@@ -634,7 +634,8 @@ int osc_extent_release(const struct lu_env *env, struct osc_extent *ext)
                osc_io_unplug_async(env, cli, obj);
        }
        osc_extent_put(env, ext);
-       RETURN(rc);
+
+       RETURN_EXIT;
 }
 
 /**
@@ -768,7 +769,6 @@ restart:
                if (osc_extent_merge(env, ext, cur) == 0) {
                        LASSERT(*grants >= chunksize);
                        *grants -= chunksize;
-                       found = osc_extent_hold(ext);
 
                        /*
                         * Try to merge with the next one too because we
@@ -778,6 +778,7 @@ restart:
                                /* we can save extent tax from next extent */
                                *grants += cli->cl_grant_extent_tax;
 
+                       found = osc_extent_hold(ext);
                        break;
                }
        }
@@ -831,6 +832,7 @@ int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext,
                      int sent, int rc)
 {
        struct client_obd *cli = osc_cli(ext->oe_obj);
+       struct osc_object *osc = ext->oe_obj;
        struct osc_async_page *oap;
        struct osc_async_page *tmp;
        int nr_pages = ext->oe_nr_pages;
@@ -845,7 +847,10 @@ int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext,
        ext->oe_rc = rc ?: ext->oe_nr_pages;
        EASSERT(ergo(rc == 0, ext->oe_state == OES_RPC), ext);
 
-       osc_lru_add_batch(cli, &ext->oe_pages);
+       /* dio pages do not go in the LRU */
+       if (!ext->oe_dio)
+               osc_lru_add_batch(cli, &ext->oe_pages);
+
        list_for_each_entry_safe(oap, tmp, &ext->oe_pages,
                                     oap_pending_item) {
                list_del_init(&oap->oap_rpc_item);
@@ -856,17 +861,20 @@ int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext,
                }
 
                --ext->oe_nr_pages;
-               osc_ap_completion(env, cli, oap, sent, rc);
+               osc_ap_completion(env, cli, osc, oap, sent, rc);
        }
        EASSERT(ext->oe_nr_pages == 0, ext);
 
        if (!sent) {
                lost_grant = ext->oe_grants;
-       } else if (blocksize < PAGE_SIZE &&
+       } else if (cli->cl_ocd_grant_param == 0 &&
+                  blocksize < PAGE_SIZE &&
                   last_count != PAGE_SIZE) {
-               /* For short writes we shouldn't count parts of pages that
-                * span a whole chunk on the OST side, or our accounting goes
-                * wrong.  Should match the code in filter_grant_check. */
+               /* For short writes without OBD_CONNECT_GRANT support, we
+                * shouldn't count parts of pages that span a whole chunk on
+                * the OST side, or our accounting goes wrong. Should match
+                * the code in tgt_grant_check.
+                */
                int offset = last_off & ~PAGE_MASK;
                int count = last_count + (offset & (blocksize - 1));
                int end = (offset + last_count) & (blocksize - 1);
@@ -943,7 +951,7 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index,
        struct client_obd     *cli = osc_cli(obj);
        struct osc_async_page *oap;
        struct osc_async_page *tmp;
-       struct pagevec        *pvec;
+       struct folio_batch    *fbatch;
        int                    pages_in_chunk = 0;
        int                    ppc_bits    = cli->cl_chunkbits -
                                             PAGE_SHIFT;
@@ -968,8 +976,8 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index,
        io  = osc_env_thread_io(env);
        io->ci_obj = cl_object_top(osc2cl(obj));
        io->ci_ignore_layout = 1;
-       pvec = &osc_env_info(env)->oti_pagevec;
-       ll_pagevec_init(pvec, 0);
+       fbatch = &osc_env_info(env)->oti_fbatch;
+       ll_folio_batch_init(fbatch, 0);
        rc = cl_io_init(env, io, CIT_MISC, io->ci_obj);
        if (rc < 0)
                GOTO(out, rc);
@@ -1007,12 +1015,12 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index,
                }
 
                lu_ref_del(&page->cp_reference, "truncate", current);
-               cl_pagevec_put(env, page, pvec);
+               cl_batch_put(env, page, fbatch);
 
                --ext->oe_nr_pages;
                ++nr_pages;
        }
-       pagevec_release(pvec);
+       folio_batch_release(fbatch);
 
        EASSERTF(ergo(ext->oe_start >= trunc_index + !!partial,
                      ext->oe_nr_pages == 0),
@@ -1092,9 +1100,7 @@ static int osc_extent_make_ready(const struct lu_env *env,
                rc = osc_make_ready(env, oap, OBD_BRW_WRITE);
                switch (rc) {
                case 0:
-                       spin_lock(&oap->oap_lock);
                        oap->oap_async_flags |= ASYNC_READY;
-                       spin_unlock(&oap->oap_lock);
                        break;
                case -EALREADY:
                        LASSERT((oap->oap_async_flags & ASYNC_READY) != 0);
@@ -1109,13 +1115,13 @@ static int osc_extent_make_ready(const struct lu_env *env,
        /* the last page is the only one we need to refresh its count by
         * the size of file. */
        if (!(last->oap_async_flags & ASYNC_COUNT_STABLE)) {
-               int last_oap_count = osc_refresh_count(env, last, OBD_BRW_WRITE);
-               LASSERT(last_oap_count > 0);
+               int last_oap_count = osc_refresh_count(env, obj, last,
+                                                      OBD_BRW_WRITE);
+               LASSERTF(last_oap_count > 0,
+                        "last_oap_count %d\n", last_oap_count);
                LASSERT(last->oap_page_off + last_oap_count <= PAGE_SIZE);
                last->oap_count = last_oap_count;
-               spin_lock(&last->oap_lock);
                last->oap_async_flags |= ASYNC_COUNT_STABLE;
-               spin_unlock(&last->oap_lock);
        }
 
        /* for the rest of pages, we don't need to call osf_refresh_count()
@@ -1123,9 +1129,7 @@ static int osc_extent_make_ready(const struct lu_env *env,
        list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
                if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
                        oap->oap_count = PAGE_SIZE - oap->oap_page_off;
-                       spin_lock(&oap->oap_lock);
                        oap->oap_async_flags |= ASYNC_COUNT_STABLE;
-                       spin_unlock(&oap->oap_lock);
                }
        }
 
@@ -1249,25 +1253,24 @@ static inline int osc_is_ready(struct osc_object *osc)
 static int osc_make_ready(const struct lu_env *env, struct osc_async_page *oap,
                          int cmd)
 {
-       struct osc_page *opg  = oap2osc_page(oap);
        struct cl_page  *page = oap2cl_page(oap);
        int result;
 
        LASSERT(cmd == OBD_BRW_WRITE); /* no cached reads */
 
        ENTRY;
+
        result = cl_page_make_ready(env, page, CRT_WRITE);
-       if (result == 0)
-               opg->ops_submit_time = ktime_get();
+
        RETURN(result);
 }
 
-static int osc_refresh_count(const struct lu_env *env,
+static int osc_refresh_count(const struct lu_env *env, struct osc_object *osc,
                             struct osc_async_page *oap, int cmd)
 {
        struct osc_page  *opg = oap2osc_page(oap);
        pgoff_t index = osc_index(oap2osc(oap));
-       struct cl_object *obj;
+       struct cl_object *obj = osc2cl(osc);
        struct cl_attr   *attr = &osc_env_info(env)->oti_attr;
        int result;
        loff_t kms;
@@ -1275,7 +1278,6 @@ static int osc_refresh_count(const struct lu_env *env,
        /* readpage queues with _COUNT_STABLE, shouldn't get here. */
        LASSERT(!(cmd & OBD_BRW_READ));
        LASSERT(opg != NULL);
-       obj = opg->ops_cl.cpl_obj;
 
        cl_object_attr_lock(obj);
        result = cl_object_attr_get(env, obj, attr);
@@ -1283,18 +1285,18 @@ static int osc_refresh_count(const struct lu_env *env,
        if (result < 0)
                return result;
        kms = attr->cat_kms;
-       if (cl_offset(obj, index) >= kms)
+       if (index << PAGE_SHIFT >= kms)
                /* catch race with truncate */
                return 0;
-       else if (cl_offset(obj, index + 1) > kms)
+       else if ((index + 1) << PAGE_SHIFT > kms)
                /* catch sub-page write at end of file */
                return kms & ~PAGE_MASK;
        else
                return PAGE_SIZE;
 }
 
-static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
-                         int cmd, int rc)
+static int osc_completion(const struct lu_env *env, struct osc_object *osc,
+                         struct osc_async_page *oap, int cmd, int rc)
 {
        struct osc_page   *opg  = oap2osc_page(oap);
        struct cl_page    *page = oap2cl_page(oap);
@@ -1304,23 +1306,24 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
        ENTRY;
 
        cmd &= ~OBD_BRW_NOQUOTA;
-       LASSERTF(equi(page->cp_state == CPS_PAGEIN,  cmd == OBD_BRW_READ),
-                "cp_state:%u, cmd:%d\n", page->cp_state, cmd);
-       LASSERTF(equi(page->cp_state == CPS_PAGEOUT, cmd == OBD_BRW_WRITE),
-               "cp_state:%u, cmd:%d\n", page->cp_state, cmd);
-       LASSERT(opg->ops_transfer_pinned);
+       if (page->cp_type != CPT_TRANSIENT) {
+               LASSERTF(equi(page->cp_state == CPS_PAGEIN,  cmd == OBD_BRW_READ),
+                        "cp_state:%u, cmd:%d\n", page->cp_state, cmd);
+               LASSERTF(equi(page->cp_state == CPS_PAGEOUT, cmd == OBD_BRW_WRITE),
+                       "cp_state:%u, cmd:%d\n", page->cp_state, cmd);
+               LASSERT(opg->ops_transfer_pinned);
+       }
 
        crt = cmd == OBD_BRW_READ ? CRT_READ : CRT_WRITE;
        /* Clear opg->ops_transfer_pinned before VM lock is released. */
        opg->ops_transfer_pinned = 0;
 
-       opg->ops_submit_time = ktime_set(0, 0);
        srvlock = oap->oap_brw_flags & OBD_BRW_SRVLOCK;
 
        /* statistic */
        if (rc == 0 && srvlock) {
-               struct lu_device *ld    = opg->ops_cl.cpl_obj->co_lu.lo_dev;
-               struct osc_stats *stats = &lu2osc_dev(ld)->od_stats;
+               struct lu_device *ld = osc->oo_cl.co_lu.lo_dev;
+               struct osc_stats *stats = &lu2osc_dev(ld)->osc_stats;
                size_t bytes = oap->oap_count;
 
                if (crt == CRT_READ)
@@ -1364,12 +1367,11 @@ static void osc_consume_write_grant(struct client_obd *cli,
                                    struct brw_page *pga)
 {
        assert_spin_locked(&cli->cl_loi_list_lock);
-       LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
+       LASSERT(!(pga->bp_flag & OBD_BRW_FROM_GRANT));
        cli->cl_dirty_pages++;
-       pga->flag |= OBD_BRW_FROM_GRANT;
+       pga->bp_flag |= OBD_BRW_FROM_GRANT;
        CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
-              PAGE_SIZE, pga, pga->pg);
-       osc_update_next_shrink(cli);
+              PAGE_SIZE, pga, pga->bp_page);
 }
 
 /* the companion to osc_consume_write_grant, called when a brw has completed.
@@ -1380,12 +1382,12 @@ static void osc_release_write_grant(struct client_obd *cli,
        ENTRY;
 
        assert_spin_locked(&cli->cl_loi_list_lock);
-       if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
+       if (!(pga->bp_flag & OBD_BRW_FROM_GRANT)) {
                EXIT;
                return;
        }
 
-       pga->flag &= ~OBD_BRW_FROM_GRANT;
+       pga->bp_flag &= ~OBD_BRW_FROM_GRANT;
        atomic_long_dec(&obd_dirty_pages);
        cli->cl_dirty_pages--;
        EXIT;
@@ -1451,11 +1453,11 @@ static void osc_unreserve_grant(struct client_obd *cli,
  * used, we should return these grants to OST. There're two cases where grants
  * can be lost:
  * 1. truncate;
- * 2. blocksize at OST is less than PAGE_SIZE and a partial page was
- *    written. In this case OST may use less chunks to serve this partial
- *    write. OSTs don't actually know the page size on the client side. so
- *    clients have to calculate lost grant by the blocksize on the OST.
- *    See filter_grant_check() for details.
+ * 2. Without OBD_CONNECT_GRANT support and blocksize at OST is less than
+ *    PAGE_SIZE and a partial page was written. In this case OST may use less
+ *    chunks to serve this partial write. OSTs don't actually know the page
+ *    size on the client side. so clients have to calculate lost grant by the
+ *    blocksize on the OST. See tgt_grant_check() for details.
  */
 static void osc_free_grant(struct client_obd *cli, unsigned int nr_pages,
                           unsigned int lost_grant, unsigned int dirty_grant)
@@ -1553,13 +1555,14 @@ static inline void cli_lock_after_unplug(struct client_obd *cli)
  * The process will be put into sleep if it's already run out of grant.
  */
 static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
-                          struct osc_async_page *oap, int bytes)
+                          struct osc_object *osc, struct osc_async_page *oap,
+                          int bytes)
 {
-       struct osc_object *osc = oap->oap_obj;
        struct lov_oinfo *loi = osc->oo_oinfo;
        int rc = -EDQUOT;
        int remain;
        bool entered = false;
+       struct obd_device *obd = cli->cl_import->imp_obd;
        /* We cannot wait for a long time here since we are holding ldlm lock
         * across the actual IO. If no requests complete fast (e.g. due to
         * overloaded OST that takes a long time to process everything, we'd
@@ -1568,8 +1571,10 @@ static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
         * evicted by server which is half obd_timeout when AT is off
         * or at least ldlm_enqueue_min with AT on.
         * See LU-13131 */
-       unsigned long timeout = cfs_time_seconds(AT_OFF ? obd_timeout / 2 :
-                                                         ldlm_enqueue_min / 2);
+       unsigned long timeout =
+               cfs_time_seconds(obd_at_off(obd) ?
+                                obd_timeout / 2 :
+                                obd_get_ldlm_enqueue_min(obd) / 2);
 
        ENTRY;
 
@@ -1579,7 +1584,7 @@ static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
 
        /* force the caller to try sync io.  this can jump the list
         * of queued writes and create a discontiguous rpc stream */
-       if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
+       if (CFS_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
            cli->cl_dirty_max_pages == 0 ||
            cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync) {
                OSC_DUMP_GRANT(D_CACHE, cli, "forced sync i/o\n");
@@ -1755,57 +1760,19 @@ static int osc_list_maint(struct client_obd *cli, struct osc_object *osc)
        return is_ready;
 }
 
-/* this is trying to propogate async writeback errors back up to the
- * application.  As an async write fails we record the error code for later if
- * the app does an fsync.  As long as errors persist we force future rpcs to be
- * sync so that the app can get a sync error and break the cycle of queueing
- * pages for which writeback will fail. */
-static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
-                          int rc)
-{
-       if (rc) {
-               if (!ar->ar_rc)
-                       ar->ar_rc = rc;
-
-               ar->ar_force_sync = 1;
-               ar->ar_min_xid = ptlrpc_sample_next_xid();
-               return;
-
-       }
-
-       if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
-               ar->ar_force_sync = 0;
-}
-
 /* this must be called holding the loi list lock to give coverage to exit_cache,
- * async_flag maintenance, and oap_request */
+ * async_flag maintenance
+ */
 static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
+                             struct osc_object *osc,
                              struct osc_async_page *oap, int sent, int rc)
 {
-       struct osc_object *osc = oap->oap_obj;
-       struct lov_oinfo  *loi = osc->oo_oinfo;
-       __u64 xid = 0;
-
        ENTRY;
-       if (oap->oap_request != NULL) {
-               xid = ptlrpc_req_xid(oap->oap_request);
-               ptlrpc_req_finished(oap->oap_request);
-               oap->oap_request = NULL;
-       }
 
        /* As the transfer for this page is being done, clear the flags */
-       spin_lock(&oap->oap_lock);
        oap->oap_async_flags = 0;
-       spin_unlock(&oap->oap_lock);
 
-       if (oap->oap_cmd & OBD_BRW_WRITE && xid > 0) {
-               spin_lock(&cli->cl_loi_list_lock);
-               osc_process_ar(&cli->cl_ar, xid, rc);
-               osc_process_ar(&loi->loi_ar, xid, rc);
-               spin_unlock(&cli->cl_loi_list_lock);
-       }
-
-       rc = osc_completion(env, oap, oap->oap_cmd, rc);
+       rc = osc_completion(env, osc, oap, oap->oap_cmd, rc);
        if (rc)
                CERROR("completion on oap %p obj %p returns %d.\n",
                       oap, osc, rc);
@@ -1937,9 +1904,9 @@ static unsigned int get_write_extents(struct osc_object *obj,
        };
 
        assert_osc_object_is_locked(obj);
-       while (!list_empty(&obj->oo_hp_exts)) {
-               ext = list_entry(obj->oo_hp_exts.next, struct osc_extent,
-                                oe_link);
+       while ((ext = list_first_entry_or_null(&obj->oo_hp_exts,
+                                              struct osc_extent,
+                                              oe_link)) != NULL) {
                if (!try_to_add_extent_for_io(cli, ext, &data))
                        return data.erd_page_count;
                EASSERT(ext->oe_nr_pages <= data.erd_max_pages, ext);
@@ -1947,9 +1914,9 @@ static unsigned int get_write_extents(struct osc_object *obj,
        if (data.erd_page_count == data.erd_max_pages)
                return data.erd_page_count;
 
-       while (!list_empty(&obj->oo_urgent_exts)) {
-               ext = list_entry(obj->oo_urgent_exts.next,
-                                struct osc_extent, oe_link);
+       while ((ext = list_first_entry_or_null(&obj->oo_urgent_exts,
+                                              struct osc_extent,
+                                              oe_link)) != NULL) {
                if (!try_to_add_extent_for_io(cli, ext, &data))
                        return data.erd_page_count;
        }
@@ -1960,10 +1927,11 @@ static unsigned int get_write_extents(struct osc_object *obj,
         * extents can usually only be added if the rpclist was empty, so if we
         * can't add one, we continue on to trying to add normal extents.  This
         * is so we don't miss adding extra extents to an RPC containing high
-        * priority or urgent extents. */
-       while (!list_empty(&obj->oo_full_exts)) {
-               ext = list_entry(obj->oo_full_exts.next,
-                                struct osc_extent, oe_link);
+        * priority or urgent extents.
+        */
+       while ((ext = list_first_entry_or_null(&obj->oo_full_exts,
+                                              struct osc_extent,
+                                              oe_link)) != NULL) {
                if (!try_to_add_extent_for_io(cli, ext, &data))
                        break;
        }
@@ -2213,6 +2181,7 @@ __must_hold(&cli->cl_loi_list_lock)
 
                spin_lock(&cli->cl_loi_list_lock);
        }
+       EXIT;
 }
 
 int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli,
@@ -2237,42 +2206,45 @@ int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli,
 EXPORT_SYMBOL(osc_io_unplug0);
 
 int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops,
-                       struct page *page, loff_t offset)
+                       struct cl_page *page, loff_t offset)
 {
-       struct obd_export     *exp = osc_export(osc);
        struct osc_async_page *oap = &ops->ops_oap;
-       ENTRY;
 
+       ENTRY;
        if (!page)
-               return cfs_size_round(sizeof(*oap));
+               return round_up(sizeof(*oap), 8);
 
-       oap->oap_magic = OAP_MAGIC;
-       oap->oap_cli = &exp->exp_obd->u.cli;
        oap->oap_obj = osc;
-
-       oap->oap_page = page;
+       oap->oap_page = page->cp_vmpage;
        oap->oap_obj_off = offset;
        LASSERT(!(offset & ~PAGE_MASK));
 
+       /* Count of transient (direct i/o) pages is always stable by the time
+        * they're submitted.  Setting this here lets us avoid calling
+        * cl_page_clip later to set this.
+        */
+       if (page->cp_type == CPT_TRANSIENT)
+               oap->oap_async_flags |= ASYNC_COUNT_STABLE|ASYNC_URGENT|
+                                       ASYNC_READY;
+
        INIT_LIST_HEAD(&oap->oap_pending_item);
        INIT_LIST_HEAD(&oap->oap_rpc_item);
 
-       spin_lock_init(&oap->oap_lock);
-       CDEBUG(D_INFO, "oap %p page %p obj off %llu\n",
-              oap, page, oap->oap_obj_off);
+       CDEBUG(D_INFO, "oap %p vmpage %p obj off %llu\n",
+              oap, oap->oap_page, oap->oap_obj_off);
        RETURN(0);
 }
 EXPORT_SYMBOL(osc_prep_async_page);
 
 int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
-                      struct osc_page *ops, cl_commit_cbt cb)
+                      struct osc_object *osc, struct osc_page *ops,
+                      cl_commit_cbt cb)
 {
        struct osc_io *oio = osc_env_io(env);
        struct osc_extent     *ext = NULL;
        struct osc_async_page *oap = &ops->ops_oap;
-       struct client_obd     *cli = oap->oap_cli;
-       struct osc_object     *osc = oap->oap_obj;
-       struct pagevec        *pvec = &osc_env_info(env)->oti_pagevec;
+       struct client_obd     *cli = osc_cli(osc);
+       struct folio_batch    *fbatch = &osc_env_info(env)->oti_fbatch;
        pgoff_t index;
        unsigned int tmp;
        unsigned int grants = 0;
@@ -2282,9 +2254,6 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
        int    rc = 0;
        ENTRY;
 
-       if (oap->oap_magic != OAP_MAGIC)
-               RETURN(-EINVAL);
-
        if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
                RETURN(-EIO);
 
@@ -2294,13 +2263,23 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
 
        /* Set the OBD_BRW_SRVLOCK before the page is queued. */
        brw_flags |= ops->ops_srvlock ? OBD_BRW_SRVLOCK : 0;
-       if (oio->oi_cap_sys_resource || io->ci_noquota) {
+       if (io->ci_noquota) {
                brw_flags |= OBD_BRW_NOQUOTA;
                cmd |= OBD_BRW_NOQUOTA;
        }
 
+       if (oio->oi_cap_sys_resource) {
+               brw_flags |= OBD_BRW_SYS_RESOURCE;
+               cmd |= OBD_BRW_SYS_RESOURCE;
+       }
+
        /* check if the file's owner/group is over quota */
-       if (!(cmd & OBD_BRW_NOQUOTA)) {
+       /* do not check for root without root squash, because in this case
+        * we should bypass quota
+        */
+       if ((!oio->oi_cap_sys_resource ||
+            cli->cl_root_squash || cli->cl_root_prjquota) &&
+           !io->ci_noquota) {
                struct cl_object *obj;
                struct cl_attr   *attr;
                unsigned int qid[LL_MAXQUOTAS];
@@ -2390,16 +2369,18 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
                LASSERT(ergo(grants > 0, grants >= tmp));
 
                rc = 0;
+
+               /* We must not hold a page lock while we do osc_enter_cache()
+                * or osc_extent_find(), so we must mark dirty & unlock
+                * any pages in the write commit folio_batch.
+                */
+               if (folio_batch_count(fbatch)) {
+                       cb(env, io, fbatch);
+                       folio_batch_reinit(fbatch);
+               }
+
                if (grants == 0) {
-                       /* We haven't allocated grant for this page, and we
-                        * must not hold a page lock while we do enter_cache,
-                        * so we must mark dirty & unlock any pages in the
-                        * write commit pagevec. */
-                       if (pagevec_count(pvec)) {
-                               cb(env, io, pvec);
-                               pagevec_reinit(pvec);
-                       }
-                       rc = osc_enter_cache(env, cli, oap, tmp);
+                       rc = osc_enter_cache(env, cli, osc, oap, tmp);
                        if (rc == 0)
                                grants = tmp;
                }
@@ -2449,8 +2430,6 @@ int osc_teardown_async_page(const struct lu_env *env,
        int rc = 0;
        ENTRY;
 
-       LASSERT(oap->oap_magic == OAP_MAGIC);
-
        CDEBUG(D_INFO, "teardown oap %p page %p at index %lu.\n",
               oap, ops, osc_index(oap2osc(oap)));
 
@@ -2488,7 +2467,7 @@ int osc_flush_async_page(const struct lu_env *env, struct cl_io *io,
                         struct osc_page *ops)
 {
        struct osc_extent *ext   = NULL;
-       struct osc_object *obj   = cl2osc(ops->ops_cl.cpl_obj);
+       struct osc_object *obj   = osc_page_object(ops);
        struct cl_page    *cp    = ops->ops_cl.cpl_page;
        pgoff_t            index = osc_index(ops);
        struct osc_async_page *oap = &ops->ops_oap;
@@ -2533,9 +2512,7 @@ int osc_flush_async_page(const struct lu_env *env, struct cl_io *io,
        if (rc)
                GOTO(out, rc);
 
-       spin_lock(&oap->oap_lock);
        oap->oap_async_flags |= ASYNC_READY|ASYNC_URGENT;
-       spin_unlock(&oap->oap_lock);
 
        if (current->flags & PF_MEMALLOC)
                ext->oe_memalloc = 1;
@@ -2559,10 +2536,11 @@ out:
        return rc;
 }
 
-int osc_queue_sync_pages(const struct lu_env *env, const struct cl_io *io,
+int osc_queue_sync_pages(const struct lu_env *env, struct cl_io *io,
                         struct osc_object *obj, struct list_head *list,
                         int brw_flags)
 {
+       struct osc_io *oio = osc_env_io(env);
        struct client_obd     *cli = osc_cli(obj);
        struct osc_extent     *ext;
        struct osc_async_page *oap;
@@ -2571,6 +2549,7 @@ int osc_queue_sync_pages(const struct lu_env *env, const struct cl_io *io,
        bool    can_merge   = true;
        pgoff_t start      = CL_PAGE_EOF;
        pgoff_t end        = 0;
+       struct osc_lock *oscl;
        ENTRY;
 
        list_for_each_entry(oap, list, oap_pending_item) {
@@ -2595,7 +2574,7 @@ int osc_queue_sync_pages(const struct lu_env *env, const struct cl_io *io,
 
                list_for_each_entry_safe(oap, tmp, list, oap_pending_item) {
                        list_del_init(&oap->oap_pending_item);
-                       osc_ap_completion(env, cli, oap, 0, -ENOMEM);
+                       osc_ap_completion(env, cli, obj, oap, 0, -ENOMEM);
                }
                RETURN(-ENOMEM);
        }
@@ -2610,6 +2589,11 @@ int osc_queue_sync_pages(const struct lu_env *env, const struct cl_io *io,
        ext->oe_srvlock = !!(brw_flags & OBD_BRW_SRVLOCK);
        ext->oe_ndelay = !!(brw_flags & OBD_BRW_NDELAY);
        ext->oe_dio = !!(brw_flags & OBD_BRW_NOCACHE);
+       oscl = oio->oi_write_osclock ? : oio->oi_read_osclock;
+       if (oscl && oscl->ols_dlmlock != NULL) {
+               ext->oe_dlmlock = LDLM_LOCK_GET(oscl->ols_dlmlock);
+               lu_ref_add(&ext->oe_dlmlock->l_reference, "osc_extent", ext);
+       }
        if (ext->oe_dio && !ext->oe_rw) { /* direct io write */
                int grants;
                int ppc;
@@ -2619,17 +2603,28 @@ int osc_queue_sync_pages(const struct lu_env *env, const struct cl_io *io,
                grants += (1 << cli->cl_chunkbits) *
                        ((page_count + ppc - 1) / ppc);
 
+               CDEBUG(D_CACHE, "requesting %d bytes grant\n", grants);
                spin_lock(&cli->cl_loi_list_lock);
                if (osc_reserve_grant(cli, grants) == 0) {
                        list_for_each_entry(oap, list, oap_pending_item) {
                                osc_consume_write_grant(cli,
                                                        &oap->oap_brw_page);
-                               atomic_long_inc(&obd_dirty_pages);
                        }
+                       atomic_long_add(page_count, &obd_dirty_pages);
                        osc_unreserve_grant_nolock(cli, grants, 0);
                        ext->oe_grants = grants;
+               } else {
+                       /* We cannot report ENOSPC correctly if we do parallel
+                        * DIO (async RPC submission), so turn off parallel dio
+                        * if there is not sufficient grant available.  This
+                        * makes individual RPCs synchronous.
+                        */
+                       io->ci_parallel_dio = false;
+                       CDEBUG(D_CACHE,
+                       "not enough grant available, switching to sync for this i/o\n");
                }
                spin_unlock(&cli->cl_loi_list_lock);
+               osc_update_next_shrink(cli);
        }
 
        ext->oe_is_rdma_only = !!(brw_flags & OBD_BRW_RDMA_ONLY);
@@ -2684,8 +2679,8 @@ int osc_cache_truncate_start(const struct lu_env *env, struct osc_object *obj,
        ENTRY;
 
        /* pages with index greater or equal to index will be truncated. */
-       index = cl_index(osc2cl(obj), size);
-       partial = size > cl_offset(osc2cl(obj), index);
+       index = size >> PAGE_SHIFT;
+       partial = size > (index << PAGE_SHIFT);
 
 again:
        osc_object_lock(obj);
@@ -2737,10 +2732,11 @@ again:
 
        osc_list_maint(cli, obj);
 
-       while (!list_empty(&list)) {
+       while ((ext = list_first_entry_or_null(&list,
+                                              struct osc_extent,
+                                              oe_link)) != NULL) {
                int rc;
 
-               ext = list_entry(list.next, struct osc_extent, oe_link);
                list_del_init(&ext->oe_link);
 
                /* extent may be in OES_ACTIVE state because inode mutex
@@ -3028,7 +3024,7 @@ bool osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
                          osc_page_gang_cbt cb, void *cbdata)
 {
        struct osc_page *ops;
-       struct pagevec  *pagevec;
+       struct folio_batch *fbatch;
        void            **pvec;
        pgoff_t         idx;
        unsigned int    nr;
@@ -3040,8 +3036,8 @@ bool osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
 
        idx = start;
        pvec = osc_env_info(env)->oti_pvec;
-       pagevec = &osc_env_info(env)->oti_pagevec;
-       ll_pagevec_init(pagevec, 0);
+       fbatch = &osc_env_info(env)->oti_fbatch;
+       ll_folio_batch_init(fbatch, 0);
        spin_lock(&osc->oo_tree_lock);
        while ((nr = radix_tree_gang_lookup(&osc->oo_tree, pvec,
                                            idx, OTI_PVEC_SIZE)) > 0) {
@@ -3081,22 +3077,33 @@ bool osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
                spin_unlock(&osc->oo_tree_lock);
                tree_lock = false;
 
+               res = (*cb)(env, io, pvec, j, cbdata);
+
                for (i = 0; i < j; ++i) {
                        ops = pvec[i];
-                       if (res)
-                               res = (*cb)(env, io, ops, cbdata);
-
                        page = ops->ops_cl.cpl_page;
                        lu_ref_del(&page->cp_reference, "gang_lookup", current);
-                       cl_pagevec_put(env, page, pagevec);
+                       cl_batch_put(env, page, fbatch);
                }
-               pagevec_release(pagevec);
+               folio_batch_release(fbatch);
 
                if (nr < OTI_PVEC_SIZE || end_of_region)
                        break;
 
                if (!res)
                        break;
+
+               CFS_FAIL_TIMEOUT(OBD_FAIL_OSC_SLOW_PAGE_EVICT,
+                                cfs_fail_val ?: 20);
+
+               if (io->ci_type == CIT_MISC &&
+                   io->u.ci_misc.lm_next_rpc_time &&
+                   ktime_get_seconds() > io->u.ci_misc.lm_next_rpc_time) {
+                       osc_send_empty_rpc(osc, idx << PAGE_SHIFT);
+                       io->u.ci_misc.lm_next_rpc_time = ktime_get_seconds() +
+                                                        5 * obd_timeout / 16;
+               }
+
                if (need_resched())
                        cond_resched();
 
@@ -3113,85 +3120,101 @@ EXPORT_SYMBOL(osc_page_gang_lookup);
  * Check if page @page is covered by an extra lock or discard it.
  */
 static bool check_and_discard_cb(const struct lu_env *env, struct cl_io *io,
-                               struct osc_page *ops, void *cbdata)
+                                void **pvec, int count, void *cbdata)
 {
        struct osc_thread_info *info = osc_env_info(env);
        struct osc_object *osc = cbdata;
-       struct cl_page *page = ops->ops_cl.cpl_page;
-       pgoff_t index;
-       bool discard = false;
-
-       index = osc_index(ops);
-
-       /* negative lock caching */
-       if (index < info->oti_ng_index) {
-               discard = true;
-       } else if (index >= info->oti_fn_index) {
-               struct ldlm_lock *tmp;
-               /* refresh non-overlapped index */
-               tmp = osc_dlmlock_at_pgoff(env, osc, index,
-                                          OSC_DAP_FL_TEST_LOCK |
-                                          OSC_DAP_FL_AST | OSC_DAP_FL_RIGHT);
-               if (tmp != NULL) {
-                       __u64 end = tmp->l_policy_data.l_extent.end;
-                       __u64 start = tmp->l_policy_data.l_extent.start;
-
-                       /* no lock covering this page */
-                       if (index < cl_index(osc2cl(osc), start)) {
-                               /* no lock at @index, first lock at @start */
-                               info->oti_ng_index = cl_index(osc2cl(osc),
-                                                    start);
-                               discard = true;
+       int i;
+
+       for (i = 0; i < count; i++) {
+               struct osc_page *ops = pvec[i];
+               struct cl_page *page = ops->ops_cl.cpl_page;
+               pgoff_t index = osc_index(ops);
+               bool discard = false;
+
+               /* negative lock caching */
+               if (index < info->oti_ng_index) {
+                       discard = true;
+               } else if (index >= info->oti_fn_index) {
+                       struct ldlm_lock *tmp;
+                       /* refresh non-overlapped index */
+                       tmp = osc_dlmlock_at_pgoff(env, osc, index,
+                                       OSC_DAP_FL_TEST_LOCK |
+                                       OSC_DAP_FL_AST |
+                                       OSC_DAP_FL_RIGHT);
+                       if (tmp != NULL) {
+                               __u64 end =
+                                       tmp->l_policy_data.l_extent.end;
+                               __u64 start =
+                                       tmp->l_policy_data.l_extent.start;
+
+                               /* no lock covering this page */
+                               if (index < start >> PAGE_SHIFT) {
+                                       /* no lock at @index,
+                                        * first lock at @start
+                                        */
+                                       info->oti_ng_index =
+                                               start >> PAGE_SHIFT;
+                                       discard = true;
+                               } else {
+                                       /* Cache the first-non-overlapped
+                                        * index so as to skip all pages
+                                        * within [index, oti_fn_index).
+                                        * This is safe because if tmp lock
+                                        * is canceled, it will discard these
+                                        * pages.
+                                        */
+                                       info->oti_fn_index =
+                                               (end + 1) >> PAGE_SHIFT;
+                                       if (end == OBD_OBJECT_EOF)
+                                               info->oti_fn_index =
+                                                       CL_PAGE_EOF;
+                               }
+                               LDLM_LOCK_PUT(tmp);
                        } else {
-                               /* Cache the first-non-overlapped index so as to
-                                * skip all pages within [index, oti_fn_index).
-                                * This is safe because if tmp lock is canceled,
-                                * it will discard these pages.
-                                */
-                               info->oti_fn_index = cl_index(osc2cl(osc),
-                                                    end + 1);
-                               if (end == OBD_OBJECT_EOF)
-                                       info->oti_fn_index = CL_PAGE_EOF;
+                               info->oti_ng_index = CL_PAGE_EOF;
+                               discard = true;
                        }
-                       LDLM_LOCK_PUT(tmp);
-               } else {
-                       info->oti_ng_index = CL_PAGE_EOF;
-                       discard = true;
                }
-       }
 
-       if (discard) {
-               if (cl_page_own(env, io, page) == 0) {
-                       cl_page_discard(env, io, page);
-                       cl_page_disown(env, io, page);
-               } else {
-                       LASSERT(page->cp_state == CPS_FREEING);
+               if (discard) {
+                       if (cl_page_own(env, io, page) == 0) {
+                               cl_page_discard(env, io, page);
+                               cl_page_disown(env, io, page);
+                       } else {
+                               LASSERT(page->cp_state == CPS_FREEING);
+                       }
                }
-       }
 
-       info->oti_next_index = index + 1;
+               info->oti_next_index = index + 1;
+       }
        return true;
 }
 
 bool osc_discard_cb(const struct lu_env *env, struct cl_io *io,
-                  struct osc_page *ops, void *cbdata)
+                   void **pvec, int count, void *cbdata)
 {
        struct osc_thread_info *info = osc_env_info(env);
-       struct cl_page *page = ops->ops_cl.cpl_page;
-
-       /* page is top page. */
-       info->oti_next_index = osc_index(ops) + 1;
-       if (cl_page_own(env, io, page) == 0) {
-               if (!ergo(page->cp_type == CPT_CACHEABLE,
-                         !PageDirty(cl_page_vmpage(page))))
-                       CL_PAGE_DEBUG(D_ERROR, env, page,
-                                       "discard dirty page?\n");
-
-               /* discard the page */
-               cl_page_discard(env, io, page);
-               cl_page_disown(env, io, page);
-       } else {
-               LASSERT(page->cp_state == CPS_FREEING);
+       int i;
+
+       for (i = 0; i < count; i++) {
+               struct osc_page *ops = pvec[i];
+               struct cl_page *page = ops->ops_cl.cpl_page;
+
+               /* page is top page. */
+               info->oti_next_index = osc_index(ops) + 1;
+               if (cl_page_own(env, io, page) == 0) {
+                       if (!ergo(page->cp_type == CPT_CACHEABLE,
+                                 !PageDirty(cl_page_vmpage(page))))
+                               CL_PAGE_DEBUG(D_ERROR, env, page,
+                                             "discard dirty page?\n");
+
+                       /* discard the page */
+                       cl_page_discard(env, io, page);
+                       cl_page_disown(env, io, page);
+               } else {
+                       LASSERT(page->cp_state == CPS_FREEING);
+               }
        }
 
        return true;
@@ -3218,6 +3241,9 @@ int osc_lock_discard_pages(const struct lu_env *env, struct osc_object *osc,
 
        io->ci_obj = cl_object_top(osc2cl(osc));
        io->ci_ignore_layout = 1;
+       io->ci_invalidate_page_cache = 1;
+       io->u.ci_misc.lm_next_rpc_time = ktime_get_seconds() +
+                                        5 * obd_timeout / 16;
        result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
        if (result != 0)
                GOTO(out, result);