*/
CEF_PEEK = 0x00000040,
/**
+ * Lock match only. Used by group lock in I/O as group lock
+ * is known to exist.
+ */
+ CEF_LOCK_MATCH = 0x00000080,
+ /**
* mask of enq_flags.
*/
- CEF_MASK = 0x0000007f,
+ CEF_MASK = 0x000000ff,
};
/**
#define ldlm_set_test_lock(_l) LDLM_SET_FLAG(( _l), 1ULL << 19)
#define ldlm_clear_test_lock(_l) LDLM_CLEAR_FLAG((_l), 1ULL << 19)
+/** match lock only */
+#define LDLM_FL_MATCH_LOCK 0x0000000000100000ULL // bit 20
+
/**
* Immediatelly cancel such locks when they block some other locks. Send
* cancel notification to original lock holder, but expect no reply. This
ldlm_lock_decref_internal_nolock(lock, mode);
- if (ldlm_is_local(lock) &&
- !lock->l_readers && !lock->l_writers) {
- /* If this is a local lock on a server namespace and this was
- * the last reference, cancel the lock. */
- CDEBUG(D_INFO, "forcing cancel of local lock\n");
- ldlm_set_cbpending(lock);
- }
-
- if (!lock->l_readers && !lock->l_writers &&
- (ldlm_is_cbpending(lock) || lock->l_req_mode == LCK_GROUP)) {
- /* If we received a blocked AST and this was the last reference,
- * run the callback.
+ if ((ldlm_is_local(lock) || lock->l_req_mode == LCK_GROUP) &&
+ !lock->l_readers && !lock->l_writers) {
+ /* If this is a local lock on a server namespace and this was
+ * the last reference, cancel the lock.
+ *
* Group locks are special:
* They must not go in LRU, but they are not called back
* like non-group locks, instead they are manually released.
* They have an l_writers reference which they keep until
* they are manually released, so we remove them when they have
* no more reader or writer references. - LU-6368 */
+ ldlm_set_cbpending(lock);
+ }
+
+ if (!lock->l_readers && !lock->l_writers && ldlm_is_cbpending(lock)) {
+ /* If we received a blocked AST and this was the last reference,
+ * run the callback. */
if (ldlm_is_ns_srv(lock) && lock->l_export)
CERROR("FL_CBPENDING set on non-local lock--just a "
"warning\n");
}
EXPORT_SYMBOL(ldlm_reprocess_all);
+static bool is_bl_done(struct ldlm_lock *lock)
+{
+ bool bl_done = true;
+
+ if (!ldlm_is_bl_done(lock)) {
+ lock_res_and_lock(lock);
+ bl_done = ldlm_is_bl_done(lock);
+ unlock_res_and_lock(lock);
+ }
+
+ return bl_done;
+}
+
/**
* Helper function to call blocking AST for LDLM lock \a lock in a
* "cancelling" mode.
} else {
LDLM_DEBUG(lock, "no blocking ast");
}
- }
- ldlm_set_bl_done(lock);
+
+ /* only canceller can set bl_done bit */
+ ldlm_set_bl_done(lock);
+ wake_up_all(&lock->l_waitq);
+ } else if (!ldlm_is_bl_done(lock)) {
+ struct l_wait_info lwi = { 0 };
+
+ /* The lock is guaranteed to have been canceled once
+ * returning from this function. */
+ unlock_res_and_lock(lock);
+ l_wait_event(lock->l_waitq, is_bl_done(lock), &lwi);
+ lock_res_and_lock(lock);
+ }
}
/**
struct list_head cancels = LIST_HEAD_INIT(cancels);
ENTRY;
- /* concurrent cancels on the same handle can happen */
- lock = ldlm_handle2lock_long(lockh, LDLM_FL_CANCELING);
+ lock = ldlm_handle2lock_long(lockh, 0);
if (lock == NULL) {
LDLM_DEBUG_NOLOCK("lock is already being destroyed");
RETURN(0);
}
+ lock_res_and_lock(lock);
+ /* Lock is being canceled and the caller doesn't want to wait */
+ if (ldlm_is_canceling(lock) && (cancel_flags & LCF_ASYNC)) {
+ unlock_res_and_lock(lock);
+ LDLM_LOCK_RELEASE(lock);
+ RETURN(0);
+ }
+
+ ldlm_set_canceling(lock);
+ unlock_res_and_lock(lock);
+
rc = ldlm_cli_cancel_local(lock);
if (rc == LDLM_FL_LOCAL_ONLY || cancel_flags & LCF_LOCAL) {
LDLM_LOCK_RELEASE(lock);
unlock_res(res);
ldlm_lock2handle(lock, &lockh);
- rc = ldlm_cli_cancel(&lockh, LCF_ASYNC);
+ rc = ldlm_cli_cancel(&lockh, LCF_LOCAL);
if (rc)
CERROR("ldlm_cli_cancel: %d\n", rc);
} else {
if (vio->vui_fd && (vio->vui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
descr->cld_mode = CLM_GROUP;
descr->cld_gid = vio->vui_fd->fd_grouplock.lg_gid;
+ enqflags |= CEF_LOCK_MATCH;
} else {
descr->cld_mode = mode;
}
continue;
}
- end = lov_offset_mod(end, +1);
- sub = lov_sub_get(env, lio, stripe);
- if (!IS_ERR(sub)) {
- lov_io_sub_inherit(sub->sub_io, lio, stripe,
- start, end);
- rc = cl_io_iter_init(sub->sub_env, sub->sub_io);
- lov_sub_put(sub);
- CDEBUG(D_VFSTRACE, "shrink: %d ["LPU64", "LPU64")\n",
- stripe, start, end);
- } else
- rc = PTR_ERR(sub);
-
- if (!rc)
- list_add_tail(&sub->sub_linkage, &lio->lis_active);
- else
- break;
- }
- RETURN(rc);
+ end = lov_offset_mod(end, +1);
+ sub = lov_sub_get(env, lio, stripe);
+ if (IS_ERR(sub)) {
+ rc = PTR_ERR(sub);
+ break;
+ }
+
+ lov_io_sub_inherit(sub->sub_io, lio, stripe, start, end);
+ rc = cl_io_iter_init(sub->sub_env, sub->sub_io);
+ if (rc != 0)
+ cl_io_iter_fini(sub->sub_env, sub->sub_io);
+ lov_sub_put(sub);
+ if (rc != 0)
+ break;
+
+ CDEBUG(D_VFSTRACE, "shrink: %d ["LPU64", "LPU64")\n",
+ stripe, start, end);
+
+ list_add_tail(&sub->sub_linkage, &lio->lis_active);
+ }
+ RETURN(rc);
}
static int lov_io_rw_iter_init(const struct lu_env *env,
* a page already having been flushed by write_page().
* We have to wait for this extent because we can't
* truncate that page. */
- LASSERT(!ext->oe_hp);
OSC_EXTENT_DUMP(D_CACHE, ext,
"waiting for busy extent\n");
waiting = osc_extent_get(ext);
* State maintained by osc layer for each IO context.
*/
struct osc_io {
- /** super class */
- struct cl_io_slice oi_cl;
- /** true if this io is lockless. */
- int oi_lockless;
+ /** super class */
+ struct cl_io_slice oi_cl;
+ /** true if this io is lockless. */
+ int oi_lockless:1,
+ /** true if this io is counted as active IO */
+ oi_is_active:1;
/** how many LRU pages are reserved for this IO */
unsigned long oi_lru_reserved;
struct list_head *ext_list, int cmd);
long osc_lru_shrink(const struct lu_env *env, struct client_obd *cli,
long target, bool force);
-long osc_lru_reclaim(struct client_obd *cli, unsigned long npages);
+unsigned long osc_lru_reserve(struct client_obd *cli, unsigned long npages);
+void osc_lru_unreserve(struct client_obd *cli, unsigned long npages);
extern spinlock_t osc_ast_guard;
extern struct lu_kmem_descr osc_caches[];
spin_lock(&imp->imp_lock);
if (likely(!imp->imp_invalid)) {
+ struct osc_io *oio = osc_env_io(env);
+
atomic_inc(&osc->oo_nr_ios);
+ oio->oi_is_active = 1;
rc = 0;
}
spin_unlock(&imp->imp_lock);
struct cl_io *io = ios->cis_io;
struct osc_io *oio = osc_env_io(env);
struct osc_object *osc = cl2osc(ios->cis_obj);
- struct client_obd *cli = osc_cli(osc);
- unsigned long c;
- unsigned long max_pages;
unsigned long npages;
ENTRY;
if (io->u.ci_rw.crw_pos & ~PAGE_MASK)
++npages;
- max_pages = cli->cl_max_pages_per_rpc * cli->cl_max_rpcs_in_flight;
- if (npages > max_pages)
- npages = max_pages;
-
- c = atomic_long_read(cli->cl_lru_left);
- if (c < npages && osc_lru_reclaim(cli, npages) > 0)
- c = atomic_long_read(cli->cl_lru_left);
- while (c >= npages) {
- if (c == atomic_long_cmpxchg(cli->cl_lru_left, c, c - npages)) {
- oio->oi_lru_reserved = npages;
- break;
- }
- c = atomic_long_read(cli->cl_lru_left);
- }
- if (atomic_long_read(cli->cl_lru_left) < max_pages) {
- /* If there aren't enough pages in the per-OSC LRU then
- * wake up the LRU thread to try and clear out space, so
- * we don't block if pages are being dirtied quickly. */
- CDEBUG(D_CACHE, "%s: queue LRU, left: %lu/%ld.\n",
- cli_name(cli), atomic_long_read(cli->cl_lru_left),
- max_pages);
- (void)ptlrpcd_queue_work(cli->cl_lru_work);
- }
+ oio->oi_lru_reserved = osc_lru_reserve(osc_cli(osc), npages);
RETURN(osc_io_iter_init(env, ios));
}
static void osc_io_iter_fini(const struct lu_env *env,
const struct cl_io_slice *ios)
{
- struct osc_object *osc = cl2osc(ios->cis_obj);
+ struct osc_io *oio = osc_env_io(env);
- LASSERT(atomic_read(&osc->oo_nr_ios) > 0);
- if (atomic_dec_and_test(&osc->oo_nr_ios))
- wake_up_all(&osc->oo_io_waitq);
+ if (oio->oi_is_active) {
+ struct osc_object *osc = cl2osc(ios->cis_obj);
+
+ oio->oi_is_active = 0;
+ LASSERT(atomic_read(&osc->oo_nr_ios) > 0);
+ if (atomic_dec_and_test(&osc->oo_nr_ios))
+ wake_up_all(&osc->oo_io_waitq);
+ }
}
static void osc_io_write_iter_fini(const struct lu_env *env,
{
struct osc_io *oio = osc_env_io(env);
struct osc_object *osc = cl2osc(ios->cis_obj);
- struct client_obd *cli = osc_cli(osc);
if (oio->oi_lru_reserved > 0) {
- atomic_long_add(oio->oi_lru_reserved, cli->cl_lru_left);
+ osc_lru_unreserve(osc_cli(osc), oio->oi_lru_reserved);
oio->oi_lru_reserved = 0;
}
oio->oi_write_osclock = NULL;
result |= LDLM_FL_AST_DISCARD_DATA;
if (enqflags & CEF_PEEK)
result |= LDLM_FL_TEST_LOCK;
+ if (enqflags & CEF_LOCK_MATCH)
+ result |= LDLM_FL_MATCH_LOCK;
return result;
}
spin_unlock(&oscl->ols_lock);
}
-static void osc_lock_enqueue_wait(const struct lu_env *env,
- struct osc_object *obj,
- struct osc_lock *oscl)
+static int osc_lock_enqueue_wait(const struct lu_env *env,
+ struct osc_object *obj, struct osc_lock *oscl)
{
struct osc_lock *tmp_oscl;
struct cl_lock_descr *need = &oscl->ols_cl.cls_lock->cll_descr;
struct cl_sync_io *waiter = &osc_env_info(env)->oti_anchor;
+ int rc = 0;
+ ENTRY;
spin_lock(&obj->oo_ol_spin);
list_add_tail(&oscl->ols_nextlock_oscobj, &obj->oo_ol_list);
spin_unlock(&tmp_oscl->ols_lock);
spin_unlock(&obj->oo_ol_spin);
- (void)cl_sync_io_wait(env, waiter, 0);
-
+ rc = cl_sync_io_wait(env, waiter, 0);
spin_lock(&obj->oo_ol_spin);
+
+ if (rc < 0)
+ break;
+
oscl->ols_owner = NULL;
goto restart;
}
spin_unlock(&obj->oo_ol_spin);
+
+ RETURN(rc);
}
/**
GOTO(enqueue_base, 0);
}
- osc_lock_enqueue_wait(env, osc, oscl);
+ result = osc_lock_enqueue_wait(env, osc, oscl);
+ if (result < 0)
+ GOTO(out, result);
/* we can grant lockless lock right after all conflicting locks
* are canceled. */
* osc_lock.
*/
ostid_build_res_name(&osc->oo_oinfo->loi_oi, resname);
- osc_lock_build_einfo(env, lock, osc, &oscl->ols_einfo);
osc_lock_build_policy(env, lock, policy);
if (oscl->ols_agl) {
oscl->ols_einfo.ei_cbdata = NULL;
upcall, cookie,
&oscl->ols_einfo, PTLRPCD_SET, async,
oscl->ols_agl);
- if (result != 0) {
- oscl->ols_state = OLS_CANCELLED;
- osc_lock_wake_waiters(env, osc, oscl);
-
- /* hide error for AGL lock. */
- if (oscl->ols_agl) {
- cl_object_put(env, osc2cl(osc));
- result = 0;
- }
-
- if (anchor != NULL)
- cl_sync_io_note(env, anchor, result);
- } else {
+ if (result == 0) {
if (osc_lock_is_lockless(oscl)) {
oio->oi_lockless = 1;
} else if (!async) {
LASSERT(oscl->ols_hold);
LASSERT(oscl->ols_dlmlock != NULL);
}
+ } else if (oscl->ols_agl) {
+ cl_object_put(env, osc2cl(osc));
+ result = 0;
+ }
+
+out:
+ if (result < 0) {
+ oscl->ols_state = OLS_CANCELLED;
+ osc_lock_wake_waiters(env, osc, oscl);
+
+ if (anchor != NULL)
+ cl_sync_io_note(env, anchor, result);
}
RETURN(result);
}
oscl->ols_flags |= LDLM_FL_BLOCK_GRANTED;
oscl->ols_glimpse = 1;
}
+ osc_lock_build_einfo(env, lock, cl2osc(obj), &oscl->ols_einfo);
cl_lock_slice_add(lock, &oscl->ols_cl, obj, &osc_lock_ops);
l_wait_event(osc->oo_io_waitq, atomic_read(&osc->oo_nr_ios) == 0, &lwi);
- /* Discard all pages of this object. */
+ /* Discard all dirty pages of this object. */
osc_cache_truncate_start(env, osc, 0, NULL);
+ /* Discard all caching pages */
+ osc_lock_discard_pages(env, osc, 0, CL_PAGE_EOF, CLM_WRITE);
+
+ /* Clear ast data of dlm lock. Do this after discarding all pages */
+ osc_object_prune(env, osc2cl(osc));
+
RETURN(0);
}
static void osc_lru_del(struct client_obd *cli, struct osc_page *opg);
static void osc_lru_use(struct client_obd *cli, struct osc_page *opg);
-static int osc_lru_reserve(const struct lu_env *env, struct osc_object *obj,
- struct osc_page *opg);
+static int osc_lru_alloc(const struct lu_env *env, struct client_obd *cli,
+ struct osc_page *opg);
/** \addtogroup osc
* @{
/* reserve an LRU space for this page */
if (page->cp_type == CPT_CACHEABLE && result == 0) {
- result = osc_lru_reserve(env, osc, opg);
+ result = osc_lru_alloc(env, osc_cli(osc), opg);
if (result == 0) {
spin_lock(&osc->oo_tree_lock);
result = radix_tree_insert(&osc->oo_tree, index, opg);
* LRU pages in batch. Therefore, the actual number is adjusted at least
* max_pages_per_rpc.
*/
-long osc_lru_reclaim(struct client_obd *cli, unsigned long npages)
+static long osc_lru_reclaim(struct client_obd *cli, unsigned long npages)
{
struct cl_env_nest nest;
struct lu_env *env;
}
/**
- * osc_lru_reserve() is called to reserve an LRU slot for a cl_page.
+ * osc_lru_alloc() is called to allocate an LRU slot for a cl_page.
*
* Usually the LRU slots are reserved in osc_io_iter_rw_init().
* Only in the case that the LRU slots are in extreme shortage, it should
* have reserved enough slots for an IO.
*/
-static int osc_lru_reserve(const struct lu_env *env, struct osc_object *obj,
- struct osc_page *opg)
+static int osc_lru_alloc(const struct lu_env *env, struct client_obd *cli,
+ struct osc_page *opg)
{
struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
struct osc_io *oio = osc_env_io(env);
- struct client_obd *cli = osc_cli(obj);
int rc = 0;
ENTRY;
}
/**
+ * osc_lru_reserve() is called to reserve enough LRU slots for I/O.
+ *
+ * The benefit of doing this is to reduce contention against atomic counter
+ * cl_lru_left by changing it from per-page access to per-IO access.
+ */
+unsigned long osc_lru_reserve(struct client_obd *cli, unsigned long npages)
+{
+ unsigned long reserved = 0;
+ unsigned long max_pages;
+ unsigned long c;
+
+ /* reserve a full RPC window at most to avoid that a thread accidentally
+ * consumes too many LRU slots */
+ max_pages = cli->cl_max_pages_per_rpc * cli->cl_max_rpcs_in_flight;
+ if (npages > max_pages)
+ npages = max_pages;
+
+ c = atomic_long_read(cli->cl_lru_left);
+ if (c < npages && osc_lru_reclaim(cli, npages) > 0)
+ c = atomic_long_read(cli->cl_lru_left);
+ while (c >= npages) {
+ if (c == atomic_long_cmpxchg(cli->cl_lru_left, c, c - npages)) {
+ reserved = npages;
+ break;
+ }
+ c = atomic_long_read(cli->cl_lru_left);
+ }
+ if (atomic_long_read(cli->cl_lru_left) < max_pages) {
+ /* If there aren't enough pages in the per-OSC LRU then
+ * wake up the LRU thread to try and clear out space, so
+ * we don't block if pages are being dirtied quickly. */
+ CDEBUG(D_CACHE, "%s: queue LRU, left: %lu/%ld.\n",
+ cli_name(cli), atomic_long_read(cli->cl_lru_left),
+ max_pages);
+ (void)ptlrpcd_queue_work(cli->cl_lru_work);
+ }
+
+ return reserved;
+}
+
+/**
+ * osc_lru_unreserve() is called to unreserve LRU slots.
+ *
+ * LRU slots reserved by osc_lru_reserve() may have entries left due to several
+ * reasons such as page already existing or I/O error. Those reserved slots
+ * should be freed by calling this function.
+ */
+void osc_lru_unreserve(struct client_obd *cli, unsigned long npages)
+{
+ atomic_long_add(npages, cli->cl_lru_left);
+ wake_up_all(&osc_lru_waitq);
+}
+
+/**
* Atomic operations are expensive. We accumulate the accounting for the
* same page zone to get better performance.
* In practice this can work pretty good because the pages in the same RPC
}
no_match:
- if (*flags & LDLM_FL_TEST_LOCK)
+ if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
RETURN(-ENOLCK);
if (intent) {
osc = lock->l_ast_data;
cl_object_get(osc2cl(osc));
}
- lock->l_ast_data = NULL;
+
+ /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
+ * by the 2nd round of ldlm_namespace_clean() call in
+ * osc_import_event(). */
+ ldlm_clear_cleaned(lock);
}
unlock_res(res);