/*
* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
+ *
+ * Copyright (c) 2011 Whamcloud, Inc.
+ *
*/
/*
* This file is part of Lustre, http://www.lustre.org/
{
struct osc_page *opg = cl2osc_page(slice);
CDEBUG(D_TRACE, "%p\n", opg);
+ LASSERT(opg->ops_lock == NULL);
OBD_SLAB_FREE_PTR(opg, osc_page_kmem);
}
struct osc_page *opg = cl2osc_page(slice);
struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj);
int result;
- int brw_flags;
+ /* All cacheable IO is async-capable */
+ int brw_flags = OBD_BRW_ASYNC;
int noquota = 0;
LINVRNT(osc_page_protected(env, opg, CLM_WRITE, 0));
ENTRY;
/* Set the OBD_BRW_SRVLOCK before the page is queued. */
- brw_flags = opg->ops_srvlock ? OBD_BRW_SRVLOCK : 0;
+ brw_flags |= opg->ops_srvlock ? OBD_BRW_SRVLOCK : 0;
if (!client_is_remote(osc_export(obj)) &&
cfs_capable(CFS_CAP_SYS_RESOURCE)) {
brw_flags |= OBD_BRW_NOQUOTA;
osc_page_transfer_get(opg, "transfer\0cache");
result = osc_queue_async_io(env, osc_export(obj), NULL, obj->oo_oinfo,
&opg->ops_oap, OBD_BRW_WRITE | noquota,
- 0, 0, brw_flags, 0);
+ opg->ops_from, opg->ops_to - opg->ops_from,
+ brw_flags, 0);
if (result != 0)
osc_page_transfer_put(env, opg);
else
policy->l_extent.end = cl_offset(obj, end + 1) - 1;
}
+static int osc_page_addref_lock(const struct lu_env *env,
+ struct osc_page *opg,
+ struct cl_lock *lock)
+{
+ struct osc_lock *olock;
+ int rc;
+
+ LASSERT(opg->ops_lock == NULL);
+
+ olock = osc_lock_at(lock);
+ if (cfs_atomic_inc_return(&olock->ols_pageref) <= 0) {
+ cfs_atomic_dec(&olock->ols_pageref);
+ cl_lock_put(env, lock);
+ rc = 1;
+ } else {
+ opg->ops_lock = lock;
+ rc = 0;
+ }
+ return rc;
+}
+
+static void osc_page_putref_lock(const struct lu_env *env,
+ struct osc_page *opg)
+{
+ struct cl_lock *lock = opg->ops_lock;
+ struct osc_lock *olock;
+
+ LASSERT(lock != NULL);
+ olock = osc_lock_at(lock);
+
+ cfs_atomic_dec(&olock->ols_pageref);
+ opg->ops_lock = NULL;
+
+ /*
+ * Note: usually this won't be the last reference of the lock, but if
+ * it is, then all the lock_put do is at most just freeing some memory,
+ * so it would be OK that caller is holding spinlocks.
+ */
+ LASSERT(cfs_atomic_read(&lock->cll_ref) > 1 || olock->ols_hold == 0);
+ cl_lock_put(env, lock);
+}
+
static int osc_page_is_under_lock(const struct lu_env *env,
const struct cl_page_slice *slice,
struct cl_io *unused)
ENTRY;
lock = cl_lock_at_page(env, slice->cpl_obj, slice->cpl_page,
NULL, 1, 0);
- if (lock != NULL) {
- cl_lock_put(env, lock);
+ if (lock != NULL &&
+ osc_page_addref_lock(env, cl2osc_page(slice), lock) == 0)
result = -EBUSY;
- } else
+ else
result = -ENODATA;
RETURN(result);
}
+static void osc_page_disown(const struct lu_env *env,
+ const struct cl_page_slice *slice,
+ struct cl_io *io)
+{
+ struct osc_page *opg = cl2osc_page(slice);
+
+ if (unlikely(opg->ops_lock))
+ osc_page_putref_lock(env, opg);
+}
+
+static void osc_page_completion_read(const struct lu_env *env,
+ const struct cl_page_slice *slice,
+ int ioret)
+{
+ struct osc_page *opg = cl2osc_page(slice);
+
+ if (likely(opg->ops_lock))
+ osc_page_putref_lock(env, opg);
+}
+
static int osc_page_fail(const struct lu_env *env,
const struct cl_page_slice *slice,
struct cl_io *unused)
return (*printer)(env, cookie, LUSTRE_OSC_NAME"-page@%p: "
"1< %#x %d %u %s %s %s > "
- "2< "LPU64" %u %#x %#x | %p %p %p %p %p > "
+ "2< "LPU64" %u %u %#x %#x | %p %p %p %p %p > "
"3< %s %p %d %lu %d > "
"4< %d %d %d %lu %s | %s %s %s %s > "
"5< %s %s %s %s | %d %s %s | %d %s %s>\n",
osc_list(&oap->oap_urgent_item),
osc_list(&oap->oap_rpc_item),
/* 2 */
- oap->oap_obj_off, oap->oap_page_off,
+ oap->oap_obj_off, oap->oap_page_off, oap->oap_count,
oap->oap_async_flags, oap->oap_brw_flags,
oap->oap_request,
oap->oap_cli, oap->oap_loi, oap->oap_caller_ops,
.cpo_print = osc_page_print,
.cpo_delete = osc_page_delete,
.cpo_is_under_lock = osc_page_is_under_lock,
+ .cpo_disown = osc_page_disown,
.io = {
[CRT_READ] = {
- .cpo_cache_add = osc_page_fail
+ .cpo_cache_add = osc_page_fail,
+ .cpo_completion = osc_page_completion_read
},
[CRT_WRITE] = {
- .cpo_cache_add = osc_page_cache_add
+ .cpo_cache_add = osc_page_cache_add
}
},
.cpo_clip = osc_page_clip,
if (rc == 0 && oap->oap_brw_flags & OBD_BRW_SRVLOCK) {
struct lu_device *ld = opg->ops_cl.cpl_obj->co_lu.lo_dev;
struct osc_stats *stats = &lu2osc_dev(ld)->od_stats;
- int bytes = opg->ops_to - opg->ops_from;
+ int bytes = oap->oap_count;
if (crt == CRT_READ)
stats->os_lockless_reads += bytes;