1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 * Author: Nikita Danilov <nikita.danilov@sun.com>
41 #define DEBUG_SUBSYSTEM S_CLASS
43 # define EXPORT_SYMTAB
46 #include <obd_class.h>
47 #include <obd_support.h>
48 #include <lustre_fid.h>
49 #include <libcfs/list.h>
50 /* lu_time_global_{init,fini}() */
53 #include <cl_object.h>
54 #include "cl_internal.h"
56 /** Lock class of cl_lock::cll_guard */
57 static struct lock_class_key cl_lock_guard_class;
58 static cfs_mem_cache_t *cl_lock_kmem;
60 static struct lu_kmem_descr cl_lock_caches[] = {
62 .ckd_cache = &cl_lock_kmem,
63 .ckd_name = "cl_lock_kmem",
64 .ckd_size = sizeof (struct cl_lock)
72 * Basic lock invariant that is maintained at all times. Caller either has a
73 * reference to \a lock, or somehow assures that \a lock cannot be freed.
75 * \see cl_lock_invariant()
77 static int cl_lock_invariant_trusted(const struct lu_env *env,
78 const struct cl_lock *lock)
82 ergo(lock->cll_state == CLS_FREEING, lock->cll_holds == 0) &&
83 atomic_read(&lock->cll_ref) >= lock->cll_holds &&
84 lock->cll_holds >= lock->cll_users &&
85 lock->cll_holds >= 0 &&
86 lock->cll_users >= 0 &&
91 * Stronger lock invariant, checking that caller has a reference on a lock.
93 * \see cl_lock_invariant_trusted()
95 static int cl_lock_invariant(const struct lu_env *env,
96 const struct cl_lock *lock)
100 result = atomic_read(&lock->cll_ref) > 0 &&
101 cl_lock_invariant_trusted(env, lock);
102 if (!result && env != NULL)
103 CL_LOCK_DEBUG(D_ERROR, env, lock, "invariant broken");
107 #define RETIP ((unsigned long)__builtin_return_address(0))
109 #ifdef CONFIG_LOCKDEP
110 static struct lock_class_key cl_lock_key;
112 static void cl_lock_lockdep_init(struct cl_lock *lock)
114 lockdep_set_class_and_name(lock, &cl_lock_key, "EXT");
117 static void cl_lock_lockdep_acquire(const struct lu_env *env,
118 struct cl_lock *lock, __u32 enqflags)
120 cl_env_info(env)->clt_nr_locks_acquired++;
121 lock_acquire(&lock->dep_map, !!(enqflags & CEF_ASYNC),
122 /* try: */ 0, lock->cll_descr.cld_mode <= CLM_READ,
123 /* check: */ 2, RETIP);
126 static void cl_lock_lockdep_release(const struct lu_env *env,
127 struct cl_lock *lock)
129 cl_env_info(env)->clt_nr_locks_acquired--;
130 lock_release(&lock->dep_map, 0, RETIP);
133 #else /* !CONFIG_LOCKDEP */
135 static void cl_lock_lockdep_init(struct cl_lock *lock)
137 static void cl_lock_lockdep_acquire(const struct lu_env *env,
138 struct cl_lock *lock, __u32 enqflags)
140 static void cl_lock_lockdep_release(const struct lu_env *env,
141 struct cl_lock *lock)
144 #endif /* !CONFIG_LOCKDEP */
147 * Adds lock slice to the compound lock.
149 * This is called by cl_object_operations::coo_lock_init() methods to add a
150 * per-layer state to the lock. New state is added at the end of
151 * cl_lock::cll_layers list, that is, it is at the bottom of the stack.
153 * \see cl_req_slice_add(), cl_page_slice_add(), cl_io_slice_add()
155 void cl_lock_slice_add(struct cl_lock *lock, struct cl_lock_slice *slice,
156 struct cl_object *obj,
157 const struct cl_lock_operations *ops)
160 slice->cls_lock = lock;
161 list_add_tail(&slice->cls_linkage, &lock->cll_layers);
162 slice->cls_obj = obj;
163 slice->cls_ops = ops;
166 EXPORT_SYMBOL(cl_lock_slice_add);
169 * Returns true iff a lock with the mode \a has provides at least the same
170 * guarantees as a lock with the mode \a need.
172 int cl_lock_mode_match(enum cl_lock_mode has, enum cl_lock_mode need)
174 LINVRNT(need == CLM_READ || need == CLM_WRITE || need == CLM_PHANTOM);
175 LINVRNT(has == CLM_READ || has == CLM_WRITE || has == CLM_PHANTOM);
176 CLASSERT(CLM_PHANTOM < CLM_READ);
177 CLASSERT(CLM_READ < CLM_WRITE);
181 EXPORT_SYMBOL(cl_lock_mode_match);
184 * Returns true iff extent portions of lock descriptions match.
186 int cl_lock_ext_match(const struct cl_lock_descr *has,
187 const struct cl_lock_descr *need)
190 has->cld_start <= need->cld_start &&
191 has->cld_end >= need->cld_end &&
192 cl_lock_mode_match(has->cld_mode, need->cld_mode);
194 EXPORT_SYMBOL(cl_lock_ext_match);
197 * Returns true iff a lock with the description \a has provides at least the
198 * same guarantees as a lock with the description \a need.
200 int cl_lock_descr_match(const struct cl_lock_descr *has,
201 const struct cl_lock_descr *need)
204 cl_object_same(has->cld_obj, need->cld_obj) &&
205 cl_lock_ext_match(has, need);
207 EXPORT_SYMBOL(cl_lock_descr_match);
209 static void cl_lock_free(const struct lu_env *env, struct cl_lock *lock)
211 struct cl_object *obj = lock->cll_descr.cld_obj;
213 LASSERT(cl_is_lock(lock));
214 LINVRNT(!cl_lock_is_mutexed(lock));
218 while (!list_empty(&lock->cll_layers)) {
219 struct cl_lock_slice *slice;
221 slice = list_entry(lock->cll_layers.next, struct cl_lock_slice,
223 list_del_init(lock->cll_layers.next);
224 slice->cls_ops->clo_fini(env, slice);
226 atomic_dec(&cl_object_site(obj)->cs_locks.cs_total);
227 atomic_dec(&cl_object_site(obj)->cs_locks_state[lock->cll_state]);
228 lu_object_ref_del_at(&obj->co_lu, lock->cll_obj_ref, "cl_lock", lock);
229 cl_object_put(env, obj);
230 lu_ref_fini(&lock->cll_reference);
231 lu_ref_fini(&lock->cll_holders);
232 mutex_destroy(&lock->cll_guard);
233 OBD_SLAB_FREE_PTR(lock, cl_lock_kmem);
238 * Releases a reference on a lock.
240 * When last reference is released, lock is returned to the cache, unless it
241 * is in cl_lock_state::CLS_FREEING state, in which case it is destroyed
244 * \see cl_object_put(), cl_page_put()
246 void cl_lock_put(const struct lu_env *env, struct cl_lock *lock)
248 struct cl_object *obj;
249 struct cl_object_header *head;
250 struct cl_site *site;
252 LINVRNT(cl_lock_invariant(env, lock));
254 obj = lock->cll_descr.cld_obj;
255 LINVRNT(obj != NULL);
256 head = cl_object_header(obj);
257 site = cl_object_site(obj);
259 CDEBUG(D_DLMTRACE, "releasing reference: %d %p %lu\n",
260 atomic_read(&lock->cll_ref), lock, RETIP);
262 if (atomic_dec_and_test(&lock->cll_ref)) {
263 if (lock->cll_state == CLS_FREEING) {
264 LASSERT(list_empty(&lock->cll_linkage));
265 cl_lock_free(env, lock);
267 atomic_dec(&site->cs_locks.cs_busy);
271 EXPORT_SYMBOL(cl_lock_put);
274 * Acquires an additional reference to a lock.
276 * This can be called only by caller already possessing a reference to \a
279 * \see cl_object_get(), cl_page_get()
281 void cl_lock_get(struct cl_lock *lock)
283 LINVRNT(cl_lock_invariant(NULL, lock));
284 CDEBUG(D_DLMTRACE|D_TRACE, "acquiring reference: %d %p %lu\n",
285 atomic_read(&lock->cll_ref), lock, RETIP);
286 atomic_inc(&lock->cll_ref);
288 EXPORT_SYMBOL(cl_lock_get);
291 * Acquires a reference to a lock.
293 * This is much like cl_lock_get(), except that this function can be used to
294 * acquire initial reference to the cached lock. Caller has to deal with all
295 * possible races. Use with care!
297 * \see cl_page_get_trust()
299 void cl_lock_get_trust(struct cl_lock *lock)
301 struct cl_site *site = cl_object_site(lock->cll_descr.cld_obj);
303 LASSERT(cl_is_lock(lock));
304 CDEBUG(D_DLMTRACE|D_TRACE, "acquiring trusted reference: %d %p %lu\n",
305 atomic_read(&lock->cll_ref), lock, RETIP);
306 if (atomic_inc_return(&lock->cll_ref) == 1)
307 atomic_inc(&site->cs_locks.cs_busy);
309 EXPORT_SYMBOL(cl_lock_get_trust);
312 * Helper function destroying the lock that wasn't completely initialized.
314 * Other threads can acquire references to the top-lock through its
315 * sub-locks. Hence, it cannot be cl_lock_free()-ed immediately.
317 static void cl_lock_finish(const struct lu_env *env, struct cl_lock *lock)
319 cl_lock_mutex_get(env, lock);
320 cl_lock_delete(env, lock);
321 cl_lock_mutex_put(env, lock);
322 cl_lock_put(env, lock);
325 static struct cl_lock *cl_lock_alloc(const struct lu_env *env,
326 struct cl_object *obj,
327 const struct cl_io *io,
328 const struct cl_lock_descr *descr)
330 struct cl_lock *lock;
331 struct lu_object_header *head;
332 struct cl_site *site = cl_object_site(obj);
335 OBD_SLAB_ALLOC_PTR(lock, cl_lock_kmem);
337 atomic_set(&lock->cll_ref, 1);
338 lock->cll_descr = *descr;
339 lock->cll_state = CLS_NEW;
341 lock->cll_obj_ref = lu_object_ref_add(&obj->co_lu,
343 CFS_INIT_LIST_HEAD(&lock->cll_layers);
344 CFS_INIT_LIST_HEAD(&lock->cll_linkage);
345 CFS_INIT_LIST_HEAD(&lock->cll_inclosure);
346 lu_ref_init(&lock->cll_reference);
347 lu_ref_init(&lock->cll_holders);
348 mutex_init(&lock->cll_guard);
349 lockdep_set_class(&lock->cll_guard, &cl_lock_guard_class);
350 cfs_waitq_init(&lock->cll_wq);
351 head = obj->co_lu.lo_header;
352 atomic_inc(&site->cs_locks_state[CLS_NEW]);
353 atomic_inc(&site->cs_locks.cs_total);
354 atomic_inc(&site->cs_locks.cs_created);
355 cl_lock_lockdep_init(lock);
356 list_for_each_entry(obj, &head->loh_layers, co_lu.lo_linkage) {
359 err = obj->co_ops->coo_lock_init(env, obj, lock, io);
361 cl_lock_finish(env, lock);
367 lock = ERR_PTR(-ENOMEM);
372 * Returns true iff lock is "suitable" for given io. E.g., locks acquired by
373 * truncate and O_APPEND cannot be reused for read/non-append-write, as they
374 * cover multiple stripes and can trigger cascading timeouts.
376 static int cl_lock_fits_into(const struct lu_env *env,
377 const struct cl_lock *lock,
378 const struct cl_lock_descr *need,
379 const struct cl_io *io)
381 const struct cl_lock_slice *slice;
383 LINVRNT(cl_lock_invariant_trusted(env, lock));
385 list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
386 if (slice->cls_ops->clo_fits_into != NULL &&
387 !slice->cls_ops->clo_fits_into(env, slice, need, io))
393 static struct cl_lock *cl_lock_lookup(const struct lu_env *env,
394 struct cl_object *obj,
395 const struct cl_io *io,
396 const struct cl_lock_descr *need)
398 struct cl_lock *lock;
399 struct cl_object_header *head;
400 struct cl_site *site;
404 head = cl_object_header(obj);
405 site = cl_object_site(obj);
406 LINVRNT_SPIN_LOCKED(&head->coh_lock_guard);
407 atomic_inc(&site->cs_locks.cs_lookup);
408 list_for_each_entry(lock, &head->coh_locks, cll_linkage) {
411 LASSERT(cl_is_lock(lock));
412 matched = cl_lock_ext_match(&lock->cll_descr, need) &&
413 lock->cll_state < CLS_FREEING &&
414 !(lock->cll_flags & CLF_CANCELLED) &&
415 cl_lock_fits_into(env, lock, need, io);
416 CDEBUG(D_DLMTRACE, "has: "DDESCR"(%i) need: "DDESCR": %d\n",
417 PDESCR(&lock->cll_descr), lock->cll_state, PDESCR(need),
420 cl_lock_get_trust(lock);
421 /* move the lock to the LRU head */
422 list_move(&lock->cll_linkage, &head->coh_locks);
423 atomic_inc(&cl_object_site(obj)->cs_locks.cs_hit);
431 * Returns a lock matching description \a need.
433 * This is the main entry point into the cl_lock caching interface. First, a
434 * cache (implemented as a per-object linked list) is consulted. If lock is
435 * found there, it is returned immediately. Otherwise new lock is allocated
436 * and returned. In any case, additional reference to lock is acquired.
438 * \see cl_object_find(), cl_page_find()
440 static struct cl_lock *cl_lock_find(const struct lu_env *env,
441 const struct cl_io *io,
442 const struct cl_lock_descr *need)
444 struct cl_object_header *head;
445 struct cl_object *obj;
446 struct cl_lock *lock;
447 struct cl_site *site;
452 head = cl_object_header(obj);
453 site = cl_object_site(obj);
455 spin_lock(&head->coh_lock_guard);
456 lock = cl_lock_lookup(env, obj, io, need);
457 spin_unlock(&head->coh_lock_guard);
460 lock = cl_lock_alloc(env, obj, io, need);
462 struct cl_lock *ghost;
464 spin_lock(&head->coh_lock_guard);
465 ghost = cl_lock_lookup(env, obj, io, need);
467 list_add(&lock->cll_linkage, &head->coh_locks);
468 spin_unlock(&head->coh_lock_guard);
469 atomic_inc(&site->cs_locks.cs_busy);
471 spin_unlock(&head->coh_lock_guard);
473 * Other threads can acquire references to the
474 * top-lock through its sub-locks. Hence, it
475 * cannot be cl_lock_free()-ed immediately.
477 cl_lock_finish(env, lock);
486 * Returns existing lock matching given description. This is similar to
487 * cl_lock_find() except that no new lock is created, and returned lock is
488 * guaranteed to be in enum cl_lock_state::CLS_HELD state.
490 struct cl_lock *cl_lock_peek(const struct lu_env *env, const struct cl_io *io,
491 const struct cl_lock_descr *need,
492 const char *scope, const void *source)
494 struct cl_object_header *head;
495 struct cl_object *obj;
496 struct cl_lock *lock;
499 head = cl_object_header(obj);
501 spin_lock(&head->coh_lock_guard);
502 lock = cl_lock_lookup(env, obj, io, need);
503 spin_unlock(&head->coh_lock_guard);
508 cl_lock_mutex_get(env, lock);
509 if (lock->cll_state == CLS_CACHED)
510 cl_use_try(env, lock);
511 ok = lock->cll_state == CLS_HELD;
513 cl_lock_hold_add(env, lock, scope, source);
514 cl_lock_user_add(env, lock);
516 cl_lock_mutex_put(env, lock);
518 cl_lock_put(env, lock);
524 EXPORT_SYMBOL(cl_lock_peek);
527 * Returns a slice within a lock, corresponding to the given layer in the
532 const struct cl_lock_slice *cl_lock_at(const struct cl_lock *lock,
533 const struct lu_device_type *dtype)
535 const struct cl_lock_slice *slice;
537 LINVRNT(cl_lock_invariant_trusted(NULL, lock));
540 list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
541 if (slice->cls_obj->co_lu.lo_dev->ld_type == dtype)
546 EXPORT_SYMBOL(cl_lock_at);
548 static void cl_lock_trace(struct cl_thread_info *info,
549 const char *prefix, const struct cl_lock *lock)
551 CDEBUG(D_DLMTRACE|D_TRACE, "%s: %i@%p %p %i %i\n", prefix,
552 atomic_read(&lock->cll_ref), lock, lock->cll_guarder,
553 lock->cll_depth, info->clt_nr_locks_locked);
556 static void cl_lock_mutex_tail(const struct lu_env *env, struct cl_lock *lock)
558 struct cl_thread_info *info;
560 info = cl_env_info(env);
562 info->clt_nr_locks_locked++;
563 lu_ref_add(&info->clt_locks_locked, "cll_guard", lock);
564 cl_lock_trace(info, "got mutex", lock);
568 * Locks cl_lock object.
570 * This is used to manipulate cl_lock fields, and to serialize state
571 * transitions in the lock state machine.
573 * \post cl_lock_is_mutexed(lock)
575 * \see cl_lock_mutex_put()
577 void cl_lock_mutex_get(const struct lu_env *env, struct cl_lock *lock)
579 LINVRNT(cl_lock_invariant(env, lock));
581 if (lock->cll_guarder == cfs_current()) {
582 LINVRNT(cl_lock_is_mutexed(lock));
583 LINVRNT(lock->cll_depth > 0);
585 struct cl_object_header *hdr;
587 LINVRNT(lock->cll_guarder != cfs_current());
588 hdr = cl_object_header(lock->cll_descr.cld_obj);
589 mutex_lock_nested(&lock->cll_guard, hdr->coh_nesting);
590 lock->cll_guarder = cfs_current();
591 LINVRNT(lock->cll_depth == 0);
593 cl_lock_mutex_tail(env, lock);
595 EXPORT_SYMBOL(cl_lock_mutex_get);
598 * Try-locks cl_lock object.
600 * \retval 0 \a lock was successfully locked
602 * \retval -EBUSY \a lock cannot be locked right now
604 * \post ergo(result == 0, cl_lock_is_mutexed(lock))
606 * \see cl_lock_mutex_get()
608 int cl_lock_mutex_try(const struct lu_env *env, struct cl_lock *lock)
612 LINVRNT(cl_lock_invariant_trusted(env, lock));
616 if (lock->cll_guarder == cfs_current()) {
617 LINVRNT(lock->cll_depth > 0);
618 cl_lock_mutex_tail(env, lock);
619 } else if (mutex_trylock(&lock->cll_guard)) {
620 LINVRNT(lock->cll_depth == 0);
621 lock->cll_guarder = cfs_current();
622 cl_lock_mutex_tail(env, lock);
627 EXPORT_SYMBOL(cl_lock_mutex_try);
630 * Unlocks cl_lock object.
632 * \pre cl_lock_is_mutexed(lock)
634 * \see cl_lock_mutex_get()
636 void cl_lock_mutex_put(const struct lu_env *env, struct cl_lock *lock)
638 struct cl_thread_info *info;
640 LINVRNT(cl_lock_invariant(env, lock));
641 LINVRNT(cl_lock_is_mutexed(lock));
642 LINVRNT(lock->cll_guarder == cfs_current());
643 LINVRNT(lock->cll_depth > 0);
645 info = cl_env_info(env);
646 LINVRNT(info->clt_nr_locks_locked > 0);
648 cl_lock_trace(info, "put mutex", lock);
649 lu_ref_del(&info->clt_locks_locked, "cll_guard", lock);
650 info->clt_nr_locks_locked--;
651 if (--lock->cll_depth == 0) {
652 lock->cll_guarder = NULL;
653 mutex_unlock(&lock->cll_guard);
656 EXPORT_SYMBOL(cl_lock_mutex_put);
659 * Returns true iff lock's mutex is owned by the current thread.
661 int cl_lock_is_mutexed(struct cl_lock *lock)
663 return lock->cll_guarder == cfs_current();
665 EXPORT_SYMBOL(cl_lock_is_mutexed);
668 * Returns number of cl_lock mutices held by the current thread (environment).
670 int cl_lock_nr_mutexed(const struct lu_env *env)
672 return cl_env_info(env)->clt_nr_locks_locked;
674 EXPORT_SYMBOL(cl_lock_nr_mutexed);
676 static void cl_lock_cancel0(const struct lu_env *env, struct cl_lock *lock)
678 LINVRNT(cl_lock_is_mutexed(lock));
679 LINVRNT(cl_lock_invariant(env, lock));
681 if (!(lock->cll_flags & CLF_CANCELLED)) {
682 const struct cl_lock_slice *slice;
684 lock->cll_flags |= CLF_CANCELLED;
685 list_for_each_entry_reverse(slice, &lock->cll_layers,
687 if (slice->cls_ops->clo_cancel != NULL)
688 slice->cls_ops->clo_cancel(env, slice);
694 static void cl_lock_delete0(const struct lu_env *env, struct cl_lock *lock)
696 struct cl_object_header *head;
697 const struct cl_lock_slice *slice;
699 LINVRNT(cl_lock_is_mutexed(lock));
700 LINVRNT(cl_lock_invariant(env, lock));
703 if (lock->cll_state < CLS_FREEING) {
704 cl_lock_state_set(env, lock, CLS_FREEING);
706 head = cl_object_header(lock->cll_descr.cld_obj);
708 spin_lock(&head->coh_lock_guard);
709 list_del_init(&lock->cll_linkage);
711 * No locks, no pages. This is only valid for bottom sub-locks
712 * and head->coh_nesting == 1 check assumes two level top-sub
715 LASSERT(ergo(head->coh_nesting == 1 &&
716 list_empty(&head->coh_locks), !head->coh_pages));
717 spin_unlock(&head->coh_lock_guard);
719 * From now on, no new references to this lock can be acquired
720 * by cl_lock_lookup().
722 list_for_each_entry_reverse(slice, &lock->cll_layers,
724 if (slice->cls_ops->clo_delete != NULL)
725 slice->cls_ops->clo_delete(env, slice);
728 * From now on, no new references to this lock can be acquired
729 * by layer-specific means (like a pointer from struct
730 * ldlm_lock in osc, or a pointer from top-lock to sub-lock in
733 * Lock will be finally freed in cl_lock_put() when last of
734 * existing references goes away.
740 static void cl_lock_hold_mod(const struct lu_env *env, struct cl_lock *lock,
743 struct cl_thread_info *cti;
744 struct cl_object_header *hdr;
746 cti = cl_env_info(env);
747 hdr = cl_object_header(lock->cll_descr.cld_obj);
748 lock->cll_holds += delta;
749 if (hdr->coh_nesting == 0) {
750 cti->clt_nr_held += delta;
751 LASSERT(cti->clt_nr_held >= 0);
755 static void cl_lock_used_mod(const struct lu_env *env, struct cl_lock *lock,
758 struct cl_thread_info *cti;
759 struct cl_object_header *hdr;
761 cti = cl_env_info(env);
762 hdr = cl_object_header(lock->cll_descr.cld_obj);
763 lock->cll_users += delta;
764 if (hdr->coh_nesting == 0) {
765 cti->clt_nr_used += delta;
766 LASSERT(cti->clt_nr_used >= 0);
770 static void cl_lock_hold_release(const struct lu_env *env, struct cl_lock *lock,
771 const char *scope, const void *source)
773 LINVRNT(cl_lock_is_mutexed(lock));
774 LINVRNT(cl_lock_invariant(env, lock));
775 LASSERT(lock->cll_holds > 0);
778 lu_ref_del(&lock->cll_holders, scope, source);
779 cl_lock_hold_mod(env, lock, -1);
780 if (lock->cll_holds == 0) {
781 if (lock->cll_descr.cld_mode == CLM_PHANTOM)
783 * If lock is still phantom when user is done with
784 * it---destroy the lock.
786 lock->cll_flags |= CLF_CANCELPEND|CLF_DOOMED;
787 if (lock->cll_flags & CLF_CANCELPEND) {
788 lock->cll_flags &= ~CLF_CANCELPEND;
789 cl_lock_cancel0(env, lock);
791 if (lock->cll_flags & CLF_DOOMED) {
792 /* no longer doomed: it's dead... Jim. */
793 lock->cll_flags &= ~CLF_DOOMED;
794 cl_lock_delete0(env, lock);
802 * Waits until lock state is changed.
804 * This function is called with cl_lock mutex locked, atomically releases
805 * mutex and goes to sleep, waiting for a lock state change (signaled by
806 * cl_lock_signal()), and re-acquires the mutex before return.
808 * This function is used to wait until lock state machine makes some progress
809 * and to emulate synchronous operations on top of asynchronous lock
812 * \retval -EINTR wait was interrupted
814 * \retval 0 wait wasn't interrupted
816 * \pre cl_lock_is_mutexed(lock)
818 * \see cl_lock_signal()
820 int cl_lock_state_wait(const struct lu_env *env, struct cl_lock *lock)
822 cfs_waitlink_t waiter;
826 LINVRNT(cl_lock_is_mutexed(lock));
827 LINVRNT(cl_lock_invariant(env, lock));
828 LASSERT(lock->cll_depth == 1);
829 LASSERT(lock->cll_state != CLS_FREEING); /* too late to wait */
831 result = lock->cll_error;
832 if (result == 0 && !(lock->cll_flags & CLF_STATE)) {
833 cfs_waitlink_init(&waiter);
834 cfs_waitq_add(&lock->cll_wq, &waiter);
835 set_current_state(CFS_TASK_INTERRUPTIBLE);
836 cl_lock_mutex_put(env, lock);
838 LASSERT(cl_lock_nr_mutexed(env) == 0);
839 cfs_waitq_wait(&waiter, CFS_TASK_INTERRUPTIBLE);
841 cl_lock_mutex_get(env, lock);
842 set_current_state(CFS_TASK_RUNNING);
843 cfs_waitq_del(&lock->cll_wq, &waiter);
844 result = cfs_signal_pending() ? -EINTR : 0;
846 lock->cll_flags &= ~CLF_STATE;
849 EXPORT_SYMBOL(cl_lock_state_wait);
851 static void cl_lock_state_signal(const struct lu_env *env, struct cl_lock *lock,
852 enum cl_lock_state state)
854 const struct cl_lock_slice *slice;
857 LINVRNT(cl_lock_is_mutexed(lock));
858 LINVRNT(cl_lock_invariant(env, lock));
860 list_for_each_entry(slice, &lock->cll_layers, cls_linkage)
861 if (slice->cls_ops->clo_state != NULL)
862 slice->cls_ops->clo_state(env, slice, state);
863 lock->cll_flags |= CLF_STATE;
864 cfs_waitq_broadcast(&lock->cll_wq);
869 * Notifies waiters that lock state changed.
871 * Wakes up all waiters sleeping in cl_lock_state_wait(), also notifies all
872 * layers about state change by calling cl_lock_operations::clo_state()
875 void cl_lock_signal(const struct lu_env *env, struct cl_lock *lock)
878 cl_lock_state_signal(env, lock, lock->cll_state);
881 EXPORT_SYMBOL(cl_lock_signal);
884 * Changes lock state.
886 * This function is invoked to notify layers that lock state changed, possible
887 * as a result of an asynchronous event such as call-back reception.
889 * \post lock->cll_state == state
891 * \see cl_lock_operations::clo_state()
893 void cl_lock_state_set(const struct lu_env *env, struct cl_lock *lock,
894 enum cl_lock_state state)
896 struct cl_site *site = cl_object_site(lock->cll_descr.cld_obj);
899 LASSERT(lock->cll_state <= state ||
900 (lock->cll_state == CLS_CACHED &&
901 (state == CLS_HELD || /* lock found in cache */
902 state == CLS_NEW /* sub-lock canceled */)) ||
903 /* sub-lock canceled during unlocking */
904 (lock->cll_state == CLS_UNLOCKING && state == CLS_NEW));
906 if (lock->cll_state != state) {
907 atomic_dec(&site->cs_locks_state[lock->cll_state]);
908 atomic_inc(&site->cs_locks_state[state]);
910 cl_lock_state_signal(env, lock, state);
911 lock->cll_state = state;
915 EXPORT_SYMBOL(cl_lock_state_set);
918 * Yanks lock from the cache (cl_lock_state::CLS_CACHED state) by calling
919 * cl_lock_operations::clo_use() top-to-bottom to notify layers.
921 int cl_use_try(const struct lu_env *env, struct cl_lock *lock)
924 const struct cl_lock_slice *slice;
928 list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
929 if (slice->cls_ops->clo_use != NULL) {
930 result = slice->cls_ops->clo_use(env, slice);
935 LASSERT(result != -ENOSYS);
937 cl_lock_state_set(env, lock, CLS_HELD);
940 EXPORT_SYMBOL(cl_use_try);
943 * Helper for cl_enqueue_try() that calls ->clo_enqueue() across all layers
946 static int cl_enqueue_kick(const struct lu_env *env,
947 struct cl_lock *lock,
948 struct cl_io *io, __u32 flags)
951 const struct cl_lock_slice *slice;
955 list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
956 if (slice->cls_ops->clo_enqueue != NULL) {
957 result = slice->cls_ops->clo_enqueue(env,
963 LASSERT(result != -ENOSYS);
968 * Tries to enqueue a lock.
970 * This function is called repeatedly by cl_enqueue() until either lock is
971 * enqueued, or error occurs. This function does not block waiting for
972 * networking communication to complete.
974 * \post ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
975 * lock->cll_state == CLS_HELD)
977 * \see cl_enqueue() cl_lock_operations::clo_enqueue()
978 * \see cl_lock_state::CLS_ENQUEUED
980 int cl_enqueue_try(const struct lu_env *env, struct cl_lock *lock,
981 struct cl_io *io, __u32 flags)
989 LINVRNT(cl_lock_is_mutexed(lock));
991 if (lock->cll_error != 0)
993 switch (lock->cll_state) {
995 cl_lock_state_set(env, lock, CLS_QUEUING);
999 result = cl_enqueue_kick(env, lock, io, flags);
1001 cl_lock_state_set(env, lock, CLS_ENQUEUED);
1004 /* wait until unlocking finishes, and enqueue lock
1009 /* yank lock from the cache. */
1010 result = cl_use_try(env, lock);
1019 * impossible, only held locks with increased
1020 * ->cll_holds can be enqueued, and they cannot be
1025 } while (result == CLO_REPEAT);
1027 cl_lock_error(env, lock, result);
1028 RETURN(result ?: lock->cll_error);
1030 EXPORT_SYMBOL(cl_enqueue_try);
1032 static int cl_enqueue_locked(const struct lu_env *env, struct cl_lock *lock,
1033 struct cl_io *io, __u32 enqflags)
1039 LINVRNT(cl_lock_is_mutexed(lock));
1040 LINVRNT(cl_lock_invariant(env, lock));
1041 LASSERT(lock->cll_holds > 0);
1043 cl_lock_user_add(env, lock);
1045 result = cl_enqueue_try(env, lock, io, enqflags);
1046 if (result == CLO_WAIT) {
1047 result = cl_lock_state_wait(env, lock);
1054 cl_lock_user_del(env, lock);
1055 if (result != -EINTR)
1056 cl_lock_error(env, lock, result);
1058 LASSERT(ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
1059 lock->cll_state == CLS_HELD));
1066 * \pre current thread or io owns a hold on lock.
1068 * \post ergo(result == 0, lock->users increased)
1069 * \post ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
1070 * lock->cll_state == CLS_HELD)
1072 int cl_enqueue(const struct lu_env *env, struct cl_lock *lock,
1073 struct cl_io *io, __u32 enqflags)
1079 cl_lock_lockdep_acquire(env, lock, enqflags);
1080 cl_lock_mutex_get(env, lock);
1081 result = cl_enqueue_locked(env, lock, io, enqflags);
1082 cl_lock_mutex_put(env, lock);
1084 cl_lock_lockdep_release(env, lock);
1085 LASSERT(ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
1086 lock->cll_state == CLS_HELD));
1089 EXPORT_SYMBOL(cl_enqueue);
1092 * Tries to unlock a lock.
1094 * This function is called repeatedly by cl_unuse() until either lock is
1095 * unlocked, or error occurs.
1097 * \ppre lock->cll_state <= CLS_HELD || lock->cll_state == CLS_UNLOCKING
1099 * \post ergo(result == 0, lock->cll_state == CLS_CACHED)
1101 * \see cl_unuse() cl_lock_operations::clo_unuse()
1102 * \see cl_lock_state::CLS_CACHED
1104 int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock)
1106 const struct cl_lock_slice *slice;
1110 if (lock->cll_state != CLS_UNLOCKING) {
1111 if (lock->cll_users > 1) {
1112 cl_lock_user_del(env, lock);
1116 * New lock users (->cll_users) are not protecting unlocking
1117 * from proceeding. From this point, lock eventually reaches
1118 * CLS_CACHED, is reinitialized to CLS_NEW or fails into
1121 cl_lock_state_set(env, lock, CLS_UNLOCKING);
1126 if (lock->cll_error != 0)
1129 LINVRNT(cl_lock_is_mutexed(lock));
1130 LINVRNT(cl_lock_invariant(env, lock));
1131 LASSERT(lock->cll_state == CLS_UNLOCKING);
1132 LASSERT(lock->cll_users > 0);
1133 LASSERT(lock->cll_holds > 0);
1136 list_for_each_entry_reverse(slice, &lock->cll_layers,
1138 if (slice->cls_ops->clo_unuse != NULL) {
1139 result = slice->cls_ops->clo_unuse(env, slice);
1144 LASSERT(result != -ENOSYS);
1145 } while (result == CLO_REPEAT);
1146 if (result != CLO_WAIT)
1148 * Once there is no more need to iterate ->clo_unuse() calls,
1149 * remove lock user. This is done even if unrecoverable error
1150 * happened during unlocking, because nothing else can be
1153 cl_lock_user_del(env, lock);
1154 if (result == 0 || result == -ESTALE) {
1155 enum cl_lock_state state;
1158 * Return lock back to the cache. This is the only
1159 * place where lock is moved into CLS_CACHED state.
1161 * If one of ->clo_unuse() methods returned -ESTALE, lock
1162 * cannot be placed into cache and has to be
1163 * re-initialized. This happens e.g., when a sub-lock was
1164 * canceled while unlocking was in progress.
1166 state = result == 0 ? CLS_CACHED : CLS_NEW;
1167 cl_lock_state_set(env, lock, state);
1170 * Hide -ESTALE error.
1171 * If the lock is a glimpse lock, and it has multiple
1172 * stripes. Assuming that one of its sublock returned -ENAVAIL,
1173 * and other sublocks are matched write locks. In this case,
1174 * we can't set this lock to error because otherwise some of
1175 * its sublocks may not be canceled. This causes some dirty
1176 * pages won't be written to OSTs. -jay
1180 result = result ?: lock->cll_error;
1182 cl_lock_error(env, lock, result);
1185 EXPORT_SYMBOL(cl_unuse_try);
1187 static void cl_unuse_locked(const struct lu_env *env, struct cl_lock *lock)
1190 LASSERT(lock->cll_state <= CLS_HELD);
1194 result = cl_unuse_try(env, lock);
1195 if (result == CLO_WAIT) {
1196 result = cl_lock_state_wait(env, lock);
1208 void cl_unuse(const struct lu_env *env, struct cl_lock *lock)
1211 cl_lock_mutex_get(env, lock);
1212 cl_unuse_locked(env, lock);
1213 cl_lock_mutex_put(env, lock);
1214 cl_lock_lockdep_release(env, lock);
1217 EXPORT_SYMBOL(cl_unuse);
1220 * Tries to wait for a lock.
1222 * This function is called repeatedly by cl_wait() until either lock is
1223 * granted, or error occurs. This function does not block waiting for network
1224 * communication to complete.
1226 * \see cl_wait() cl_lock_operations::clo_wait()
1227 * \see cl_lock_state::CLS_HELD
1229 int cl_wait_try(const struct lu_env *env, struct cl_lock *lock)
1231 const struct cl_lock_slice *slice;
1236 LINVRNT(cl_lock_is_mutexed(lock));
1237 LINVRNT(cl_lock_invariant(env, lock));
1238 LASSERT(lock->cll_state == CLS_ENQUEUED ||
1239 lock->cll_state == CLS_HELD);
1240 LASSERT(lock->cll_users > 0);
1241 LASSERT(lock->cll_holds > 0);
1244 if (lock->cll_error != 0)
1246 if (lock->cll_state == CLS_HELD)
1251 list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
1252 if (slice->cls_ops->clo_wait != NULL) {
1253 result = slice->cls_ops->clo_wait(env, slice);
1258 LASSERT(result != -ENOSYS);
1260 cl_lock_state_set(env, lock, CLS_HELD);
1261 } while (result == CLO_REPEAT);
1262 RETURN(result ?: lock->cll_error);
1264 EXPORT_SYMBOL(cl_wait_try);
1267 * Waits until enqueued lock is granted.
1269 * \pre current thread or io owns a hold on the lock
1270 * \pre ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
1271 * lock->cll_state == CLS_HELD)
1273 * \post ergo(result == 0, lock->cll_state == CLS_HELD)
1275 int cl_wait(const struct lu_env *env, struct cl_lock *lock)
1280 cl_lock_mutex_get(env, lock);
1282 LINVRNT(cl_lock_invariant(env, lock));
1283 LASSERT(lock->cll_state == CLS_ENQUEUED || lock->cll_state == CLS_HELD);
1284 LASSERT(lock->cll_holds > 0);
1287 result = cl_wait_try(env, lock);
1288 if (result == CLO_WAIT) {
1289 result = cl_lock_state_wait(env, lock);
1296 cl_lock_user_del(env, lock);
1297 if (result != -EINTR)
1298 cl_lock_error(env, lock, result);
1299 cl_lock_lockdep_release(env, lock);
1301 cl_lock_mutex_put(env, lock);
1302 LASSERT(ergo(result == 0, lock->cll_state == CLS_HELD));
1305 EXPORT_SYMBOL(cl_wait);
1308 * Executes cl_lock_operations::clo_weigh(), and sums results to estimate lock
1311 unsigned long cl_lock_weigh(const struct lu_env *env, struct cl_lock *lock)
1313 const struct cl_lock_slice *slice;
1314 unsigned long pound;
1315 unsigned long ounce;
1318 LINVRNT(cl_lock_is_mutexed(lock));
1319 LINVRNT(cl_lock_invariant(env, lock));
1322 list_for_each_entry_reverse(slice, &lock->cll_layers, cls_linkage) {
1323 if (slice->cls_ops->clo_weigh != NULL) {
1324 ounce = slice->cls_ops->clo_weigh(env, slice);
1326 if (pound < ounce) /* over-weight^Wflow */
1332 EXPORT_SYMBOL(cl_lock_weigh);
1335 * Notifies layers that lock description changed.
1337 * The server can grant client a lock different from one that was requested
1338 * (e.g., larger in extent). This method is called when actually granted lock
1339 * description becomes known to let layers to accommodate for changed lock
1342 * \see cl_lock_operations::clo_modify()
1344 int cl_lock_modify(const struct lu_env *env, struct cl_lock *lock,
1345 const struct cl_lock_descr *desc)
1347 const struct cl_lock_slice *slice;
1348 struct cl_object *obj = lock->cll_descr.cld_obj;
1349 struct cl_object_header *hdr = cl_object_header(obj);
1353 /* don't allow object to change */
1354 LASSERT(obj == desc->cld_obj);
1355 LINVRNT(cl_lock_is_mutexed(lock));
1356 LINVRNT(cl_lock_invariant(env, lock));
1358 list_for_each_entry_reverse(slice, &lock->cll_layers, cls_linkage) {
1359 if (slice->cls_ops->clo_modify != NULL) {
1360 result = slice->cls_ops->clo_modify(env, slice, desc);
1365 CL_LOCK_DEBUG(D_DLMTRACE, env, lock, " -> "DDESCR"@"DFID"\n",
1366 PDESCR(desc), PFID(lu_object_fid(&desc->cld_obj->co_lu)));
1368 * Just replace description in place. Nothing more is needed for
1369 * now. If locks were indexed according to their extent and/or mode,
1370 * that index would have to be updated here.
1372 spin_lock(&hdr->coh_lock_guard);
1373 lock->cll_descr = *desc;
1374 spin_unlock(&hdr->coh_lock_guard);
1377 EXPORT_SYMBOL(cl_lock_modify);
1380 * Initializes lock closure with a given origin.
1382 * \see cl_lock_closure
1384 void cl_lock_closure_init(const struct lu_env *env,
1385 struct cl_lock_closure *closure,
1386 struct cl_lock *origin, int wait)
1388 LINVRNT(cl_lock_is_mutexed(origin));
1389 LINVRNT(cl_lock_invariant(env, origin));
1391 CFS_INIT_LIST_HEAD(&closure->clc_list);
1392 closure->clc_origin = origin;
1393 closure->clc_wait = wait;
1394 closure->clc_nr = 0;
1396 EXPORT_SYMBOL(cl_lock_closure_init);
1399 * Builds a closure of \a lock.
1401 * Building of a closure consists of adding initial lock (\a lock) into it,
1402 * and calling cl_lock_operations::clo_closure() methods of \a lock. These
1403 * methods might call cl_lock_closure_build() recursively again, adding more
1404 * locks to the closure, etc.
1406 * \see cl_lock_closure
1408 int cl_lock_closure_build(const struct lu_env *env, struct cl_lock *lock,
1409 struct cl_lock_closure *closure)
1411 const struct cl_lock_slice *slice;
1415 LINVRNT(cl_lock_is_mutexed(closure->clc_origin));
1416 LINVRNT(cl_lock_invariant(env, closure->clc_origin));
1418 result = cl_lock_enclosure(env, lock, closure);
1420 list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
1421 if (slice->cls_ops->clo_closure != NULL) {
1422 result = slice->cls_ops->clo_closure(env, slice,
1430 cl_lock_disclosure(env, closure);
1433 EXPORT_SYMBOL(cl_lock_closure_build);
1436 * Adds new lock to a closure.
1438 * Try-locks \a lock and if succeeded, adds it to the closure (never more than
1439 * once). If try-lock failed, returns CLO_REPEAT, after optionally waiting
1440 * until next try-lock is likely to succeed.
1442 int cl_lock_enclosure(const struct lu_env *env, struct cl_lock *lock,
1443 struct cl_lock_closure *closure)
1447 if (!cl_lock_mutex_try(env, lock)) {
1449 * If lock->cll_inclosure is not empty, lock is already in
1452 if (list_empty(&lock->cll_inclosure)) {
1453 cl_lock_get_trust(lock);
1454 lu_ref_add(&lock->cll_reference, "closure", closure);
1455 list_add(&lock->cll_inclosure, &closure->clc_list);
1458 cl_lock_mutex_put(env, lock);
1461 cl_lock_disclosure(env, closure);
1462 if (closure->clc_wait) {
1463 cl_lock_get_trust(lock);
1464 lu_ref_add(&lock->cll_reference, "closure-w", closure);
1465 cl_lock_mutex_put(env, closure->clc_origin);
1467 LASSERT(cl_lock_nr_mutexed(env) == 0);
1468 cl_lock_mutex_get(env, lock);
1469 cl_lock_mutex_put(env, lock);
1471 cl_lock_mutex_get(env, closure->clc_origin);
1472 lu_ref_del(&lock->cll_reference, "closure-w", closure);
1473 cl_lock_put(env, lock);
1475 result = CLO_REPEAT;
1479 EXPORT_SYMBOL(cl_lock_enclosure);
1481 /** Releases mutices of enclosed locks. */
1482 void cl_lock_disclosure(const struct lu_env *env,
1483 struct cl_lock_closure *closure)
1485 struct cl_lock *scan;
1486 struct cl_lock *temp;
1488 list_for_each_entry_safe(scan, temp, &closure->clc_list, cll_inclosure){
1489 list_del_init(&scan->cll_inclosure);
1490 cl_lock_mutex_put(env, scan);
1491 lu_ref_del(&scan->cll_reference, "closure", closure);
1492 cl_lock_put(env, scan);
1495 LASSERT(closure->clc_nr == 0);
1497 EXPORT_SYMBOL(cl_lock_disclosure);
1499 /** Finalizes a closure. */
1500 void cl_lock_closure_fini(struct cl_lock_closure *closure)
1502 LASSERT(closure->clc_nr == 0);
1503 LASSERT(list_empty(&closure->clc_list));
1505 EXPORT_SYMBOL(cl_lock_closure_fini);
1508 * Destroys this lock. Notifies layers (bottom-to-top) that lock is being
1509 * destroyed, then destroy the lock. If there are holds on the lock, postpone
1510 * destruction until all holds are released. This is called when a decision is
1511 * made to destroy the lock in the future. E.g., when a blocking AST is
1512 * received on it, or fatal communication error happens.
1514 * Caller must have a reference on this lock to prevent a situation, when
1515 * deleted lock lingers in memory for indefinite time, because nobody calls
1516 * cl_lock_put() to finish it.
1518 * \pre atomic_read(&lock->cll_ref) > 0
1520 * \see cl_lock_operations::clo_delete()
1521 * \see cl_lock::cll_holds
1523 void cl_lock_delete(const struct lu_env *env, struct cl_lock *lock)
1525 LINVRNT(cl_lock_is_mutexed(lock));
1526 LINVRNT(cl_lock_invariant(env, lock));
1529 if (lock->cll_holds == 0)
1530 cl_lock_delete0(env, lock);
1532 lock->cll_flags |= CLF_DOOMED;
1535 EXPORT_SYMBOL(cl_lock_delete);
1538 * Mark lock as irrecoverably failed, and mark it for destruction. This
1539 * happens when, e.g., server fails to grant a lock to us, or networking
1542 * \pre atomic_read(&lock->cll_ref) > 0
1544 * \see clo_lock_delete()
1545 * \see cl_lock::cll_holds
1547 void cl_lock_error(const struct lu_env *env, struct cl_lock *lock, int error)
1549 LINVRNT(cl_lock_is_mutexed(lock));
1550 LINVRNT(cl_lock_invariant(env, lock));
1553 if (lock->cll_error == 0 && error != 0) {
1554 lock->cll_error = error;
1555 cl_lock_signal(env, lock);
1556 cl_lock_cancel(env, lock);
1557 cl_lock_delete(env, lock);
1561 EXPORT_SYMBOL(cl_lock_error);
1564 * Cancels this lock. Notifies layers
1565 * (bottom-to-top) that lock is being cancelled, then destroy the lock. If
1566 * there are holds on the lock, postpone cancellation until
1567 * all holds are released.
1569 * Cancellation notification is delivered to layers at most once.
1571 * \see cl_lock_operations::clo_cancel()
1572 * \see cl_lock::cll_holds
1574 void cl_lock_cancel(const struct lu_env *env, struct cl_lock *lock)
1576 LINVRNT(cl_lock_is_mutexed(lock));
1577 LINVRNT(cl_lock_invariant(env, lock));
1579 if (lock->cll_holds == 0)
1580 cl_lock_cancel0(env, lock);
1582 lock->cll_flags |= CLF_CANCELPEND;
1585 EXPORT_SYMBOL(cl_lock_cancel);
1588 * Finds an existing lock covering given page and optionally different from a
1589 * given \a except lock.
1591 struct cl_lock *cl_lock_at_page(const struct lu_env *env, struct cl_object *obj,
1592 struct cl_page *page, struct cl_lock *except,
1593 int pending, int canceld)
1595 struct cl_object_header *head;
1596 struct cl_lock *scan;
1597 struct cl_lock *lock;
1598 struct cl_lock_descr *need;
1602 head = cl_object_header(obj);
1603 need = &cl_env_info(env)->clt_descr;
1606 need->cld_mode = CLM_READ; /* CLM_READ matches both READ & WRITE, but
1608 need->cld_start = need->cld_end = page->cp_index;
1610 spin_lock(&head->coh_lock_guard);
1611 list_for_each_entry(scan, &head->coh_locks, cll_linkage) {
1612 if (scan != except &&
1613 cl_lock_ext_match(&scan->cll_descr, need) &&
1614 scan->cll_state < CLS_FREEING &&
1616 * This check is racy as the lock can be canceled right
1617 * after it is done, but this is fine, because page exists
1620 (canceld || !(scan->cll_flags & CLF_CANCELLED)) &&
1621 (pending || !(scan->cll_flags & CLF_CANCELPEND))) {
1622 /* Don't increase cs_hit here since this
1623 * is just a helper function. */
1624 cl_lock_get_trust(scan);
1629 spin_unlock(&head->coh_lock_guard);
1632 EXPORT_SYMBOL(cl_lock_at_page);
1635 * Returns a list of pages protected (only) by a given lock.
1637 * Scans an extent of page radix tree, corresponding to the \a lock and queues
1638 * all pages that are not protected by locks other than \a lock into \a queue.
1640 void cl_lock_page_list_fixup(const struct lu_env *env,
1641 struct cl_io *io, struct cl_lock *lock,
1642 struct cl_page_list *queue)
1644 struct cl_page *page;
1645 struct cl_page *temp;
1646 struct cl_page_list *plist = &cl_env_info(env)->clt_list;
1648 LINVRNT(cl_lock_invariant(env, lock));
1651 /* Now, we have a list of cl_pages under the \a lock, we need
1652 * to check if some of pages are covered by other ldlm lock.
1653 * If this is the case, they aren't needed to be written out this time.
1655 * For example, we have A:[0,200] & B:[100,300] PW locks on client, now
1656 * the latter is to be canceled, this means other client is
1657 * reading/writing [200,300] since A won't canceled. Actually
1658 * we just need to write the pages covered by [200,300]. This is safe,
1659 * since [100,200] is also protected lock A.
1662 cl_page_list_init(plist);
1663 cl_page_list_for_each_safe(page, temp, queue) {
1664 pgoff_t idx = page->cp_index;
1665 struct cl_lock *found;
1666 struct cl_lock_descr *descr;
1668 /* The algorithm counts on the index-ascending page index. */
1669 LASSERT(ergo(&temp->cp_batch != &queue->pl_pages,
1670 page->cp_index < temp->cp_index));
1672 found = cl_lock_at_page(env, lock->cll_descr.cld_obj,
1677 descr = &found->cll_descr;
1678 list_for_each_entry_safe_from(page, temp, &queue->pl_pages,
1680 idx = page->cp_index;
1681 if (descr->cld_start > idx || descr->cld_end < idx)
1683 cl_page_list_move(plist, queue, page);
1685 cl_lock_put(env, found);
1688 /* The pages in plist are covered by other locks, don't handle them
1692 cl_page_list_disown(env, io, plist);
1693 cl_page_list_fini(env, plist);
1696 EXPORT_SYMBOL(cl_lock_page_list_fixup);
1699 * Invalidate pages protected by the given lock, sending them out to the
1700 * server first, if necessary.
1702 * This function does the following:
1704 * - collects a list of pages to be invalidated,
1706 * - unmaps them from the user virtual memory,
1708 * - sends dirty pages to the server,
1710 * - waits for transfer completion,
1712 * - discards pages, and throws them out of memory.
1714 * If \a discard is set, pages are discarded without sending them to the
1717 * If error happens on any step, the process continues anyway (the reasoning
1718 * behind this being that lock cancellation cannot be delayed indefinitely).
1720 int cl_lock_page_out(const struct lu_env *env, struct cl_lock *lock,
1723 struct cl_thread_info *info = cl_env_info(env);
1724 struct cl_io *io = &info->clt_io;
1725 struct cl_2queue *queue = &info->clt_queue;
1726 struct cl_lock_descr *descr = &lock->cll_descr;
1731 LINVRNT(cl_lock_invariant(env, lock));
1734 io->ci_obj = cl_object_top(descr->cld_obj);
1735 result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
1738 cl_2queue_init(queue);
1739 cl_page_gang_lookup(env, descr->cld_obj, io, descr->cld_start,
1740 descr->cld_end, &queue->c2_qin);
1741 if (queue->c2_qin.pl_nr > 0) {
1742 result = cl_page_list_unmap(env, io, &queue->c2_qin);
1744 rc0 = cl_io_submit_rw(env, io,
1746 rc1 = cl_page_list_own(env, io,
1748 result = result ?: rc0 ?: rc1;
1750 cl_lock_page_list_fixup(env, io, lock, &queue->c2_qout);
1751 cl_2queue_discard(env, io, queue);
1752 cl_2queue_disown(env, io, queue);
1754 cl_2queue_fini(env, queue);
1756 cl_io_fini(env, io);
1759 EXPORT_SYMBOL(cl_lock_page_out);
1762 * Eliminate all locks for a given object.
1764 * Caller has to guarantee that no lock is in active use.
1766 * \param cancel when this is set, cl_locks_prune() cancels locks before
1769 void cl_locks_prune(const struct lu_env *env, struct cl_object *obj, int cancel)
1771 struct cl_object_header *head;
1772 struct cl_lock *lock;
1775 head = cl_object_header(obj);
1777 * If locks are destroyed without cancellation, all pages must be
1778 * already destroyed (as otherwise they will be left unprotected).
1780 LASSERT(ergo(!cancel,
1781 head->coh_tree.rnode == NULL && head->coh_pages == 0));
1783 spin_lock(&head->coh_lock_guard);
1784 while (!list_empty(&head->coh_locks)) {
1785 lock = container_of(head->coh_locks.next,
1786 struct cl_lock, cll_linkage);
1787 cl_lock_get_trust(lock);
1788 spin_unlock(&head->coh_lock_guard);
1789 lu_ref_add(&lock->cll_reference, "prune", cfs_current());
1790 cl_lock_mutex_get(env, lock);
1791 if (lock->cll_state < CLS_FREEING) {
1792 LASSERT(lock->cll_holds == 0);
1793 LASSERT(lock->cll_users == 0);
1795 cl_lock_cancel(env, lock);
1796 cl_lock_delete(env, lock);
1798 cl_lock_mutex_put(env, lock);
1799 lu_ref_del(&lock->cll_reference, "prune", cfs_current());
1800 cl_lock_put(env, lock);
1801 spin_lock(&head->coh_lock_guard);
1803 spin_unlock(&head->coh_lock_guard);
1806 EXPORT_SYMBOL(cl_locks_prune);
1809 * Returns true if \a addr is an address of an allocated cl_lock. Used in
1810 * assertions. This check is optimistically imprecise, i.e., it occasionally
1811 * returns true for the incorrect addresses, but if it returns false, then the
1812 * address is guaranteed to be incorrect. (Should be named cl_lockp().)
1816 int cl_is_lock(const void *addr)
1818 return cfs_mem_is_in_cache(addr, cl_lock_kmem);
1820 EXPORT_SYMBOL(cl_is_lock);
1822 static struct cl_lock *cl_lock_hold_mutex(const struct lu_env *env,
1823 const struct cl_io *io,
1824 const struct cl_lock_descr *need,
1825 const char *scope, const void *source)
1827 struct cl_lock *lock;
1832 lock = cl_lock_find(env, io, need);
1835 cl_lock_mutex_get(env, lock);
1836 if (lock->cll_state < CLS_FREEING) {
1837 cl_lock_hold_mod(env, lock, +1);
1838 lu_ref_add(&lock->cll_holders, scope, source);
1839 lu_ref_add(&lock->cll_reference, scope, source);
1842 cl_lock_mutex_put(env, lock);
1843 cl_lock_put(env, lock);
1849 * Returns a lock matching \a need description with a reference and a hold on
1852 * This is much like cl_lock_find(), except that cl_lock_hold() additionally
1853 * guarantees that lock is not in the CLS_FREEING state on return.
1855 struct cl_lock *cl_lock_hold(const struct lu_env *env, const struct cl_io *io,
1856 const struct cl_lock_descr *need,
1857 const char *scope, const void *source)
1859 struct cl_lock *lock;
1863 lock = cl_lock_hold_mutex(env, io, need, scope, source);
1865 cl_lock_mutex_put(env, lock);
1868 EXPORT_SYMBOL(cl_lock_hold);
1871 * Main high-level entry point of cl_lock interface that finds existing or
1872 * enqueues new lock matching given description.
1874 struct cl_lock *cl_lock_request(const struct lu_env *env, struct cl_io *io,
1875 const struct cl_lock_descr *need,
1877 const char *scope, const void *source)
1879 struct cl_lock *lock;
1880 const struct lu_fid *fid;
1886 fid = lu_object_fid(&io->ci_obj->co_lu);
1889 warn = iter >= 16 && IS_PO2(iter);
1890 CDEBUG(warn ? D_WARNING : D_DLMTRACE,
1891 DDESCR"@"DFID" %i %08x `%s'\n",
1892 PDESCR(need), PFID(fid), iter, enqflags, scope);
1893 lock = cl_lock_hold_mutex(env, io, need, scope, source);
1894 if (!IS_ERR(lock)) {
1895 rc = cl_enqueue_locked(env, lock, io, enqflags);
1897 if (cl_lock_fits_into(env, lock, need, io)) {
1898 cl_lock_mutex_put(env, lock);
1899 cl_lock_lockdep_acquire(env,
1903 CL_LOCK_DEBUG(D_WARNING, env, lock,
1904 "got (see bug 17665)\n");
1905 cl_unuse_locked(env, lock);
1907 cl_lock_hold_release(env, lock, scope, source);
1908 cl_lock_mutex_put(env, lock);
1909 lu_ref_del(&lock->cll_reference, scope, source);
1910 cl_lock_put(env, lock);
1918 EXPORT_SYMBOL(cl_lock_request);
1921 * Adds a hold to a known lock.
1923 void cl_lock_hold_add(const struct lu_env *env, struct cl_lock *lock,
1924 const char *scope, const void *source)
1926 LINVRNT(cl_lock_is_mutexed(lock));
1927 LINVRNT(cl_lock_invariant(env, lock));
1928 LASSERT(lock->cll_state != CLS_FREEING);
1931 cl_lock_hold_mod(env, lock, +1);
1933 lu_ref_add(&lock->cll_holders, scope, source);
1934 lu_ref_add(&lock->cll_reference, scope, source);
1937 EXPORT_SYMBOL(cl_lock_hold_add);
1940 * Releases a hold and a reference on a lock, on which caller acquired a
1943 void cl_lock_unhold(const struct lu_env *env, struct cl_lock *lock,
1944 const char *scope, const void *source)
1946 LINVRNT(cl_lock_invariant(env, lock));
1948 cl_lock_hold_release(env, lock, scope, source);
1949 lu_ref_del(&lock->cll_reference, scope, source);
1950 cl_lock_put(env, lock);
1953 EXPORT_SYMBOL(cl_lock_unhold);
1956 * Releases a hold and a reference on a lock, obtained by cl_lock_hold().
1958 void cl_lock_release(const struct lu_env *env, struct cl_lock *lock,
1959 const char *scope, const void *source)
1961 LINVRNT(cl_lock_invariant(env, lock));
1963 cl_lock_mutex_get(env, lock);
1964 cl_lock_hold_release(env, lock, scope, source);
1965 cl_lock_mutex_put(env, lock);
1966 lu_ref_del(&lock->cll_reference, scope, source);
1967 cl_lock_put(env, lock);
1970 EXPORT_SYMBOL(cl_lock_release);
1972 void cl_lock_user_add(const struct lu_env *env, struct cl_lock *lock)
1974 LINVRNT(cl_lock_is_mutexed(lock));
1975 LINVRNT(cl_lock_invariant(env, lock));
1978 cl_lock_used_mod(env, lock, +1);
1981 EXPORT_SYMBOL(cl_lock_user_add);
1983 int cl_lock_user_del(const struct lu_env *env, struct cl_lock *lock)
1985 LINVRNT(cl_lock_is_mutexed(lock));
1986 LINVRNT(cl_lock_invariant(env, lock));
1987 LASSERT(lock->cll_users > 0);
1990 cl_lock_used_mod(env, lock, -1);
1991 RETURN(lock->cll_users == 0);
1993 EXPORT_SYMBOL(cl_lock_user_del);
1996 * Check if two lock's mode are compatible.
1998 * This returns true iff en-queuing \a lock2 won't cause cancellation of \a
1999 * lock1 even when these locks overlap.
2001 int cl_lock_compatible(const struct cl_lock *lock1, const struct cl_lock *lock2)
2003 enum cl_lock_mode mode1;
2004 enum cl_lock_mode mode2;
2007 mode1 = lock1->cll_descr.cld_mode;
2008 mode2 = lock2->cll_descr.cld_mode;
2009 RETURN(mode2 == CLM_PHANTOM ||
2010 (mode1 == CLM_READ && mode2 == CLM_READ));
2012 EXPORT_SYMBOL(cl_lock_compatible);
2014 const char *cl_lock_mode_name(const enum cl_lock_mode mode)
2016 static const char *names[] = {
2017 [CLM_PHANTOM] = "PHANTOM",
2018 [CLM_READ] = "READ",
2019 [CLM_WRITE] = "WRITE"
2021 if (0 <= mode && mode < ARRAY_SIZE(names))
2026 EXPORT_SYMBOL(cl_lock_mode_name);
2029 * Prints human readable representation of a lock description.
2031 void cl_lock_descr_print(const struct lu_env *env, void *cookie,
2032 lu_printer_t printer,
2033 const struct cl_lock_descr *descr)
2035 const struct lu_fid *fid;
2037 fid = lu_object_fid(&descr->cld_obj->co_lu);
2038 (*printer)(env, cookie, DDESCR"@"DFID, PDESCR(descr), PFID(fid));
2040 EXPORT_SYMBOL(cl_lock_descr_print);
2043 * Prints human readable representation of \a lock to the \a f.
2045 void cl_lock_print(const struct lu_env *env, void *cookie,
2046 lu_printer_t printer, const struct cl_lock *lock)
2048 const struct cl_lock_slice *slice;
2049 (*printer)(env, cookie, "lock@%p[%d %d %d %d %d %08lx] ",
2050 lock, atomic_read(&lock->cll_ref),
2051 lock->cll_state, lock->cll_error, lock->cll_holds,
2052 lock->cll_users, lock->cll_flags);
2053 cl_lock_descr_print(env, cookie, printer, &lock->cll_descr);
2054 (*printer)(env, cookie, " {\n");
2056 list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
2057 (*printer)(env, cookie, " %s@%p: ",
2058 slice->cls_obj->co_lu.lo_dev->ld_type->ldt_name,
2060 if (slice->cls_ops->clo_print != NULL)
2061 slice->cls_ops->clo_print(env, cookie, printer, slice);
2062 (*printer)(env, cookie, "\n");
2064 (*printer)(env, cookie, "} lock@%p\n", lock);
2066 EXPORT_SYMBOL(cl_lock_print);
2068 int cl_lock_init(void)
2070 return lu_kmem_init(cl_lock_caches);
2073 void cl_lock_fini(void)
2075 lu_kmem_fini(cl_lock_caches);