static int osc_extent_wait(const struct lu_env *env, struct osc_extent *ext,
enum osc_extent_state state);
static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
+ struct osc_object *osc,
struct osc_async_page *oap, int sent, int rc);
static int osc_make_ready(const struct lu_env *env, struct osc_async_page *oap,
int cmd);
-static int osc_refresh_count(const struct lu_env *env,
+static int osc_refresh_count(const struct lu_env *env, struct osc_object *osc,
struct osc_async_page *oap, int cmd);
static int osc_io_unplug_async(const struct lu_env *env,
struct client_obd *cli, struct osc_object *osc);
GOTO(out, rc = 60);
if (ext->oe_fsync_wait && !ext->oe_urgent && !ext->oe_hp)
GOTO(out, rc = 65);
- /* fallthrough */
+ fallthrough;
default:
if (atomic_read(&ext->oe_users) > 0)
GOTO(out, rc = 70);
struct ldlm_extent *extent;
extent = &ext->oe_dlmlock->l_policy_data.l_extent;
- if (!(extent->start <= cl_offset(osc2cl(obj), ext->oe_start) &&
- extent->end >= cl_offset(osc2cl(obj), ext->oe_max_end)))
+ if (!(extent->start <= ext->oe_start << PAGE_SHIFT &&
+ extent->end >= ext->oe_max_end << PAGE_SHIFT))
GOTO(out, rc = 100);
if (!(ext->oe_dlmlock->l_granted_mode & (LCK_PW | LCK_GROUP)))
if (ext->oe_dlmlock) {
lu_ref_del(&ext->oe_dlmlock->l_reference,
"osc_extent", ext);
- LDLM_LOCK_PUT(ext->oe_dlmlock);
+ LDLM_LOCK_RELEASE(ext->oe_dlmlock);
ext->oe_dlmlock = NULL;
}
#if 0
int sent, int rc)
{
struct client_obd *cli = osc_cli(ext->oe_obj);
+ struct osc_object *osc = ext->oe_obj;
struct osc_async_page *oap;
struct osc_async_page *tmp;
int nr_pages = ext->oe_nr_pages;
ext->oe_rc = rc ?: ext->oe_nr_pages;
EASSERT(ergo(rc == 0, ext->oe_state == OES_RPC), ext);
- osc_lru_add_batch(cli, &ext->oe_pages);
+ /* dio pages do not go in the LRU */
+ if (!ext->oe_dio)
+ osc_lru_add_batch(cli, &ext->oe_pages);
+
list_for_each_entry_safe(oap, tmp, &ext->oe_pages,
oap_pending_item) {
list_del_init(&oap->oap_rpc_item);
}
--ext->oe_nr_pages;
- osc_ap_completion(env, cli, oap, sent, rc);
+ osc_ap_completion(env, cli, osc, oap, sent, rc);
}
EASSERT(ext->oe_nr_pages == 0, ext);
if (!sent) {
lost_grant = ext->oe_grants;
- } else if (blocksize < PAGE_SIZE &&
+ } else if (cli->cl_ocd_grant_param == 0 &&
+ blocksize < PAGE_SIZE &&
last_count != PAGE_SIZE) {
- /* For short writes we shouldn't count parts of pages that
- * span a whole chunk on the OST side, or our accounting goes
- * wrong. Should match the code in filter_grant_check. */
+ /* For short writes without OBD_CONNECT_GRANT support, we
+ * shouldn't count parts of pages that span a whole chunk on
+ * the OST side, or our accounting goes wrong. Should match
+ * the code in tgt_grant_check.
+ */
int offset = last_off & ~PAGE_MASK;
int count = last_count + (offset & (blocksize - 1));
int end = (offset + last_count) & (blocksize - 1);
struct client_obd *cli = osc_cli(obj);
struct osc_async_page *oap;
struct osc_async_page *tmp;
- struct pagevec *pvec;
+ struct folio_batch *fbatch;
int pages_in_chunk = 0;
int ppc_bits = cli->cl_chunkbits -
PAGE_SHIFT;
io = osc_env_thread_io(env);
io->ci_obj = cl_object_top(osc2cl(obj));
io->ci_ignore_layout = 1;
- pvec = &osc_env_info(env)->oti_pagevec;
- ll_pagevec_init(pvec, 0);
+ fbatch = &osc_env_info(env)->oti_fbatch;
+ ll_folio_batch_init(fbatch, 0);
rc = cl_io_init(env, io, CIT_MISC, io->ci_obj);
if (rc < 0)
GOTO(out, rc);
}
lu_ref_del(&page->cp_reference, "truncate", current);
- cl_pagevec_put(env, page, pvec);
+ cl_batch_put(env, page, fbatch);
--ext->oe_nr_pages;
++nr_pages;
}
- pagevec_release(pvec);
+ folio_batch_release(fbatch);
EASSERTF(ergo(ext->oe_start >= trunc_index + !!partial,
ext->oe_nr_pages == 0),
rc = osc_make_ready(env, oap, OBD_BRW_WRITE);
switch (rc) {
case 0:
- spin_lock(&oap->oap_lock);
oap->oap_async_flags |= ASYNC_READY;
- spin_unlock(&oap->oap_lock);
break;
case -EALREADY:
LASSERT((oap->oap_async_flags & ASYNC_READY) != 0);
/* the last page is the only one we need to refresh its count by
* the size of file. */
if (!(last->oap_async_flags & ASYNC_COUNT_STABLE)) {
- int last_oap_count = osc_refresh_count(env, last, OBD_BRW_WRITE);
+ int last_oap_count = osc_refresh_count(env, obj, last,
+ OBD_BRW_WRITE);
LASSERTF(last_oap_count > 0,
"last_oap_count %d\n", last_oap_count);
LASSERT(last->oap_page_off + last_oap_count <= PAGE_SIZE);
last->oap_count = last_oap_count;
- spin_lock(&last->oap_lock);
last->oap_async_flags |= ASYNC_COUNT_STABLE;
- spin_unlock(&last->oap_lock);
}
/* for the rest of pages, we don't need to call osf_refresh_count()
list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
oap->oap_count = PAGE_SIZE - oap->oap_page_off;
- spin_lock(&oap->oap_lock);
oap->oap_async_flags |= ASYNC_COUNT_STABLE;
- spin_unlock(&oap->oap_lock);
}
}
static int osc_make_ready(const struct lu_env *env, struct osc_async_page *oap,
int cmd)
{
- struct osc_page *opg = oap2osc_page(oap);
struct cl_page *page = oap2cl_page(oap);
int result;
LASSERT(cmd == OBD_BRW_WRITE); /* no cached reads */
ENTRY;
+
result = cl_page_make_ready(env, page, CRT_WRITE);
- if (result == 0)
- opg->ops_submit_time = ktime_get();
+
RETURN(result);
}
-static int osc_refresh_count(const struct lu_env *env,
+static int osc_refresh_count(const struct lu_env *env, struct osc_object *osc,
struct osc_async_page *oap, int cmd)
{
struct osc_page *opg = oap2osc_page(oap);
pgoff_t index = osc_index(oap2osc(oap));
- struct cl_object *obj;
+ struct cl_object *obj = osc2cl(osc);
struct cl_attr *attr = &osc_env_info(env)->oti_attr;
int result;
loff_t kms;
/* readpage queues with _COUNT_STABLE, shouldn't get here. */
LASSERT(!(cmd & OBD_BRW_READ));
LASSERT(opg != NULL);
- obj = opg->ops_cl.cpl_obj;
cl_object_attr_lock(obj);
result = cl_object_attr_get(env, obj, attr);
if (result < 0)
return result;
kms = attr->cat_kms;
- if (cl_offset(obj, index) >= kms)
+ if (index << PAGE_SHIFT >= kms)
/* catch race with truncate */
return 0;
- else if (cl_offset(obj, index + 1) > kms)
+ else if ((index + 1) << PAGE_SHIFT > kms)
/* catch sub-page write at end of file */
return kms & ~PAGE_MASK;
else
return PAGE_SIZE;
}
-static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
- int cmd, int rc)
+static int osc_completion(const struct lu_env *env, struct osc_object *osc,
+ struct osc_async_page *oap, int cmd, int rc)
{
struct osc_page *opg = oap2osc_page(oap);
struct cl_page *page = oap2cl_page(oap);
/* Clear opg->ops_transfer_pinned before VM lock is released. */
opg->ops_transfer_pinned = 0;
- opg->ops_submit_time = ktime_set(0, 0);
srvlock = oap->oap_brw_flags & OBD_BRW_SRVLOCK;
/* statistic */
if (rc == 0 && srvlock) {
- struct lu_device *ld = opg->ops_cl.cpl_obj->co_lu.lo_dev;
- struct osc_stats *stats = &lu2osc_dev(ld)->od_stats;
+ struct lu_device *ld = osc->oo_cl.co_lu.lo_dev;
+ struct osc_stats *stats = &lu2osc_dev(ld)->osc_stats;
size_t bytes = oap->oap_count;
if (crt == CRT_READ)
struct brw_page *pga)
{
assert_spin_locked(&cli->cl_loi_list_lock);
- LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
+ LASSERT(!(pga->bp_flag & OBD_BRW_FROM_GRANT));
cli->cl_dirty_pages++;
- pga->flag |= OBD_BRW_FROM_GRANT;
+ pga->bp_flag |= OBD_BRW_FROM_GRANT;
CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
- PAGE_SIZE, pga, pga->pg);
+ PAGE_SIZE, pga, pga->bp_page);
}
/* the companion to osc_consume_write_grant, called when a brw has completed.
ENTRY;
assert_spin_locked(&cli->cl_loi_list_lock);
- if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
+ if (!(pga->bp_flag & OBD_BRW_FROM_GRANT)) {
EXIT;
return;
}
- pga->flag &= ~OBD_BRW_FROM_GRANT;
+ pga->bp_flag &= ~OBD_BRW_FROM_GRANT;
atomic_long_dec(&obd_dirty_pages);
cli->cl_dirty_pages--;
EXIT;
* used, we should return these grants to OST. There're two cases where grants
* can be lost:
* 1. truncate;
- * 2. blocksize at OST is less than PAGE_SIZE and a partial page was
- * written. In this case OST may use less chunks to serve this partial
- * write. OSTs don't actually know the page size on the client side. so
- * clients have to calculate lost grant by the blocksize on the OST.
- * See filter_grant_check() for details.
+ * 2. Without OBD_CONNECT_GRANT support and blocksize at OST is less than
+ * PAGE_SIZE and a partial page was written. In this case OST may use less
+ * chunks to serve this partial write. OSTs don't actually know the page
+ * size on the client side. so clients have to calculate lost grant by the
+ * blocksize on the OST. See tgt_grant_check() for details.
*/
static void osc_free_grant(struct client_obd *cli, unsigned int nr_pages,
unsigned int lost_grant, unsigned int dirty_grant)
* The process will be put into sleep if it's already run out of grant.
*/
static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
- struct osc_async_page *oap, int bytes)
+ struct osc_object *osc, struct osc_async_page *oap,
+ int bytes)
{
- struct osc_object *osc = oap->oap_obj;
struct lov_oinfo *loi = osc->oo_oinfo;
int rc = -EDQUOT;
int remain;
bool entered = false;
+ struct obd_device *obd = cli->cl_import->imp_obd;
/* We cannot wait for a long time here since we are holding ldlm lock
* across the actual IO. If no requests complete fast (e.g. due to
* overloaded OST that takes a long time to process everything, we'd
* evicted by server which is half obd_timeout when AT is off
* or at least ldlm_enqueue_min with AT on.
* See LU-13131 */
- unsigned long timeout = cfs_time_seconds(AT_OFF ? obd_timeout / 2 :
- ldlm_enqueue_min / 2);
+ unsigned long timeout =
+ cfs_time_seconds(obd_at_off(obd) ?
+ obd_timeout / 2 :
+ obd_get_ldlm_enqueue_min(obd) / 2);
ENTRY;
/* force the caller to try sync io. this can jump the list
* of queued writes and create a discontiguous rpc stream */
- if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
+ if (CFS_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
cli->cl_dirty_max_pages == 0 ||
cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync) {
OSC_DUMP_GRANT(D_CACHE, cli, "forced sync i/o\n");
return is_ready;
}
-/* this is trying to propogate async writeback errors back up to the
- * application. As an async write fails we record the error code for later if
- * the app does an fsync. As long as errors persist we force future rpcs to be
- * sync so that the app can get a sync error and break the cycle of queueing
- * pages for which writeback will fail. */
-static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
- int rc)
-{
- if (rc) {
- if (!ar->ar_rc)
- ar->ar_rc = rc;
-
- ar->ar_force_sync = 1;
- ar->ar_min_xid = ptlrpc_sample_next_xid();
- return;
-
- }
-
- if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
- ar->ar_force_sync = 0;
-}
-
/* this must be called holding the loi list lock to give coverage to exit_cache,
- * async_flag maintenance, and oap_request */
+ * async_flag maintenance
+ */
static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
+ struct osc_object *osc,
struct osc_async_page *oap, int sent, int rc)
{
- struct osc_object *osc = oap->oap_obj;
- struct lov_oinfo *loi = osc->oo_oinfo;
- __u64 xid = 0;
-
ENTRY;
- if (oap->oap_request != NULL) {
- xid = ptlrpc_req_xid(oap->oap_request);
- ptlrpc_req_finished(oap->oap_request);
- oap->oap_request = NULL;
- }
/* As the transfer for this page is being done, clear the flags */
- spin_lock(&oap->oap_lock);
oap->oap_async_flags = 0;
- spin_unlock(&oap->oap_lock);
-
- if (oap->oap_cmd & OBD_BRW_WRITE && xid > 0) {
- spin_lock(&cli->cl_loi_list_lock);
- osc_process_ar(&cli->cl_ar, xid, rc);
- osc_process_ar(&loi->loi_ar, xid, rc);
- spin_unlock(&cli->cl_loi_list_lock);
- }
- rc = osc_completion(env, oap, oap->oap_cmd, rc);
+ rc = osc_completion(env, osc, oap, oap->oap_cmd, rc);
if (rc)
CERROR("completion on oap %p obj %p returns %d.\n",
oap, osc, rc);
};
assert_osc_object_is_locked(obj);
- while (!list_empty(&obj->oo_hp_exts)) {
- ext = list_entry(obj->oo_hp_exts.next, struct osc_extent,
- oe_link);
+ while ((ext = list_first_entry_or_null(&obj->oo_hp_exts,
+ struct osc_extent,
+ oe_link)) != NULL) {
if (!try_to_add_extent_for_io(cli, ext, &data))
return data.erd_page_count;
EASSERT(ext->oe_nr_pages <= data.erd_max_pages, ext);
if (data.erd_page_count == data.erd_max_pages)
return data.erd_page_count;
- while (!list_empty(&obj->oo_urgent_exts)) {
- ext = list_entry(obj->oo_urgent_exts.next,
- struct osc_extent, oe_link);
+ while ((ext = list_first_entry_or_null(&obj->oo_urgent_exts,
+ struct osc_extent,
+ oe_link)) != NULL) {
if (!try_to_add_extent_for_io(cli, ext, &data))
return data.erd_page_count;
}
* extents can usually only be added if the rpclist was empty, so if we
* can't add one, we continue on to trying to add normal extents. This
* is so we don't miss adding extra extents to an RPC containing high
- * priority or urgent extents. */
- while (!list_empty(&obj->oo_full_exts)) {
- ext = list_entry(obj->oo_full_exts.next,
- struct osc_extent, oe_link);
+ * priority or urgent extents.
+ */
+ while ((ext = list_first_entry_or_null(&obj->oo_full_exts,
+ struct osc_extent,
+ oe_link)) != NULL) {
if (!try_to_add_extent_for_io(cli, ext, &data))
break;
}
spin_lock(&cli->cl_loi_list_lock);
}
+ EXIT;
}
int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli,
int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops,
struct cl_page *page, loff_t offset)
{
- struct obd_export *exp = osc_export(osc);
struct osc_async_page *oap = &ops->ops_oap;
- struct page *vmpage = page->cp_vmpage;
- ENTRY;
+ ENTRY;
if (!page)
- return cfs_size_round(sizeof(*oap));
+ return round_up(sizeof(*oap), 8);
- oap->oap_magic = OAP_MAGIC;
- oap->oap_cli = &exp->exp_obd->u.cli;
oap->oap_obj = osc;
-
- oap->oap_page = vmpage;
+ oap->oap_page = page->cp_vmpage;
oap->oap_obj_off = offset;
LASSERT(!(offset & ~PAGE_MASK));
INIT_LIST_HEAD(&oap->oap_pending_item);
INIT_LIST_HEAD(&oap->oap_rpc_item);
- spin_lock_init(&oap->oap_lock);
CDEBUG(D_INFO, "oap %p vmpage %p obj off %llu\n",
- oap, vmpage, oap->oap_obj_off);
+ oap, oap->oap_page, oap->oap_obj_off);
RETURN(0);
}
EXPORT_SYMBOL(osc_prep_async_page);
int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
- struct osc_page *ops, cl_commit_cbt cb)
+ struct osc_object *osc, struct osc_page *ops,
+ cl_commit_cbt cb)
{
struct osc_io *oio = osc_env_io(env);
struct osc_extent *ext = NULL;
struct osc_async_page *oap = &ops->ops_oap;
- struct client_obd *cli = oap->oap_cli;
- struct osc_object *osc = oap->oap_obj;
- struct pagevec *pvec = &osc_env_info(env)->oti_pagevec;
+ struct client_obd *cli = osc_cli(osc);
+ struct folio_batch *fbatch = &osc_env_info(env)->oti_fbatch;
pgoff_t index;
unsigned int tmp;
unsigned int grants = 0;
int rc = 0;
ENTRY;
- if (oap->oap_magic != OAP_MAGIC)
- RETURN(-EINVAL);
-
if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
RETURN(-EIO);
/* Set the OBD_BRW_SRVLOCK before the page is queued. */
brw_flags |= ops->ops_srvlock ? OBD_BRW_SRVLOCK : 0;
- if (oio->oi_cap_sys_resource || io->ci_noquota) {
+ if (io->ci_noquota) {
brw_flags |= OBD_BRW_NOQUOTA;
cmd |= OBD_BRW_NOQUOTA;
}
+ if (oio->oi_cap_sys_resource) {
+ brw_flags |= OBD_BRW_SYS_RESOURCE;
+ cmd |= OBD_BRW_SYS_RESOURCE;
+ }
+
/* check if the file's owner/group is over quota */
- if (!(cmd & OBD_BRW_NOQUOTA)) {
+ /* do not check for root without root squash, because in this case
+ * we should bypass quota
+ */
+ if ((!oio->oi_cap_sys_resource ||
+ cli->cl_root_squash || cli->cl_root_prjquota) &&
+ !io->ci_noquota) {
struct cl_object *obj;
struct cl_attr *attr;
unsigned int qid[LL_MAXQUOTAS];
LASSERT(ergo(grants > 0, grants >= tmp));
rc = 0;
+
+ /* We must not hold a page lock while we do osc_enter_cache()
+ * or osc_extent_find(), so we must mark dirty & unlock
+ * any pages in the write commit folio_batch.
+ */
+ if (folio_batch_count(fbatch)) {
+ cb(env, io, fbatch);
+ folio_batch_reinit(fbatch);
+ }
+
if (grants == 0) {
- /* We haven't allocated grant for this page, and we
- * must not hold a page lock while we do enter_cache,
- * so we must mark dirty & unlock any pages in the
- * write commit pagevec. */
- if (pagevec_count(pvec)) {
- cb(env, io, pvec);
- pagevec_reinit(pvec);
- }
- rc = osc_enter_cache(env, cli, oap, tmp);
+ rc = osc_enter_cache(env, cli, osc, oap, tmp);
if (rc == 0)
grants = tmp;
}
int rc = 0;
ENTRY;
- LASSERT(oap->oap_magic == OAP_MAGIC);
-
CDEBUG(D_INFO, "teardown oap %p page %p at index %lu.\n",
oap, ops, osc_index(oap2osc(oap)));
struct osc_page *ops)
{
struct osc_extent *ext = NULL;
- struct osc_object *obj = cl2osc(ops->ops_cl.cpl_obj);
+ struct osc_object *obj = osc_page_object(ops);
struct cl_page *cp = ops->ops_cl.cpl_page;
pgoff_t index = osc_index(ops);
struct osc_async_page *oap = &ops->ops_oap;
if (rc)
GOTO(out, rc);
- spin_lock(&oap->oap_lock);
oap->oap_async_flags |= ASYNC_READY|ASYNC_URGENT;
- spin_unlock(&oap->oap_lock);
if (current->flags & PF_MEMALLOC)
ext->oe_memalloc = 1;
struct osc_object *obj, struct list_head *list,
int brw_flags)
{
+ struct osc_io *oio = osc_env_io(env);
struct client_obd *cli = osc_cli(obj);
struct osc_extent *ext;
struct osc_async_page *oap;
bool can_merge = true;
pgoff_t start = CL_PAGE_EOF;
pgoff_t end = 0;
+ struct osc_lock *oscl;
ENTRY;
list_for_each_entry(oap, list, oap_pending_item) {
list_for_each_entry_safe(oap, tmp, list, oap_pending_item) {
list_del_init(&oap->oap_pending_item);
- osc_ap_completion(env, cli, oap, 0, -ENOMEM);
+ osc_ap_completion(env, cli, obj, oap, 0, -ENOMEM);
}
RETURN(-ENOMEM);
}
ext->oe_srvlock = !!(brw_flags & OBD_BRW_SRVLOCK);
ext->oe_ndelay = !!(brw_flags & OBD_BRW_NDELAY);
ext->oe_dio = !!(brw_flags & OBD_BRW_NOCACHE);
+ oscl = oio->oi_write_osclock ? : oio->oi_read_osclock;
+ if (oscl && oscl->ols_dlmlock != NULL) {
+ ext->oe_dlmlock = LDLM_LOCK_GET(oscl->ols_dlmlock);
+ lu_ref_add(&ext->oe_dlmlock->l_reference, "osc_extent", ext);
+ }
if (ext->oe_dio && !ext->oe_rw) { /* direct io write */
int grants;
int ppc;
list_for_each_entry(oap, list, oap_pending_item) {
osc_consume_write_grant(cli,
&oap->oap_brw_page);
- atomic_long_inc(&obd_dirty_pages);
}
+ atomic_long_add(page_count, &obd_dirty_pages);
osc_unreserve_grant_nolock(cli, grants, 0);
ext->oe_grants = grants;
} else {
"not enough grant available, switching to sync for this i/o\n");
}
spin_unlock(&cli->cl_loi_list_lock);
+ osc_update_next_shrink(cli);
}
ext->oe_is_rdma_only = !!(brw_flags & OBD_BRW_RDMA_ONLY);
ENTRY;
/* pages with index greater or equal to index will be truncated. */
- index = cl_index(osc2cl(obj), size);
- partial = size > cl_offset(osc2cl(obj), index);
+ index = size >> PAGE_SHIFT;
+ partial = size > (index << PAGE_SHIFT);
again:
osc_object_lock(obj);
osc_list_maint(cli, obj);
- while (!list_empty(&list)) {
+ while ((ext = list_first_entry_or_null(&list,
+ struct osc_extent,
+ oe_link)) != NULL) {
int rc;
- ext = list_entry(list.next, struct osc_extent, oe_link);
list_del_init(&ext->oe_link);
/* extent may be in OES_ACTIVE state because inode mutex
osc_page_gang_cbt cb, void *cbdata)
{
struct osc_page *ops;
- struct pagevec *pagevec;
+ struct folio_batch *fbatch;
void **pvec;
pgoff_t idx;
unsigned int nr;
idx = start;
pvec = osc_env_info(env)->oti_pvec;
- pagevec = &osc_env_info(env)->oti_pagevec;
- ll_pagevec_init(pagevec, 0);
+ fbatch = &osc_env_info(env)->oti_fbatch;
+ ll_folio_batch_init(fbatch, 0);
spin_lock(&osc->oo_tree_lock);
while ((nr = radix_tree_gang_lookup(&osc->oo_tree, pvec,
idx, OTI_PVEC_SIZE)) > 0) {
ops = pvec[i];
page = ops->ops_cl.cpl_page;
lu_ref_del(&page->cp_reference, "gang_lookup", current);
- cl_pagevec_put(env, page, pagevec);
+ cl_batch_put(env, page, fbatch);
}
- pagevec_release(pagevec);
+ folio_batch_release(fbatch);
if (nr < OTI_PVEC_SIZE || end_of_region)
break;
if (!res)
break;
+ CFS_FAIL_TIMEOUT(OBD_FAIL_OSC_SLOW_PAGE_EVICT,
+ cfs_fail_val ?: 20);
+
if (io->ci_type == CIT_MISC &&
io->u.ci_misc.lm_next_rpc_time &&
ktime_get_seconds() > io->u.ci_misc.lm_next_rpc_time) {
tmp->l_policy_data.l_extent.start;
/* no lock covering this page */
- if (index < cl_index(osc2cl(osc), start)) {
+ if (index < start >> PAGE_SHIFT) {
/* no lock at @index,
* first lock at @start
*/
info->oti_ng_index =
- cl_index(osc2cl(osc), start);
+ start >> PAGE_SHIFT;
discard = true;
} else {
/* Cache the first-non-overlapped
* pages.
*/
info->oti_fn_index =
- cl_index(osc2cl(osc), end + 1);
+ (end + 1) >> PAGE_SHIFT;
if (end == OBD_OBJECT_EOF)
info->oti_fn_index =
CL_PAGE_EOF;
io->ci_obj = cl_object_top(osc2cl(osc));
io->ci_ignore_layout = 1;
+ io->ci_invalidate_page_cache = 1;
io->u.ci_misc.lm_next_rpc_time = ktime_get_seconds() +
5 * obd_timeout / 16;
result = cl_io_init(env, io, CIT_MISC, io->ci_obj);