*/
/*
* This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
*
* osc cache management.
*
#define DEBUG_SUBSYSTEM S_OSC
#include <lustre_osc.h>
+#include <lustre_dlm.h>
#include "osc_internal.h"
static int osc_extent_wait(const struct lu_env *env, struct osc_extent *ext,
enum osc_extent_state state);
static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
+ struct osc_object *osc,
struct osc_async_page *oap, int sent, int rc);
static int osc_make_ready(const struct lu_env *env, struct osc_async_page *oap,
int cmd);
-static int osc_refresh_count(const struct lu_env *env,
+static int osc_refresh_count(const struct lu_env *env, struct osc_object *osc,
struct osc_async_page *oap, int cmd);
static int osc_io_unplug_async(const struct lu_env *env,
struct client_obd *cli, struct osc_object *osc);
#define EXTSTR "[%lu -> %lu/%lu]"
#define EXTPARA(ext) (ext)->oe_start, (ext)->oe_end, (ext)->oe_max_end
-static const char *oes_strings[] = {
+static const char *const oes_strings[] = {
"inv", "active", "cache", "locking", "lockdone", "rpc", "trunc", NULL };
#define OSC_EXTENT_DUMP_WITH_LOC(file, func, line, mask, extent, fmt, ...) do {\
/* ----- extent part 0 ----- */ \
__ext, EXTPARA(__ext), \
/* ----- part 1 ----- */ \
- atomic_read(&__ext->oe_refc), \
+ kref_read(&__ext->oe_refc), \
atomic_read(&__ext->oe_users), \
list_empty_marker(&__ext->oe_link), \
oes_strings[__ext->oe_state], ext_flags(__ext, __buf), \
if (ext->oe_state >= OES_STATE_MAX)
GOTO(out, rc = 10);
- if (atomic_read(&ext->oe_refc) <= 0)
+ if (kref_read(&ext->oe_refc) <= 0)
GOTO(out, rc = 20);
- if (atomic_read(&ext->oe_refc) < atomic_read(&ext->oe_users))
+ if (kref_read(&ext->oe_refc) < atomic_read(&ext->oe_users))
GOTO(out, rc = 30);
switch (ext->oe_state) {
GOTO(out, rc = 60);
if (ext->oe_fsync_wait && !ext->oe_urgent && !ext->oe_hp)
GOTO(out, rc = 65);
- /* fallthrough */
+ fallthrough;
default:
if (atomic_read(&ext->oe_users) > 0)
GOTO(out, rc = 70);
struct ldlm_extent *extent;
extent = &ext->oe_dlmlock->l_policy_data.l_extent;
- if (!(extent->start <= cl_offset(osc2cl(obj), ext->oe_start) &&
- extent->end >= cl_offset(osc2cl(obj), ext->oe_max_end)))
+ if (!(extent->start <= ext->oe_start << PAGE_SHIFT &&
+ extent->end >= ext->oe_max_end << PAGE_SHIFT))
GOTO(out, rc = 100);
if (!(ext->oe_dlmlock->l_granted_mode & (LCK_PW | LCK_GROUP)))
RB_CLEAR_NODE(&ext->oe_node);
ext->oe_obj = obj;
cl_object_get(osc2cl(obj));
- atomic_set(&ext->oe_refc, 1);
+ kref_init(&ext->oe_refc);
atomic_set(&ext->oe_users, 0);
INIT_LIST_HEAD(&ext->oe_link);
ext->oe_state = OES_INV;
return ext;
}
-static void osc_extent_free(struct osc_extent *ext)
+static void osc_extent_free(struct kref *kref)
{
+ struct osc_extent *ext = container_of(kref, struct osc_extent,
+ oe_refc);
+
+ LASSERT(list_empty(&ext->oe_link));
+ LASSERT(atomic_read(&ext->oe_users) == 0);
+ LASSERT(ext->oe_state == OES_INV);
+ LASSERT(RB_EMPTY_NODE(&ext->oe_node));
+
+ if (ext->oe_dlmlock) {
+ lu_ref_del(&ext->oe_dlmlock->l_reference,
+ "osc_extent", ext);
+ LDLM_LOCK_RELEASE(ext->oe_dlmlock);
+ ext->oe_dlmlock = NULL;
+ }
+#if 0
+ /* If/When cl_object_put drops the need for 'env',
+ * this code can be enabled, and matching code in
+ * osc_extent_put removed.
+ */
+ cl_object_put(osc2cl(ext->oe_obj));
+
OBD_SLAB_FREE_PTR(ext, osc_extent_kmem);
+#endif
}
static struct osc_extent *osc_extent_get(struct osc_extent *ext)
{
- LASSERT(atomic_read(&ext->oe_refc) >= 0);
- atomic_inc(&ext->oe_refc);
+ LASSERT(kref_read(&ext->oe_refc) >= 0);
+ kref_get(&ext->oe_refc);
return ext;
}
static void osc_extent_put(const struct lu_env *env, struct osc_extent *ext)
{
- LASSERT(atomic_read(&ext->oe_refc) > 0);
- if (atomic_dec_and_test(&ext->oe_refc)) {
- LASSERT(list_empty(&ext->oe_link));
- LASSERT(atomic_read(&ext->oe_users) == 0);
- LASSERT(ext->oe_state == OES_INV);
- LASSERT(RB_EMPTY_NODE(&ext->oe_node));
-
- if (ext->oe_dlmlock != NULL) {
- lu_ref_del(&ext->oe_dlmlock->l_reference,
- "osc_extent", ext);
- LDLM_LOCK_RELEASE(ext->oe_dlmlock);
- ext->oe_dlmlock = NULL;
- }
+ LASSERT(kref_read(&ext->oe_refc) > 0);
+ if (kref_put(&ext->oe_refc, osc_extent_free)) {
+ /* This should be in osc_extent_free(), but
+ * while we need to pass 'env' it cannot be.
+ */
cl_object_put(env, osc2cl(ext->oe_obj));
- osc_extent_free(ext);
+
+ OBD_SLAB_FREE_PTR(ext, osc_extent_kmem);
}
}
*/
static void osc_extent_put_trust(struct osc_extent *ext)
{
- LASSERT(atomic_read(&ext->oe_refc) > 1);
+ LASSERT(kref_read(&ext->oe_refc) > 1);
assert_osc_object_is_locked(ext->oe_obj);
- atomic_dec(&ext->oe_refc);
+ osc_extent_put(NULL, ext);
}
/**
if (victim == NULL)
return -EINVAL;
- if (victim->oe_state != OES_CACHE || victim->oe_fsync_wait)
+ if (victim->oe_state != OES_INV &&
+ (victim->oe_state != OES_CACHE || victim->oe_fsync_wait))
return -EBUSY;
if (cur->oe_max_end != victim->oe_max_end)
return -ERANGE;
+ /*
+ * In the rare case max_pages_per_rpc (mppr) is changed, don't
+ * merge extents until after old ones have been sent, or the
+ * "extents are aligned to RPCs" checks are unhappy.
+ */
+ if (cur->oe_mppr != victim->oe_mppr)
+ return -ERANGE;
+
LASSERT(cur->oe_dlmlock == victim->oe_dlmlock);
ppc_bits = osc_cli(obj)->cl_chunkbits - PAGE_SHIFT;
chunk_start = cur->oe_start >> ppc_bits;
cur->oe_urgent |= victim->oe_urgent;
cur->oe_memalloc |= victim->oe_memalloc;
list_splice_init(&victim->oe_pages, &cur->oe_pages);
- list_del_init(&victim->oe_link);
victim->oe_nr_pages = 0;
osc_extent_get(victim);
/**
* Drop user count of osc_extent, and unplug IO asynchronously.
*/
-int osc_extent_release(const struct lu_env *env, struct osc_extent *ext)
+void osc_extent_release(const struct lu_env *env, struct osc_extent *ext)
{
struct osc_object *obj = ext->oe_obj;
struct client_obd *cli = osc_cli(obj);
- int rc = 0;
ENTRY;
LASSERT(atomic_read(&ext->oe_users) > 0);
osc_io_unplug_async(env, cli, obj);
}
osc_extent_put(env, ext);
- RETURN(rc);
+
+ RETURN_EXIT;
}
/**
cur->oe_start = descr->cld_start;
if (cur->oe_end > max_end)
cur->oe_end = max_end;
- cur->oe_grants = 0;
+ cur->oe_grants = chunksize + cli->cl_grant_extent_tax;
cur->oe_mppr = max_pages;
if (olck->ols_dlmlock != NULL) {
LASSERT(olck->ols_hold);
* flushed, try next one. */
continue;
- /* check if they belong to the same rpc slot before trying to
- * merge. the extents are not overlapped and contiguous at
- * chunk level to get here. */
- if (ext->oe_max_end != max_end)
- /* if they don't belong to the same RPC slot or
- * max_pages_per_rpc has ever changed, do not merge. */
- continue;
-
- /* check whether maximum extent size will be hit */
- if ((ext_chk_end - ext_chk_start + 1 + 1) << ppc_bits >
- cli->cl_max_extent_pages)
- continue;
-
- /* it's required that an extent must be contiguous at chunk
- * level so that we know the whole extent is covered by grant
- * (the pages in the extent are NOT required to be contiguous).
- * Otherwise, it will be too much difficult to know which
- * chunks have grants allocated. */
-
- /* try to do front merge - extend ext's start */
- if (chunk + 1 == ext_chk_start) {
- /* ext must be chunk size aligned */
- EASSERT((ext->oe_start & ~chunk_mask) == 0, ext);
-
- /* pull ext's start back to cover cur */
- ext->oe_start = cur->oe_start;
- ext->oe_grants += chunksize;
+ if (osc_extent_merge(env, ext, cur) == 0) {
LASSERT(*grants >= chunksize);
*grants -= chunksize;
- found = osc_extent_hold(ext);
- } else if (chunk == ext_chk_end + 1) {
- /* rear merge */
- ext->oe_end = cur->oe_end;
- ext->oe_grants += chunksize;
- LASSERT(*grants >= chunksize);
- *grants -= chunksize;
-
- /* try to merge with the next one because we just fill
- * in a gap */
+ /*
+ * Try to merge with the next one too because we
+ * might have just filled in a gap.
+ */
if (osc_extent_merge(env, ext, next_extent(ext)) == 0)
/* we can save extent tax from next extent */
*grants += cli->cl_grant_extent_tax;
found = osc_extent_hold(ext);
- }
- if (found != NULL)
break;
+ }
}
osc_extent_tree_dump(D_CACHE, obj);
} else if (conflict == NULL) {
/* create a new extent */
EASSERT(osc_extent_is_overlapped(obj, cur) == 0, cur);
- cur->oe_grants = chunksize + cli->cl_grant_extent_tax;
LASSERT(*grants >= cur->oe_grants);
*grants -= cur->oe_grants;
int sent, int rc)
{
struct client_obd *cli = osc_cli(ext->oe_obj);
+ struct osc_object *osc = ext->oe_obj;
struct osc_async_page *oap;
struct osc_async_page *tmp;
int nr_pages = ext->oe_nr_pages;
ext->oe_rc = rc ?: ext->oe_nr_pages;
EASSERT(ergo(rc == 0, ext->oe_state == OES_RPC), ext);
- osc_lru_add_batch(cli, &ext->oe_pages);
+ /* dio pages do not go in the LRU */
+ if (!ext->oe_dio)
+ osc_lru_add_batch(cli, &ext->oe_pages);
+
list_for_each_entry_safe(oap, tmp, &ext->oe_pages,
oap_pending_item) {
list_del_init(&oap->oap_rpc_item);
}
--ext->oe_nr_pages;
- osc_ap_completion(env, cli, oap, sent, rc);
+ osc_ap_completion(env, cli, osc, oap, sent, rc);
}
EASSERT(ext->oe_nr_pages == 0, ext);
if (!sent) {
lost_grant = ext->oe_grants;
- } else if (blocksize < PAGE_SIZE &&
+ } else if (cli->cl_ocd_grant_param == 0 &&
+ blocksize < PAGE_SIZE &&
last_count != PAGE_SIZE) {
- /* For short writes we shouldn't count parts of pages that
- * span a whole chunk on the OST side, or our accounting goes
- * wrong. Should match the code in filter_grant_check. */
+ /* For short writes without OBD_CONNECT_GRANT support, we
+ * shouldn't count parts of pages that span a whole chunk on
+ * the OST side, or our accounting goes wrong. Should match
+ * the code in tgt_grant_check.
+ */
int offset = last_off & ~PAGE_MASK;
int count = last_count + (offset & (blocksize - 1));
int end = (offset + last_count) & (blocksize - 1);
struct client_obd *cli = osc_cli(obj);
struct osc_async_page *oap;
struct osc_async_page *tmp;
- struct pagevec *pvec;
+ struct folio_batch *fbatch;
int pages_in_chunk = 0;
int ppc_bits = cli->cl_chunkbits -
PAGE_SHIFT;
io = osc_env_thread_io(env);
io->ci_obj = cl_object_top(osc2cl(obj));
io->ci_ignore_layout = 1;
- pvec = &osc_env_info(env)->oti_pagevec;
- ll_pagevec_init(pvec, 0);
+ fbatch = &osc_env_info(env)->oti_fbatch;
+ ll_folio_batch_init(fbatch, 0);
rc = cl_io_init(env, io, CIT_MISC, io->ci_obj);
if (rc < 0)
GOTO(out, rc);
}
lu_ref_del(&page->cp_reference, "truncate", current);
- cl_pagevec_put(env, page, pvec);
+ cl_batch_put(env, page, fbatch);
--ext->oe_nr_pages;
++nr_pages;
}
- pagevec_release(pvec);
+ folio_batch_release(fbatch);
EASSERTF(ergo(ext->oe_start >= trunc_index + !!partial,
ext->oe_nr_pages == 0),
rc = osc_make_ready(env, oap, OBD_BRW_WRITE);
switch (rc) {
case 0:
- spin_lock(&oap->oap_lock);
oap->oap_async_flags |= ASYNC_READY;
- spin_unlock(&oap->oap_lock);
break;
case -EALREADY:
LASSERT((oap->oap_async_flags & ASYNC_READY) != 0);
/* the last page is the only one we need to refresh its count by
* the size of file. */
if (!(last->oap_async_flags & ASYNC_COUNT_STABLE)) {
- int last_oap_count = osc_refresh_count(env, last, OBD_BRW_WRITE);
- LASSERT(last_oap_count > 0);
+ int last_oap_count = osc_refresh_count(env, obj, last,
+ OBD_BRW_WRITE);
+ LASSERTF(last_oap_count > 0,
+ "last_oap_count %d\n", last_oap_count);
LASSERT(last->oap_page_off + last_oap_count <= PAGE_SIZE);
last->oap_count = last_oap_count;
- spin_lock(&last->oap_lock);
last->oap_async_flags |= ASYNC_COUNT_STABLE;
- spin_unlock(&last->oap_lock);
}
/* for the rest of pages, we don't need to call osf_refresh_count()
list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
oap->oap_count = PAGE_SIZE - oap->oap_page_off;
- spin_lock(&oap->oap_lock);
oap->oap_async_flags |= ASYNC_COUNT_STABLE;
- spin_unlock(&oap->oap_lock);
}
}
static int osc_make_ready(const struct lu_env *env, struct osc_async_page *oap,
int cmd)
{
- struct osc_page *opg = oap2osc_page(oap);
struct cl_page *page = oap2cl_page(oap);
int result;
LASSERT(cmd == OBD_BRW_WRITE); /* no cached reads */
ENTRY;
+
result = cl_page_make_ready(env, page, CRT_WRITE);
- if (result == 0)
- opg->ops_submit_time = ktime_get();
+
RETURN(result);
}
-static int osc_refresh_count(const struct lu_env *env,
+static int osc_refresh_count(const struct lu_env *env, struct osc_object *osc,
struct osc_async_page *oap, int cmd)
{
struct osc_page *opg = oap2osc_page(oap);
pgoff_t index = osc_index(oap2osc(oap));
- struct cl_object *obj;
+ struct cl_object *obj = osc2cl(osc);
struct cl_attr *attr = &osc_env_info(env)->oti_attr;
int result;
loff_t kms;
/* readpage queues with _COUNT_STABLE, shouldn't get here. */
LASSERT(!(cmd & OBD_BRW_READ));
LASSERT(opg != NULL);
- obj = opg->ops_cl.cpl_obj;
cl_object_attr_lock(obj);
result = cl_object_attr_get(env, obj, attr);
if (result < 0)
return result;
kms = attr->cat_kms;
- if (cl_offset(obj, index) >= kms)
+ if (index << PAGE_SHIFT >= kms)
/* catch race with truncate */
return 0;
- else if (cl_offset(obj, index + 1) > kms)
+ else if ((index + 1) << PAGE_SHIFT > kms)
/* catch sub-page write at end of file */
return kms & ~PAGE_MASK;
else
return PAGE_SIZE;
}
-static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
- int cmd, int rc)
+static int osc_completion(const struct lu_env *env, struct osc_object *osc,
+ struct osc_async_page *oap, int cmd, int rc)
{
struct osc_page *opg = oap2osc_page(oap);
struct cl_page *page = oap2cl_page(oap);
ENTRY;
cmd &= ~OBD_BRW_NOQUOTA;
- LASSERTF(equi(page->cp_state == CPS_PAGEIN, cmd == OBD_BRW_READ),
- "cp_state:%u, cmd:%d\n", page->cp_state, cmd);
- LASSERTF(equi(page->cp_state == CPS_PAGEOUT, cmd == OBD_BRW_WRITE),
- "cp_state:%u, cmd:%d\n", page->cp_state, cmd);
- LASSERT(opg->ops_transfer_pinned);
+ if (page->cp_type != CPT_TRANSIENT) {
+ LASSERTF(equi(page->cp_state == CPS_PAGEIN, cmd == OBD_BRW_READ),
+ "cp_state:%u, cmd:%d\n", page->cp_state, cmd);
+ LASSERTF(equi(page->cp_state == CPS_PAGEOUT, cmd == OBD_BRW_WRITE),
+ "cp_state:%u, cmd:%d\n", page->cp_state, cmd);
+ LASSERT(opg->ops_transfer_pinned);
+ }
crt = cmd == OBD_BRW_READ ? CRT_READ : CRT_WRITE;
/* Clear opg->ops_transfer_pinned before VM lock is released. */
opg->ops_transfer_pinned = 0;
- opg->ops_submit_time = ktime_set(0, 0);
srvlock = oap->oap_brw_flags & OBD_BRW_SRVLOCK;
/* statistic */
if (rc == 0 && srvlock) {
- struct lu_device *ld = opg->ops_cl.cpl_obj->co_lu.lo_dev;
- struct osc_stats *stats = &lu2osc_dev(ld)->od_stats;
+ struct lu_device *ld = osc->oo_cl.co_lu.lo_dev;
+ struct osc_stats *stats = &lu2osc_dev(ld)->osc_stats;
size_t bytes = oap->oap_count;
if (crt == CRT_READ)
struct brw_page *pga)
{
assert_spin_locked(&cli->cl_loi_list_lock);
- LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
+ LASSERT(!(pga->bp_flag & OBD_BRW_FROM_GRANT));
cli->cl_dirty_pages++;
- pga->flag |= OBD_BRW_FROM_GRANT;
+ pga->bp_flag |= OBD_BRW_FROM_GRANT;
CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
- PAGE_SIZE, pga, pga->pg);
- osc_update_next_shrink(cli);
+ PAGE_SIZE, pga, pga->bp_page);
}
/* the companion to osc_consume_write_grant, called when a brw has completed.
ENTRY;
assert_spin_locked(&cli->cl_loi_list_lock);
- if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
+ if (!(pga->bp_flag & OBD_BRW_FROM_GRANT)) {
EXIT;
return;
}
- pga->flag &= ~OBD_BRW_FROM_GRANT;
+ pga->bp_flag &= ~OBD_BRW_FROM_GRANT;
atomic_long_dec(&obd_dirty_pages);
cli->cl_dirty_pages--;
EXIT;
* used, we should return these grants to OST. There're two cases where grants
* can be lost:
* 1. truncate;
- * 2. blocksize at OST is less than PAGE_SIZE and a partial page was
- * written. In this case OST may use less chunks to serve this partial
- * write. OSTs don't actually know the page size on the client side. so
- * clients have to calculate lost grant by the blocksize on the OST.
- * See filter_grant_check() for details.
+ * 2. Without OBD_CONNECT_GRANT support and blocksize at OST is less than
+ * PAGE_SIZE and a partial page was written. In this case OST may use less
+ * chunks to serve this partial write. OSTs don't actually know the page
+ * size on the client side. so clients have to calculate lost grant by the
+ * blocksize on the OST. See tgt_grant_check() for details.
*/
static void osc_free_grant(struct client_obd *cli, unsigned int nr_pages,
unsigned int lost_grant, unsigned int dirty_grant)
* The process will be put into sleep if it's already run out of grant.
*/
static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
- struct osc_async_page *oap, int bytes)
+ struct osc_object *osc, struct osc_async_page *oap,
+ int bytes)
{
- struct osc_object *osc = oap->oap_obj;
- struct lov_oinfo *loi = osc->oo_oinfo;
- int rc = -EDQUOT;
- unsigned long timeout = cfs_time_seconds(AT_OFF ? obd_timeout : at_max);
+ struct lov_oinfo *loi = osc->oo_oinfo;
+ int rc = -EDQUOT;
int remain;
bool entered = false;
+ struct obd_device *obd = cli->cl_import->imp_obd;
+ /* We cannot wait for a long time here since we are holding ldlm lock
+ * across the actual IO. If no requests complete fast (e.g. due to
+ * overloaded OST that takes a long time to process everything, we'd
+ * get evicted if we wait for a normal obd_timeout or some such.
+ * So we try to wait half the time it would take the client to be
+ * evicted by server which is half obd_timeout when AT is off
+ * or at least ldlm_enqueue_min with AT on.
+ * See LU-13131 */
+ unsigned long timeout =
+ cfs_time_seconds(obd_at_off(obd) ?
+ obd_timeout / 2 :
+ obd_get_ldlm_enqueue_min(obd) / 2);
ENTRY;
/* force the caller to try sync io. this can jump the list
* of queued writes and create a discontiguous rpc stream */
- if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
+ if (CFS_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
cli->cl_dirty_max_pages == 0 ||
cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync) {
OSC_DUMP_GRANT(D_CACHE, cli, "forced sync i/o\n");
return is_ready;
}
-/* this is trying to propogate async writeback errors back up to the
- * application. As an async write fails we record the error code for later if
- * the app does an fsync. As long as errors persist we force future rpcs to be
- * sync so that the app can get a sync error and break the cycle of queueing
- * pages for which writeback will fail. */
-static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
- int rc)
-{
- if (rc) {
- if (!ar->ar_rc)
- ar->ar_rc = rc;
-
- ar->ar_force_sync = 1;
- ar->ar_min_xid = ptlrpc_sample_next_xid();
- return;
-
- }
-
- if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
- ar->ar_force_sync = 0;
-}
-
/* this must be called holding the loi list lock to give coverage to exit_cache,
- * async_flag maintenance, and oap_request */
+ * async_flag maintenance
+ */
static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
+ struct osc_object *osc,
struct osc_async_page *oap, int sent, int rc)
{
- struct osc_object *osc = oap->oap_obj;
- struct lov_oinfo *loi = osc->oo_oinfo;
- __u64 xid = 0;
-
ENTRY;
- if (oap->oap_request != NULL) {
- xid = ptlrpc_req_xid(oap->oap_request);
- ptlrpc_req_finished(oap->oap_request);
- oap->oap_request = NULL;
- }
/* As the transfer for this page is being done, clear the flags */
- spin_lock(&oap->oap_lock);
oap->oap_async_flags = 0;
- spin_unlock(&oap->oap_lock);
- if (oap->oap_cmd & OBD_BRW_WRITE && xid > 0) {
- spin_lock(&cli->cl_loi_list_lock);
- osc_process_ar(&cli->cl_ar, xid, rc);
- osc_process_ar(&loi->loi_ar, xid, rc);
- spin_unlock(&cli->cl_loi_list_lock);
- }
-
- rc = osc_completion(env, oap, oap->oap_cmd, rc);
+ rc = osc_completion(env, osc, oap, oap->oap_cmd, rc);
if (rc)
CERROR("completion on oap %p obj %p returns %d.\n",
oap, osc, rc);
if (in_rpc->oe_dio && overlapped(ext, in_rpc))
return false;
+ if (ext->oe_is_rdma_only != in_rpc->oe_is_rdma_only)
+ return false;
+
return true;
}
};
assert_osc_object_is_locked(obj);
- while (!list_empty(&obj->oo_hp_exts)) {
- ext = list_entry(obj->oo_hp_exts.next, struct osc_extent,
- oe_link);
- LASSERT(ext->oe_state == OES_CACHE);
+ while ((ext = list_first_entry_or_null(&obj->oo_hp_exts,
+ struct osc_extent,
+ oe_link)) != NULL) {
if (!try_to_add_extent_for_io(cli, ext, &data))
return data.erd_page_count;
EASSERT(ext->oe_nr_pages <= data.erd_max_pages, ext);
if (data.erd_page_count == data.erd_max_pages)
return data.erd_page_count;
- while (!list_empty(&obj->oo_urgent_exts)) {
- ext = list_entry(obj->oo_urgent_exts.next,
- struct osc_extent, oe_link);
+ while ((ext = list_first_entry_or_null(&obj->oo_urgent_exts,
+ struct osc_extent,
+ oe_link)) != NULL) {
if (!try_to_add_extent_for_io(cli, ext, &data))
return data.erd_page_count;
}
* extents can usually only be added if the rpclist was empty, so if we
* can't add one, we continue on to trying to add normal extents. This
* is so we don't miss adding extra extents to an RPC containing high
- * priority or urgent extents. */
- while (!list_empty(&obj->oo_full_exts)) {
- ext = list_entry(obj->oo_full_exts.next,
- struct osc_extent, oe_link);
+ * priority or urgent extents.
+ */
+ while ((ext = list_first_entry_or_null(&obj->oo_full_exts,
+ struct osc_extent,
+ oe_link)) != NULL) {
if (!try_to_add_extent_for_io(cli, ext, &data))
break;
}
spin_lock(&cli->cl_loi_list_lock);
}
+ EXIT;
}
int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli,
EXPORT_SYMBOL(osc_io_unplug0);
int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops,
- struct page *page, loff_t offset)
+ struct cl_page *page, loff_t offset)
{
- struct obd_export *exp = osc_export(osc);
struct osc_async_page *oap = &ops->ops_oap;
- ENTRY;
+ ENTRY;
if (!page)
- return cfs_size_round(sizeof(*oap));
+ return round_up(sizeof(*oap), 8);
- oap->oap_magic = OAP_MAGIC;
- oap->oap_cli = &exp->exp_obd->u.cli;
oap->oap_obj = osc;
-
- oap->oap_page = page;
+ oap->oap_page = page->cp_vmpage;
oap->oap_obj_off = offset;
LASSERT(!(offset & ~PAGE_MASK));
+ /* Count of transient (direct i/o) pages is always stable by the time
+ * they're submitted. Setting this here lets us avoid calling
+ * cl_page_clip later to set this.
+ */
+ if (page->cp_type == CPT_TRANSIENT)
+ oap->oap_async_flags |= ASYNC_COUNT_STABLE|ASYNC_URGENT|
+ ASYNC_READY;
+
INIT_LIST_HEAD(&oap->oap_pending_item);
INIT_LIST_HEAD(&oap->oap_rpc_item);
- spin_lock_init(&oap->oap_lock);
- CDEBUG(D_INFO, "oap %p page %p obj off %llu\n",
- oap, page, oap->oap_obj_off);
+ CDEBUG(D_INFO, "oap %p vmpage %p obj off %llu\n",
+ oap, oap->oap_page, oap->oap_obj_off);
RETURN(0);
}
EXPORT_SYMBOL(osc_prep_async_page);
int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
- struct osc_page *ops, cl_commit_cbt cb)
+ struct osc_object *osc, struct osc_page *ops,
+ cl_commit_cbt cb)
{
struct osc_io *oio = osc_env_io(env);
struct osc_extent *ext = NULL;
struct osc_async_page *oap = &ops->ops_oap;
- struct client_obd *cli = oap->oap_cli;
- struct osc_object *osc = oap->oap_obj;
- struct pagevec *pvec = &osc_env_info(env)->oti_pagevec;
+ struct client_obd *cli = osc_cli(osc);
+ struct folio_batch *fbatch = &osc_env_info(env)->oti_fbatch;
pgoff_t index;
unsigned int tmp;
unsigned int grants = 0;
int rc = 0;
ENTRY;
- if (oap->oap_magic != OAP_MAGIC)
- RETURN(-EINVAL);
-
if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
RETURN(-EIO);
/* Set the OBD_BRW_SRVLOCK before the page is queued. */
brw_flags |= ops->ops_srvlock ? OBD_BRW_SRVLOCK : 0;
- if (oio->oi_cap_sys_resource || io->ci_noquota) {
+ if (io->ci_noquota) {
brw_flags |= OBD_BRW_NOQUOTA;
cmd |= OBD_BRW_NOQUOTA;
}
+ if (oio->oi_cap_sys_resource) {
+ brw_flags |= OBD_BRW_SYS_RESOURCE;
+ cmd |= OBD_BRW_SYS_RESOURCE;
+ }
+
/* check if the file's owner/group is over quota */
- if (!(cmd & OBD_BRW_NOQUOTA)) {
+ /* do not check for root without root squash, because in this case
+ * we should bypass quota
+ */
+ if ((!oio->oi_cap_sys_resource ||
+ cli->cl_root_squash || cli->cl_root_prjquota) &&
+ !io->ci_noquota) {
struct cl_object *obj;
struct cl_attr *attr;
unsigned int qid[LL_MAXQUOTAS];
oap->oap_cmd = cmd;
oap->oap_page_off = ops->ops_from;
- oap->oap_count = ops->ops_to - ops->ops_from;
+ oap->oap_count = ops->ops_to - ops->ops_from + 1;
/* No need to hold a lock here,
* since this page is not in any list yet. */
oap->oap_async_flags = 0;
LASSERT(ergo(grants > 0, grants >= tmp));
rc = 0;
+
+ /* We must not hold a page lock while we do osc_enter_cache()
+ * or osc_extent_find(), so we must mark dirty & unlock
+ * any pages in the write commit folio_batch.
+ */
+ if (folio_batch_count(fbatch)) {
+ cb(env, io, fbatch);
+ folio_batch_reinit(fbatch);
+ }
+
if (grants == 0) {
- /* We haven't allocated grant for this page, and we
- * must not hold a page lock while we do enter_cache,
- * so we must mark dirty & unlock any pages in the
- * write commit pagevec. */
- if (pagevec_count(pvec)) {
- cb(env, io, pvec);
- pagevec_reinit(pvec);
- }
- rc = osc_enter_cache(env, cli, oap, tmp);
+ rc = osc_enter_cache(env, cli, osc, oap, tmp);
if (rc == 0)
grants = tmp;
}
int rc = 0;
ENTRY;
- LASSERT(oap->oap_magic == OAP_MAGIC);
-
CDEBUG(D_INFO, "teardown oap %p page %p at index %lu.\n",
oap, ops, osc_index(oap2osc(oap)));
struct osc_page *ops)
{
struct osc_extent *ext = NULL;
- struct osc_object *obj = cl2osc(ops->ops_cl.cpl_obj);
+ struct osc_object *obj = osc_page_object(ops);
struct cl_page *cp = ops->ops_cl.cpl_page;
pgoff_t index = osc_index(ops);
struct osc_async_page *oap = &ops->ops_oap;
if (rc)
GOTO(out, rc);
- spin_lock(&oap->oap_lock);
oap->oap_async_flags |= ASYNC_READY|ASYNC_URGENT;
- spin_unlock(&oap->oap_lock);
- if (memory_pressure_get())
+ if (current->flags & PF_MEMALLOC)
ext->oe_memalloc = 1;
ext->oe_urgent = 1;
return rc;
}
-int osc_queue_sync_pages(const struct lu_env *env, const struct cl_io *io,
+int osc_queue_sync_pages(const struct lu_env *env, struct cl_io *io,
struct osc_object *obj, struct list_head *list,
int brw_flags)
{
+ struct osc_io *oio = osc_env_io(env);
struct client_obd *cli = osc_cli(obj);
struct osc_extent *ext;
struct osc_async_page *oap;
bool can_merge = true;
pgoff_t start = CL_PAGE_EOF;
pgoff_t end = 0;
+ struct osc_lock *oscl;
ENTRY;
list_for_each_entry(oap, list, oap_pending_item) {
++page_count;
mppr <<= (page_count > mppr);
- if (unlikely(opg->ops_from > 0 || opg->ops_to < PAGE_SIZE))
+ if (unlikely(opg->ops_from > 0 ||
+ opg->ops_to < PAGE_SIZE - 1))
can_merge = false;
}
list_for_each_entry_safe(oap, tmp, list, oap_pending_item) {
list_del_init(&oap->oap_pending_item);
- osc_ap_completion(env, cli, oap, 0, -ENOMEM);
+ osc_ap_completion(env, cli, obj, oap, 0, -ENOMEM);
}
RETURN(-ENOMEM);
}
ext->oe_srvlock = !!(brw_flags & OBD_BRW_SRVLOCK);
ext->oe_ndelay = !!(brw_flags & OBD_BRW_NDELAY);
ext->oe_dio = !!(brw_flags & OBD_BRW_NOCACHE);
+ oscl = oio->oi_write_osclock ? : oio->oi_read_osclock;
+ if (oscl && oscl->ols_dlmlock != NULL) {
+ ext->oe_dlmlock = LDLM_LOCK_GET(oscl->ols_dlmlock);
+ lu_ref_add(&ext->oe_dlmlock->l_reference, "osc_extent", ext);
+ }
+ if (ext->oe_dio && !ext->oe_rw) { /* direct io write */
+ int grants;
+ int ppc;
+
+ ppc = 1 << (cli->cl_chunkbits - PAGE_SHIFT);
+ grants = cli->cl_grant_extent_tax;
+ grants += (1 << cli->cl_chunkbits) *
+ ((page_count + ppc - 1) / ppc);
+
+ CDEBUG(D_CACHE, "requesting %d bytes grant\n", grants);
+ spin_lock(&cli->cl_loi_list_lock);
+ if (osc_reserve_grant(cli, grants) == 0) {
+ list_for_each_entry(oap, list, oap_pending_item) {
+ osc_consume_write_grant(cli,
+ &oap->oap_brw_page);
+ }
+ atomic_long_add(page_count, &obd_dirty_pages);
+ osc_unreserve_grant_nolock(cli, grants, 0);
+ ext->oe_grants = grants;
+ } else {
+ /* We cannot report ENOSPC correctly if we do parallel
+ * DIO (async RPC submission), so turn off parallel dio
+ * if there is not sufficient grant available. This
+ * makes individual RPCs synchronous.
+ */
+ io->ci_parallel_dio = false;
+ CDEBUG(D_CACHE,
+ "not enough grant available, switching to sync for this i/o\n");
+ }
+ spin_unlock(&cli->cl_loi_list_lock);
+ osc_update_next_shrink(cli);
+ }
+
+ ext->oe_is_rdma_only = !!(brw_flags & OBD_BRW_RDMA_ONLY);
ext->oe_nr_pages = page_count;
ext->oe_mppr = mppr;
list_splice_init(list, &ext->oe_pages);
/* Reuse the initial refcount for RPC, don't drop it */
osc_extent_state_set(ext, OES_LOCK_DONE);
if (!ext->oe_rw) { /* write */
- list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
+ if (!ext->oe_srvlock && !ext->oe_dio) {
+ /* The most likely case here is from lack of grants
+ * so we are either out of quota or out of space.
+ * Since this means we are holding locks across
+ * potentially multi-striped IO, we must send out
+ * everything out instantly to avoid prolonged
+ * waits resulting in lock eviction (likely since
+ * the extended wait in osc_cache_enter() did not
+ * yield any additional grant due to a timeout.
+ * LU-13131 */
+ ext->oe_hp = 1;
+ list_add_tail(&ext->oe_link, &obj->oo_hp_exts);
+ } else {
+ list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
+ }
osc_update_pending(obj, OBD_BRW_WRITE, page_count);
} else {
list_add_tail(&ext->oe_link, &obj->oo_reading_exts);
ENTRY;
/* pages with index greater or equal to index will be truncated. */
- index = cl_index(osc2cl(obj), size);
- partial = size > cl_offset(osc2cl(obj), index);
+ index = size >> PAGE_SHIFT;
+ partial = size > (index << PAGE_SHIFT);
again:
osc_object_lock(obj);
osc_list_maint(cli, obj);
- while (!list_empty(&list)) {
+ while ((ext = list_first_entry_or_null(&list,
+ struct osc_extent,
+ oe_link)) != NULL) {
int rc;
- ext = list_entry(list.next, struct osc_extent, oe_link);
list_del_init(&ext->oe_link);
/* extent may be in OES_ACTIVE state because inode mutex
osc_page_gang_cbt cb, void *cbdata)
{
struct osc_page *ops;
- struct pagevec *pagevec;
+ struct folio_batch *fbatch;
void **pvec;
pgoff_t idx;
unsigned int nr;
idx = start;
pvec = osc_env_info(env)->oti_pvec;
- pagevec = &osc_env_info(env)->oti_pagevec;
- ll_pagevec_init(pagevec, 0);
+ fbatch = &osc_env_info(env)->oti_fbatch;
+ ll_folio_batch_init(fbatch, 0);
spin_lock(&osc->oo_tree_lock);
while ((nr = radix_tree_gang_lookup(&osc->oo_tree, pvec,
idx, OTI_PVEC_SIZE)) > 0) {
spin_unlock(&osc->oo_tree_lock);
tree_lock = false;
+ res = (*cb)(env, io, pvec, j, cbdata);
+
for (i = 0; i < j; ++i) {
ops = pvec[i];
- if (res)
- res = (*cb)(env, io, ops, cbdata);
-
page = ops->ops_cl.cpl_page;
lu_ref_del(&page->cp_reference, "gang_lookup", current);
- cl_pagevec_put(env, page, pagevec);
+ cl_batch_put(env, page, fbatch);
}
- pagevec_release(pagevec);
+ folio_batch_release(fbatch);
if (nr < OTI_PVEC_SIZE || end_of_region)
break;
if (!res)
break;
+
+ CFS_FAIL_TIMEOUT(OBD_FAIL_OSC_SLOW_PAGE_EVICT,
+ cfs_fail_val ?: 20);
+
+ if (io->ci_type == CIT_MISC &&
+ io->u.ci_misc.lm_next_rpc_time &&
+ ktime_get_seconds() > io->u.ci_misc.lm_next_rpc_time) {
+ osc_send_empty_rpc(osc, idx << PAGE_SHIFT);
+ io->u.ci_misc.lm_next_rpc_time = ktime_get_seconds() +
+ 5 * obd_timeout / 16;
+ }
+
if (need_resched())
cond_resched();
* Check if page @page is covered by an extra lock or discard it.
*/
static bool check_and_discard_cb(const struct lu_env *env, struct cl_io *io,
- struct osc_page *ops, void *cbdata)
+ void **pvec, int count, void *cbdata)
{
struct osc_thread_info *info = osc_env_info(env);
struct osc_object *osc = cbdata;
- pgoff_t index;
+ int i;
- index = osc_index(ops);
- if (index >= info->oti_fn_index) {
- struct ldlm_lock *tmp;
+ for (i = 0; i < count; i++) {
+ struct osc_page *ops = pvec[i];
struct cl_page *page = ops->ops_cl.cpl_page;
+ pgoff_t index = osc_index(ops);
+ bool discard = false;
+
+ /* negative lock caching */
+ if (index < info->oti_ng_index) {
+ discard = true;
+ } else if (index >= info->oti_fn_index) {
+ struct ldlm_lock *tmp;
+ /* refresh non-overlapped index */
+ tmp = osc_dlmlock_at_pgoff(env, osc, index,
+ OSC_DAP_FL_TEST_LOCK |
+ OSC_DAP_FL_AST |
+ OSC_DAP_FL_RIGHT);
+ if (tmp != NULL) {
+ __u64 end =
+ tmp->l_policy_data.l_extent.end;
+ __u64 start =
+ tmp->l_policy_data.l_extent.start;
+
+ /* no lock covering this page */
+ if (index < start >> PAGE_SHIFT) {
+ /* no lock at @index,
+ * first lock at @start
+ */
+ info->oti_ng_index =
+ start >> PAGE_SHIFT;
+ discard = true;
+ } else {
+ /* Cache the first-non-overlapped
+ * index so as to skip all pages
+ * within [index, oti_fn_index).
+ * This is safe because if tmp lock
+ * is canceled, it will discard these
+ * pages.
+ */
+ info->oti_fn_index =
+ (end + 1) >> PAGE_SHIFT;
+ if (end == OBD_OBJECT_EOF)
+ info->oti_fn_index =
+ CL_PAGE_EOF;
+ }
+ LDLM_LOCK_PUT(tmp);
+ } else {
+ info->oti_ng_index = CL_PAGE_EOF;
+ discard = true;
+ }
+ }
- /* refresh non-overlapped index */
- tmp = osc_dlmlock_at_pgoff(env, osc, index,
- OSC_DAP_FL_TEST_LOCK);
- if (tmp != NULL) {
- __u64 end = tmp->l_policy_data.l_extent.end;
- /* Cache the first-non-overlapped index so as to skip
- * all pages within [index, oti_fn_index). This is safe
- * because if tmp lock is canceled, it will discard
- * these pages. */
- info->oti_fn_index = cl_index(osc2cl(osc), end + 1);
- if (end == OBD_OBJECT_EOF)
- info->oti_fn_index = CL_PAGE_EOF;
- LDLM_LOCK_PUT(tmp);
- } else if (cl_page_own(env, io, page) == 0) {
- /* discard the page */
- cl_page_discard(env, io, page);
- cl_page_disown(env, io, page);
- } else {
- LASSERT(page->cp_state == CPS_FREEING);
+ if (discard) {
+ if (cl_page_own(env, io, page) == 0) {
+ cl_page_discard(env, io, page);
+ cl_page_disown(env, io, page);
+ } else {
+ LASSERT(page->cp_state == CPS_FREEING);
+ }
}
- }
- info->oti_next_index = index + 1;
+ info->oti_next_index = index + 1;
+ }
return true;
}
bool osc_discard_cb(const struct lu_env *env, struct cl_io *io,
- struct osc_page *ops, void *cbdata)
+ void **pvec, int count, void *cbdata)
{
struct osc_thread_info *info = osc_env_info(env);
- struct cl_page *page = ops->ops_cl.cpl_page;
-
- /* page is top page. */
- info->oti_next_index = osc_index(ops) + 1;
- if (cl_page_own(env, io, page) == 0) {
- if (!ergo(page->cp_type == CPT_CACHEABLE,
- !PageDirty(cl_page_vmpage(page))))
- CL_PAGE_DEBUG(D_ERROR, env, page,
- "discard dirty page?\n");
-
- /* discard the page */
- cl_page_discard(env, io, page);
- cl_page_disown(env, io, page);
- } else {
- LASSERT(page->cp_state == CPS_FREEING);
+ int i;
+
+ for (i = 0; i < count; i++) {
+ struct osc_page *ops = pvec[i];
+ struct cl_page *page = ops->ops_cl.cpl_page;
+
+ /* page is top page. */
+ info->oti_next_index = osc_index(ops) + 1;
+ if (cl_page_own(env, io, page) == 0) {
+ if (!ergo(page->cp_type == CPT_CACHEABLE,
+ !PageDirty(cl_page_vmpage(page))))
+ CL_PAGE_DEBUG(D_ERROR, env, page,
+ "discard dirty page?\n");
+
+ /* discard the page */
+ cl_page_discard(env, io, page);
+ cl_page_disown(env, io, page);
+ } else {
+ LASSERT(page->cp_state == CPS_FREEING);
+ }
}
return true;
io->ci_obj = cl_object_top(osc2cl(osc));
io->ci_ignore_layout = 1;
+ io->ci_invalidate_page_cache = 1;
+ io->u.ci_misc.lm_next_rpc_time = ktime_get_seconds() +
+ 5 * obd_timeout / 16;
result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
if (result != 0)
GOTO(out, result);
cb = discard ? osc_discard_cb : check_and_discard_cb;
info->oti_fn_index = info->oti_next_index = start;
+ info->oti_ng_index = 0;
osc_page_gang_lookup(env, io, osc,
info->oti_next_index, end, cb, osc);