*buf++ = ext->oe_rw ? 'r' : 'w';
if (ext->oe_intree)
*buf++ = 'i';
+ if (ext->oe_sync)
+ *buf++ = 'S';
if (ext->oe_srvlock)
*buf++ = 's';
if (ext->oe_hp)
static const char *oes_strings[] = {
"inv", "active", "cache", "locking", "lockdone", "rpc", "trunc", NULL };
-#define OSC_EXTENT_DUMP(lvl, extent, fmt, ...) do { \
- struct osc_extent *__ext = (extent); \
- char __buf[16]; \
- \
- CDEBUG(lvl, \
- "extent %p@{" EXTSTR ", " \
- "[%d|%d|%c|%s|%s|%p], [%d|%d|%c|%c|%p|%u|%p]} " fmt, \
- /* ----- extent part 0 ----- */ \
- __ext, EXTPARA(__ext), \
- /* ----- part 1 ----- */ \
- atomic_read(&__ext->oe_refc), \
- atomic_read(&__ext->oe_users), \
- list_empty_marker(&__ext->oe_link), \
- oes_strings[__ext->oe_state], ext_flags(__ext, __buf), \
- __ext->oe_obj, \
- /* ----- part 2 ----- */ \
- __ext->oe_grants, __ext->oe_nr_pages, \
- list_empty_marker(&__ext->oe_pages), \
- waitqueue_active(&__ext->oe_waitq) ? '+' : '-', \
- __ext->oe_osclock, __ext->oe_mppr, __ext->oe_owner, \
- /* ----- part 4 ----- */ \
- ## __VA_ARGS__); \
+#define OSC_EXTENT_DUMP(lvl, extent, fmt, ...) do { \
+ struct osc_extent *__ext = (extent); \
+ char __buf[16]; \
+ \
+ CDEBUG(lvl, \
+ "extent %p@{" EXTSTR ", " \
+ "[%d|%d|%c|%s|%s|%p], [%d|%d|%c|%c|%p|%u|%p]} " fmt, \
+ /* ----- extent part 0 ----- */ \
+ __ext, EXTPARA(__ext), \
+ /* ----- part 1 ----- */ \
+ atomic_read(&__ext->oe_refc), \
+ atomic_read(&__ext->oe_users), \
+ list_empty_marker(&__ext->oe_link), \
+ oes_strings[__ext->oe_state], ext_flags(__ext, __buf), \
+ __ext->oe_obj, \
+ /* ----- part 2 ----- */ \
+ __ext->oe_grants, __ext->oe_nr_pages, \
+ list_empty_marker(&__ext->oe_pages), \
+ waitqueue_active(&__ext->oe_waitq) ? '+' : '-', \
+ __ext->oe_dlmlock, __ext->oe_mppr, __ext->oe_owner, \
+ /* ----- part 4 ----- */ \
+ ## __VA_ARGS__); \
+ if (lvl == D_ERROR && __ext->oe_dlmlock != NULL) \
+ LDLM_ERROR(__ext->oe_dlmlock, "extent: %p\n", __ext); \
+ else \
+ LDLM_DEBUG(__ext->oe_dlmlock, "extent: %p\n", __ext); \
} while (0)
#undef EASSERTF
if (ext->oe_max_end < ext->oe_end || ext->oe_end < ext->oe_start)
GOTO(out, rc = 80);
- if (ext->oe_osclock == NULL && ext->oe_grants > 0)
+ if (ext->oe_sync && ext->oe_grants > 0)
GOTO(out, rc = 90);
- if (ext->oe_osclock) {
- struct cl_lock_descr *descr;
- descr = &ext->oe_osclock->cll_descr;
- if (!(descr->cld_start <= ext->oe_start &&
- descr->cld_end >= ext->oe_max_end))
+ if (ext->oe_dlmlock != NULL) {
+ struct ldlm_extent *extent;
+
+ extent = &ext->oe_dlmlock->l_policy_data.l_extent;
+ if (!(extent->start <= cl_offset(osc2cl(obj), ext->oe_start) &&
+ extent->end >= cl_offset(osc2cl(obj), ext->oe_max_end)))
GOTO(out, rc = 100);
+
+ if (!(ext->oe_dlmlock->l_granted_mode & (LCK_PW | LCK_GROUP)))
+ GOTO(out, rc = 102);
}
if (ext->oe_nr_pages > ext->oe_mppr)
ext->oe_state = OES_INV;
INIT_LIST_HEAD(&ext->oe_pages);
init_waitqueue_head(&ext->oe_waitq);
- ext->oe_osclock = NULL;
+ ext->oe_dlmlock = NULL;
return ext;
}
LASSERT(ext->oe_state == OES_INV);
LASSERT(!ext->oe_intree);
- if (ext->oe_osclock) {
- cl_lock_put(env, ext->oe_osclock);
- ext->oe_osclock = NULL;
+ if (ext->oe_dlmlock != NULL) {
+ lu_ref_add(&ext->oe_dlmlock->l_reference,
+ "osc_extent", ext);
+ LDLM_LOCK_PUT(ext->oe_dlmlock);
+ ext->oe_dlmlock = NULL;
}
osc_extent_free(ext);
}
if (cur->oe_max_end != victim->oe_max_end)
return -ERANGE;
- LASSERT(cur->oe_osclock == victim->oe_osclock);
+ LASSERT(cur->oe_dlmlock == victim->oe_dlmlock);
ppc_bits = osc_cli(obj)->cl_chunkbits - PAGE_CACHE_SHIFT;
chunk_start = cur->oe_start >> ppc_bits;
chunk_end = cur->oe_end >> ppc_bits;
struct osc_extent *osc_extent_find(const struct lu_env *env,
struct osc_object *obj, pgoff_t index,
int *grants)
-
{
struct client_obd *cli = osc_cli(obj);
- struct cl_lock *lock;
+ struct osc_lock *olck;
+ struct cl_lock_descr *descr;
struct osc_extent *cur;
struct osc_extent *ext;
struct osc_extent *conflict = NULL;
if (cur == NULL)
RETURN(ERR_PTR(-ENOMEM));
- lock = cl_lock_at_pgoff(env, osc2cl(obj), index, NULL, 1, 0);
- LASSERT(lock != NULL);
- LASSERT(lock->cll_descr.cld_mode >= CLM_WRITE);
+ olck = osc_env_io(env)->oi_write_osclock;
+ LASSERTF(olck != NULL, "page %lu is not covered by lock\n", index);
+ LASSERT(olck->ols_state == OLS_GRANTED);
+
+ descr = &olck->ols_cl.cls_lock->cll_descr;
+ LASSERT(descr->cld_mode >= CLM_WRITE);
LASSERT(cli->cl_chunkbits >= PAGE_CACHE_SHIFT);
ppc_bits = cli->cl_chunkbits - PAGE_CACHE_SHIFT;
max_pages = cli->cl_max_pages_per_rpc;
LASSERT((max_pages & ~chunk_mask) == 0);
max_end = index - (index % max_pages) + max_pages - 1;
- max_end = min_t(pgoff_t, max_end, lock->cll_descr.cld_end);
+ max_end = min_t(pgoff_t, max_end, descr->cld_end);
/* initialize new extent by parameters so far */
cur->oe_max_end = max_end;
cur->oe_start = index & chunk_mask;
cur->oe_end = ((index + ~chunk_mask + 1) & chunk_mask) - 1;
- if (cur->oe_start < lock->cll_descr.cld_start)
- cur->oe_start = lock->cll_descr.cld_start;
+ if (cur->oe_start < descr->cld_start)
+ cur->oe_start = descr->cld_start;
if (cur->oe_end > max_end)
cur->oe_end = max_end;
- cur->oe_osclock = lock;
cur->oe_grants = 0;
cur->oe_mppr = max_pages;
+ if (olck->ols_dlmlock != NULL) {
+ LASSERT(olck->ols_hold);
+ cur->oe_dlmlock = LDLM_LOCK_GET(olck->ols_dlmlock);
+ lu_ref_add(&olck->ols_dlmlock->l_reference, "osc_extent", cur);
+ }
/* grants has been allocated by caller */
LASSERTF(*grants >= chunksize + cli->cl_extent_tax,
break;
/* if covering by different locks, no chance to match */
- if (lock != ext->oe_osclock) {
+ if (olck->ols_dlmlock != ext->oe_dlmlock) {
EASSERTF(!overlapped(ext, cur), ext,
EXTSTR"\n", EXTPARA(cur));
if (found != NULL) {
LASSERT(conflict == NULL);
if (!IS_ERR(found)) {
- LASSERT(found->oe_osclock == cur->oe_osclock);
+ LASSERT(found->oe_dlmlock == cur->oe_dlmlock);
OSC_EXTENT_DUMP(D_CACHE, found,
"found caching ext for %lu.\n", index);
}
found = osc_extent_hold(cur);
osc_extent_insert(obj, cur);
OSC_EXTENT_DUMP(D_CACHE, cur, "add into tree %lu/%lu.\n",
- index, lock->cll_descr.cld_end);
+ index, descr->cld_end);
}
osc_object_unlock(obj);
}
ext->oe_rw = !!(cmd & OBD_BRW_READ);
+ ext->oe_sync = 1;
ext->oe_urgent = 1;
ext->oe_start = start;
ext->oe_end = ext->oe_max_end = end;
struct osc_page *ops, void *cbdata)
{
struct osc_thread_info *info = osc_env_info(env);
- struct cl_lock *lock = cbdata;
+ struct osc_object *osc = cbdata;
pgoff_t index;
index = osc_index(ops);
if (index >= info->oti_fn_index) {
- struct cl_lock *tmp;
+ struct ldlm_lock *tmp;
struct cl_page *page = ops->ops_cl.cpl_page;
/* refresh non-overlapped index */
- tmp = cl_lock_at_pgoff(env, lock->cll_descr.cld_obj, index,
- lock, 1, 0);
+ tmp = osc_dlmlock_at_pgoff(env, osc, index, 0, 0);
if (tmp != NULL) {
+ __u64 end = tmp->l_policy_data.l_extent.end;
/* Cache the first-non-overlapped index so as to skip
- * all pages within [index, oti_fn_index). This
- * is safe because if tmp lock is canceled, it will
- * discard these pages. */
- info->oti_fn_index = tmp->cll_descr.cld_end + 1;
- if (tmp->cll_descr.cld_end == CL_PAGE_EOF)
+ * all pages within [index, oti_fn_index). This is safe
+ * because if tmp lock is canceled, it will discard
+ * these pages. */
+ info->oti_fn_index = cl_index(osc2cl(osc), end + 1);
+ if (end == OBD_OBJECT_EOF)
info->oti_fn_index = CL_PAGE_EOF;
- cl_lock_put(env, tmp);
+ LDLM_LOCK_PUT(tmp);
} else if (cl_page_own(env, io, page) == 0) {
/* discard the page */
cl_page_discard(env, io, page);
struct osc_page *ops, void *cbdata)
{
struct osc_thread_info *info = osc_env_info(env);
- struct cl_lock *lock = cbdata;
struct cl_page *page = ops->ops_cl.cpl_page;
- LASSERT(lock->cll_descr.cld_mode >= CLM_WRITE);
-
/* page is top page. */
info->oti_next_index = osc_index(ops) + 1;
if (cl_page_own(env, io, page) == 0) {
* If error happens on any step, the process continues anyway (the reasoning
* behind this being that lock cancellation cannot be delayed indefinitely).
*/
-int osc_lock_discard_pages(const struct lu_env *env, struct osc_lock *ols)
+int osc_lock_discard_pages(const struct lu_env *env, struct osc_object *osc,
+ pgoff_t start, pgoff_t end, enum cl_lock_mode mode)
{
struct osc_thread_info *info = osc_env_info(env);
struct cl_io *io = &info->oti_io;
- struct cl_object *osc = ols->ols_cl.cls_obj;
- struct cl_lock *lock = ols->ols_cl.cls_lock;
- struct cl_lock_descr *descr = &lock->cll_descr;
osc_page_gang_cbt cb;
int res;
int result;
ENTRY;
- io->ci_obj = cl_object_top(osc);
+ io->ci_obj = cl_object_top(osc2cl(osc));
io->ci_ignore_layout = 1;
result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
if (result != 0)
GOTO(out, result);
- cb = descr->cld_mode == CLM_READ ? check_and_discard_cb : discard_cb;
- info->oti_fn_index = info->oti_next_index = descr->cld_start;
+ cb = mode == CLM_READ ? check_and_discard_cb : discard_cb;
+ info->oti_fn_index = info->oti_next_index = start;
do {
- res = osc_page_gang_lookup(env, io, cl2osc(osc),
- info->oti_next_index, descr->cld_end,
- cb, (void *)lock);
- if (info->oti_next_index > descr->cld_end)
+ res = osc_page_gang_lookup(env, io, osc,
+ info->oti_next_index, end, cb, osc);
+ if (info->oti_next_index > end)
break;
if (res == CLP_GANG_RESCHED)