1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 * Author: Nikita Danilov <nikita.danilov@sun.com>
41 #define DEBUG_SUBSYSTEM S_CLASS
43 # define EXPORT_SYMTAB
46 #include <obd_class.h>
47 #include <obd_support.h>
48 #include <lustre_fid.h>
49 #include <libcfs/list.h>
50 /* lu_time_global_{init,fini}() */
53 #include <cl_object.h>
54 #include "cl_internal.h"
56 /** Lock class of cl_lock::cll_guard */
57 static struct lock_class_key cl_lock_guard_class;
58 static cfs_mem_cache_t *cl_lock_kmem;
60 static struct lu_kmem_descr cl_lock_caches[] = {
62 .ckd_cache = &cl_lock_kmem,
63 .ckd_name = "cl_lock_kmem",
64 .ckd_size = sizeof (struct cl_lock)
72 * Basic lock invariant that is maintained at all times. Caller either has a
73 * reference to \a lock, or somehow assures that \a lock cannot be freed.
75 * \see cl_lock_invariant()
77 static int cl_lock_invariant_trusted(const struct lu_env *env,
78 const struct cl_lock *lock)
82 ergo(lock->cll_state == CLS_FREEING, lock->cll_holds == 0) &&
83 atomic_read(&lock->cll_ref) >= lock->cll_holds &&
84 lock->cll_holds >= lock->cll_users &&
85 lock->cll_holds >= 0 &&
86 lock->cll_users >= 0 &&
91 * Stronger lock invariant, checking that caller has a reference on a lock.
93 * \see cl_lock_invariant_trusted()
95 static int cl_lock_invariant(const struct lu_env *env,
96 const struct cl_lock *lock)
100 result = atomic_read(&lock->cll_ref) > 0 &&
101 cl_lock_invariant_trusted(env, lock);
102 if (!result && env != NULL)
103 CL_LOCK_DEBUG(D_ERROR, env, lock, "invariant broken");
107 #define RETIP ((unsigned long)__builtin_return_address(0))
109 #ifdef CONFIG_LOCKDEP
110 static struct lock_class_key cl_lock_key;
112 static void cl_lock_lockdep_init(struct cl_lock *lock)
114 lockdep_set_class_and_name(lock, &cl_lock_key, "EXT");
117 static void cl_lock_lockdep_acquire(const struct lu_env *env,
118 struct cl_lock *lock, __u32 enqflags)
120 cl_env_info(env)->clt_nr_locks_acquired++;
121 lock_acquire(&lock->dep_map, !!(enqflags & CEF_ASYNC),
122 /* try: */ 0, lock->cll_descr.cld_mode <= CLM_READ,
123 /* check: */ 2, RETIP);
126 static void cl_lock_lockdep_release(const struct lu_env *env,
127 struct cl_lock *lock)
129 cl_env_info(env)->clt_nr_locks_acquired--;
130 lock_release(&lock->dep_map, 0, RETIP);
133 #else /* !CONFIG_LOCKDEP */
135 static void cl_lock_lockdep_init(struct cl_lock *lock)
137 static void cl_lock_lockdep_acquire(const struct lu_env *env,
138 struct cl_lock *lock, __u32 enqflags)
140 static void cl_lock_lockdep_release(const struct lu_env *env,
141 struct cl_lock *lock)
144 #endif /* !CONFIG_LOCKDEP */
147 * Adds lock slice to the compound lock.
149 * This is called by cl_object_operations::coo_lock_init() methods to add a
150 * per-layer state to the lock. New state is added at the end of
151 * cl_lock::cll_layers list, that is, it is at the bottom of the stack.
153 * \see cl_req_slice_add(), cl_page_slice_add(), cl_io_slice_add()
155 void cl_lock_slice_add(struct cl_lock *lock, struct cl_lock_slice *slice,
156 struct cl_object *obj,
157 const struct cl_lock_operations *ops)
160 slice->cls_lock = lock;
161 list_add_tail(&slice->cls_linkage, &lock->cll_layers);
162 slice->cls_obj = obj;
163 slice->cls_ops = ops;
166 EXPORT_SYMBOL(cl_lock_slice_add);
169 * Returns true iff a lock with the mode \a has provides at least the same
170 * guarantees as a lock with the mode \a need.
172 int cl_lock_mode_match(enum cl_lock_mode has, enum cl_lock_mode need)
174 LINVRNT(need == CLM_READ || need == CLM_WRITE || need == CLM_PHANTOM);
175 LINVRNT(has == CLM_READ || has == CLM_WRITE || has == CLM_PHANTOM);
176 CLASSERT(CLM_PHANTOM < CLM_READ);
177 CLASSERT(CLM_READ < CLM_WRITE);
181 EXPORT_SYMBOL(cl_lock_mode_match);
184 * Returns true iff extent portions of lock descriptions match.
186 int cl_lock_ext_match(const struct cl_lock_descr *has,
187 const struct cl_lock_descr *need)
190 has->cld_start <= need->cld_start &&
191 has->cld_end >= need->cld_end &&
192 cl_lock_mode_match(has->cld_mode, need->cld_mode);
194 EXPORT_SYMBOL(cl_lock_ext_match);
197 * Returns true iff a lock with the description \a has provides at least the
198 * same guarantees as a lock with the description \a need.
200 int cl_lock_descr_match(const struct cl_lock_descr *has,
201 const struct cl_lock_descr *need)
204 cl_object_same(has->cld_obj, need->cld_obj) &&
205 cl_lock_ext_match(has, need);
207 EXPORT_SYMBOL(cl_lock_descr_match);
209 static void cl_lock_free(const struct lu_env *env, struct cl_lock *lock)
211 struct cl_object *obj = lock->cll_descr.cld_obj;
213 LASSERT(cl_is_lock(lock));
214 LINVRNT(!cl_lock_is_mutexed(lock));
215 LINVRNT(!mutex_is_locked(&lock->cll_guard));
219 while (!list_empty(&lock->cll_layers)) {
220 struct cl_lock_slice *slice;
222 slice = list_entry(lock->cll_layers.next, struct cl_lock_slice,
224 list_del_init(lock->cll_layers.next);
225 slice->cls_ops->clo_fini(env, slice);
227 atomic_dec(&cl_object_site(obj)->cs_locks.cs_total);
228 atomic_dec(&cl_object_site(obj)->cs_locks_state[lock->cll_state]);
229 lu_object_ref_del_at(&obj->co_lu, lock->cll_obj_ref, "cl_lock", lock);
230 cl_object_put(env, obj);
231 lu_ref_fini(&lock->cll_reference);
232 lu_ref_fini(&lock->cll_holders);
233 mutex_destroy(&lock->cll_guard);
234 OBD_SLAB_FREE_PTR(lock, cl_lock_kmem);
239 * Releases a reference on a lock.
241 * When last reference is released, lock is returned to the cache, unless it
242 * is in cl_lock_state::CLS_FREEING state, in which case it is destroyed
245 * \see cl_object_put(), cl_page_put()
247 void cl_lock_put(const struct lu_env *env, struct cl_lock *lock)
249 struct cl_object *obj;
250 struct cl_object_header *head;
251 struct cl_site *site;
253 LINVRNT(cl_lock_invariant(env, lock));
255 obj = lock->cll_descr.cld_obj;
256 LINVRNT(obj != NULL);
257 head = cl_object_header(obj);
258 site = cl_object_site(obj);
260 CDEBUG(D_DLMTRACE, "releasing reference: %d %p %lu\n",
261 atomic_read(&lock->cll_ref), lock, RETIP);
263 if (atomic_dec_and_test(&lock->cll_ref)) {
264 if (lock->cll_state == CLS_FREEING) {
265 LASSERT(list_empty(&lock->cll_linkage));
266 cl_lock_free(env, lock);
268 atomic_dec(&site->cs_locks.cs_busy);
272 EXPORT_SYMBOL(cl_lock_put);
275 * Acquires an additional reference to a lock.
277 * This can be called only by caller already possessing a reference to \a
280 * \see cl_object_get(), cl_page_get()
282 void cl_lock_get(struct cl_lock *lock)
284 LINVRNT(cl_lock_invariant(NULL, lock));
285 CDEBUG(D_DLMTRACE|D_TRACE, "acquiring reference: %d %p %lu\n",
286 atomic_read(&lock->cll_ref), lock, RETIP);
287 atomic_inc(&lock->cll_ref);
289 EXPORT_SYMBOL(cl_lock_get);
292 * Acquires a reference to a lock.
294 * This is much like cl_lock_get(), except that this function can be used to
295 * acquire initial reference to the cached lock. Caller has to deal with all
296 * possible races. Use with care!
298 * \see cl_page_get_trust()
300 void cl_lock_get_trust(struct cl_lock *lock)
302 struct cl_site *site = cl_object_site(lock->cll_descr.cld_obj);
304 LASSERT(cl_is_lock(lock));
305 CDEBUG(D_DLMTRACE|D_TRACE, "acquiring trusted reference: %d %p %lu\n",
306 atomic_read(&lock->cll_ref), lock, RETIP);
307 if (atomic_inc_return(&lock->cll_ref) == 1)
308 atomic_inc(&site->cs_locks.cs_busy);
310 EXPORT_SYMBOL(cl_lock_get_trust);
313 * Helper function destroying the lock that wasn't completely initialized.
315 * Other threads can acquire references to the top-lock through its
316 * sub-locks. Hence, it cannot be cl_lock_free()-ed immediately.
318 static void cl_lock_finish(const struct lu_env *env, struct cl_lock *lock)
320 cl_lock_mutex_get(env, lock);
321 cl_lock_delete(env, lock);
322 cl_lock_mutex_put(env, lock);
323 cl_lock_put(env, lock);
326 static struct cl_lock *cl_lock_alloc(const struct lu_env *env,
327 struct cl_object *obj,
328 const struct cl_io *io,
329 const struct cl_lock_descr *descr)
331 struct cl_lock *lock;
332 struct lu_object_header *head;
333 struct cl_site *site = cl_object_site(obj);
336 OBD_SLAB_ALLOC_PTR(lock, cl_lock_kmem);
338 atomic_set(&lock->cll_ref, 1);
339 lock->cll_descr = *descr;
340 lock->cll_state = CLS_NEW;
342 lock->cll_obj_ref = lu_object_ref_add(&obj->co_lu,
344 CFS_INIT_LIST_HEAD(&lock->cll_layers);
345 CFS_INIT_LIST_HEAD(&lock->cll_linkage);
346 CFS_INIT_LIST_HEAD(&lock->cll_inclosure);
347 lu_ref_init(&lock->cll_reference);
348 lu_ref_init(&lock->cll_holders);
349 mutex_init(&lock->cll_guard);
350 lockdep_set_class(&lock->cll_guard, &cl_lock_guard_class);
351 cfs_waitq_init(&lock->cll_wq);
352 head = obj->co_lu.lo_header;
353 atomic_inc(&site->cs_locks_state[CLS_NEW]);
354 atomic_inc(&site->cs_locks.cs_total);
355 atomic_inc(&site->cs_locks.cs_created);
356 cl_lock_lockdep_init(lock);
357 list_for_each_entry(obj, &head->loh_layers, co_lu.lo_linkage) {
360 err = obj->co_ops->coo_lock_init(env, obj, lock, io);
362 cl_lock_finish(env, lock);
368 lock = ERR_PTR(-ENOMEM);
373 * Returns true iff lock is "suitable" for given io. E.g., locks acquired by
374 * truncate and O_APPEND cannot be reused for read/non-append-write, as they
375 * cover multiple stripes and can trigger cascading timeouts.
377 static int cl_lock_fits_into(const struct lu_env *env,
378 const struct cl_lock *lock,
379 const struct cl_lock_descr *need,
380 const struct cl_io *io)
382 const struct cl_lock_slice *slice;
384 LINVRNT(cl_lock_invariant_trusted(env, lock));
386 list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
387 if (slice->cls_ops->clo_fits_into != NULL &&
388 !slice->cls_ops->clo_fits_into(env, slice, need, io))
394 static struct cl_lock *cl_lock_lookup(const struct lu_env *env,
395 struct cl_object *obj,
396 const struct cl_io *io,
397 const struct cl_lock_descr *need)
399 struct cl_lock *lock;
400 struct cl_object_header *head;
401 struct cl_site *site;
405 head = cl_object_header(obj);
406 site = cl_object_site(obj);
407 LINVRNT(spin_is_locked(&head->coh_lock_guard));
408 atomic_inc(&site->cs_locks.cs_lookup);
409 list_for_each_entry(lock, &head->coh_locks, cll_linkage) {
412 LASSERT(cl_is_lock(lock));
413 matched = cl_lock_ext_match(&lock->cll_descr, need) &&
414 lock->cll_state < CLS_FREEING &&
415 !(lock->cll_flags & CLF_CANCELLED) &&
416 cl_lock_fits_into(env, lock, need, io);
417 CDEBUG(D_DLMTRACE, "has: "DDESCR"(%i) need: "DDESCR": %d\n",
418 PDESCR(&lock->cll_descr), lock->cll_state, PDESCR(need),
421 cl_lock_get_trust(lock);
422 /* move the lock to the LRU head */
423 list_move(&lock->cll_linkage, &head->coh_locks);
424 atomic_inc(&cl_object_site(obj)->cs_locks.cs_hit);
432 * Returns a lock matching description \a need.
434 * This is the main entry point into the cl_lock caching interface. First, a
435 * cache (implemented as a per-object linked list) is consulted. If lock is
436 * found there, it is returned immediately. Otherwise new lock is allocated
437 * and returned. In any case, additional reference to lock is acquired.
439 * \see cl_object_find(), cl_page_find()
441 static struct cl_lock *cl_lock_find(const struct lu_env *env,
442 const struct cl_io *io,
443 const struct cl_lock_descr *need)
445 struct cl_object_header *head;
446 struct cl_object *obj;
447 struct cl_lock *lock;
448 struct cl_site *site;
453 head = cl_object_header(obj);
454 site = cl_object_site(obj);
456 spin_lock(&head->coh_lock_guard);
457 lock = cl_lock_lookup(env, obj, io, need);
458 spin_unlock(&head->coh_lock_guard);
461 lock = cl_lock_alloc(env, obj, io, need);
463 struct cl_lock *ghost;
465 spin_lock(&head->coh_lock_guard);
466 ghost = cl_lock_lookup(env, obj, io, need);
468 list_add(&lock->cll_linkage, &head->coh_locks);
469 spin_unlock(&head->coh_lock_guard);
470 atomic_inc(&site->cs_locks.cs_busy);
472 spin_unlock(&head->coh_lock_guard);
474 * Other threads can acquire references to the
475 * top-lock through its sub-locks. Hence, it
476 * cannot be cl_lock_free()-ed immediately.
478 cl_lock_finish(env, lock);
487 * Returns existing lock matching given description. This is similar to
488 * cl_lock_find() except that no new lock is created, and returned lock is
489 * guaranteed to be in enum cl_lock_state::CLS_HELD state.
491 struct cl_lock *cl_lock_peek(const struct lu_env *env, const struct cl_io *io,
492 const struct cl_lock_descr *need,
493 const char *scope, const void *source)
495 struct cl_object_header *head;
496 struct cl_object *obj;
497 struct cl_lock *lock;
500 head = cl_object_header(obj);
502 spin_lock(&head->coh_lock_guard);
503 lock = cl_lock_lookup(env, obj, io, need);
504 spin_unlock(&head->coh_lock_guard);
509 cl_lock_mutex_get(env, lock);
510 if (lock->cll_state == CLS_CACHED)
511 cl_use_try(env, lock);
512 ok = lock->cll_state == CLS_HELD;
514 cl_lock_hold_add(env, lock, scope, source);
515 cl_lock_user_add(env, lock);
517 cl_lock_mutex_put(env, lock);
519 cl_lock_put(env, lock);
525 EXPORT_SYMBOL(cl_lock_peek);
528 * Returns a slice within a lock, corresponding to the given layer in the
533 const struct cl_lock_slice *cl_lock_at(const struct cl_lock *lock,
534 const struct lu_device_type *dtype)
536 const struct cl_lock_slice *slice;
538 LINVRNT(cl_lock_invariant_trusted(NULL, lock));
541 list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
542 if (slice->cls_obj->co_lu.lo_dev->ld_type == dtype)
547 EXPORT_SYMBOL(cl_lock_at);
549 static void cl_lock_trace(struct cl_thread_info *info,
550 const char *prefix, const struct cl_lock *lock)
552 CDEBUG(D_DLMTRACE|D_TRACE, "%s: %i@%p %p %i %i\n", prefix,
553 atomic_read(&lock->cll_ref), lock, lock->cll_guarder,
554 lock->cll_depth, info->clt_nr_locks_locked);
557 static void cl_lock_mutex_tail(const struct lu_env *env, struct cl_lock *lock)
559 struct cl_thread_info *info;
561 info = cl_env_info(env);
563 info->clt_nr_locks_locked++;
564 lu_ref_add(&info->clt_locks_locked, "cll_guard", lock);
565 cl_lock_trace(info, "got mutex", lock);
569 * Locks cl_lock object.
571 * This is used to manipulate cl_lock fields, and to serialize state
572 * transitions in the lock state machine.
574 * \post cl_lock_is_mutexed(lock)
576 * \see cl_lock_mutex_put()
578 void cl_lock_mutex_get(const struct lu_env *env, struct cl_lock *lock)
580 LINVRNT(cl_lock_invariant(env, lock));
582 if (lock->cll_guarder == cfs_current()) {
583 LINVRNT(cl_lock_is_mutexed(lock));
584 LINVRNT(lock->cll_depth > 0);
586 struct cl_object_header *hdr;
588 LINVRNT(lock->cll_guarder != cfs_current());
589 hdr = cl_object_header(lock->cll_descr.cld_obj);
590 mutex_lock_nested(&lock->cll_guard, hdr->coh_nesting);
591 lock->cll_guarder = cfs_current();
592 LINVRNT(lock->cll_depth == 0);
594 cl_lock_mutex_tail(env, lock);
596 EXPORT_SYMBOL(cl_lock_mutex_get);
599 * Try-locks cl_lock object.
601 * \retval 0 \a lock was successfully locked
603 * \retval -EBUSY \a lock cannot be locked right now
605 * \post ergo(result == 0, cl_lock_is_mutexed(lock))
607 * \see cl_lock_mutex_get()
609 int cl_lock_mutex_try(const struct lu_env *env, struct cl_lock *lock)
613 LINVRNT(cl_lock_invariant_trusted(env, lock));
617 if (lock->cll_guarder == cfs_current()) {
618 LINVRNT(lock->cll_depth > 0);
619 cl_lock_mutex_tail(env, lock);
620 } else if (mutex_trylock(&lock->cll_guard)) {
621 LINVRNT(lock->cll_depth == 0);
622 lock->cll_guarder = cfs_current();
623 cl_lock_mutex_tail(env, lock);
628 EXPORT_SYMBOL(cl_lock_mutex_try);
631 * Unlocks cl_lock object.
633 * \pre cl_lock_is_mutexed(lock)
635 * \see cl_lock_mutex_get()
637 void cl_lock_mutex_put(const struct lu_env *env, struct cl_lock *lock)
639 struct cl_thread_info *info;
641 LINVRNT(cl_lock_invariant(env, lock));
642 LINVRNT(cl_lock_is_mutexed(lock));
643 LINVRNT(lock->cll_guarder == cfs_current());
644 LINVRNT(lock->cll_depth > 0);
646 info = cl_env_info(env);
647 LINVRNT(info->clt_nr_locks_locked > 0);
649 cl_lock_trace(info, "put mutex", lock);
650 lu_ref_del(&info->clt_locks_locked, "cll_guard", lock);
651 info->clt_nr_locks_locked--;
652 if (--lock->cll_depth == 0) {
653 lock->cll_guarder = NULL;
654 mutex_unlock(&lock->cll_guard);
657 EXPORT_SYMBOL(cl_lock_mutex_put);
660 * Returns true iff lock's mutex is owned by the current thread.
662 int cl_lock_is_mutexed(struct cl_lock *lock)
664 return lock->cll_guarder == cfs_current();
666 EXPORT_SYMBOL(cl_lock_is_mutexed);
669 * Returns number of cl_lock mutices held by the current thread (environment).
671 int cl_lock_nr_mutexed(const struct lu_env *env)
673 return cl_env_info(env)->clt_nr_locks_locked;
675 EXPORT_SYMBOL(cl_lock_nr_mutexed);
677 static void cl_lock_cancel0(const struct lu_env *env, struct cl_lock *lock)
679 LINVRNT(cl_lock_is_mutexed(lock));
680 LINVRNT(cl_lock_invariant(env, lock));
682 if (!(lock->cll_flags & CLF_CANCELLED)) {
683 const struct cl_lock_slice *slice;
685 lock->cll_flags |= CLF_CANCELLED;
686 list_for_each_entry_reverse(slice, &lock->cll_layers,
688 if (slice->cls_ops->clo_cancel != NULL)
689 slice->cls_ops->clo_cancel(env, slice);
695 static void cl_lock_delete0(const struct lu_env *env, struct cl_lock *lock)
697 struct cl_object_header *head;
698 const struct cl_lock_slice *slice;
700 LINVRNT(cl_lock_is_mutexed(lock));
701 LINVRNT(cl_lock_invariant(env, lock));
704 if (lock->cll_state < CLS_FREEING) {
705 cl_lock_state_set(env, lock, CLS_FREEING);
707 head = cl_object_header(lock->cll_descr.cld_obj);
709 spin_lock(&head->coh_lock_guard);
710 list_del_init(&lock->cll_linkage);
712 * No locks, no pages. This is only valid for bottom sub-locks
713 * and head->coh_nesting == 1 check assumes two level top-sub
716 LASSERT(ergo(head->coh_nesting == 1 &&
717 list_empty(&head->coh_locks), !head->coh_pages));
718 spin_unlock(&head->coh_lock_guard);
720 * From now on, no new references to this lock can be acquired
721 * by cl_lock_lookup().
723 list_for_each_entry_reverse(slice, &lock->cll_layers,
725 if (slice->cls_ops->clo_delete != NULL)
726 slice->cls_ops->clo_delete(env, slice);
729 * From now on, no new references to this lock can be acquired
730 * by layer-specific means (like a pointer from struct
731 * ldlm_lock in osc, or a pointer from top-lock to sub-lock in
734 * Lock will be finally freed in cl_lock_put() when last of
735 * existing references goes away.
741 static void cl_lock_hold_mod(const struct lu_env *env, struct cl_lock *lock,
744 struct cl_thread_info *cti;
745 struct cl_object_header *hdr;
747 cti = cl_env_info(env);
748 hdr = cl_object_header(lock->cll_descr.cld_obj);
749 lock->cll_holds += delta;
750 if (hdr->coh_nesting == 0) {
751 cti->clt_nr_held += delta;
752 LASSERT(cti->clt_nr_held >= 0);
756 static void cl_lock_used_mod(const struct lu_env *env, struct cl_lock *lock,
759 struct cl_thread_info *cti;
760 struct cl_object_header *hdr;
762 cti = cl_env_info(env);
763 hdr = cl_object_header(lock->cll_descr.cld_obj);
764 lock->cll_users += delta;
765 if (hdr->coh_nesting == 0) {
766 cti->clt_nr_used += delta;
767 LASSERT(cti->clt_nr_used >= 0);
771 static void cl_lock_hold_release(const struct lu_env *env, struct cl_lock *lock,
772 const char *scope, const void *source)
774 LINVRNT(cl_lock_is_mutexed(lock));
775 LINVRNT(cl_lock_invariant(env, lock));
776 LASSERT(lock->cll_holds > 0);
779 lu_ref_del(&lock->cll_holders, scope, source);
780 cl_lock_hold_mod(env, lock, -1);
781 if (lock->cll_holds == 0) {
782 if (lock->cll_descr.cld_mode == CLM_PHANTOM)
784 * If lock is still phantom when user is done with
785 * it---destroy the lock.
787 lock->cll_flags |= CLF_CANCELPEND|CLF_DOOMED;
788 if (lock->cll_flags & CLF_CANCELPEND) {
789 lock->cll_flags &= ~CLF_CANCELPEND;
790 cl_lock_cancel0(env, lock);
792 if (lock->cll_flags & CLF_DOOMED) {
793 /* no longer doomed: it's dead... Jim. */
794 lock->cll_flags &= ~CLF_DOOMED;
795 cl_lock_delete0(env, lock);
803 * Waits until lock state is changed.
805 * This function is called with cl_lock mutex locked, atomically releases
806 * mutex and goes to sleep, waiting for a lock state change (signaled by
807 * cl_lock_signal()), and re-acquires the mutex before return.
809 * This function is used to wait until lock state machine makes some progress
810 * and to emulate synchronous operations on top of asynchronous lock
813 * \retval -EINTR wait was interrupted
815 * \retval 0 wait wasn't interrupted
817 * \pre cl_lock_is_mutexed(lock)
819 * \see cl_lock_signal()
821 int cl_lock_state_wait(const struct lu_env *env, struct cl_lock *lock)
823 cfs_waitlink_t waiter;
827 LINVRNT(cl_lock_is_mutexed(lock));
828 LINVRNT(cl_lock_invariant(env, lock));
829 LASSERT(lock->cll_depth == 1);
830 LASSERT(lock->cll_state != CLS_FREEING); /* too late to wait */
832 result = lock->cll_error;
833 if (result == 0 && !(lock->cll_flags & CLF_STATE)) {
834 cfs_waitlink_init(&waiter);
835 cfs_waitq_add(&lock->cll_wq, &waiter);
836 set_current_state(CFS_TASK_INTERRUPTIBLE);
837 cl_lock_mutex_put(env, lock);
839 LASSERT(cl_lock_nr_mutexed(env) == 0);
840 cfs_waitq_wait(&waiter, CFS_TASK_INTERRUPTIBLE);
842 cl_lock_mutex_get(env, lock);
843 set_current_state(CFS_TASK_RUNNING);
844 cfs_waitq_del(&lock->cll_wq, &waiter);
845 result = cfs_signal_pending() ? -EINTR : 0;
847 lock->cll_flags &= ~CLF_STATE;
850 EXPORT_SYMBOL(cl_lock_state_wait);
852 static void cl_lock_state_signal(const struct lu_env *env, struct cl_lock *lock,
853 enum cl_lock_state state)
855 const struct cl_lock_slice *slice;
858 LINVRNT(cl_lock_is_mutexed(lock));
859 LINVRNT(cl_lock_invariant(env, lock));
861 list_for_each_entry(slice, &lock->cll_layers, cls_linkage)
862 if (slice->cls_ops->clo_state != NULL)
863 slice->cls_ops->clo_state(env, slice, state);
864 lock->cll_flags |= CLF_STATE;
865 cfs_waitq_broadcast(&lock->cll_wq);
870 * Notifies waiters that lock state changed.
872 * Wakes up all waiters sleeping in cl_lock_state_wait(), also notifies all
873 * layers about state change by calling cl_lock_operations::clo_state()
876 void cl_lock_signal(const struct lu_env *env, struct cl_lock *lock)
879 cl_lock_state_signal(env, lock, lock->cll_state);
882 EXPORT_SYMBOL(cl_lock_signal);
885 * Changes lock state.
887 * This function is invoked to notify layers that lock state changed, possible
888 * as a result of an asynchronous event such as call-back reception.
890 * \post lock->cll_state == state
892 * \see cl_lock_operations::clo_state()
894 void cl_lock_state_set(const struct lu_env *env, struct cl_lock *lock,
895 enum cl_lock_state state)
897 struct cl_site *site = cl_object_site(lock->cll_descr.cld_obj);
900 LASSERT(lock->cll_state <= state ||
901 (lock->cll_state == CLS_CACHED &&
902 (state == CLS_HELD || /* lock found in cache */
903 state == CLS_NEW /* sub-lock canceled */)) ||
904 /* sub-lock canceled during unlocking */
905 (lock->cll_state == CLS_UNLOCKING && state == CLS_NEW));
907 if (lock->cll_state != state) {
908 atomic_dec(&site->cs_locks_state[lock->cll_state]);
909 atomic_inc(&site->cs_locks_state[state]);
911 cl_lock_state_signal(env, lock, state);
912 lock->cll_state = state;
916 EXPORT_SYMBOL(cl_lock_state_set);
919 * Yanks lock from the cache (cl_lock_state::CLS_CACHED state) by calling
920 * cl_lock_operations::clo_use() top-to-bottom to notify layers.
922 int cl_use_try(const struct lu_env *env, struct cl_lock *lock)
925 const struct cl_lock_slice *slice;
929 list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
930 if (slice->cls_ops->clo_use != NULL) {
931 result = slice->cls_ops->clo_use(env, slice);
936 LASSERT(result != -ENOSYS);
938 cl_lock_state_set(env, lock, CLS_HELD);
941 EXPORT_SYMBOL(cl_use_try);
944 * Helper for cl_enqueue_try() that calls ->clo_enqueue() across all layers
947 static int cl_enqueue_kick(const struct lu_env *env,
948 struct cl_lock *lock,
949 struct cl_io *io, __u32 flags)
952 const struct cl_lock_slice *slice;
956 list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
957 if (slice->cls_ops->clo_enqueue != NULL) {
958 result = slice->cls_ops->clo_enqueue(env,
964 LASSERT(result != -ENOSYS);
969 * Tries to enqueue a lock.
971 * This function is called repeatedly by cl_enqueue() until either lock is
972 * enqueued, or error occurs. This function does not block waiting for
973 * networking communication to complete.
975 * \post ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
976 * lock->cll_state == CLS_HELD)
978 * \see cl_enqueue() cl_lock_operations::clo_enqueue()
979 * \see cl_lock_state::CLS_ENQUEUED
981 int cl_enqueue_try(const struct lu_env *env, struct cl_lock *lock,
982 struct cl_io *io, __u32 flags)
990 LINVRNT(cl_lock_is_mutexed(lock));
992 if (lock->cll_error != 0)
994 switch (lock->cll_state) {
996 cl_lock_state_set(env, lock, CLS_QUEUING);
1000 result = cl_enqueue_kick(env, lock, io, flags);
1002 cl_lock_state_set(env, lock, CLS_ENQUEUED);
1005 /* wait until unlocking finishes, and enqueue lock
1010 /* yank lock from the cache. */
1011 result = cl_use_try(env, lock);
1020 * impossible, only held locks with increased
1021 * ->cll_holds can be enqueued, and they cannot be
1026 } while (result == CLO_REPEAT);
1028 cl_lock_error(env, lock, result);
1029 RETURN(result ?: lock->cll_error);
1031 EXPORT_SYMBOL(cl_enqueue_try);
1033 static int cl_enqueue_locked(const struct lu_env *env, struct cl_lock *lock,
1034 struct cl_io *io, __u32 enqflags)
1040 LINVRNT(cl_lock_is_mutexed(lock));
1041 LINVRNT(cl_lock_invariant(env, lock));
1042 LASSERT(lock->cll_holds > 0);
1044 cl_lock_user_add(env, lock);
1046 result = cl_enqueue_try(env, lock, io, enqflags);
1047 if (result == CLO_WAIT) {
1048 result = cl_lock_state_wait(env, lock);
1055 cl_lock_user_del(env, lock);
1056 if (result != -EINTR)
1057 cl_lock_error(env, lock, result);
1059 LASSERT(ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
1060 lock->cll_state == CLS_HELD));
1067 * \pre current thread or io owns a hold on lock.
1069 * \post ergo(result == 0, lock->users increased)
1070 * \post ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
1071 * lock->cll_state == CLS_HELD)
1073 int cl_enqueue(const struct lu_env *env, struct cl_lock *lock,
1074 struct cl_io *io, __u32 enqflags)
1080 cl_lock_lockdep_acquire(env, lock, enqflags);
1081 cl_lock_mutex_get(env, lock);
1082 result = cl_enqueue_locked(env, lock, io, enqflags);
1083 cl_lock_mutex_put(env, lock);
1085 cl_lock_lockdep_release(env, lock);
1086 LASSERT(ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
1087 lock->cll_state == CLS_HELD));
1090 EXPORT_SYMBOL(cl_enqueue);
1093 * Tries to unlock a lock.
1095 * This function is called repeatedly by cl_unuse() until either lock is
1096 * unlocked, or error occurs.
1098 * \ppre lock->cll_state <= CLS_HELD || lock->cll_state == CLS_UNLOCKING
1100 * \post ergo(result == 0, lock->cll_state == CLS_CACHED)
1102 * \see cl_unuse() cl_lock_operations::clo_unuse()
1103 * \see cl_lock_state::CLS_CACHED
1105 int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock)
1107 const struct cl_lock_slice *slice;
1111 if (lock->cll_state != CLS_UNLOCKING) {
1112 if (lock->cll_users > 1) {
1113 cl_lock_user_del(env, lock);
1117 * New lock users (->cll_users) are not protecting unlocking
1118 * from proceeding. From this point, lock eventually reaches
1119 * CLS_CACHED, is reinitialized to CLS_NEW or fails into
1122 cl_lock_state_set(env, lock, CLS_UNLOCKING);
1127 if (lock->cll_error != 0)
1130 LINVRNT(cl_lock_is_mutexed(lock));
1131 LINVRNT(cl_lock_invariant(env, lock));
1132 LASSERT(lock->cll_state == CLS_UNLOCKING);
1133 LASSERT(lock->cll_users > 0);
1134 LASSERT(lock->cll_holds > 0);
1137 list_for_each_entry_reverse(slice, &lock->cll_layers,
1139 if (slice->cls_ops->clo_unuse != NULL) {
1140 result = slice->cls_ops->clo_unuse(env, slice);
1145 LASSERT(result != -ENOSYS);
1146 } while (result == CLO_REPEAT);
1147 if (result != CLO_WAIT)
1149 * Once there is no more need to iterate ->clo_unuse() calls,
1150 * remove lock user. This is done even if unrecoverable error
1151 * happened during unlocking, because nothing else can be
1154 cl_lock_user_del(env, lock);
1155 if (result == 0 || result == -ESTALE) {
1156 enum cl_lock_state state;
1159 * Return lock back to the cache. This is the only
1160 * place where lock is moved into CLS_CACHED state.
1162 * If one of ->clo_unuse() methods returned -ESTALE, lock
1163 * cannot be placed into cache and has to be
1164 * re-initialized. This happens e.g., when a sub-lock was
1165 * canceled while unlocking was in progress.
1167 state = result == 0 ? CLS_CACHED : CLS_NEW;
1168 cl_lock_state_set(env, lock, state);
1171 * Hide -ESTALE error.
1172 * If the lock is a glimpse lock, and it has multiple
1173 * stripes. Assuming that one of its sublock returned -ENAVAIL,
1174 * and other sublocks are matched write locks. In this case,
1175 * we can't set this lock to error because otherwise some of
1176 * its sublocks may not be canceled. This causes some dirty
1177 * pages won't be written to OSTs. -jay
1181 result = result ?: lock->cll_error;
1183 cl_lock_error(env, lock, result);
1186 EXPORT_SYMBOL(cl_unuse_try);
1188 static void cl_unuse_locked(const struct lu_env *env, struct cl_lock *lock)
1191 LASSERT(lock->cll_state <= CLS_HELD);
1195 result = cl_unuse_try(env, lock);
1196 if (result == CLO_WAIT) {
1197 result = cl_lock_state_wait(env, lock);
1209 void cl_unuse(const struct lu_env *env, struct cl_lock *lock)
1212 cl_lock_mutex_get(env, lock);
1213 cl_unuse_locked(env, lock);
1214 cl_lock_mutex_put(env, lock);
1215 cl_lock_lockdep_release(env, lock);
1218 EXPORT_SYMBOL(cl_unuse);
1221 * Tries to wait for a lock.
1223 * This function is called repeatedly by cl_wait() until either lock is
1224 * granted, or error occurs. This function does not block waiting for network
1225 * communication to complete.
1227 * \see cl_wait() cl_lock_operations::clo_wait()
1228 * \see cl_lock_state::CLS_HELD
1230 int cl_wait_try(const struct lu_env *env, struct cl_lock *lock)
1232 const struct cl_lock_slice *slice;
1237 LINVRNT(cl_lock_is_mutexed(lock));
1238 LINVRNT(cl_lock_invariant(env, lock));
1239 LASSERT(lock->cll_state == CLS_ENQUEUED ||
1240 lock->cll_state == CLS_HELD);
1241 LASSERT(lock->cll_users > 0);
1242 LASSERT(lock->cll_holds > 0);
1245 if (lock->cll_error != 0)
1247 if (lock->cll_state == CLS_HELD)
1252 list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
1253 if (slice->cls_ops->clo_wait != NULL) {
1254 result = slice->cls_ops->clo_wait(env, slice);
1259 LASSERT(result != -ENOSYS);
1261 cl_lock_state_set(env, lock, CLS_HELD);
1262 } while (result == CLO_REPEAT);
1263 RETURN(result ?: lock->cll_error);
1265 EXPORT_SYMBOL(cl_wait_try);
1268 * Waits until enqueued lock is granted.
1270 * \pre current thread or io owns a hold on the lock
1271 * \pre ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
1272 * lock->cll_state == CLS_HELD)
1274 * \post ergo(result == 0, lock->cll_state == CLS_HELD)
1276 int cl_wait(const struct lu_env *env, struct cl_lock *lock)
1281 cl_lock_mutex_get(env, lock);
1283 LINVRNT(cl_lock_invariant(env, lock));
1284 LASSERT(lock->cll_state == CLS_ENQUEUED || lock->cll_state == CLS_HELD);
1285 LASSERT(lock->cll_holds > 0);
1288 result = cl_wait_try(env, lock);
1289 if (result == CLO_WAIT) {
1290 result = cl_lock_state_wait(env, lock);
1297 cl_lock_user_del(env, lock);
1298 if (result != -EINTR)
1299 cl_lock_error(env, lock, result);
1300 cl_lock_lockdep_release(env, lock);
1302 cl_lock_mutex_put(env, lock);
1303 LASSERT(ergo(result == 0, lock->cll_state == CLS_HELD));
1306 EXPORT_SYMBOL(cl_wait);
1309 * Executes cl_lock_operations::clo_weigh(), and sums results to estimate lock
1312 unsigned long cl_lock_weigh(const struct lu_env *env, struct cl_lock *lock)
1314 const struct cl_lock_slice *slice;
1315 unsigned long pound;
1316 unsigned long ounce;
1319 LINVRNT(cl_lock_is_mutexed(lock));
1320 LINVRNT(cl_lock_invariant(env, lock));
1323 list_for_each_entry_reverse(slice, &lock->cll_layers, cls_linkage) {
1324 if (slice->cls_ops->clo_weigh != NULL) {
1325 ounce = slice->cls_ops->clo_weigh(env, slice);
1327 if (pound < ounce) /* over-weight^Wflow */
1333 EXPORT_SYMBOL(cl_lock_weigh);
1336 * Notifies layers that lock description changed.
1338 * The server can grant client a lock different from one that was requested
1339 * (e.g., larger in extent). This method is called when actually granted lock
1340 * description becomes known to let layers to accommodate for changed lock
1343 * \see cl_lock_operations::clo_modify()
1345 int cl_lock_modify(const struct lu_env *env, struct cl_lock *lock,
1346 const struct cl_lock_descr *desc)
1348 const struct cl_lock_slice *slice;
1349 struct cl_object *obj = lock->cll_descr.cld_obj;
1350 struct cl_object_header *hdr = cl_object_header(obj);
1354 /* don't allow object to change */
1355 LASSERT(obj == desc->cld_obj);
1356 LINVRNT(cl_lock_is_mutexed(lock));
1357 LINVRNT(cl_lock_invariant(env, lock));
1359 list_for_each_entry_reverse(slice, &lock->cll_layers, cls_linkage) {
1360 if (slice->cls_ops->clo_modify != NULL) {
1361 result = slice->cls_ops->clo_modify(env, slice, desc);
1366 CL_LOCK_DEBUG(D_DLMTRACE, env, lock, " -> "DDESCR"@"DFID"\n",
1367 PDESCR(desc), PFID(lu_object_fid(&desc->cld_obj->co_lu)));
1369 * Just replace description in place. Nothing more is needed for
1370 * now. If locks were indexed according to their extent and/or mode,
1371 * that index would have to be updated here.
1373 spin_lock(&hdr->coh_lock_guard);
1374 lock->cll_descr = *desc;
1375 spin_unlock(&hdr->coh_lock_guard);
1378 EXPORT_SYMBOL(cl_lock_modify);
1381 * Initializes lock closure with a given origin.
1383 * \see cl_lock_closure
1385 void cl_lock_closure_init(const struct lu_env *env,
1386 struct cl_lock_closure *closure,
1387 struct cl_lock *origin, int wait)
1389 LINVRNT(cl_lock_is_mutexed(origin));
1390 LINVRNT(cl_lock_invariant(env, origin));
1392 CFS_INIT_LIST_HEAD(&closure->clc_list);
1393 closure->clc_origin = origin;
1394 closure->clc_wait = wait;
1395 closure->clc_nr = 0;
1397 EXPORT_SYMBOL(cl_lock_closure_init);
1400 * Builds a closure of \a lock.
1402 * Building of a closure consists of adding initial lock (\a lock) into it,
1403 * and calling cl_lock_operations::clo_closure() methods of \a lock. These
1404 * methods might call cl_lock_closure_build() recursively again, adding more
1405 * locks to the closure, etc.
1407 * \see cl_lock_closure
1409 int cl_lock_closure_build(const struct lu_env *env, struct cl_lock *lock,
1410 struct cl_lock_closure *closure)
1412 const struct cl_lock_slice *slice;
1416 LINVRNT(cl_lock_is_mutexed(closure->clc_origin));
1417 LINVRNT(cl_lock_invariant(env, closure->clc_origin));
1419 result = cl_lock_enclosure(env, lock, closure);
1421 list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
1422 if (slice->cls_ops->clo_closure != NULL) {
1423 result = slice->cls_ops->clo_closure(env, slice,
1431 cl_lock_disclosure(env, closure);
1434 EXPORT_SYMBOL(cl_lock_closure_build);
1437 * Adds new lock to a closure.
1439 * Try-locks \a lock and if succeeded, adds it to the closure (never more than
1440 * once). If try-lock failed, returns CLO_REPEAT, after optionally waiting
1441 * until next try-lock is likely to succeed.
1443 int cl_lock_enclosure(const struct lu_env *env, struct cl_lock *lock,
1444 struct cl_lock_closure *closure)
1448 if (!cl_lock_mutex_try(env, lock)) {
1450 * If lock->cll_inclosure is not empty, lock is already in
1453 if (list_empty(&lock->cll_inclosure)) {
1454 cl_lock_get_trust(lock);
1455 lu_ref_add(&lock->cll_reference, "closure", closure);
1456 list_add(&lock->cll_inclosure, &closure->clc_list);
1459 cl_lock_mutex_put(env, lock);
1462 cl_lock_disclosure(env, closure);
1463 if (closure->clc_wait) {
1464 cl_lock_get_trust(lock);
1465 lu_ref_add(&lock->cll_reference, "closure-w", closure);
1466 cl_lock_mutex_put(env, closure->clc_origin);
1468 LASSERT(cl_lock_nr_mutexed(env) == 0);
1469 cl_lock_mutex_get(env, lock);
1470 cl_lock_mutex_put(env, lock);
1472 cl_lock_mutex_get(env, closure->clc_origin);
1473 lu_ref_del(&lock->cll_reference, "closure-w", closure);
1474 cl_lock_put(env, lock);
1476 result = CLO_REPEAT;
1480 EXPORT_SYMBOL(cl_lock_enclosure);
1482 /** Releases mutices of enclosed locks. */
1483 void cl_lock_disclosure(const struct lu_env *env,
1484 struct cl_lock_closure *closure)
1486 struct cl_lock *scan;
1487 struct cl_lock *temp;
1489 list_for_each_entry_safe(scan, temp, &closure->clc_list, cll_inclosure){
1490 list_del_init(&scan->cll_inclosure);
1491 cl_lock_mutex_put(env, scan);
1492 lu_ref_del(&scan->cll_reference, "closure", closure);
1493 cl_lock_put(env, scan);
1496 LASSERT(closure->clc_nr == 0);
1498 EXPORT_SYMBOL(cl_lock_disclosure);
1500 /** Finalizes a closure. */
1501 void cl_lock_closure_fini(struct cl_lock_closure *closure)
1503 LASSERT(closure->clc_nr == 0);
1504 LASSERT(list_empty(&closure->clc_list));
1506 EXPORT_SYMBOL(cl_lock_closure_fini);
1509 * Destroys this lock. Notifies layers (bottom-to-top) that lock is being
1510 * destroyed, then destroy the lock. If there are holds on the lock, postpone
1511 * destruction until all holds are released. This is called when a decision is
1512 * made to destroy the lock in the future. E.g., when a blocking AST is
1513 * received on it, or fatal communication error happens.
1515 * Caller must have a reference on this lock to prevent a situation, when
1516 * deleted lock lingers in memory for indefinite time, because nobody calls
1517 * cl_lock_put() to finish it.
1519 * \pre atomic_read(&lock->cll_ref) > 0
1521 * \see cl_lock_operations::clo_delete()
1522 * \see cl_lock::cll_holds
1524 void cl_lock_delete(const struct lu_env *env, struct cl_lock *lock)
1526 LINVRNT(cl_lock_is_mutexed(lock));
1527 LINVRNT(cl_lock_invariant(env, lock));
1530 if (lock->cll_holds == 0)
1531 cl_lock_delete0(env, lock);
1533 lock->cll_flags |= CLF_DOOMED;
1536 EXPORT_SYMBOL(cl_lock_delete);
1539 * Mark lock as irrecoverably failed, and mark it for destruction. This
1540 * happens when, e.g., server fails to grant a lock to us, or networking
1543 * \pre atomic_read(&lock->cll_ref) > 0
1545 * \see clo_lock_delete()
1546 * \see cl_lock::cll_holds
1548 void cl_lock_error(const struct lu_env *env, struct cl_lock *lock, int error)
1550 LINVRNT(cl_lock_is_mutexed(lock));
1551 LINVRNT(cl_lock_invariant(env, lock));
1554 if (lock->cll_error == 0 && error != 0) {
1555 lock->cll_error = error;
1556 cl_lock_signal(env, lock);
1557 cl_lock_cancel(env, lock);
1558 cl_lock_delete(env, lock);
1562 EXPORT_SYMBOL(cl_lock_error);
1565 * Cancels this lock. Notifies layers
1566 * (bottom-to-top) that lock is being cancelled, then destroy the lock. If
1567 * there are holds on the lock, postpone cancellation until
1568 * all holds are released.
1570 * Cancellation notification is delivered to layers at most once.
1572 * \see cl_lock_operations::clo_cancel()
1573 * \see cl_lock::cll_holds
1575 void cl_lock_cancel(const struct lu_env *env, struct cl_lock *lock)
1577 LINVRNT(cl_lock_is_mutexed(lock));
1578 LINVRNT(cl_lock_invariant(env, lock));
1580 if (lock->cll_holds == 0)
1581 cl_lock_cancel0(env, lock);
1583 lock->cll_flags |= CLF_CANCELPEND;
1586 EXPORT_SYMBOL(cl_lock_cancel);
1589 * Finds an existing lock covering given page and optionally different from a
1590 * given \a except lock.
1592 struct cl_lock *cl_lock_at_page(const struct lu_env *env, struct cl_object *obj,
1593 struct cl_page *page, struct cl_lock *except,
1594 int pending, int canceld)
1596 struct cl_object_header *head;
1597 struct cl_lock *scan;
1598 struct cl_lock *lock;
1599 struct cl_lock_descr *need;
1603 head = cl_object_header(obj);
1604 need = &cl_env_info(env)->clt_descr;
1607 need->cld_mode = CLM_READ; /* CLM_READ matches both READ & WRITE, but
1609 need->cld_start = need->cld_end = page->cp_index;
1611 spin_lock(&head->coh_lock_guard);
1612 list_for_each_entry(scan, &head->coh_locks, cll_linkage) {
1613 if (scan != except &&
1614 cl_lock_ext_match(&scan->cll_descr, need) &&
1615 scan->cll_state < CLS_FREEING &&
1617 * This check is racy as the lock can be canceled right
1618 * after it is done, but this is fine, because page exists
1621 (canceld || !(scan->cll_flags & CLF_CANCELLED)) &&
1622 (pending || !(scan->cll_flags & CLF_CANCELPEND))) {
1623 /* Don't increase cs_hit here since this
1624 * is just a helper function. */
1625 cl_lock_get_trust(scan);
1630 spin_unlock(&head->coh_lock_guard);
1633 EXPORT_SYMBOL(cl_lock_at_page);
1636 * Returns a list of pages protected (only) by a given lock.
1638 * Scans an extent of page radix tree, corresponding to the \a lock and queues
1639 * all pages that are not protected by locks other than \a lock into \a queue.
1641 void cl_lock_page_list_fixup(const struct lu_env *env,
1642 struct cl_io *io, struct cl_lock *lock,
1643 struct cl_page_list *queue)
1645 struct cl_page *page;
1646 struct cl_page *temp;
1647 struct cl_page_list *plist = &cl_env_info(env)->clt_list;
1649 LINVRNT(cl_lock_invariant(env, lock));
1652 /* Now, we have a list of cl_pages under the \a lock, we need
1653 * to check if some of pages are covered by other ldlm lock.
1654 * If this is the case, they aren't needed to be written out this time.
1656 * For example, we have A:[0,200] & B:[100,300] PW locks on client, now
1657 * the latter is to be canceled, this means other client is
1658 * reading/writing [200,300] since A won't canceled. Actually
1659 * we just need to write the pages covered by [200,300]. This is safe,
1660 * since [100,200] is also protected lock A.
1663 cl_page_list_init(plist);
1664 cl_page_list_for_each_safe(page, temp, queue) {
1665 pgoff_t idx = page->cp_index;
1666 struct cl_lock *found;
1667 struct cl_lock_descr *descr;
1669 /* The algorithm counts on the index-ascending page index. */
1670 LASSERT(ergo(&temp->cp_batch != &queue->pl_pages,
1671 page->cp_index < temp->cp_index));
1673 found = cl_lock_at_page(env, lock->cll_descr.cld_obj,
1678 descr = &found->cll_descr;
1679 list_for_each_entry_safe_from(page, temp, &queue->pl_pages,
1681 idx = page->cp_index;
1682 if (descr->cld_start > idx || descr->cld_end < idx)
1684 cl_page_list_move(plist, queue, page);
1686 cl_lock_put(env, found);
1689 /* The pages in plist are covered by other locks, don't handle them
1693 cl_page_list_disown(env, io, plist);
1694 cl_page_list_fini(env, plist);
1697 EXPORT_SYMBOL(cl_lock_page_list_fixup);
1700 * Invalidate pages protected by the given lock, sending them out to the
1701 * server first, if necessary.
1703 * This function does the following:
1705 * - collects a list of pages to be invalidated,
1707 * - unmaps them from the user virtual memory,
1709 * - sends dirty pages to the server,
1711 * - waits for transfer completion,
1713 * - discards pages, and throws them out of memory.
1715 * If \a discard is set, pages are discarded without sending them to the
1718 * If error happens on any step, the process continues anyway (the reasoning
1719 * behind this being that lock cancellation cannot be delayed indefinitely).
1721 int cl_lock_page_out(const struct lu_env *env, struct cl_lock *lock,
1724 struct cl_thread_info *info = cl_env_info(env);
1725 struct cl_io *io = &info->clt_io;
1726 struct cl_2queue *queue = &info->clt_queue;
1727 struct cl_lock_descr *descr = &lock->cll_descr;
1732 LINVRNT(cl_lock_invariant(env, lock));
1735 io->ci_obj = cl_object_top(descr->cld_obj);
1736 result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
1739 cl_2queue_init(queue);
1740 cl_page_gang_lookup(env, descr->cld_obj, io, descr->cld_start,
1741 descr->cld_end, &queue->c2_qin);
1742 if (queue->c2_qin.pl_nr > 0) {
1743 result = cl_page_list_unmap(env, io, &queue->c2_qin);
1745 rc0 = cl_io_submit_rw(env, io,
1747 rc1 = cl_page_list_own(env, io,
1749 result = result ?: rc0 ?: rc1;
1751 cl_lock_page_list_fixup(env, io, lock, &queue->c2_qout);
1752 cl_2queue_discard(env, io, queue);
1753 cl_2queue_disown(env, io, queue);
1755 cl_2queue_fini(env, queue);
1757 cl_io_fini(env, io);
1760 EXPORT_SYMBOL(cl_lock_page_out);
1763 * Eliminate all locks for a given object.
1765 * Caller has to guarantee that no lock is in active use.
1767 * \param cancel when this is set, cl_locks_prune() cancels locks before
1770 void cl_locks_prune(const struct lu_env *env, struct cl_object *obj, int cancel)
1772 struct cl_object_header *head;
1773 struct cl_lock *lock;
1776 head = cl_object_header(obj);
1778 * If locks are destroyed without cancellation, all pages must be
1779 * already destroyed (as otherwise they will be left unprotected).
1781 LASSERT(ergo(!cancel,
1782 head->coh_tree.rnode == NULL && head->coh_pages == 0));
1784 spin_lock(&head->coh_lock_guard);
1785 while (!list_empty(&head->coh_locks)) {
1786 lock = container_of(head->coh_locks.next,
1787 struct cl_lock, cll_linkage);
1788 cl_lock_get_trust(lock);
1789 spin_unlock(&head->coh_lock_guard);
1790 lu_ref_add(&lock->cll_reference, "prune", cfs_current());
1791 cl_lock_mutex_get(env, lock);
1792 if (lock->cll_state < CLS_FREEING) {
1793 LASSERT(lock->cll_holds == 0);
1794 LASSERT(lock->cll_users == 0);
1796 cl_lock_cancel(env, lock);
1797 cl_lock_delete(env, lock);
1799 cl_lock_mutex_put(env, lock);
1800 lu_ref_del(&lock->cll_reference, "prune", cfs_current());
1801 cl_lock_put(env, lock);
1802 spin_lock(&head->coh_lock_guard);
1804 spin_unlock(&head->coh_lock_guard);
1807 EXPORT_SYMBOL(cl_locks_prune);
1810 * Returns true if \a addr is an address of an allocated cl_lock. Used in
1811 * assertions. This check is optimistically imprecise, i.e., it occasionally
1812 * returns true for the incorrect addresses, but if it returns false, then the
1813 * address is guaranteed to be incorrect. (Should be named cl_lockp().)
1817 int cl_is_lock(const void *addr)
1819 return cfs_mem_is_in_cache(addr, cl_lock_kmem);
1821 EXPORT_SYMBOL(cl_is_lock);
1823 static struct cl_lock *cl_lock_hold_mutex(const struct lu_env *env,
1824 const struct cl_io *io,
1825 const struct cl_lock_descr *need,
1826 const char *scope, const void *source)
1828 struct cl_lock *lock;
1833 lock = cl_lock_find(env, io, need);
1836 cl_lock_mutex_get(env, lock);
1837 if (lock->cll_state < CLS_FREEING) {
1838 cl_lock_hold_mod(env, lock, +1);
1839 lu_ref_add(&lock->cll_holders, scope, source);
1840 lu_ref_add(&lock->cll_reference, scope, source);
1843 cl_lock_mutex_put(env, lock);
1844 cl_lock_put(env, lock);
1850 * Returns a lock matching \a need description with a reference and a hold on
1853 * This is much like cl_lock_find(), except that cl_lock_hold() additionally
1854 * guarantees that lock is not in the CLS_FREEING state on return.
1856 struct cl_lock *cl_lock_hold(const struct lu_env *env, const struct cl_io *io,
1857 const struct cl_lock_descr *need,
1858 const char *scope, const void *source)
1860 struct cl_lock *lock;
1864 lock = cl_lock_hold_mutex(env, io, need, scope, source);
1866 cl_lock_mutex_put(env, lock);
1869 EXPORT_SYMBOL(cl_lock_hold);
1872 * Main high-level entry point of cl_lock interface that finds existing or
1873 * enqueues new lock matching given description.
1875 struct cl_lock *cl_lock_request(const struct lu_env *env, struct cl_io *io,
1876 const struct cl_lock_descr *need,
1878 const char *scope, const void *source)
1880 struct cl_lock *lock;
1881 const struct lu_fid *fid;
1887 fid = lu_object_fid(&io->ci_obj->co_lu);
1890 warn = iter >= 16 && IS_PO2(iter);
1891 CDEBUG(warn ? D_WARNING : D_DLMTRACE,
1892 DDESCR"@"DFID" %i %08x `%s'\n",
1893 PDESCR(need), PFID(fid), iter, enqflags, scope);
1894 lock = cl_lock_hold_mutex(env, io, need, scope, source);
1895 if (!IS_ERR(lock)) {
1896 rc = cl_enqueue_locked(env, lock, io, enqflags);
1898 if (cl_lock_fits_into(env, lock, need, io)) {
1899 cl_lock_mutex_put(env, lock);
1900 cl_lock_lockdep_acquire(env,
1904 CL_LOCK_DEBUG(D_WARNING, env, lock,
1906 cl_unuse_locked(env, lock);
1908 cl_lock_hold_release(env, lock, scope, source);
1909 cl_lock_mutex_put(env, lock);
1910 lu_ref_del(&lock->cll_reference, scope, source);
1911 cl_lock_put(env, lock);
1919 EXPORT_SYMBOL(cl_lock_request);
1922 * Adds a hold to a known lock.
1924 void cl_lock_hold_add(const struct lu_env *env, struct cl_lock *lock,
1925 const char *scope, const void *source)
1927 LINVRNT(cl_lock_is_mutexed(lock));
1928 LINVRNT(cl_lock_invariant(env, lock));
1929 LASSERT(lock->cll_state != CLS_FREEING);
1932 cl_lock_hold_mod(env, lock, +1);
1934 lu_ref_add(&lock->cll_holders, scope, source);
1935 lu_ref_add(&lock->cll_reference, scope, source);
1938 EXPORT_SYMBOL(cl_lock_hold_add);
1941 * Releases a hold and a reference on a lock, on which caller acquired a
1944 void cl_lock_unhold(const struct lu_env *env, struct cl_lock *lock,
1945 const char *scope, const void *source)
1947 LINVRNT(cl_lock_invariant(env, lock));
1949 cl_lock_hold_release(env, lock, scope, source);
1950 lu_ref_del(&lock->cll_reference, scope, source);
1951 cl_lock_put(env, lock);
1954 EXPORT_SYMBOL(cl_lock_unhold);
1957 * Releases a hold and a reference on a lock, obtained by cl_lock_hold().
1959 void cl_lock_release(const struct lu_env *env, struct cl_lock *lock,
1960 const char *scope, const void *source)
1962 LINVRNT(cl_lock_invariant(env, lock));
1964 cl_lock_mutex_get(env, lock);
1965 cl_lock_hold_release(env, lock, scope, source);
1966 cl_lock_mutex_put(env, lock);
1967 lu_ref_del(&lock->cll_reference, scope, source);
1968 cl_lock_put(env, lock);
1971 EXPORT_SYMBOL(cl_lock_release);
1973 void cl_lock_user_add(const struct lu_env *env, struct cl_lock *lock)
1975 LINVRNT(cl_lock_is_mutexed(lock));
1976 LINVRNT(cl_lock_invariant(env, lock));
1979 cl_lock_used_mod(env, lock, +1);
1982 EXPORT_SYMBOL(cl_lock_user_add);
1984 int cl_lock_user_del(const struct lu_env *env, struct cl_lock *lock)
1986 LINVRNT(cl_lock_is_mutexed(lock));
1987 LINVRNT(cl_lock_invariant(env, lock));
1988 LASSERT(lock->cll_users > 0);
1991 cl_lock_used_mod(env, lock, -1);
1992 RETURN(lock->cll_users == 0);
1994 EXPORT_SYMBOL(cl_lock_user_del);
1997 * Check if two lock's mode are compatible.
1999 * This returns true iff en-queuing \a lock2 won't cause cancellation of \a
2000 * lock1 even when these locks overlap.
2002 int cl_lock_compatible(const struct cl_lock *lock1, const struct cl_lock *lock2)
2004 enum cl_lock_mode mode1;
2005 enum cl_lock_mode mode2;
2008 mode1 = lock1->cll_descr.cld_mode;
2009 mode2 = lock2->cll_descr.cld_mode;
2010 RETURN(mode2 == CLM_PHANTOM ||
2011 (mode1 == CLM_READ && mode2 == CLM_READ));
2013 EXPORT_SYMBOL(cl_lock_compatible);
2015 const char *cl_lock_mode_name(const enum cl_lock_mode mode)
2017 static const char *names[] = {
2018 [CLM_PHANTOM] = "PHANTOM",
2019 [CLM_READ] = "READ",
2020 [CLM_WRITE] = "WRITE"
2022 if (0 <= mode && mode < ARRAY_SIZE(names))
2027 EXPORT_SYMBOL(cl_lock_mode_name);
2030 * Prints human readable representation of a lock description.
2032 void cl_lock_descr_print(const struct lu_env *env, void *cookie,
2033 lu_printer_t printer,
2034 const struct cl_lock_descr *descr)
2036 const struct lu_fid *fid;
2038 fid = lu_object_fid(&descr->cld_obj->co_lu);
2039 (*printer)(env, cookie, DDESCR"@"DFID, PDESCR(descr), PFID(fid));
2041 EXPORT_SYMBOL(cl_lock_descr_print);
2044 * Prints human readable representation of \a lock to the \a f.
2046 void cl_lock_print(const struct lu_env *env, void *cookie,
2047 lu_printer_t printer, const struct cl_lock *lock)
2049 const struct cl_lock_slice *slice;
2050 (*printer)(env, cookie, "lock@%p[%d %d %d %d %d %08lx] ",
2051 lock, atomic_read(&lock->cll_ref),
2052 lock->cll_state, lock->cll_error, lock->cll_holds,
2053 lock->cll_users, lock->cll_flags);
2054 cl_lock_descr_print(env, cookie, printer, &lock->cll_descr);
2055 (*printer)(env, cookie, " {\n");
2057 list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
2058 (*printer)(env, cookie, " %s@%p: ",
2059 slice->cls_obj->co_lu.lo_dev->ld_type->ldt_name,
2061 if (slice->cls_ops->clo_print != NULL)
2062 slice->cls_ops->clo_print(env, cookie, printer, slice);
2063 (*printer)(env, cookie, "\n");
2065 (*printer)(env, cookie, "} lock@%p\n", lock);
2067 EXPORT_SYMBOL(cl_lock_print);
2069 int cl_lock_init(void)
2071 return lu_kmem_init(cl_lock_caches);
2074 void cl_lock_fini(void)
2076 lu_kmem_fini(cl_lock_caches);