From e8b421531c166b91ab5c1f417570c544bcdd050c Mon Sep 17 00:00:00 2001 From: Jinshan Xiong Date: Wed, 16 Sep 2015 11:47:20 -0700 Subject: [PATCH] LU-6271 osc: further OSC cleanup after eviction A few problems are fixed in this patch: 1. a ldlm lock could be canceled simutaneously by ldlm bl thread and cleanup_resource(). In this case, only one side will win the race and the other side should wait for the work to complete; 2. in lov_io_iter_init(), if cl_io_iter_init() against sub io fails, it should call cl_io_iter_fini() to cleanup leftover information; 3. define osc_lru_reserve() and osc_lru_unreserve() to reserve LRU slots in osc_io_write_iter_init() and unreserve them in fini(); 4. eviction on group lock is well supported; 5. cleanups Signed-off-by: Jinshan Xiong Change-Id: I293770b62e177a9ecefe0b4e05f3a8f44b1c831d Reviewed-on: http://review.whamcloud.com/16456 Tested-by: Jenkins Reviewed-by: Bobi Jam Tested-by: Maloo Reviewed-by: John L. Hammond Reviewed-by: James Simmons Reviewed-by: Oleg Drokin --- lustre/include/cl_object.h | 7 +++- lustre/include/lustre_dlm_flags.h | 3 ++ lustre/ldlm/ldlm_lock.c | 51 +++++++++++++++++++++-------- lustre/ldlm/ldlm_request.c | 14 ++++++-- lustre/ldlm/ldlm_resource.c | 2 +- lustre/llite/vvp_io.c | 1 + lustre/lov/lov_io.c | 39 ++++++++++++---------- lustre/osc/osc_cache.c | 1 - lustre/osc/osc_cl_internal.h | 10 +++--- lustre/osc/osc_internal.h | 3 +- lustre/osc/osc_io.c | 46 ++++++++------------------ lustre/osc/osc_lock.c | 50 ++++++++++++++++------------ lustre/osc/osc_object.c | 8 ++++- lustre/osc/osc_page.c | 69 ++++++++++++++++++++++++++++++++++----- lustre/osc/osc_request.c | 8 +++-- 15 files changed, 207 insertions(+), 105 deletions(-) diff --git a/lustre/include/cl_object.h b/lustre/include/cl_object.h index a9543a1..6c4db88 100644 --- a/lustre/include/cl_object.h +++ b/lustre/include/cl_object.h @@ -1642,9 +1642,14 @@ enum cl_enq_flags { */ CEF_PEEK = 0x00000040, /** + * Lock match only. Used by group lock in I/O as group lock + * is known to exist. + */ + CEF_LOCK_MATCH = 0x00000080, + /** * mask of enq_flags. */ - CEF_MASK = 0x0000007f, + CEF_MASK = 0x000000ff, }; /** diff --git a/lustre/include/lustre_dlm_flags.h b/lustre/include/lustre_dlm_flags.h index 5357746..5f7206d 100644 --- a/lustre/include/lustre_dlm_flags.h +++ b/lustre/include/lustre_dlm_flags.h @@ -116,6 +116,9 @@ #define ldlm_set_test_lock(_l) LDLM_SET_FLAG(( _l), 1ULL << 19) #define ldlm_clear_test_lock(_l) LDLM_CLEAR_FLAG((_l), 1ULL << 19) +/** match lock only */ +#define LDLM_FL_MATCH_LOCK 0x0000000000100000ULL // bit 20 + /** * Immediatelly cancel such locks when they block some other locks. Send * cancel notification to original lock holder, but expect no reply. This diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 39abaef..13741a0 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -832,24 +832,23 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, enum ldlm_mode mode) ldlm_lock_decref_internal_nolock(lock, mode); - if (ldlm_is_local(lock) && - !lock->l_readers && !lock->l_writers) { - /* If this is a local lock on a server namespace and this was - * the last reference, cancel the lock. */ - CDEBUG(D_INFO, "forcing cancel of local lock\n"); - ldlm_set_cbpending(lock); - } - - if (!lock->l_readers && !lock->l_writers && - (ldlm_is_cbpending(lock) || lock->l_req_mode == LCK_GROUP)) { - /* If we received a blocked AST and this was the last reference, - * run the callback. + if ((ldlm_is_local(lock) || lock->l_req_mode == LCK_GROUP) && + !lock->l_readers && !lock->l_writers) { + /* If this is a local lock on a server namespace and this was + * the last reference, cancel the lock. + * * Group locks are special: * They must not go in LRU, but they are not called back * like non-group locks, instead they are manually released. * They have an l_writers reference which they keep until * they are manually released, so we remove them when they have * no more reader or writer references. - LU-6368 */ + ldlm_set_cbpending(lock); + } + + if (!lock->l_readers && !lock->l_writers && ldlm_is_cbpending(lock)) { + /* If we received a blocked AST and this was the last reference, + * run the callback. */ if (ldlm_is_ns_srv(lock) && lock->l_export) CERROR("FL_CBPENDING set on non-local lock--just a " "warning\n"); @@ -2169,6 +2168,19 @@ restart: } EXPORT_SYMBOL(ldlm_reprocess_all); +static bool is_bl_done(struct ldlm_lock *lock) +{ + bool bl_done = true; + + if (!ldlm_is_bl_done(lock)) { + lock_res_and_lock(lock); + bl_done = ldlm_is_bl_done(lock); + unlock_res_and_lock(lock); + } + + return bl_done; +} + /** * Helper function to call blocking AST for LDLM lock \a lock in a * "cancelling" mode. @@ -2186,8 +2198,19 @@ void ldlm_cancel_callback(struct ldlm_lock *lock) } else { LDLM_DEBUG(lock, "no blocking ast"); } - } - ldlm_set_bl_done(lock); + + /* only canceller can set bl_done bit */ + ldlm_set_bl_done(lock); + wake_up_all(&lock->l_waitq); + } else if (!ldlm_is_bl_done(lock)) { + struct l_wait_info lwi = { 0 }; + + /* The lock is guaranteed to have been canceled once + * returning from this function. */ + unlock_res_and_lock(lock); + l_wait_event(lock->l_waitq, is_bl_done(lock), &lwi); + lock_res_and_lock(lock); + } } /** diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index ef2fb06..53d2f43 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -1342,13 +1342,23 @@ int ldlm_cli_cancel(struct lustre_handle *lockh, struct list_head cancels = LIST_HEAD_INIT(cancels); ENTRY; - /* concurrent cancels on the same handle can happen */ - lock = ldlm_handle2lock_long(lockh, LDLM_FL_CANCELING); + lock = ldlm_handle2lock_long(lockh, 0); if (lock == NULL) { LDLM_DEBUG_NOLOCK("lock is already being destroyed"); RETURN(0); } + lock_res_and_lock(lock); + /* Lock is being canceled and the caller doesn't want to wait */ + if (ldlm_is_canceling(lock) && (cancel_flags & LCF_ASYNC)) { + unlock_res_and_lock(lock); + LDLM_LOCK_RELEASE(lock); + RETURN(0); + } + + ldlm_set_canceling(lock); + unlock_res_and_lock(lock); + rc = ldlm_cli_cancel_local(lock); if (rc == LDLM_FL_LOCAL_ONLY || cancel_flags & LCF_LOCAL) { LDLM_LOCK_RELEASE(lock); diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index 0a81a87..1ec08e7 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -849,7 +849,7 @@ static void cleanup_resource(struct ldlm_resource *res, struct list_head *q, unlock_res(res); ldlm_lock2handle(lock, &lockh); - rc = ldlm_cli_cancel(&lockh, LCF_ASYNC); + rc = ldlm_cli_cancel(&lockh, LCF_LOCAL); if (rc) CERROR("ldlm_cli_cancel: %d\n", rc); } else { diff --git a/lustre/llite/vvp_io.c b/lustre/llite/vvp_io.c index 6b063fe..c2a088a 100644 --- a/lustre/llite/vvp_io.c +++ b/lustre/llite/vvp_io.c @@ -238,6 +238,7 @@ static int vvp_io_one_lock_index(const struct lu_env *env, struct cl_io *io, if (vio->vui_fd && (vio->vui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) { descr->cld_mode = CLM_GROUP; descr->cld_gid = vio->vui_fd->fd_grouplock.lg_gid; + enqflags |= CEF_LOCK_MATCH; } else { descr->cld_mode = mode; } diff --git a/lustre/lov/lov_io.c b/lustre/lov/lov_io.c index cf1ccfe..00644f6 100644 --- a/lustre/lov/lov_io.c +++ b/lustre/lov/lov_io.c @@ -439,24 +439,27 @@ static int lov_io_iter_init(const struct lu_env *env, continue; } - end = lov_offset_mod(end, +1); - sub = lov_sub_get(env, lio, stripe); - if (!IS_ERR(sub)) { - lov_io_sub_inherit(sub->sub_io, lio, stripe, - start, end); - rc = cl_io_iter_init(sub->sub_env, sub->sub_io); - lov_sub_put(sub); - CDEBUG(D_VFSTRACE, "shrink: %d ["LPU64", "LPU64")\n", - stripe, start, end); - } else - rc = PTR_ERR(sub); - - if (!rc) - list_add_tail(&sub->sub_linkage, &lio->lis_active); - else - break; - } - RETURN(rc); + end = lov_offset_mod(end, +1); + sub = lov_sub_get(env, lio, stripe); + if (IS_ERR(sub)) { + rc = PTR_ERR(sub); + break; + } + + lov_io_sub_inherit(sub->sub_io, lio, stripe, start, end); + rc = cl_io_iter_init(sub->sub_env, sub->sub_io); + if (rc != 0) + cl_io_iter_fini(sub->sub_env, sub->sub_io); + lov_sub_put(sub); + if (rc != 0) + break; + + CDEBUG(D_VFSTRACE, "shrink: %d ["LPU64", "LPU64")\n", + stripe, start, end); + + list_add_tail(&sub->sub_linkage, &lio->lis_active); + } + RETURN(rc); } static int lov_io_rw_iter_init(const struct lu_env *env, diff --git a/lustre/osc/osc_cache.c b/lustre/osc/osc_cache.c index 0b01b36..e70d61f 100644 --- a/lustre/osc/osc_cache.c +++ b/lustre/osc/osc_cache.c @@ -2694,7 +2694,6 @@ again: * a page already having been flushed by write_page(). * We have to wait for this extent because we can't * truncate that page. */ - LASSERT(!ext->oe_hp); OSC_EXTENT_DUMP(D_CACHE, ext, "waiting for busy extent\n"); waiting = osc_extent_get(ext); diff --git a/lustre/osc/osc_cl_internal.h b/lustre/osc/osc_cl_internal.h index e5caf37..7ca58c5 100644 --- a/lustre/osc/osc_cl_internal.h +++ b/lustre/osc/osc_cl_internal.h @@ -62,10 +62,12 @@ struct osc_extent; * State maintained by osc layer for each IO context. */ struct osc_io { - /** super class */ - struct cl_io_slice oi_cl; - /** true if this io is lockless. */ - int oi_lockless; + /** super class */ + struct cl_io_slice oi_cl; + /** true if this io is lockless. */ + int oi_lockless:1, + /** true if this io is counted as active IO */ + oi_is_active:1; /** how many LRU pages are reserved for this IO */ unsigned long oi_lru_reserved; diff --git a/lustre/osc/osc_internal.h b/lustre/osc/osc_internal.h index 9a03ba3..636a315 100644 --- a/lustre/osc/osc_internal.h +++ b/lustre/osc/osc_internal.h @@ -135,7 +135,8 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, struct list_head *ext_list, int cmd); long osc_lru_shrink(const struct lu_env *env, struct client_obd *cli, long target, bool force); -long osc_lru_reclaim(struct client_obd *cli, unsigned long npages); +unsigned long osc_lru_reserve(struct client_obd *cli, unsigned long npages); +void osc_lru_unreserve(struct client_obd *cli, unsigned long npages); extern spinlock_t osc_ast_guard; extern struct lu_kmem_descr osc_caches[]; diff --git a/lustre/osc/osc_io.c b/lustre/osc/osc_io.c index 2fb42ba..8df200b 100644 --- a/lustre/osc/osc_io.c +++ b/lustre/osc/osc_io.c @@ -340,7 +340,10 @@ static int osc_io_iter_init(const struct lu_env *env, spin_lock(&imp->imp_lock); if (likely(!imp->imp_invalid)) { + struct osc_io *oio = osc_env_io(env); + atomic_inc(&osc->oo_nr_ios); + oio->oi_is_active = 1; rc = 0; } spin_unlock(&imp->imp_lock); @@ -354,9 +357,6 @@ static int osc_io_write_iter_init(const struct lu_env *env, struct cl_io *io = ios->cis_io; struct osc_io *oio = osc_env_io(env); struct osc_object *osc = cl2osc(ios->cis_obj); - struct client_obd *cli = osc_cli(osc); - unsigned long c; - unsigned long max_pages; unsigned long npages; ENTRY; @@ -367,29 +367,7 @@ static int osc_io_write_iter_init(const struct lu_env *env, if (io->u.ci_rw.crw_pos & ~PAGE_MASK) ++npages; - max_pages = cli->cl_max_pages_per_rpc * cli->cl_max_rpcs_in_flight; - if (npages > max_pages) - npages = max_pages; - - c = atomic_long_read(cli->cl_lru_left); - if (c < npages && osc_lru_reclaim(cli, npages) > 0) - c = atomic_long_read(cli->cl_lru_left); - while (c >= npages) { - if (c == atomic_long_cmpxchg(cli->cl_lru_left, c, c - npages)) { - oio->oi_lru_reserved = npages; - break; - } - c = atomic_long_read(cli->cl_lru_left); - } - if (atomic_long_read(cli->cl_lru_left) < max_pages) { - /* If there aren't enough pages in the per-OSC LRU then - * wake up the LRU thread to try and clear out space, so - * we don't block if pages are being dirtied quickly. */ - CDEBUG(D_CACHE, "%s: queue LRU, left: %lu/%ld.\n", - cli_name(cli), atomic_long_read(cli->cl_lru_left), - max_pages); - (void)ptlrpcd_queue_work(cli->cl_lru_work); - } + oio->oi_lru_reserved = osc_lru_reserve(osc_cli(osc), npages); RETURN(osc_io_iter_init(env, ios)); } @@ -397,11 +375,16 @@ static int osc_io_write_iter_init(const struct lu_env *env, static void osc_io_iter_fini(const struct lu_env *env, const struct cl_io_slice *ios) { - struct osc_object *osc = cl2osc(ios->cis_obj); + struct osc_io *oio = osc_env_io(env); - LASSERT(atomic_read(&osc->oo_nr_ios) > 0); - if (atomic_dec_and_test(&osc->oo_nr_ios)) - wake_up_all(&osc->oo_io_waitq); + if (oio->oi_is_active) { + struct osc_object *osc = cl2osc(ios->cis_obj); + + oio->oi_is_active = 0; + LASSERT(atomic_read(&osc->oo_nr_ios) > 0); + if (atomic_dec_and_test(&osc->oo_nr_ios)) + wake_up_all(&osc->oo_io_waitq); + } } static void osc_io_write_iter_fini(const struct lu_env *env, @@ -409,10 +392,9 @@ static void osc_io_write_iter_fini(const struct lu_env *env, { struct osc_io *oio = osc_env_io(env); struct osc_object *osc = cl2osc(ios->cis_obj); - struct client_obd *cli = osc_cli(osc); if (oio->oi_lru_reserved > 0) { - atomic_long_add(oio->oi_lru_reserved, cli->cl_lru_left); + osc_lru_unreserve(osc_cli(osc), oio->oi_lru_reserved); oio->oi_lru_reserved = 0; } oio->oi_write_osclock = NULL; diff --git a/lustre/osc/osc_lock.c b/lustre/osc/osc_lock.c index b645379..eebc0d6 100644 --- a/lustre/osc/osc_lock.c +++ b/lustre/osc/osc_lock.c @@ -173,6 +173,8 @@ static __u64 osc_enq2ldlm_flags(__u32 enqflags) result |= LDLM_FL_AST_DISCARD_DATA; if (enqflags & CEF_PEEK) result |= LDLM_FL_TEST_LOCK; + if (enqflags & CEF_LOCK_MATCH) + result |= LDLM_FL_MATCH_LOCK; return result; } @@ -848,13 +850,14 @@ static void osc_lock_wake_waiters(const struct lu_env *env, spin_unlock(&oscl->ols_lock); } -static void osc_lock_enqueue_wait(const struct lu_env *env, - struct osc_object *obj, - struct osc_lock *oscl) +static int osc_lock_enqueue_wait(const struct lu_env *env, + struct osc_object *obj, struct osc_lock *oscl) { struct osc_lock *tmp_oscl; struct cl_lock_descr *need = &oscl->ols_cl.cls_lock->cll_descr; struct cl_sync_io *waiter = &osc_env_info(env)->oti_anchor; + int rc = 0; + ENTRY; spin_lock(&obj->oo_ol_spin); list_add_tail(&oscl->ols_nextlock_oscobj, &obj->oo_ol_list); @@ -891,13 +894,18 @@ restart: spin_unlock(&tmp_oscl->ols_lock); spin_unlock(&obj->oo_ol_spin); - (void)cl_sync_io_wait(env, waiter, 0); - + rc = cl_sync_io_wait(env, waiter, 0); spin_lock(&obj->oo_ol_spin); + + if (rc < 0) + break; + oscl->ols_owner = NULL; goto restart; } spin_unlock(&obj->oo_ol_spin); + + RETURN(rc); } /** @@ -947,7 +955,9 @@ static int osc_lock_enqueue(const struct lu_env *env, GOTO(enqueue_base, 0); } - osc_lock_enqueue_wait(env, osc, oscl); + result = osc_lock_enqueue_wait(env, osc, oscl); + if (result < 0) + GOTO(out, result); /* we can grant lockless lock right after all conflicting locks * are canceled. */ @@ -971,7 +981,6 @@ enqueue_base: * osc_lock. */ ostid_build_res_name(&osc->oo_oinfo->loi_oi, resname); - osc_lock_build_einfo(env, lock, osc, &oscl->ols_einfo); osc_lock_build_policy(env, lock, policy); if (oscl->ols_agl) { oscl->ols_einfo.ei_cbdata = NULL; @@ -986,19 +995,7 @@ enqueue_base: upcall, cookie, &oscl->ols_einfo, PTLRPCD_SET, async, oscl->ols_agl); - if (result != 0) { - oscl->ols_state = OLS_CANCELLED; - osc_lock_wake_waiters(env, osc, oscl); - - /* hide error for AGL lock. */ - if (oscl->ols_agl) { - cl_object_put(env, osc2cl(osc)); - result = 0; - } - - if (anchor != NULL) - cl_sync_io_note(env, anchor, result); - } else { + if (result == 0) { if (osc_lock_is_lockless(oscl)) { oio->oi_lockless = 1; } else if (!async) { @@ -1006,6 +1003,18 @@ enqueue_base: LASSERT(oscl->ols_hold); LASSERT(oscl->ols_dlmlock != NULL); } + } else if (oscl->ols_agl) { + cl_object_put(env, osc2cl(osc)); + result = 0; + } + +out: + if (result < 0) { + oscl->ols_state = OLS_CANCELLED; + osc_lock_wake_waiters(env, osc, oscl); + + if (anchor != NULL) + cl_sync_io_note(env, anchor, result); } RETURN(result); } @@ -1175,6 +1184,7 @@ int osc_lock_init(const struct lu_env *env, oscl->ols_flags |= LDLM_FL_BLOCK_GRANTED; oscl->ols_glimpse = 1; } + osc_lock_build_einfo(env, lock, cl2osc(obj), &oscl->ols_einfo); cl_lock_slice_add(lock, &oscl->ols_cl, obj, &osc_lock_ops); diff --git a/lustre/osc/osc_object.c b/lustre/osc/osc_object.c index 7337489..94b3cf5 100644 --- a/lustre/osc/osc_object.c +++ b/lustre/osc/osc_object.c @@ -494,9 +494,15 @@ int osc_object_invalidate(const struct lu_env *env, struct osc_object *osc) l_wait_event(osc->oo_io_waitq, atomic_read(&osc->oo_nr_ios) == 0, &lwi); - /* Discard all pages of this object. */ + /* Discard all dirty pages of this object. */ osc_cache_truncate_start(env, osc, 0, NULL); + /* Discard all caching pages */ + osc_lock_discard_pages(env, osc, 0, CL_PAGE_EOF, CLM_WRITE); + + /* Clear ast data of dlm lock. Do this after discarding all pages */ + osc_object_prune(env, osc2cl(osc)); + RETURN(0); } diff --git a/lustre/osc/osc_page.c b/lustre/osc/osc_page.c index b861a9d..12380c6 100644 --- a/lustre/osc/osc_page.c +++ b/lustre/osc/osc_page.c @@ -45,8 +45,8 @@ static void osc_lru_del(struct client_obd *cli, struct osc_page *opg); static void osc_lru_use(struct client_obd *cli, struct osc_page *opg); -static int osc_lru_reserve(const struct lu_env *env, struct osc_object *obj, - struct osc_page *opg); +static int osc_lru_alloc(const struct lu_env *env, struct client_obd *cli, + struct osc_page *opg); /** \addtogroup osc * @{ @@ -406,7 +406,7 @@ int osc_page_init(const struct lu_env *env, struct cl_object *obj, /* reserve an LRU space for this page */ if (page->cp_type == CPT_CACHEABLE && result == 0) { - result = osc_lru_reserve(env, osc, opg); + result = osc_lru_alloc(env, osc_cli(osc), opg); if (result == 0) { spin_lock(&osc->oo_tree_lock); result = radix_tree_insert(&osc->oo_tree, index, opg); @@ -805,7 +805,7 @@ long osc_lru_shrink(const struct lu_env *env, struct client_obd *cli, * LRU pages in batch. Therefore, the actual number is adjusted at least * max_pages_per_rpc. */ -long osc_lru_reclaim(struct client_obd *cli, unsigned long npages) +static long osc_lru_reclaim(struct client_obd *cli, unsigned long npages) { struct cl_env_nest nest; struct lu_env *env; @@ -878,18 +878,17 @@ out: } /** - * osc_lru_reserve() is called to reserve an LRU slot for a cl_page. + * osc_lru_alloc() is called to allocate an LRU slot for a cl_page. * * Usually the LRU slots are reserved in osc_io_iter_rw_init(). * Only in the case that the LRU slots are in extreme shortage, it should * have reserved enough slots for an IO. */ -static int osc_lru_reserve(const struct lu_env *env, struct osc_object *obj, - struct osc_page *opg) +static int osc_lru_alloc(const struct lu_env *env, struct client_obd *cli, + struct osc_page *opg) { struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL); struct osc_io *oio = osc_env_io(env); - struct client_obd *cli = osc_cli(obj); int rc = 0; ENTRY; @@ -929,6 +928,60 @@ out: } /** + * osc_lru_reserve() is called to reserve enough LRU slots for I/O. + * + * The benefit of doing this is to reduce contention against atomic counter + * cl_lru_left by changing it from per-page access to per-IO access. + */ +unsigned long osc_lru_reserve(struct client_obd *cli, unsigned long npages) +{ + unsigned long reserved = 0; + unsigned long max_pages; + unsigned long c; + + /* reserve a full RPC window at most to avoid that a thread accidentally + * consumes too many LRU slots */ + max_pages = cli->cl_max_pages_per_rpc * cli->cl_max_rpcs_in_flight; + if (npages > max_pages) + npages = max_pages; + + c = atomic_long_read(cli->cl_lru_left); + if (c < npages && osc_lru_reclaim(cli, npages) > 0) + c = atomic_long_read(cli->cl_lru_left); + while (c >= npages) { + if (c == atomic_long_cmpxchg(cli->cl_lru_left, c, c - npages)) { + reserved = npages; + break; + } + c = atomic_long_read(cli->cl_lru_left); + } + if (atomic_long_read(cli->cl_lru_left) < max_pages) { + /* If there aren't enough pages in the per-OSC LRU then + * wake up the LRU thread to try and clear out space, so + * we don't block if pages are being dirtied quickly. */ + CDEBUG(D_CACHE, "%s: queue LRU, left: %lu/%ld.\n", + cli_name(cli), atomic_long_read(cli->cl_lru_left), + max_pages); + (void)ptlrpcd_queue_work(cli->cl_lru_work); + } + + return reserved; +} + +/** + * osc_lru_unreserve() is called to unreserve LRU slots. + * + * LRU slots reserved by osc_lru_reserve() may have entries left due to several + * reasons such as page already existing or I/O error. Those reserved slots + * should be freed by calling this function. + */ +void osc_lru_unreserve(struct client_obd *cli, unsigned long npages) +{ + atomic_long_add(npages, cli->cl_lru_left); + wake_up_all(&osc_lru_waitq); +} + +/** * Atomic operations are expensive. We accumulate the accounting for the * same page zone to get better performance. * In practice this can work pretty good because the pages in the same RPC diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 28b23da..060ef0d 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -2011,7 +2011,7 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, } no_match: - if (*flags & LDLM_FL_TEST_LOCK) + if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK)) RETURN(-ENOLCK); if (intent) { @@ -2491,7 +2491,11 @@ static int osc_ldlm_resource_invalidate(struct cfs_hash *hs, osc = lock->l_ast_data; cl_object_get(osc2cl(osc)); } - lock->l_ast_data = NULL; + + /* clear LDLM_FL_CLEANED flag to make sure it will be canceled + * by the 2nd round of ldlm_namespace_clean() call in + * osc_import_event(). */ + ldlm_clear_cleaned(lock); } unlock_res(res); -- 1.8.3.1