*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
*
* GPL HEADER END
*/
* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2012, 2015, Intel Corporation.
+ * Copyright (c) 2012, 2017, Intel Corporation.
*
*/
/*
#define DEBUG_SUBSYSTEM S_OSC
-#include "osc_cl_internal.h"
+#include <lustre_osc.h>
+
#include "osc_internal.h"
static int extent_debug; /* set it to be true for more debug */
if (ext->oe_sync && ext->oe_grants > 0)
GOTO(out, rc = 90);
- if (ext->oe_dlmlock != NULL && !ldlm_is_failed(ext->oe_dlmlock)) {
+ if (ext->oe_dlmlock != NULL &&
+ ext->oe_dlmlock->l_resource->lr_type == LDLM_EXTENT &&
+ !ldlm_is_failed(ext->oe_dlmlock)) {
struct ldlm_extent *extent;
extent = &ext->oe_dlmlock->l_policy_data.l_extent;
if (ext->oe_urgent)
list_move_tail(&ext->oe_link,
&obj->oo_urgent_exts);
+ else if (ext->oe_nr_pages == ext->oe_mppr) {
+ list_move_tail(&ext->oe_link,
+ &obj->oo_full_exts);
+ }
}
osc_object_unlock(obj);
pgoff_t ext_chk_end = ext->oe_end >> ppc_bits;
LASSERT(sanity_check_nolock(ext) == 0);
- if (chunk > ext_chk_end + 1)
+ if (chunk > ext_chk_end + 1 || chunk < ext_chk_start)
break;
/* if covering by different locks, no chance to match */
/* pull ext's start back to cover cur */
ext->oe_start = cur->oe_start;
ext->oe_grants += chunksize;
+ LASSERT(*grants >= chunksize);
*grants -= chunksize;
found = osc_extent_hold(ext);
/* rear merge */
ext->oe_end = cur->oe_end;
ext->oe_grants += chunksize;
+ LASSERT(*grants >= chunksize);
*grants -= chunksize;
/* try to merge with the next one because we just fill
/* create a new extent */
EASSERT(osc_extent_is_overlapped(obj, cur) == 0, cur);
cur->oe_grants = chunksize + cli->cl_grant_extent_tax;
+ LASSERT(*grants >= cur->oe_grants);
*grants -= cur->oe_grants;
- LASSERT(*grants >= 0);
cur->oe_state = OES_CACHE;
found = osc_extent_hold(cur);
out:
osc_extent_put(env, cur);
- LASSERT(*grants >= 0);
return found;
}
* We can't use that env from osc_cache_truncate_start() because
* it's from lov_io_sub and not fully initialized. */
env = cl_env_get(&refcheck);
- io = &osc_env_info(env)->oti_io;
+ if (IS_ERR(env))
+ RETURN(PTR_ERR(env));
+
+ io = osc_env_thread_io(env);
io->ci_obj = cl_object_top(osc2cl(obj));
io->ci_ignore_layout = 1;
rc = cl_io_init(env, io, CIT_MISC, io->ci_obj);
ext->oe_end = end_index;
ext->oe_grants += chunksize;
+ LASSERT(*grants >= chunksize);
*grants -= chunksize;
- LASSERT(*grants >= 0);
EASSERTF(osc_extent_is_overlapped(obj, ext) == 0, ext,
"overlapped after expanding for %lu.\n", index);
EXIT;
struct osc_extent *ext;
int cnt;
+ if (!cfs_cdebug_show(level, DEBUG_SUBSYSTEM))
+ return;
+
CDEBUG(level, "Dump object %p extents at %s:%d, mppr: %u.\n",
obj, func, line, osc_cli(obj)->cl_max_pages_per_rpc);
ENTRY;
result = cl_page_make_ready(env, page, CRT_WRITE);
if (result == 0)
- opg->ops_submit_time = cfs_time_current();
+ opg->ops_submit_time = ktime_get();
RETURN(result);
}
pgoff_t index = osc_index(oap2osc(oap));
struct cl_object *obj;
struct cl_attr *attr = &osc_env_info(env)->oti_attr;
-
int result;
loff_t kms;
/* Clear opg->ops_transfer_pinned before VM lock is released. */
opg->ops_transfer_pinned = 0;
- opg->ops_submit_time = 0;
+ opg->ops_submit_time = ktime_set(0, 0);
srvlock = oap->oap_brw_flags & OBD_BRW_SRVLOCK;
/* statistic */
EXIT;
}
+EXPORT_SYMBOL(osc_wake_cache_waiters);
static int osc_max_rpc_in_flight(struct client_obd *cli, struct osc_object *osc)
{
CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
RETURN(1);
}
- if (atomic_read(&osc->oo_nr_writes) >=
- cli->cl_max_pages_per_rpc)
+ if (!list_empty(&osc->oo_full_exts)) {
+ CDEBUG(D_CACHE, "full extent ready, make an RPC\n");
RETURN(1);
+ }
} else {
if (atomic_read(&osc->oo_nr_reads) == 0)
RETURN(0);
unsigned int erd_page_count;
unsigned int erd_max_pages;
unsigned int erd_max_chunks;
+ unsigned int erd_max_extents;
};
static inline unsigned osc_extent_chunks(const struct osc_extent *ext)
EASSERT((ext->oe_state == OES_CACHE || ext->oe_state == OES_LOCK_DONE),
ext);
+ OSC_EXTENT_DUMP(D_CACHE, ext, "trying to add this extent\n");
+
+ if (data->erd_max_extents == 0)
+ RETURN(0);
chunk_count = osc_extent_chunks(ext);
+ EASSERTF(data->erd_page_count != 0 ||
+ chunk_count <= data->erd_max_chunks, ext,
+ "The first extent to be fit in a RPC contains %u chunks, "
+ "which is over the limit %u.\n", chunk_count,
+ data->erd_max_chunks);
if (chunk_count > data->erd_max_chunks)
RETURN(0);
data->erd_max_pages = max(ext->oe_mppr, data->erd_max_pages);
+ EASSERTF(data->erd_page_count != 0 ||
+ ext->oe_nr_pages <= data->erd_max_pages, ext,
+ "The first extent to be fit in a RPC contains %u pages, "
+ "which is over the limit %u.\n", ext->oe_nr_pages,
+ data->erd_max_pages);
if (data->erd_page_count + ext->oe_nr_pages > data->erd_max_pages)
RETURN(0);
if (tmp->oe_srvlock != ext->oe_srvlock ||
!tmp->oe_grants != !ext->oe_grants ||
+ tmp->oe_ndelay != ext->oe_ndelay ||
tmp->oe_no_merge || ext->oe_no_merge)
RETURN(0);
break;
}
+ data->erd_max_extents--;
data->erd_max_chunks -= chunk_count;
data->erd_page_count += ext->oe_nr_pages;
list_move_tail(&ext->oe_link, data->erd_rpc_list);
*
* This limitation doesn't apply to ldiskfs, which allows as many
* chunks in one RPC as we want. However, it won't have any benefits
- * to have too many discontiguous pages in one RPC. Therefore, it
- * can only have 256 chunks at most in one RPC.
+ * to have too many discontiguous pages in one RPC.
+ *
+ * An osc_extent won't cover over a RPC size, so the chunks in an
+ * osc_extent won't bigger than PTLRPC_MAX_BRW_SIZE >> chunkbits.
*/
- return min(PTLRPC_MAX_BRW_SIZE >> cli->cl_chunkbits, 256);
+ return PTLRPC_MAX_BRW_SIZE >> cli->cl_chunkbits;
}
/**
.erd_page_count = 0,
.erd_max_pages = cli->cl_max_pages_per_rpc,
.erd_max_chunks = osc_max_write_chunks(cli),
+ .erd_max_extents = 256,
};
LASSERT(osc_object_is_locked(obj));
struct osc_extent, oe_link);
if (!try_to_add_extent_for_io(cli, ext, &data))
return data.erd_page_count;
+ }
+ if (data.erd_page_count == data.erd_max_pages)
+ return data.erd_page_count;
- if (!ext->oe_intree)
- continue;
-
- while ((ext = next_extent(ext)) != NULL) {
- if ((ext->oe_state != OES_CACHE) ||
- (!list_empty(&ext->oe_link) &&
- ext->oe_owner != NULL))
- continue;
-
- if (!try_to_add_extent_for_io(cli, ext, &data))
- return data.erd_page_count;
- }
+ /* One key difference between full extents and other extents: full
+ * extents can usually only be added if the rpclist was empty, so if we
+ * can't add one, we continue on to trying to add normal extents. This
+ * is so we don't miss adding extra extents to an RPC containing high
+ * priority or urgent extents. */
+ while (!list_empty(&obj->oo_full_exts)) {
+ ext = list_entry(obj->oo_full_exts.next,
+ struct osc_extent, oe_link);
+ if (!try_to_add_extent_for_io(cli, ext, &data))
+ break;
}
if (data.erd_page_count == data.erd_max_pages)
return data.erd_page_count;
.erd_page_count = 0,
.erd_max_pages = cli->cl_max_pages_per_rpc,
.erd_max_chunks = UINT_MAX,
+ .erd_max_extents = UINT_MAX,
};
int rc = 0;
ENTRY;
}
}
-static int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli,
- struct osc_object *osc, int async)
+int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli,
+ struct osc_object *osc, int async)
{
int rc = 0;
}
return rc;
}
-
-static int osc_io_unplug_async(const struct lu_env *env,
- struct client_obd *cli, struct osc_object *osc)
-{
- return osc_io_unplug0(env, cli, osc, 1);
-}
-
-void osc_io_unplug(const struct lu_env *env, struct client_obd *cli,
- struct osc_object *osc)
-{
- (void)osc_io_unplug0(env, cli, osc, 0);
-}
+EXPORT_SYMBOL(osc_io_unplug0);
int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops,
struct page *page, loff_t offset)
oap, page, oap->oap_obj_off);
RETURN(0);
}
+EXPORT_SYMBOL(osc_prep_async_page);
int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
struct osc_page *ops)
qid[USRQUOTA] = attr->cat_uid;
qid[GRPQUOTA] = attr->cat_gid;
+ qid[PRJQUOTA] = attr->cat_projid;
if (rc == 0 && osc_quota_chkdq(cli, qid) == NO_QUOTA)
rc = -EDQUOT;
if (rc)
++ext->oe_nr_pages;
list_add_tail(&oap->oap_pending_item, &ext->oe_pages);
osc_object_unlock(osc);
+
+ if (!ext->oe_layout_version)
+ ext->oe_layout_version = io->ci_layout_version;
}
+
RETURN(rc);
}
RETURN(rc);
}
-int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj,
- struct list_head *list, int cmd, int brw_flags)
+int osc_queue_sync_pages(const struct lu_env *env, const struct cl_io *io,
+ struct osc_object *obj, struct list_head *list,
+ int brw_flags)
{
struct client_obd *cli = osc_cli(obj);
struct osc_extent *ext;
RETURN(-ENOMEM);
}
- ext->oe_rw = !!(cmd & OBD_BRW_READ);
+ ext->oe_rw = !!(brw_flags & OBD_BRW_READ);
ext->oe_sync = 1;
ext->oe_no_merge = !can_merge;
ext->oe_urgent = 1;
ext->oe_end = ext->oe_max_end = end;
ext->oe_obj = obj;
ext->oe_srvlock = !!(brw_flags & OBD_BRW_SRVLOCK);
+ ext->oe_ndelay = !!(brw_flags & OBD_BRW_NDELAY);
ext->oe_nr_pages = page_count;
ext->oe_mppr = mppr;
list_splice_init(list, &ext->oe_pages);
+ ext->oe_layout_version = io->ci_layout_version;
osc_object_lock(obj);
/* Reuse the initial refcount for RPC, don't drop it */
osc_extent_state_set(ext, OES_LOCK_DONE);
- if (cmd & OBD_BRW_WRITE) {
+ if (!ext->oe_rw) { /* write */
list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
osc_update_pending(obj, OBD_BRW_WRITE, page_count);
} else {
osc_update_pending(obj, OBD_BRW_WRITE,
-ext->oe_nr_pages);
}
- EASSERT(list_empty(&ext->oe_link), ext);
- list_add_tail(&ext->oe_link, &list);
+ /* This extent could be on the full extents list, that's OK */
+ EASSERT(!ext->oe_hp && !ext->oe_urgent, ext);
+ if (!list_empty(&ext->oe_link))
+ list_move_tail(&ext->oe_link, &list);
+ else
+ list_add_tail(&ext->oe_link, &list);
ext = next_extent(ext);
}
}
RETURN(result);
}
+EXPORT_SYMBOL(osc_cache_truncate_start);
/**
* Called after osc_io_setattr_end to add oio->oi_trunc back to cache.
OSC_IO_DEBUG(obj, "sync file range.\n");
RETURN(result);
}
+EXPORT_SYMBOL(osc_cache_wait_range);
/**
* Called to write out a range of osc object.
OSC_IO_DEBUG(obj, "pageout [%lu, %lu], %d.\n", start, end, result);
RETURN(result);
}
+EXPORT_SYMBOL(osc_cache_writeback_range);
/**
* Returns a list of pages by a given [start, end] of \a obj.
spin_unlock(&osc->oo_tree_lock);
RETURN(res);
}
+EXPORT_SYMBOL(osc_page_gang_lookup);
/**
* Check if page @page is covered by an extra lock or discard it.
return CLP_GANG_OKAY;
}
-static int discard_cb(const struct lu_env *env, struct cl_io *io,
- struct osc_page *ops, void *cbdata)
+int osc_discard_cb(const struct lu_env *env, struct cl_io *io,
+ struct osc_page *ops, void *cbdata)
{
struct osc_thread_info *info = osc_env_info(env);
struct cl_page *page = ops->ops_cl.cpl_page;
return CLP_GANG_OKAY;
}
+EXPORT_SYMBOL(osc_discard_cb);
/**
* Discard pages protected by the given lock. This function traverses radix
* behind this being that lock cancellation cannot be delayed indefinitely).
*/
int osc_lock_discard_pages(const struct lu_env *env, struct osc_object *osc,
- pgoff_t start, pgoff_t end, enum cl_lock_mode mode)
+ pgoff_t start, pgoff_t end, bool discard)
{
struct osc_thread_info *info = osc_env_info(env);
- struct cl_io *io = &info->oti_io;
+ struct cl_io *io = osc_env_thread_io(env);
osc_page_gang_cbt cb;
int res;
int result;
if (result != 0)
GOTO(out, result);
- cb = mode == CLM_READ ? check_and_discard_cb : discard_cb;
+ cb = discard ? osc_discard_cb : check_and_discard_cb;
info->oti_fn_index = info->oti_next_index = start;
do {
res = osc_page_gang_lookup(env, io, osc,