struct lustre_handle ols_handle;
struct ldlm_enqueue_info ols_einfo;
enum osc_lock_state ols_state;
+
+ /**
+ * How many pages are using this lock for io, currently only used by
+ * read-ahead. If non-zero, the underlying dlm lock won't be cancelled
+ * during recovery to avoid deadlock. see bz16774.
+ *
+ * \see osc_page::ops_lock
+ * \see osc_page_addref_lock(), osc_page_putref_lock()
+ */
+ cfs_atomic_t ols_pageref;
+
/**
* true, if ldlm_lock_addref() was called against
* osc_lock::ols_lock. This is used for sanity checking.
* Submit time - the time when the page is starting RPC. For debugging.
*/
cfs_time_t ops_submit_time;
+
+ /**
+ * A lock of which we hold a reference covers this page. Only used by
+ * read-ahead: for a readahead page, we hold it's covering lock to
+ * prevent it from being canceled during recovery.
+ *
+ * \see osc_lock::ols_pageref
+ * \see osc_page_addref_lock(), osc_page_putref_lock().
+ */
+ struct cl_lock *ops_lock;
};
extern cfs_mem_cache_t *osc_page_kmem;
return container_of0(d->obd_lu_dev, struct osc_device, od_cl.cd_lu_dev);
}
+int osc_dlm_lock_pageref(struct ldlm_lock *dlm);
#endif /* OSC_INTERNAL_H */
* @{
*/
+#define _PAGEREF_MAGIC (-10000000)
+
/*****************************************************************************
*
* Type conversions.
*/
osc_lock_unhold(ols);
LASSERT(ols->ols_lock == NULL);
+ LASSERT(cfs_atomic_read(&ols->ols_pageref) == 0 ||
+ cfs_atomic_read(&ols->ols_pageref) == _PAGEREF_MAGIC);
OBD_SLAB_FREE_PTR(ols, osc_lock_kmem);
}
OBD_SLAB_ALLOC_PTR_GFP(clk, osc_lock_kmem, CFS_ALLOC_IO);
if (clk != NULL) {
osc_lock_build_einfo(env, lock, clk, &clk->ols_einfo);
+ cfs_atomic_set(&clk->ols_pageref, 0);
clk->ols_state = OLS_NEW;
cl_lock_slice_add(lock, &clk->ols_cl, obj, &osc_lock_ops);
result = 0;
return result;
}
+int osc_dlm_lock_pageref(struct ldlm_lock *dlm)
+{
+ struct osc_lock *olock;
+ int rc = 0;
+
+ cfs_spin_lock(&osc_ast_guard);
+ olock = dlm->l_ast_data;
+ /*
+ * there's a very rare race with osc_page_addref_lock(), but that
+ * doesn't matter because in the worst case we don't cancel a lock
+ * which we actually can, that's no harm.
+ */
+ if (olock != NULL &&
+ cfs_atomic_add_return(_PAGEREF_MAGIC,
+ &olock->ols_pageref) != _PAGEREF_MAGIC) {
+ cfs_atomic_sub(_PAGEREF_MAGIC, &olock->ols_pageref);
+ rc = 1;
+ }
+ cfs_spin_unlock(&osc_ast_guard);
+ return rc;
+}
/** @} osc */
{
struct osc_page *opg = cl2osc_page(slice);
CDEBUG(D_TRACE, "%p\n", opg);
+ LASSERT(opg->ops_lock == NULL);
OBD_SLAB_FREE_PTR(opg, osc_page_kmem);
}
policy->l_extent.end = cl_offset(obj, end + 1) - 1;
}
+static int osc_page_addref_lock(const struct lu_env *env,
+ struct osc_page *opg,
+ struct cl_lock *lock)
+{
+ struct osc_lock *olock;
+ int rc;
+
+ LASSERT(opg->ops_lock == NULL);
+
+ olock = osc_lock_at(lock);
+ if (cfs_atomic_inc_return(&olock->ols_pageref) <= 0) {
+ cfs_atomic_dec(&olock->ols_pageref);
+ cl_lock_put(env, lock);
+ rc = 1;
+ } else {
+ opg->ops_lock = lock;
+ rc = 0;
+ }
+ return rc;
+}
+
+static void osc_page_putref_lock(const struct lu_env *env,
+ struct osc_page *opg)
+{
+ struct cl_lock *lock = opg->ops_lock;
+ struct osc_lock *olock;
+
+ LASSERT(lock != NULL);
+ olock = osc_lock_at(lock);
+
+ cfs_atomic_dec(&olock->ols_pageref);
+ opg->ops_lock = NULL;
+
+ /*
+ * Note: usually this won't be the last reference of the lock, but if
+ * it is, then all the lock_put do is at most just freeing some memory,
+ * so it would be OK that caller is holding spinlocks.
+ */
+ LASSERT(cfs_atomic_read(&lock->cll_ref) > 1 || olock->ols_hold == 0);
+ cl_lock_put(env, lock);
+}
+
static int osc_page_is_under_lock(const struct lu_env *env,
const struct cl_page_slice *slice,
struct cl_io *unused)
ENTRY;
lock = cl_lock_at_page(env, slice->cpl_obj, slice->cpl_page,
NULL, 1, 0);
- if (lock != NULL) {
- cl_lock_put(env, lock);
+ if (lock != NULL &&
+ osc_page_addref_lock(env, cl2osc_page(slice), lock) == 0)
result = -EBUSY;
- } else
+ else
result = -ENODATA;
RETURN(result);
}
+static void osc_page_disown(const struct lu_env *env,
+ const struct cl_page_slice *slice,
+ struct cl_io *io)
+{
+ struct osc_page *opg = cl2osc_page(slice);
+
+ if (unlikely(opg->ops_lock))
+ osc_page_putref_lock(env, opg);
+}
+
+static void osc_page_completion_read(const struct lu_env *env,
+ const struct cl_page_slice *slice,
+ int ioret)
+{
+ struct osc_page *opg = cl2osc_page(slice);
+
+ if (likely(opg->ops_lock))
+ osc_page_putref_lock(env, opg);
+}
+
static int osc_page_fail(const struct lu_env *env,
const struct cl_page_slice *slice,
struct cl_io *unused)
.cpo_print = osc_page_print,
.cpo_delete = osc_page_delete,
.cpo_is_under_lock = osc_page_is_under_lock,
+ .cpo_disown = osc_page_disown,
.io = {
[CRT_READ] = {
- .cpo_cache_add = osc_page_fail
+ .cpo_cache_add = osc_page_fail,
+ .cpo_completion = osc_page_completion_read
},
[CRT_WRITE] = {
- .cpo_cache_add = osc_page_cache_add
+ .cpo_cache_add = osc_page_cache_add
}
},
.cpo_clip = osc_page_clip,
RETURN(rc);
}
+/**
+ * Determine whether the lock can be canceled before replaying the lock
+ * during recovery, see bug16774 for detailed information.
+ *
+ * \retval zero the lock can't be canceled
+ * \retval other ok to cancel
+ */
+static int osc_cancel_for_recovery(struct ldlm_lock *lock)
+{
+ check_res_locked(lock->l_resource);
+
+ /*
+ * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
+ *
+ * XXX as a future improvement, we can also cancel unused write lock
+ * if it doesn't have dirty data and active mmaps.
+ */
+ if (lock->l_resource->lr_type == LDLM_EXTENT &&
+ (lock->l_granted_mode == LCK_PR ||
+ lock->l_granted_mode == LCK_CR) &&
+ (osc_dlm_lock_pageref(lock) == 0))
+ RETURN(1);
+
+ RETURN(0);
+}
+
int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
{
int rc;
CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
cfs_sema_init(&cli->cl_grant_sem, 1);
+
+ ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
}
RETURN(rc);
}
run_test 85a "check the cancellation of unused locks during recovery(IBITS)"
+test_85b() { #bug 16774
+ lctl set_param -n ldlm.cancel_unused_locks_before_replay "1"
+
+ lfs setstripe -o 0 -c 1 $DIR
+
+ for i in `seq 100`; do
+ dd if=/dev/urandom of=$DIR/$tfile-$i bs=4096 count=32 >/dev/null 2>&1
+ done
+
+ cancel_lru_locks osc
+
+ for i in `seq 100`; do
+ dd if=$DIR/$tfile-$i of=/dev/null bs=4096 count=32 >/dev/null 2>&1
+ done
+
+ lov_id=`lctl dl | grep "clilov"`
+ addr=`echo $lov_id | awk '{print $4}' | awk -F '-' '{print $3}'`
+ count=`lctl get_param -n ldlm.namespaces.*OST0000*$addr.lock_unused_count`
+ echo "before recovery: unused locks count = $count"
+
+ fail ost1
+
+ count2=`lctl get_param -n ldlm.namespaces.*OST0000*$addr.lock_unused_count`
+ echo "after recovery: unused locks count = $count2"
+
+ if [ $count2 -ge $count ]; then
+ error "unused locks are not canceled"
+ fi
+}
+run_test 85b "check the cancellation of unused locks during recovery(EXTENT)"
+
test_86() {
local clients=${CLIENTS:-$HOSTNAME}