X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fobdclass%2Fcl_lock.c;h=2dba495d79de992366990b192eaae17add85af86;hp=21919b37639e3041f1fd46497961221fc3004757;hb=e2af7fb3c91dfb13d34d8e1b2f2df8c09621f768;hpb=cdb698a1a036870b6c9d8e51f69809c558d4823a diff --git a/lustre/obdclass/cl_lock.c b/lustre/obdclass/cl_lock.c index 21919b3..2dba495 100644 --- a/lustre/obdclass/cl_lock.c +++ b/lustre/obdclass/cl_lock.c @@ -1,6 +1,4 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * +/* * GPL HEADER START * * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. @@ -28,6 +26,8 @@ /* * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * + * Copyright (c) 2011, 2012, Whamcloud, Inc. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -39,9 +39,6 @@ */ #define DEBUG_SUBSYSTEM S_CLASS -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif #include #include @@ -295,14 +292,12 @@ static void cl_lock_free(const struct lu_env *env, struct cl_lock *lock) void cl_lock_put(const struct lu_env *env, struct cl_lock *lock) { struct cl_object *obj; - struct cl_object_header *head; struct cl_site *site; LINVRNT(cl_lock_invariant(env, lock)); ENTRY; obj = lock->cll_descr.cld_obj; LINVRNT(obj != NULL); - head = cl_object_header(obj); site = cl_object_site(obj); CDEBUG(D_TRACE, "releasing reference: %d %p %lu\n", @@ -594,7 +589,6 @@ struct cl_lock *cl_lock_peek(const struct lu_env *env, const struct cl_io *io, struct cl_object_header *head; struct cl_object *obj; struct cl_lock *lock; - int ok; obj = need->cld_obj; head = cl_object_header(obj); @@ -615,14 +609,14 @@ struct cl_lock *cl_lock_peek(const struct lu_env *env, const struct cl_io *io, if (result < 0) cl_lock_error(env, lock, result); } - ok = lock->cll_state == CLS_HELD; - if (ok) { + if (lock->cll_state == CLS_HELD) { cl_lock_hold_add(env, lock, scope, source); cl_lock_user_add(env, lock); + cl_lock_mutex_put(env, lock); + cl_lock_lockdep_acquire(env, lock, 0); cl_lock_put(env, lock); - } - cl_lock_mutex_put(env, lock); - if (!ok) { + } else { + cl_lock_mutex_put(env, lock); cl_lock_put(env, lock); lock = NULL; } @@ -924,7 +918,6 @@ static void cl_lock_hold_release(const struct lu_env *env, struct cl_lock *lock, EXIT; } - /** * Waits until lock state is changed. * @@ -1302,7 +1295,8 @@ static int cl_enqueue_locked(const struct lu_env *env, struct cl_lock *lock, cl_lock_user_del(env, lock); cl_lock_error(env, lock, result); } - LASSERT(ergo(result == 0, lock->cll_state == CLS_ENQUEUED || + LASSERT(ergo(result == 0 && !(enqflags & CEF_AGL), + lock->cll_state == CLS_ENQUEUED || lock->cll_state == CLS_HELD)); RETURN(result); } @@ -1886,68 +1880,81 @@ struct cl_lock *cl_lock_at_page(const struct lu_env *env, struct cl_object *obj, EXPORT_SYMBOL(cl_lock_at_page); /** - * Returns a list of pages protected (only) by a given lock. - * - * Scans an extent of page radix tree, corresponding to the \a lock and queues - * all pages that are not protected by locks other than \a lock into \a queue. + * Calculate the page offset at the layer of @lock. + * At the time of this writing, @page is top page and @lock is sub lock. */ -void cl_lock_page_list_fixup(const struct lu_env *env, - struct cl_io *io, struct cl_lock *lock, - struct cl_page_list *queue) +static pgoff_t pgoff_at_lock(struct cl_page *page, struct cl_lock *lock) { - struct cl_page *page; - struct cl_page *temp; - struct cl_page_list *plist = &cl_env_info(env)->clt_list; - - LINVRNT(cl_lock_invariant(env, lock)); - ENTRY; + struct lu_device_type *dtype; + const struct cl_page_slice *slice; - /* Now, we have a list of cl_pages under the \a lock, we need - * to check if some of pages are covered by other ldlm lock. - * If this is the case, they aren't needed to be written out this time. - * - * For example, we have A:[0,200] & B:[100,300] PW locks on client, now - * the latter is to be canceled, this means other client is - * reading/writing [200,300] since A won't canceled. Actually - * we just need to write the pages covered by [200,300]. This is safe, - * since [100,200] is also protected lock A. - */ + dtype = lock->cll_descr.cld_obj->co_lu.lo_dev->ld_type; + slice = cl_page_at(page, dtype); + LASSERT(slice != NULL); + return slice->cpl_page->cp_index; +} - cl_page_list_init(plist); - cl_page_list_for_each_safe(page, temp, queue) { - pgoff_t idx = page->cp_index; - struct cl_lock *found; - struct cl_lock_descr *descr; - - /* The algorithm counts on the index-ascending page index. */ - LASSERT(ergo(&temp->cp_batch != &queue->pl_pages, - page->cp_index < temp->cp_index)); - - found = cl_lock_at_page(env, lock->cll_descr.cld_obj, - page, lock, 1, 0); - if (found == NULL) - continue; - - descr = &found->cll_descr; - cfs_list_for_each_entry_safe_from(page, temp, &queue->pl_pages, - cp_batch) { - idx = page->cp_index; - if (descr->cld_start > idx || descr->cld_end < idx) - break; - cl_page_list_move(plist, queue, page); +/** + * Check if page @page is covered by an extra lock or discard it. + */ +static int check_and_discard_cb(const struct lu_env *env, struct cl_io *io, + struct cl_page *page, void *cbdata) +{ + struct cl_thread_info *info = cl_env_info(env); + struct cl_lock *lock = cbdata; + pgoff_t index = pgoff_at_lock(page, lock); + + if (index >= info->clt_fn_index) { + struct cl_lock *tmp; + + /* refresh non-overlapped index */ + tmp = cl_lock_at_page(env, lock->cll_descr.cld_obj, page, lock, + 1, 0); + if (tmp != NULL) { + /* Cache the first-non-overlapped index so as to skip + * all pages within [index, clt_fn_index). This + * is safe because if tmp lock is canceled, it will + * discard these pages. */ + info->clt_fn_index = tmp->cll_descr.cld_end + 1; + if (tmp->cll_descr.cld_end == CL_PAGE_EOF) + info->clt_fn_index = CL_PAGE_EOF; + cl_lock_put(env, tmp); + } else if (cl_page_own(env, io, page) == 0) { + /* discard the page */ + cl_page_unmap(env, io, page); + cl_page_discard(env, io, page); + cl_page_disown(env, io, page); + } else { + LASSERT(page->cp_state == CPS_FREEING); } - cl_lock_put(env, found); } - /* The pages in plist are covered by other locks, don't handle them - * this time. - */ - if (io != NULL) - cl_page_list_disown(env, io, plist); - cl_page_list_fini(env, plist); - EXIT; + info->clt_next_index = index + 1; + return CLP_GANG_OKAY; +} + +static int pageout_cb(const struct lu_env *env, struct cl_io *io, + struct cl_page *page, void *cbdata) +{ + struct cl_thread_info *info = cl_env_info(env); + struct cl_page_list *queue = &info->clt_queue.c2_qin; + struct cl_lock *lock = cbdata; + typeof(cl_page_own) *page_own; + int rc = CLP_GANG_OKAY; + + page_own = queue->pl_nr ? cl_page_own_try : cl_page_own; + if (page_own(env, io, page) == 0) { + cl_page_list_add(queue, page); + info->clt_next_index = pgoff_at_lock(page, lock) + 1; + } else if (page->cp_state != CPS_FREEING) { + /* cl_page_own() won't fail unless + * the page is being freed. */ + LASSERT(queue->pl_nr != 0); + rc = CLP_GANG_AGAIN; + } + + return rc; } -EXPORT_SYMBOL(cl_lock_page_list_fixup); /** * Invalidate pages protected by the given lock, sending them out to the @@ -1978,8 +1985,9 @@ int cl_lock_page_out(const struct lu_env *env, struct cl_lock *lock, struct cl_io *io = &info->clt_io; struct cl_2queue *queue = &info->clt_queue; struct cl_lock_descr *descr = &lock->cll_descr; + cl_page_gang_cb_t cb; long page_count; - int nonblock = 1, resched; + int res; int result; LINVRNT(cl_lock_invariant(env, lock)); @@ -1990,13 +1998,19 @@ int cl_lock_page_out(const struct lu_env *env, struct cl_lock *lock, if (result != 0) GOTO(out, result); + cb = descr->cld_mode == CLM_READ ? check_and_discard_cb : pageout_cb; + info->clt_fn_index = info->clt_next_index = descr->cld_start; do { cl_2queue_init(queue); - cl_page_gang_lookup(env, descr->cld_obj, io, descr->cld_start, - descr->cld_end, &queue->c2_qin, nonblock, - &resched); + res = cl_page_gang_lookup(env, descr->cld_obj, io, + info->clt_next_index, descr->cld_end, + cb, (void *)lock); page_count = queue->c2_qin.pl_nr; if (page_count > 0) { + /* must be writeback case */ + LASSERTF(descr->cld_mode >= CLM_WRITE, "lock mode %s\n", + cl_lock_mode_name(descr->cld_mode)); + result = cl_page_list_unmap(env, io, &queue->c2_qin); if (!discard) { long timeout = 600; /* 10 minutes. */ @@ -2011,15 +2025,17 @@ int cl_lock_page_out(const struct lu_env *env, struct cl_lock *lock, CWARN("Writing %lu pages error: %d\n", page_count, result); } - cl_lock_page_list_fixup(env, io, lock, &queue->c2_qout); cl_2queue_discard(env, io, queue); cl_2queue_disown(env, io, queue); + cl_2queue_fini(env, queue); } - cl_2queue_fini(env, queue); - if (resched) + if (info->clt_next_index > descr->cld_end) + break; + + if (res == CLP_GANG_RESCHED) cfs_cond_resched(); - } while (resched || nonblock--); + } while (res != CLP_GANG_OKAY); out: cl_io_fini(env, io); RETURN(result); @@ -2055,10 +2071,22 @@ void cl_locks_prune(const struct lu_env *env, struct cl_object *obj, int cancel) cl_lock_get_trust(lock); cfs_spin_unlock(&head->coh_lock_guard); lu_ref_add(&lock->cll_reference, "prune", cfs_current()); + +again: cl_lock_mutex_get(env, lock); if (lock->cll_state < CLS_FREEING) { LASSERT(lock->cll_holds == 0); - LASSERT(lock->cll_users == 0); + LASSERT(lock->cll_users <= 1); + if (unlikely(lock->cll_users == 1)) { + struct l_wait_info lwi = { 0 }; + + cl_lock_mutex_put(env, lock); + l_wait_event(lock->cll_wq, + lock->cll_users == 0, + &lwi); + goto again; + } + if (cancel) cl_lock_cancel(env, lock); cl_lock_delete(env, lock); @@ -2137,25 +2165,34 @@ struct cl_lock *cl_lock_request(const struct lu_env *env, struct cl_io *io, ENTRY; do { lock = cl_lock_hold_mutex(env, io, need, scope, source); - if (!IS_ERR(lock)) { - rc = cl_enqueue_locked(env, lock, io, enqflags); - if (rc == 0) { - if (cl_lock_fits_into(env, lock, need, io)) { + if (IS_ERR(lock)) + break; + + rc = cl_enqueue_locked(env, lock, io, enqflags); + if (rc == 0) { + if (cl_lock_fits_into(env, lock, need, io)) { + if (!(enqflags & CEF_AGL)) { cl_lock_mutex_put(env, lock); - cl_lock_lockdep_acquire(env, - lock, enqflags); + cl_lock_lockdep_acquire(env, lock, + enqflags); break; } - cl_unuse_locked(env, lock); + rc = 1; } - cl_lock_trace(D_DLMTRACE, env, "enqueue failed", lock); - cl_lock_hold_release(env, lock, scope, source); - cl_lock_mutex_put(env, lock); - lu_ref_del(&lock->cll_reference, scope, source); - cl_lock_put(env, lock); + cl_unuse_locked(env, lock); + } + cl_lock_trace(D_DLMTRACE, env, + rc <= 0 ? "enqueue failed" : "agl succeed", lock); + cl_lock_hold_release(env, lock, scope, source); + cl_lock_mutex_put(env, lock); + lu_ref_del(&lock->cll_reference, scope, source); + cl_lock_put(env, lock); + if (rc > 0) { + LASSERT(enqflags & CEF_AGL); + lock = NULL; + } else if (rc != 0) { lock = ERR_PTR(rc); - } else - rc = PTR_ERR(lock); + } } while (rc == 0); RETURN(lock); } @@ -2225,7 +2262,7 @@ void cl_lock_user_add(const struct lu_env *env, struct cl_lock *lock) } EXPORT_SYMBOL(cl_lock_user_add); -int cl_lock_user_del(const struct lu_env *env, struct cl_lock *lock) +void cl_lock_user_del(const struct lu_env *env, struct cl_lock *lock) { LINVRNT(cl_lock_is_mutexed(lock)); LINVRNT(cl_lock_invariant(env, lock)); @@ -2233,7 +2270,9 @@ int cl_lock_user_del(const struct lu_env *env, struct cl_lock *lock) ENTRY; cl_lock_used_mod(env, lock, -1); - RETURN(lock->cll_users == 0); + if (lock->cll_users == 0) + cfs_waitq_broadcast(&lock->cll_wq); + EXIT; } EXPORT_SYMBOL(cl_lock_user_del);