return flags;
}
-static inline char list_empty_marker(struct list_head *list)
-{
- return list_empty(list) ? '-' : '+';
-}
-
#define EXTSTR "[%lu -> %lu/%lu]"
#define EXTPARA(ext) (ext)->oe_start, (ext)->oe_end, (ext)->oe_max_end
static const char *oes_strings[] = {
size_t page_count;
int rc = 0;
- if (!osc_object_is_locked(obj))
- GOTO(out, rc = 9);
+ assert_osc_object_is_locked(obj);
if (ext->oe_state >= OES_STATE_MAX)
GOTO(out, rc = 10);
GOTO(out, rc = 60);
if (ext->oe_fsync_wait && !ext->oe_urgent && !ext->oe_hp)
GOTO(out, rc = 65);
+ /* fallthrough */
default:
if (atomic_read(&ext->oe_users) > 0)
GOTO(out, rc = 70);
__res; \
})
+static inline bool
+overlapped(const struct osc_extent *ex1, const struct osc_extent *ex2)
+{
+ return !(ex1->oe_end < ex2->oe_start || ex2->oe_end < ex1->oe_start);
+}
/**
* sanity check - to make sure there is no overlapped extent in the tree.
{
struct osc_extent *tmp;
- LASSERT(osc_object_is_locked(obj));
+ assert_osc_object_is_locked(obj);
if (!extent_debug)
return 0;
for (tmp = first_extent(obj); tmp != NULL; tmp = next_extent(tmp)) {
if (tmp == ext)
continue;
- if (tmp->oe_end >= ext->oe_start &&
- tmp->oe_start <= ext->oe_end)
+ if (overlapped(tmp, ext))
return 1;
}
return 0;
static void osc_extent_state_set(struct osc_extent *ext, int state)
{
- LASSERT(osc_object_is_locked(ext->oe_obj));
+ assert_osc_object_is_locked(ext->oe_obj);
LASSERT(state >= OES_INV && state < OES_STATE_MAX);
/* Never try to sanity check a state changing extent :-) */
static void osc_extent_put_trust(struct osc_extent *ext)
{
LASSERT(atomic_read(&ext->oe_refc) > 1);
- LASSERT(osc_object_is_locked(ext->oe_obj));
+ assert_osc_object_is_locked(ext->oe_obj);
atomic_dec(&ext->oe_refc);
}
struct rb_node *n = obj->oo_root.rb_node;
struct osc_extent *tmp, *p = NULL;
- LASSERT(osc_object_is_locked(obj));
+ assert_osc_object_is_locked(obj);
while (n != NULL) {
tmp = rb_extent(n);
if (index < tmp->oe_start) {
LASSERT(ext->oe_intree == 0);
LASSERT(ext->oe_obj == obj);
- LASSERT(osc_object_is_locked(obj));
+ assert_osc_object_is_locked(obj);
while (*n != NULL) {
tmp = rb_extent(*n);
parent = *n;
static void osc_extent_erase(struct osc_extent *ext)
{
struct osc_object *obj = ext->oe_obj;
- LASSERT(osc_object_is_locked(obj));
+ assert_osc_object_is_locked(obj);
if (ext->oe_intree) {
rb_erase(&ext->oe_node, &obj->oo_root);
ext->oe_intree = 0;
{
struct osc_object *obj = ext->oe_obj;
- LASSERT(osc_object_is_locked(obj));
+ assert_osc_object_is_locked(obj);
LASSERT(ext->oe_state == OES_ACTIVE || ext->oe_state == OES_CACHE);
if (ext->oe_state == OES_CACHE) {
osc_extent_state_set(ext, OES_ACTIVE);
static void __osc_extent_remove(struct osc_extent *ext)
{
- LASSERT(osc_object_is_locked(ext->oe_obj));
+ assert_osc_object_is_locked(ext->oe_obj);
LASSERT(list_empty(&ext->oe_pages));
osc_extent_erase(ext);
list_del_init(&ext->oe_link);
int ppc_bits;
LASSERT(cur->oe_state == OES_CACHE);
- LASSERT(osc_object_is_locked(obj));
+ assert_osc_object_is_locked(obj);
if (victim == NULL)
return -EINVAL;
RETURN(rc);
}
-static inline int overlapped(struct osc_extent *ex1, struct osc_extent *ex2)
-{
- return !(ex1->oe_end < ex2->oe_start || ex2->oe_end < ex1->oe_start);
-}
-
/**
* Find or create an extent which includes @index, core function to manage
* extent tree.
enum osc_extent_state state)
{
struct osc_object *obj = ext->oe_obj;
- struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(600), NULL,
- LWI_ON_SIGNAL_NOOP, NULL);
int rc = 0;
ENTRY;
osc_extent_release(env, ext);
/* wait for the extent until its state becomes @state */
- rc = l_wait_event(ext->oe_waitq, extent_wait_cb(ext, state), &lwi);
- if (rc == -ETIMEDOUT) {
+ rc = wait_event_idle_timeout(ext->oe_waitq, extent_wait_cb(ext, state),
+ cfs_time_seconds(600));
+ if (rc == 0) {
OSC_EXTENT_DUMP(D_ERROR, ext,
"%s: wait ext to %u timedout, recovery in progress?\n",
cli_name(osc_cli(obj)), state);
- lwi = LWI_INTR(NULL, NULL);
- rc = l_wait_event(ext->oe_waitq, extent_wait_cb(ext, state),
- &lwi);
+ wait_event_idle(ext->oe_waitq, extent_wait_cb(ext, state));
}
- if (rc == 0 && ext->oe_rc < 0)
+ if (ext->oe_rc < 0)
rc = ext->oe_rc;
+ else
+ rc = 0;
RETURN(rc);
}
return 0;
else if (cl_offset(obj, index + 1) > kms)
/* catch sub-page write at end of file */
- return kms % PAGE_SIZE;
+ return kms & ~PAGE_MASK;
else
return PAGE_SIZE;
}
pga->flag &= ~OBD_BRW_FROM_GRANT;
atomic_long_dec(&obd_dirty_pages);
cli->cl_dirty_pages--;
- if (pga->flag & OBD_BRW_NOCACHE) {
- pga->flag &= ~OBD_BRW_NOCACHE;
- atomic_long_dec(&obd_dirty_transit_pages);
- cli->cl_dirty_transit--;
- }
EXIT;
}
*/
static int osc_enter_cache_try(struct client_obd *cli,
struct osc_async_page *oap,
- int bytes, int transient)
+ int bytes)
{
int rc;
if (atomic_long_add_return(1, &obd_dirty_pages) <=
obd_max_dirty_pages) {
osc_consume_write_grant(cli, &oap->oap_brw_page);
- if (transient) {
- cli->cl_dirty_transit++;
- atomic_long_inc(&obd_dirty_transit_pages);
- oap->oap_brw_flags |= OBD_BRW_NOCACHE;
- }
rc = 1;
goto out;
} else
struct osc_object *osc = oap->oap_obj;
struct lov_oinfo *loi = osc->oo_oinfo;
struct osc_cache_waiter ocw;
- struct l_wait_info lwi;
int rc = -EDQUOT;
ENTRY;
- lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(AT_OFF ? obd_timeout : at_max),
- NULL, LWI_ON_SIGNAL_NOOP, NULL);
-
OSC_DUMP_GRANT(D_CACHE, cli, "need:%d\n", bytes);
spin_lock(&cli->cl_loi_list_lock);
/* Hopefully normal case - cache space and write credits available */
if (list_empty(&cli->cl_cache_waiters) &&
- osc_enter_cache_try(cli, oap, bytes, 0)) {
+ osc_enter_cache_try(cli, oap, bytes)) {
OSC_DUMP_GRANT(D_CACHE, cli, "granted from cache\n");
GOTO(out, rc = 0);
}
CDEBUG(D_CACHE, "%s: sleeping for cache space @ %p for %p\n",
cli_name(cli), &ocw, oap);
- rc = l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
+ rc = wait_event_idle_timeout(ocw.ocw_waitq,
+ ocw_granted(cli, &ocw),
+ cfs_time_seconds(AT_OFF ?
+ obd_timeout :
+ at_max));
spin_lock(&cli->cl_loi_list_lock);
- if (rc < 0) {
+ if (rc <= 0) {
/* l_wait_event is interrupted by signal or timed out */
list_del_init(&ocw.ocw_entry);
+ if (rc == 0)
+ rc = -ETIMEDOUT;
break;
}
LASSERT(list_empty(&ocw.ocw_entry));
if (rc != -EDQUOT)
break;
- if (osc_enter_cache_try(cli, oap, bytes, 0)) {
+ if (osc_enter_cache_try(cli, oap, bytes)) {
rc = 0;
break;
}
ocw->ocw_rc = -EDQUOT;
- if (osc_enter_cache_try(cli, ocw->ocw_oap, ocw->ocw_grant, 0))
+ if (osc_enter_cache_try(cli, ocw->ocw_oap, ocw->ocw_grant))
ocw->ocw_rc = 0;
if (ocw->ocw_rc == 0 ||
spin_lock(&oap->oap_lock);
oap->oap_async_flags = 0;
spin_unlock(&oap->oap_lock);
- oap->oap_interrupted = 0;
if (oap->oap_cmd & OBD_BRW_WRITE && xid > 0) {
spin_lock(&cli->cl_loi_list_lock);
return (ext->oe_end >> ppc_bits) - (ext->oe_start >> ppc_bits) + 1;
}
+static inline bool
+can_merge(const struct osc_extent *ext, const struct osc_extent *in_rpc)
+{
+ if (ext->oe_no_merge || in_rpc->oe_no_merge)
+ return false;
+
+ if (ext->oe_srvlock != in_rpc->oe_srvlock)
+ return false;
+
+ if (ext->oe_ndelay != in_rpc->oe_ndelay)
+ return false;
+
+ if (!ext->oe_grants != !in_rpc->oe_grants)
+ return false;
+
+ if (ext->oe_dio != in_rpc->oe_dio)
+ return false;
+
+ /* It's possible to have overlap on DIO */
+ if (in_rpc->oe_dio && overlapped(ext, in_rpc))
+ return false;
+
+ return true;
+}
+
/**
* Try to add extent to one RPC. We need to think about the following things:
* - # of pages must not be over max_pages_per_rpc
{
struct osc_extent *tmp;
unsigned int chunk_count;
- struct osc_async_page *oap = list_first_entry(&ext->oe_pages,
- struct osc_async_page,
- oap_pending_item);
ENTRY;
EASSERT((ext->oe_state == OES_CACHE || ext->oe_state == OES_LOCK_DONE),
RETURN(0);
list_for_each_entry(tmp, data->erd_rpc_list, oe_link) {
- struct osc_async_page *oap2;
- oap2 = list_first_entry(&tmp->oe_pages, struct osc_async_page,
- oap_pending_item);
EASSERT(tmp->oe_owner == current, tmp);
-#if 0
- if (overlapped(tmp, ext)) {
- OSC_EXTENT_DUMP(D_ERROR, tmp, "overlapped %p.\n", ext);
- EASSERT(0, ext);
- }
-#endif
- if (oap2cl_page(oap)->cp_type != oap2cl_page(oap2)->cp_type) {
- CDEBUG(D_CACHE, "Do not permit different types of IO "
- "in one RPC\n");
- RETURN(0);
- }
- if (tmp->oe_srvlock != ext->oe_srvlock ||
- !tmp->oe_grants != !ext->oe_grants ||
- tmp->oe_ndelay != ext->oe_ndelay ||
- tmp->oe_no_merge || ext->oe_no_merge)
+ if (!can_merge(ext, tmp))
RETURN(0);
-
- /* remove break for strict check */
- break;
}
data->erd_max_extents--;
.erd_max_extents = 256,
};
- LASSERT(osc_object_is_locked(obj));
+ assert_osc_object_is_locked(obj);
while (!list_empty(&obj->oo_hp_exts)) {
ext = list_entry(obj->oo_hp_exts.next, struct osc_extent,
oe_link);
struct osc_object *osc)
__must_hold(osc)
{
- struct list_head rpclist = LIST_HEAD_INIT(rpclist);
+ LIST_HEAD(rpclist);
struct osc_extent *ext;
struct osc_extent *tmp;
struct osc_extent *first = NULL;
int rc = 0;
ENTRY;
- LASSERT(osc_object_is_locked(osc));
+ assert_osc_object_is_locked(osc);
page_count = get_write_extents(osc, &rpclist);
LASSERT(equi(page_count == 0, list_empty(&rpclist)));
{
struct osc_extent *ext;
struct osc_extent *next;
- struct list_head rpclist = LIST_HEAD_INIT(rpclist);
+ LIST_HEAD(rpclist);
struct extent_rpc_data data = {
.erd_rpc_list = &rpclist,
.erd_page_count = 0,
int rc = 0;
ENTRY;
- LASSERT(osc_object_is_locked(osc));
+ assert_osc_object_is_locked(osc);
list_for_each_entry_safe(ext, next, &osc->oo_reading_exts, oe_link) {
EASSERT(ext->oe_state == OES_LOCK_DONE, ext);
if (!try_to_add_extent_for_io(cli, ext, &data))
EXPORT_SYMBOL(osc_prep_async_page);
int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
- struct osc_page *ops)
+ struct osc_page *ops, cl_commit_cbt cb)
{
struct osc_io *oio = osc_env_io(env);
struct osc_extent *ext = NULL;
struct osc_async_page *oap = &ops->ops_oap;
struct client_obd *cli = oap->oap_cli;
struct osc_object *osc = oap->oap_obj;
+ struct pagevec *pvec = &osc_env_info(env)->oti_pagevec;
pgoff_t index;
unsigned int tmp;
unsigned int grants = 0;
/* Set the OBD_BRW_SRVLOCK before the page is queued. */
brw_flags |= ops->ops_srvlock ? OBD_BRW_SRVLOCK : 0;
- if (oio->oi_cap_sys_resource) {
+ if (oio->oi_cap_sys_resource || io->ci_noquota) {
brw_flags |= OBD_BRW_NOQUOTA;
cmd |= OBD_BRW_NOQUOTA;
}
qid[USRQUOTA] = attr->cat_uid;
qid[GRPQUOTA] = attr->cat_gid;
qid[PRJQUOTA] = attr->cat_projid;
- if (rc == 0 && osc_quota_chkdq(cli, qid) == NO_QUOTA)
+ if (rc == 0 && osc_quota_chkdq(cli, qid) == -EDQUOT)
rc = -EDQUOT;
if (rc)
RETURN(rc);
/* it doesn't need any grant to dirty this page */
spin_lock(&cli->cl_loi_list_lock);
- rc = osc_enter_cache_try(cli, oap, grants, 0);
+ rc = osc_enter_cache_try(cli, oap, grants);
if (rc == 0) { /* try failed */
grants = 0;
need_release = 1;
rc = 0;
if (grants == 0) {
- /* we haven't allocated grant for this page. */
+ /* We haven't allocated grant for this page, and we
+ * must not hold a page lock while we do enter_cache,
+ * so we must mark dirty & unlock any pages in the
+ * write commit pagevec. */
+ if (pagevec_count(pvec)) {
+ cb(env, io, pvec);
+ pagevec_reinit(pvec);
+ }
rc = osc_enter_cache(env, cli, oap, tmp);
if (rc == 0)
grants = tmp;
return rc;
}
-/**
- * this is called when a sync waiter receives an interruption. Its job is to
- * get the caller woken as soon as possible. If its page hasn't been put in an
- * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
- * desiring interruption which will forcefully complete the rpc once the rpc
- * has timed out.
- */
-int osc_cancel_async_page(const struct lu_env *env, struct osc_page *ops)
-{
- struct osc_async_page *oap = &ops->ops_oap;
- struct osc_object *obj = oap->oap_obj;
- struct client_obd *cli = osc_cli(obj);
- struct osc_extent *ext;
- struct osc_extent *found = NULL;
- struct list_head *plist;
- pgoff_t index = osc_index(ops);
- int rc = -EBUSY;
- int cmd;
- ENTRY;
-
- LASSERT(!oap->oap_interrupted);
- oap->oap_interrupted = 1;
-
- /* Find out the caching extent */
- osc_object_lock(obj);
- if (oap->oap_cmd & OBD_BRW_WRITE) {
- plist = &obj->oo_urgent_exts;
- cmd = OBD_BRW_WRITE;
- } else {
- plist = &obj->oo_reading_exts;
- cmd = OBD_BRW_READ;
- }
- list_for_each_entry(ext, plist, oe_link) {
- if (ext->oe_start <= index && ext->oe_end >= index) {
- LASSERT(ext->oe_state == OES_LOCK_DONE);
- /* For OES_LOCK_DONE state extent, it has already held
- * a refcount for RPC. */
- found = osc_extent_get(ext);
- break;
- }
- }
- if (found != NULL) {
- list_del_init(&found->oe_link);
- osc_update_pending(obj, cmd, -found->oe_nr_pages);
- osc_object_unlock(obj);
-
- osc_extent_finish(env, found, 0, -EINTR);
- osc_extent_put(env, found);
- rc = 0;
- } else {
- osc_object_unlock(obj);
- /* ok, it's been put in an rpc. only one oap gets a request
- * reference */
- if (oap->oap_request != NULL) {
- ptlrpc_mark_interrupted(oap->oap_request);
- ptlrpcd_wake(oap->oap_request);
- ptlrpc_req_finished(oap->oap_request);
- oap->oap_request = NULL;
- }
- }
-
- osc_list_maint(cli, obj);
- RETURN(rc);
-}
-
int osc_queue_sync_pages(const struct lu_env *env, const struct cl_io *io,
struct osc_object *obj, struct list_head *list,
int brw_flags)
ext->oe_obj = obj;
ext->oe_srvlock = !!(brw_flags & OBD_BRW_SRVLOCK);
ext->oe_ndelay = !!(brw_flags & OBD_BRW_NDELAY);
+ ext->oe_dio = !!(brw_flags & OBD_BRW_NOCACHE);
ext->oe_nr_pages = page_count;
ext->oe_mppr = mppr;
list_splice_init(list, &ext->oe_pages);
struct osc_extent *ext;
struct osc_extent *waiting = NULL;
pgoff_t index;
- struct list_head list = LIST_HEAD_INIT(list);
+ LIST_HEAD(list);
int result = 0;
bool partial;
ENTRY;
pgoff_t start, pgoff_t end, int hp, int discard)
{
struct osc_extent *ext;
- struct list_head discard_list = LIST_HEAD_INIT(discard_list);
+ LIST_HEAD(discard_list);
bool unplug = false;
int result = 0;
ENTRY;