/**
* Invariant that has to be true all of the time.
*/
-static int osc_lock_invariant(struct osc_lock *ols)
+static inline int osc_lock_invariant(struct osc_lock *ols)
{
struct ldlm_lock *lock = osc_handle_ptr(&ols->ols_handle);
struct ldlm_lock *olock = ols->ols_dlmlock;
/* extend the lock extent, otherwise it will have problem when
* we decide whether to grant a lockless lock. */
descr->cld_mode = osc_ldlm2cl_lock(dlmlock->l_granted_mode);
- descr->cld_start = cl_index(descr->cld_obj, ext->start);
- descr->cld_end = cl_index(descr->cld_obj, ext->end);
+ descr->cld_start = ext->start >> PAGE_SHIFT;
+ descr->cld_end = ext->end >> PAGE_SHIFT;
descr->cld_gid = ext->gid;
/* no lvb update for matched lock */
osc_lock_granted(env, oscl, lockh);
/* Error handling, some errors are tolerable. */
- if (oscl->ols_locklessable && rc == -EUSERS) {
- /* This is a tolerable error, turn this lock into
- * lockless lock.
- */
- osc_object_set_contended(cl2osc(slice->cls_obj));
- LASSERT(slice->cls_ops != oscl->ols_lockless_ops);
-
- /* Change this lock to ldlmlock-less lock. */
- osc_lock_to_lockless(env, oscl, 1);
- oscl->ols_state = OLS_GRANTED;
- rc = 0;
- } else if (oscl->ols_glimpse && rc == -ENAVAIL) {
+ if (oscl->ols_glimpse && rc == -ENAVAIL) {
LASSERT(oscl->ols_flags & LDLM_FL_LVB_READY);
osc_lock_lvb_update(env, cl2osc(slice->cls_obj),
NULL, &oscl->ols_lvb);
unlock_res_and_lock(dlmlock);
- OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_CANCEL, 5);
+ CFS_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_CANCEL, 5);
/* if l_ast_data is NULL, the dlmlock was enqueued by AGL or
* the object has been destroyed. */
/* Destroy pages covered by the extent of the DLM lock */
result = osc_lock_flush(cl2osc(obj),
- cl_index(obj, extent->start),
- cl_index(obj, extent->end),
+ extent->start >> PAGE_SHIFT,
+ extent->end >> PAGE_SHIFT,
mode, discard);
/* losing a lock, update kms */
dlmlock = NULL;
if (obj == NULL && res->lr_type == LDLM_EXTENT) {
- if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_SIZE_DATA))
+ if (CFS_FAIL_CHECK(OBD_FAIL_OSC_NO_SIZE_DATA))
break;
lock_res(res);
EXPORT_SYMBOL(osc_ldlm_glimpse_ast);
static bool weigh_cb(const struct lu_env *env, struct cl_io *io,
- struct osc_page *ops, void *cbdata)
+ void **pvec, int count, void *cbdata)
{
- struct cl_page *page = ops->ops_cl.cpl_page;
+ int i;
+
+ for (i = 0; i < count; i++) {
+ struct osc_page *ops = pvec[i];
+ struct cl_page *page = ops->ops_cl.cpl_page;
- if (cl_page_is_vmlocked(env, page) || PageDirty(page->cp_vmpage) ||
- PageWriteback(page->cp_vmpage))
- return false;
+ if (PageLocked(page->cp_vmpage) ||
+ PageDirty(page->cp_vmpage) ||
+ PageWriteback(page->cp_vmpage))
+ return false;
- *(pgoff_t *)cbdata = osc_index(ops) + 1;
+ *(pgoff_t *)cbdata = osc_index(ops) + 1;
+ }
return true;
}
if (result != 0)
RETURN(1);
- page_index = cl_index(obj, start);
+ page_index = start >> PAGE_SHIFT;
if (!osc_page_gang_lookup(env, io, oscobj,
- page_index, cl_index(obj, end),
+ page_index, end >> PAGE_SHIFT,
weigh_cb, (void *)&page_index))
result = 1;
cl_io_fini(env, io);
struct cl_io *io = oio->oi_cl.cis_io;
struct cl_object *obj = slice->cls_obj;
struct osc_object *oob = cl2osc(obj);
- const struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev);
struct obd_connect_data *ocd;
LASSERT(ols->ols_state == OLS_NEW ||
(io->ci_lockreq == CILR_MAYBE) &&
(ocd->ocd_connect_flags &
OBD_CONNECT_SRVLOCK);
- if (io->ci_lockreq == CILR_NEVER ||
- /* lockless IO */
- (ols->ols_locklessable && osc_object_is_contended(oob)) ||
- /* lockless truncate */
- (cl_io_is_trunc(io) && osd->od_lockless_truncate &&
- (ocd->ocd_connect_flags & OBD_CONNECT_TRUNCLOCK))) {
+ if (io->ci_lockreq == CILR_NEVER) {
ols->ols_locklessable = 1;
slice->cls_ops = ols->ols_lockless_ops;
}
void osc_lock_wake_waiters(const struct lu_env *env, struct osc_object *osc,
struct osc_lock *oscl)
{
+ struct osc_lock *scan;
+
spin_lock(&osc->oo_ol_spin);
list_del_init(&oscl->ols_nextlock_oscobj);
spin_unlock(&osc->oo_ol_spin);
spin_lock(&oscl->ols_lock);
- while (!list_empty(&oscl->ols_waiting_list)) {
- struct osc_lock *scan;
-
- scan = list_entry(oscl->ols_waiting_list.next, struct osc_lock,
- ols_wait_entry);
+ while ((scan = list_first_entry_or_null(&oscl->ols_waiting_list,
+ struct osc_lock,
+ ols_wait_entry)) != NULL) {
list_del_init(&scan->ols_wait_entry);
cl_sync_io_note(env, scan->ols_owner, 0);
ENTRY;
LASSERTF(ergo(oscl->ols_glimpse, lock->cll_descr.cld_mode <= CLM_READ),
- "lock = %p, ols = %p\n", lock, oscl);
+ "lock = %px, ols = %px\n", lock, oscl);
if (oscl->ols_state == OLS_GRANTED)
RETURN(0);
if (osc_lock_is_lockless(oscl)) {
oio->oi_lockless = 1;
} else if (!async) {
- if (OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_IDLE_RACE)) {
- OBD_RACE(OBD_FAIL_PTLRPC_IDLE_RACE);
+ if (CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_IDLE_RACE)) {
+ CFS_RACE(OBD_FAIL_PTLRPC_IDLE_RACE);
set_current_state(TASK_UNINTERRUPTIBLE);
schedule_timeout(cfs_time_seconds(1) / 2);
}
{
struct osc_lock *ols = cl2osc_lock(slice);
struct osc_object *osc = cl2osc(slice->cls_obj);
- struct cl_lock_descr *descr = &slice->cls_lock->cll_descr;
- int result;
LASSERT(ols->ols_dlmlock == NULL);
- result = osc_lock_flush(osc, descr->cld_start, descr->cld_end,
- descr->cld_mode, false);
- if (result)
- CERROR("Pages for lockless lock %p were not purged(%d)\n",
- ols, result);
-
osc_lock_wake_waiters(env, osc, ols);
}
return;
if (likely(io->ci_type == CIT_WRITE)) {
- io_start = cl_index(obj, io->u.ci_rw.crw_pos);
- io_end = cl_index(obj, io->u.ci_rw.crw_pos +
- io->u.ci_rw.crw_count - 1);
+ io_start = io->u.ci_rw.crw_pos >> PAGE_SHIFT;
+ io_end = (io->u.ci_rw.crw_pos +
+ io->u.ci_rw.crw_bytes - 1) >> PAGE_SHIFT;
} else {
LASSERT(cl_io_is_mkwrite(io));
io_start = io_end = io->u.ci_fault.ft_index;
}
EXPORT_SYMBOL(osc_lock_set_writer);
+void osc_lock_set_reader(const struct lu_env *env, const struct cl_io *io,
+ struct cl_object *obj, struct osc_lock *oscl)
+{
+ struct osc_io *oio = osc_env_io(env);
+
+ if (!cl_object_same(io->ci_obj, obj))
+ return;
+
+ if (oscl->ols_glimpse || osc_lock_is_lockless(oscl))
+ return;
+
+ if (oio->oi_read_osclock == NULL)
+ oio->oi_read_osclock = oscl;
+}
+EXPORT_SYMBOL(osc_lock_set_reader);
+
int osc_lock_init(const struct lu_env *env,
struct cl_object *obj, struct cl_lock *lock,
const struct cl_io *io)
oscl->ols_flags = osc_enq2ldlm_flags(enqflags);
oscl->ols_speculative = !!(enqflags & CEF_SPECULATIVE);
- if (lock->cll_descr.cld_mode == CLM_GROUP)
- oscl->ols_flags |= LDLM_FL_ATOMIC_CB;
if (oscl->ols_flags & LDLM_FL_HAS_INTENT) {
oscl->ols_flags |= LDLM_FL_BLOCK_GRANTED;
if (!(enqflags & CEF_MUST))
/* try to convert this lock to a lockless lock */
osc_lock_to_lockless(env, oscl, (enqflags & CEF_NEVER));
- if (oscl->ols_locklessable && !(enqflags & CEF_DISCARD_DATA))
- oscl->ols_flags |= LDLM_FL_DENY_ON_CONTENTION;
if (io->ci_type == CIT_WRITE || cl_io_is_mkwrite(io))
osc_lock_set_writer(env, io, obj, oscl);
+ else if (io->ci_type == CIT_READ ||
+ (io->ci_type == CIT_FAULT && !io->u.ci_fault.ft_mkwrite))
+ osc_lock_set_reader(env, io, obj, oscl);
LDLM_DEBUG_NOLOCK("lock %p, osc lock %p, flags %#llx",
lock, oscl, oscl->ols_flags);