* GPL HEADER END
*/
/*
- * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
+ * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*/
/*
* Author: Nikita Danilov <nikita.danilov@sun.com>
*/
-/** \addtogroup osc osc @{ */
-
#define DEBUG_SUBSYSTEM S_OSC
#include "osc_cl_internal.h"
+/** \addtogroup osc
+ * @{
+ */
+
+/*
+ * Comment out osc_page_protected because it may sleep inside the
+ * the client_obd_list_lock.
+ * client_obd_list_lock -> osc_ap_completion -> osc_completion ->
+ * -> osc_page_protected -> osc_page_is_dlocked -> osc_match_base
+ * -> ldlm_lock_match -> sptlrpc_import_check_ctx -> sleep.
+ */
+#if 0
static int osc_page_is_dlocked(const struct lu_env *env,
const struct osc_page *opg,
enum cl_lock_mode mode, int pending, int unref)
ldlm_mode_t dlmmode;
int flags;
+ cfs_might_sleep();
+
info = osc_env_info(env);
resname = &info->oti_resname;
policy = &info->oti_policy;
descr->cld_mode = mode;
descr->cld_start = page->cp_index;
descr->cld_end = page->cp_index;
- spin_lock(&hdr->coh_lock_guard);
- list_for_each_entry(scan, &hdr->coh_locks, cll_linkage) {
+ cfs_spin_lock(&hdr->coh_lock_guard);
+ cfs_list_for_each_entry(scan, &hdr->coh_locks, cll_linkage) {
/*
* Lock-less sub-lock has to be either in HELD state
* (when io is actively going on), or in CACHED state,
break;
}
}
- spin_unlock(&hdr->coh_lock_guard);
+ cfs_spin_unlock(&hdr->coh_lock_guard);
}
return result;
}
+#else
+static int osc_page_protected(const struct lu_env *env,
+ const struct osc_page *opg,
+ enum cl_lock_mode mode, int unref)
+{
+ return 1;
+}
+#endif
/*****************************************************************************
*
{
struct osc_page *opg = cl2osc_page(slice);
CDEBUG(D_TRACE, "%p\n", opg);
+ LASSERT(opg->ops_lock == NULL);
OBD_SLAB_FREE_PTR(opg, osc_page_kmem);
}
LINVRNT(cl_page_is_vmlocked(env, opg->ops_cl.cpl_page));
obj = cl2osc(opg->ops_cl.cpl_obj);
- spin_lock(&obj->oo_seatbelt);
- list_add(&opg->ops_inflight, &obj->oo_inflight[crt]);
+ cfs_spin_lock(&obj->oo_seatbelt);
+ cfs_list_add(&opg->ops_inflight, &obj->oo_inflight[crt]);
opg->ops_submitter = cfs_current();
- spin_unlock(&obj->oo_seatbelt);
+ cfs_spin_unlock(&obj->oo_seatbelt);
}
static int osc_page_cache_add(const struct lu_env *env,
const struct cl_page_slice *slice,
- struct cl_io *_)
+ struct cl_io *unused)
{
struct osc_page *opg = cl2osc_page(slice);
struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj);
- struct osc_io *oio = osc_env_io(env);
int result;
- int brw_flags;
+ /* All cacheable IO is async-capable */
+ int brw_flags = OBD_BRW_ASYNC;
int noquota = 0;
LINVRNT(osc_page_protected(env, opg, CLM_WRITE, 0));
ENTRY;
/* Set the OBD_BRW_SRVLOCK before the page is queued. */
- brw_flags = osc_io_srvlock(oio) ? OBD_BRW_SRVLOCK : 0;
+ brw_flags |= opg->ops_srvlock ? OBD_BRW_SRVLOCK : 0;
if (!client_is_remote(osc_export(obj)) &&
cfs_capable(CFS_CAP_SYS_RESOURCE)) {
brw_flags |= OBD_BRW_NOQUOTA;
policy->l_extent.end = cl_offset(obj, end + 1) - 1;
}
+static int osc_page_addref_lock(const struct lu_env *env,
+ struct osc_page *opg,
+ struct cl_lock *lock)
+{
+ struct osc_lock *olock;
+ int rc;
+
+ LASSERT(opg->ops_lock == NULL);
+
+ olock = osc_lock_at(lock);
+ if (cfs_atomic_inc_return(&olock->ols_pageref) <= 0) {
+ cfs_atomic_dec(&olock->ols_pageref);
+ cl_lock_put(env, lock);
+ rc = 1;
+ } else {
+ opg->ops_lock = lock;
+ rc = 0;
+ }
+ return rc;
+}
+
+static void osc_page_putref_lock(const struct lu_env *env,
+ struct osc_page *opg)
+{
+ struct cl_lock *lock = opg->ops_lock;
+ struct osc_lock *olock;
+
+ LASSERT(lock != NULL);
+ olock = osc_lock_at(lock);
+
+ cfs_atomic_dec(&olock->ols_pageref);
+ opg->ops_lock = NULL;
+
+ /*
+ * Note: usually this won't be the last reference of the lock, but if
+ * it is, then all the lock_put do is at most just freeing some memory,
+ * so it would be OK that caller is holding spinlocks.
+ */
+ LASSERT(cfs_atomic_read(&lock->cll_ref) > 1 || olock->ols_hold == 0);
+ cl_lock_put(env, lock);
+}
+
static int osc_page_is_under_lock(const struct lu_env *env,
const struct cl_page_slice *slice,
- struct cl_io *_)
+ struct cl_io *unused)
{
struct cl_lock *lock;
int result;
ENTRY;
lock = cl_lock_at_page(env, slice->cpl_obj, slice->cpl_page,
NULL, 1, 0);
- if (lock != NULL) {
- cl_lock_put(env, lock);
+ if (lock != NULL &&
+ osc_page_addref_lock(env, cl2osc_page(slice), lock) == 0)
result = -EBUSY;
- } else
+ else
result = -ENODATA;
RETURN(result);
}
+static void osc_page_disown(const struct lu_env *env,
+ const struct cl_page_slice *slice,
+ struct cl_io *io)
+{
+ struct osc_page *opg = cl2osc_page(slice);
+
+ if (unlikely(opg->ops_lock))
+ osc_page_putref_lock(env, opg);
+}
+
+static void osc_page_completion_read(const struct lu_env *env,
+ const struct cl_page_slice *slice,
+ int ioret)
+{
+ struct osc_page *opg = cl2osc_page(slice);
+
+ if (likely(opg->ops_lock))
+ osc_page_putref_lock(env, opg);
+}
+
static int osc_page_fail(const struct lu_env *env,
- const struct cl_page_slice *slice, struct cl_io *_)
+ const struct cl_page_slice *slice,
+ struct cl_io *unused)
{
/*
* Cached read?
}
-static const char *osc_list(struct list_head *head)
+static const char *osc_list(cfs_list_t *head)
+{
+ return cfs_list_empty(head) ? "-" : "+";
+}
+
+static inline cfs_time_t osc_submit_duration(struct osc_page *opg)
{
- return list_empty(head) ? "-" : "+";
+ if (opg->ops_submit_time == 0)
+ return 0;
+
+ return (cfs_time_current() - opg->ops_submit_time);
}
static int osc_page_print(const struct lu_env *env,
{
struct osc_page *opg = cl2osc_page(slice);
struct osc_async_page *oap = &opg->ops_oap;
+ struct osc_object *obj = cl2osc(slice->cpl_obj);
+ struct client_obd *cli = &osc_export(obj)->exp_obd->u.cli;
+ struct lov_oinfo *loi = obj->oo_oinfo;
return (*printer)(env, cookie, LUSTRE_OSC_NAME"-page@%p: "
- "< %#x %d %u %s %s %s >"
- "< %llu %u %#x %#x %p %p %p %p %p >"
- "< %s %p %d >\n",
+ "1< %#x %d %u %s %s %s > "
+ "2< "LPU64" %u %#x %#x | %p %p %p %p %p > "
+ "3< %s %p %d %lu %d > "
+ "4< %d %d %d %lu %s | %s %s %s %s > "
+ "5< %s %s %s %s | %d %s %s | %d %s %s>\n",
opg,
/* 1 */
oap->oap_magic, oap->oap_cmd,
oap->oap_caller_data,
/* 3 */
osc_list(&opg->ops_inflight),
- opg->ops_submitter, opg->ops_transfer_pinned);
+ opg->ops_submitter, opg->ops_transfer_pinned,
+ osc_submit_duration(opg), opg->ops_srvlock,
+ /* 4 */
+ cli->cl_r_in_flight, cli->cl_w_in_flight,
+ cli->cl_max_rpcs_in_flight,
+ cli->cl_avail_grant,
+ osc_list(&cli->cl_cache_waiters),
+ osc_list(&cli->cl_loi_ready_list),
+ osc_list(&cli->cl_loi_hp_ready_list),
+ osc_list(&cli->cl_loi_write_list),
+ osc_list(&cli->cl_loi_read_list),
+ /* 5 */
+ osc_list(&loi->loi_ready_item),
+ osc_list(&loi->loi_hp_ready_item),
+ osc_list(&loi->loi_write_item),
+ osc_list(&loi->loi_read_item),
+ loi->loi_read_lop.lop_num_pending,
+ osc_list(&loi->loi_read_lop.lop_pending),
+ osc_list(&loi->loi_read_lop.lop_urgent),
+ loi->loi_write_lop.lop_num_pending,
+ osc_list(&loi->loi_write_lop.lop_pending),
+ osc_list(&loi->loi_write_lop.lop_urgent));
}
static void osc_page_delete(const struct lu_env *env,
"Trying to teardown failed: %d\n", rc);
LASSERT(0);
}
- spin_lock(&obj->oo_seatbelt);
- list_del_init(&opg->ops_inflight);
- spin_unlock(&obj->oo_seatbelt);
+ cfs_spin_lock(&obj->oo_seatbelt);
+ cfs_list_del_init(&opg->ops_inflight);
+ cfs_spin_unlock(&obj->oo_seatbelt);
EXIT;
}
opg->ops_from = from;
opg->ops_to = to;
+ cfs_spin_lock(&oap->oap_lock);
oap->oap_async_flags |= ASYNC_COUNT_STABLE;
+ cfs_spin_unlock(&oap->oap_lock);
}
static int osc_page_cancel(const struct lu_env *env,
.cpo_print = osc_page_print,
.cpo_delete = osc_page_delete,
.cpo_is_under_lock = osc_page_is_under_lock,
+ .cpo_disown = osc_page_disown,
.io = {
[CRT_READ] = {
- .cpo_cache_add = osc_page_fail
+ .cpo_cache_add = osc_page_fail,
+ .cpo_completion = osc_page_completion_read
},
[CRT_WRITE] = {
- .cpo_cache_add = osc_page_cache_add
+ .cpo_cache_add = osc_page_cache_add
}
},
.cpo_clip = osc_page_clip,
ENTRY;
result = cl_page_make_ready(env, page, CRT_WRITE);
+ if (result == 0)
+ opg->ops_submit_time = cfs_time_current();
RETURN(result);
}
LASSERT(page->cp_req == NULL);
/* As the transfer for this page is being done, clear the flags */
+ cfs_spin_lock(&oap->oap_lock);
oap->oap_async_flags = 0;
+ cfs_spin_unlock(&oap->oap_lock);
crt = cmd == OBD_BRW_READ ? CRT_READ : CRT_WRITE;
/* Clear opg->ops_transfer_pinned before VM lock is released. */
opg->ops_transfer_pinned = 0;
- spin_lock(&obj->oo_seatbelt);
+ cfs_spin_lock(&obj->oo_seatbelt);
LASSERT(opg->ops_submitter != NULL);
- LASSERT(!list_empty(&opg->ops_inflight));
- list_del_init(&opg->ops_inflight);
- spin_unlock(&obj->oo_seatbelt);
+ LASSERT(!cfs_list_empty(&opg->ops_inflight));
+ cfs_list_del_init(&opg->ops_inflight);
+ cfs_spin_unlock(&obj->oo_seatbelt);
+
+ opg->ops_submit_time = 0;
cl_page_completion(env, page, crt, rc);
cl_offset(obj, page->cp_index),
&osc_async_page_ops,
opg, (void **)&oap, 1, NULL);
- if (result == 0)
+ if (result == 0) {
+ struct osc_io *oio = osc_env_io(env);
+ opg->ops_srvlock = osc_io_srvlock(oio);
cl_page_slice_add(page, &opg->ops_cl, obj,
&osc_page_ops);
+ }
/*
* Cannot assert osc_page_protected() here as read-ahead
* creates temporary pages outside of a lock.
{
struct osc_async_page *oap = &opg->ops_oap;
struct client_obd *cli = oap->oap_cli;
+ int flags = 0;
LINVRNT(osc_page_protected(env, opg,
crt == CRT_WRITE ? CLM_WRITE : CLM_READ, 1));
oap->oap_page_off = opg->ops_from;
oap->oap_count = opg->ops_to - opg->ops_from;
+ /* Give a hint to OST that requests are coming from kswapd - bug19529 */
+ if (cfs_memory_pressure_get())
+ oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
oap->oap_brw_flags |= OBD_BRW_SYNC;
if (osc_io_srvlock(oio))
oap->oap_brw_flags |= OBD_BRW_SRVLOCK;
oap->oap_cmd |= OBD_BRW_NOQUOTA;
}
- oap->oap_async_flags |= OSC_FLAGS;
if (oap->oap_cmd & OBD_BRW_READ)
- oap->oap_async_flags |= ASYNC_COUNT_STABLE;
+ flags = ASYNC_COUNT_STABLE;
else if (!(oap->oap_brw_page.flag & OBD_BRW_FROM_GRANT))
osc_enter_cache_try(env, cli, oap->oap_loi, oap, 1);
+ cfs_spin_lock(&oap->oap_lock);
+ oap->oap_async_flags |= OSC_FLAGS | flags;
+ cfs_spin_unlock(&oap->oap_lock);
+
osc_oap_to_pending(oap);
osc_page_transfer_get(opg, "transfer\0imm");
osc_page_transfer_add(env, opg, crt);