Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / obdclass / cl_lock.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Client Extent Lock.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_CLASS
42 #ifndef EXPORT_SYMTAB
43 # define EXPORT_SYMTAB
44 #endif
45
46 #include <obd_class.h>
47 #include <obd_support.h>
48 #include <lustre_fid.h>
49 #include <libcfs/list.h>
50 /* lu_time_global_{init,fini}() */
51 #include <lu_time.h>
52
53 #include <cl_object.h>
54 #include "cl_internal.h"
55
56 /** Lock class of cl_lock::cll_guard */
57 static struct lock_class_key cl_lock_guard_class;
58 static cfs_mem_cache_t *cl_lock_kmem;
59
60 static struct lu_kmem_descr cl_lock_caches[] = {
61         {
62                 .ckd_cache = &cl_lock_kmem,
63                 .ckd_name  = "cl_lock_kmem",
64                 .ckd_size  = sizeof (struct cl_lock)
65         },
66         {
67                 .ckd_cache = NULL
68         }
69 };
70
71 /**
72  * Basic lock invariant that is maintained at all times. Caller either has a
73  * reference to \a lock, or somehow assures that \a lock cannot be freed.
74  *
75  * \see cl_lock_invariant()
76  */
77 static int cl_lock_invariant_trusted(const struct lu_env *env,
78                                      const struct cl_lock *lock)
79 {
80         return
81                 cl_is_lock(lock) &&
82                 ergo(lock->cll_state == CLS_FREEING, lock->cll_holds == 0) &&
83                 atomic_read(&lock->cll_ref) >= lock->cll_holds &&
84                 lock->cll_holds >= lock->cll_users &&
85                 lock->cll_holds >= 0 &&
86                 lock->cll_users >= 0 &&
87                 lock->cll_depth >= 0;
88 }
89
90 /**
91  * Stronger lock invariant, checking that caller has a reference on a lock.
92  *
93  * \see cl_lock_invariant_trusted()
94  */
95 static int cl_lock_invariant(const struct lu_env *env,
96                              const struct cl_lock *lock)
97 {
98         int result;
99
100         result = atomic_read(&lock->cll_ref) > 0 &&
101                 cl_lock_invariant_trusted(env, lock);
102         if (!result && env != NULL)
103                 CL_LOCK_DEBUG(D_ERROR, env, lock, "invariant broken");
104         return result;
105 }
106
107 #define RETIP ((unsigned long)__builtin_return_address(0))
108
109 #ifdef CONFIG_LOCKDEP
110 static struct lock_class_key cl_lock_key;
111
112 static void cl_lock_lockdep_init(struct cl_lock *lock)
113 {
114         lockdep_set_class_and_name(lock, &cl_lock_key, "EXT");
115 }
116
117 static void cl_lock_lockdep_acquire(const struct lu_env *env,
118                                     struct cl_lock *lock, __u32 enqflags)
119 {
120         cl_env_info(env)->clt_nr_locks_acquired++;
121         lock_acquire(&lock->dep_map, !!(enqflags & CEF_ASYNC),
122                      /* try: */ 0, lock->cll_descr.cld_mode <= CLM_READ,
123                      /* check: */ 2, RETIP);
124 }
125
126 static void cl_lock_lockdep_release(const struct lu_env *env,
127                                     struct cl_lock *lock)
128 {
129         cl_env_info(env)->clt_nr_locks_acquired--;
130         lock_release(&lock->dep_map, 0, RETIP);
131 }
132
133 #else /* !CONFIG_LOCKDEP */
134
135 static void cl_lock_lockdep_init(struct cl_lock *lock)
136 {}
137 static void cl_lock_lockdep_acquire(const struct lu_env *env,
138                                     struct cl_lock *lock, __u32 enqflags)
139 {}
140 static void cl_lock_lockdep_release(const struct lu_env *env,
141                                     struct cl_lock *lock)
142 {}
143
144 #endif /* !CONFIG_LOCKDEP */
145
146 /**
147  * Adds lock slice to the compound lock.
148  *
149  * This is called by cl_object_operations::coo_lock_init() methods to add a
150  * per-layer state to the lock. New state is added at the end of
151  * cl_lock::cll_layers list, that is, it is at the bottom of the stack.
152  *
153  * \see cl_req_slice_add(), cl_page_slice_add(), cl_io_slice_add()
154  */
155 void cl_lock_slice_add(struct cl_lock *lock, struct cl_lock_slice *slice,
156                        struct cl_object *obj,
157                        const struct cl_lock_operations *ops)
158 {
159         ENTRY;
160         slice->cls_lock = lock;
161         list_add_tail(&slice->cls_linkage, &lock->cll_layers);
162         slice->cls_obj = obj;
163         slice->cls_ops = ops;
164         EXIT;
165 }
166 EXPORT_SYMBOL(cl_lock_slice_add);
167
168 /**
169  * Returns true iff a lock with the mode \a has provides at least the same
170  * guarantees as a lock with the mode \a need.
171  */
172 int cl_lock_mode_match(enum cl_lock_mode has, enum cl_lock_mode need)
173 {
174         LINVRNT(need == CLM_READ || need == CLM_WRITE || need == CLM_PHANTOM);
175         LINVRNT(has == CLM_READ || has == CLM_WRITE || has == CLM_PHANTOM);
176         CLASSERT(CLM_PHANTOM < CLM_READ);
177         CLASSERT(CLM_READ < CLM_WRITE);
178
179         return need <= has;
180 }
181 EXPORT_SYMBOL(cl_lock_mode_match);
182
183 /**
184  * Returns true iff extent portions of lock descriptions match.
185  */
186 int cl_lock_ext_match(const struct cl_lock_descr *has,
187                       const struct cl_lock_descr *need)
188 {
189         return
190                 has->cld_start <= need->cld_start &&
191                 has->cld_end >= need->cld_end &&
192                 cl_lock_mode_match(has->cld_mode, need->cld_mode);
193 }
194 EXPORT_SYMBOL(cl_lock_ext_match);
195
196 /**
197  * Returns true iff a lock with the description \a has provides at least the
198  * same guarantees as a lock with the description \a need.
199  */
200 int cl_lock_descr_match(const struct cl_lock_descr *has,
201                         const struct cl_lock_descr *need)
202 {
203         return
204                 cl_object_same(has->cld_obj, need->cld_obj) &&
205                 cl_lock_ext_match(has, need);
206 }
207 EXPORT_SYMBOL(cl_lock_descr_match);
208
209 static void cl_lock_free(const struct lu_env *env, struct cl_lock *lock)
210 {
211         struct cl_object *obj = lock->cll_descr.cld_obj;
212
213         LASSERT(cl_is_lock(lock));
214         LINVRNT(!cl_lock_is_mutexed(lock));
215
216         ENTRY;
217         might_sleep();
218         while (!list_empty(&lock->cll_layers)) {
219                 struct cl_lock_slice *slice;
220
221                 slice = list_entry(lock->cll_layers.next, struct cl_lock_slice,
222                                    cls_linkage);
223                 list_del_init(lock->cll_layers.next);
224                 slice->cls_ops->clo_fini(env, slice);
225         }
226         atomic_dec(&cl_object_site(obj)->cs_locks.cs_total);
227         atomic_dec(&cl_object_site(obj)->cs_locks_state[lock->cll_state]);
228         lu_object_ref_del_at(&obj->co_lu, lock->cll_obj_ref, "cl_lock", lock);
229         cl_object_put(env, obj);
230         lu_ref_fini(&lock->cll_reference);
231         lu_ref_fini(&lock->cll_holders);
232         mutex_destroy(&lock->cll_guard);
233         OBD_SLAB_FREE_PTR(lock, cl_lock_kmem);
234         EXIT;
235 }
236
237 /**
238  * Releases a reference on a lock.
239  *
240  * When last reference is released, lock is returned to the cache, unless it
241  * is in cl_lock_state::CLS_FREEING state, in which case it is destroyed
242  * immediately.
243  *
244  * \see cl_object_put(), cl_page_put()
245  */
246 void cl_lock_put(const struct lu_env *env, struct cl_lock *lock)
247 {
248         struct cl_object        *obj;
249         struct cl_object_header *head;
250         struct cl_site          *site;
251
252         LINVRNT(cl_lock_invariant(env, lock));
253         ENTRY;
254         obj = lock->cll_descr.cld_obj;
255         LINVRNT(obj != NULL);
256         head = cl_object_header(obj);
257         site = cl_object_site(obj);
258
259         CDEBUG(D_DLMTRACE, "releasing reference: %d %p %lu\n",
260                atomic_read(&lock->cll_ref), lock, RETIP);
261
262         if (atomic_dec_and_test(&lock->cll_ref)) {
263                 if (lock->cll_state == CLS_FREEING) {
264                         LASSERT(list_empty(&lock->cll_linkage));
265                         cl_lock_free(env, lock);
266                 }
267                 atomic_dec(&site->cs_locks.cs_busy);
268         }
269         EXIT;
270 }
271 EXPORT_SYMBOL(cl_lock_put);
272
273 /**
274  * Acquires an additional reference to a lock.
275  *
276  * This can be called only by caller already possessing a reference to \a
277  * lock.
278  *
279  * \see cl_object_get(), cl_page_get()
280  */
281 void cl_lock_get(struct cl_lock *lock)
282 {
283         LINVRNT(cl_lock_invariant(NULL, lock));
284         CDEBUG(D_DLMTRACE|D_TRACE, "acquiring reference: %d %p %lu\n",
285                atomic_read(&lock->cll_ref), lock, RETIP);
286         atomic_inc(&lock->cll_ref);
287 }
288 EXPORT_SYMBOL(cl_lock_get);
289
290 /**
291  * Acquires a reference to a lock.
292  *
293  * This is much like cl_lock_get(), except that this function can be used to
294  * acquire initial reference to the cached lock. Caller has to deal with all
295  * possible races. Use with care!
296  *
297  * \see cl_page_get_trust()
298  */
299 void cl_lock_get_trust(struct cl_lock *lock)
300 {
301         struct cl_site *site = cl_object_site(lock->cll_descr.cld_obj);
302
303         LASSERT(cl_is_lock(lock));
304         CDEBUG(D_DLMTRACE|D_TRACE, "acquiring trusted reference: %d %p %lu\n",
305                atomic_read(&lock->cll_ref), lock, RETIP);
306         if (atomic_inc_return(&lock->cll_ref) == 1)
307                 atomic_inc(&site->cs_locks.cs_busy);
308 }
309 EXPORT_SYMBOL(cl_lock_get_trust);
310
311 /**
312  * Helper function destroying the lock that wasn't completely initialized.
313  *
314  * Other threads can acquire references to the top-lock through its
315  * sub-locks. Hence, it cannot be cl_lock_free()-ed immediately.
316  */
317 static void cl_lock_finish(const struct lu_env *env, struct cl_lock *lock)
318 {
319         cl_lock_mutex_get(env, lock);
320         cl_lock_delete(env, lock);
321         cl_lock_mutex_put(env, lock);
322         cl_lock_put(env, lock);
323 }
324
325 static struct cl_lock *cl_lock_alloc(const struct lu_env *env,
326                                      struct cl_object *obj,
327                                      const struct cl_io *io,
328                                      const struct cl_lock_descr *descr)
329 {
330         struct cl_lock          *lock;
331         struct lu_object_header *head;
332         struct cl_site          *site = cl_object_site(obj);
333
334         ENTRY;
335         OBD_SLAB_ALLOC_PTR_GFP(lock, cl_lock_kmem, CFS_ALLOC_IO);
336         if (lock != NULL) {
337                 atomic_set(&lock->cll_ref, 1);
338                 lock->cll_descr = *descr;
339                 lock->cll_state = CLS_NEW;
340                 cl_object_get(obj);
341                 lock->cll_obj_ref = lu_object_ref_add(&obj->co_lu,
342                                                       "cl_lock", lock);
343                 CFS_INIT_LIST_HEAD(&lock->cll_layers);
344                 CFS_INIT_LIST_HEAD(&lock->cll_linkage);
345                 CFS_INIT_LIST_HEAD(&lock->cll_inclosure);
346                 lu_ref_init(&lock->cll_reference);
347                 lu_ref_init(&lock->cll_holders);
348                 mutex_init(&lock->cll_guard);
349                 lockdep_set_class(&lock->cll_guard, &cl_lock_guard_class);
350                 cfs_waitq_init(&lock->cll_wq);
351                 head = obj->co_lu.lo_header;
352                 atomic_inc(&site->cs_locks_state[CLS_NEW]);
353                 atomic_inc(&site->cs_locks.cs_total);
354                 atomic_inc(&site->cs_locks.cs_created);
355                 cl_lock_lockdep_init(lock);
356                 list_for_each_entry(obj, &head->loh_layers, co_lu.lo_linkage) {
357                         int err;
358
359                         err = obj->co_ops->coo_lock_init(env, obj, lock, io);
360                         if (err != 0) {
361                                 cl_lock_finish(env, lock);
362                                 lock = ERR_PTR(err);
363                                 break;
364                         }
365                 }
366         } else
367                 lock = ERR_PTR(-ENOMEM);
368         RETURN(lock);
369 }
370
371 /**
372  * Returns true iff lock is "suitable" for given io. E.g., locks acquired by
373  * truncate and O_APPEND cannot be reused for read/non-append-write, as they
374  * cover multiple stripes and can trigger cascading timeouts.
375  */
376 static int cl_lock_fits_into(const struct lu_env *env,
377                              const struct cl_lock *lock,
378                              const struct cl_lock_descr *need,
379                              const struct cl_io *io)
380 {
381         const struct cl_lock_slice *slice;
382
383         LINVRNT(cl_lock_invariant_trusted(env, lock));
384         ENTRY;
385         list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
386                 if (slice->cls_ops->clo_fits_into != NULL &&
387                     !slice->cls_ops->clo_fits_into(env, slice, need, io))
388                         RETURN(0);
389         }
390         RETURN(1);
391 }
392
393 static struct cl_lock *cl_lock_lookup(const struct lu_env *env,
394                                       struct cl_object *obj,
395                                       const struct cl_io *io,
396                                       const struct cl_lock_descr *need)
397 {
398         struct cl_lock          *lock;
399         struct cl_object_header *head;
400         struct cl_site          *site;
401
402         ENTRY;
403
404         head = cl_object_header(obj);
405         site = cl_object_site(obj);
406         LINVRNT_SPIN_LOCKED(&head->coh_lock_guard);
407         atomic_inc(&site->cs_locks.cs_lookup);
408         list_for_each_entry(lock, &head->coh_locks, cll_linkage) {
409                 int matched;
410
411                 LASSERT(cl_is_lock(lock));
412                 matched = cl_lock_ext_match(&lock->cll_descr, need) &&
413                         lock->cll_state < CLS_FREEING &&
414                         !(lock->cll_flags & CLF_CANCELLED) &&
415                         cl_lock_fits_into(env, lock, need, io);
416                 CDEBUG(D_DLMTRACE, "has: "DDESCR"(%i) need: "DDESCR": %d\n",
417                        PDESCR(&lock->cll_descr), lock->cll_state, PDESCR(need),
418                        matched);
419                 if (matched) {
420                         cl_lock_get_trust(lock);
421                         /* move the lock to the LRU head */
422                         list_move(&lock->cll_linkage, &head->coh_locks);
423                         atomic_inc(&cl_object_site(obj)->cs_locks.cs_hit);
424                         RETURN(lock);
425                 }
426         }
427         RETURN(NULL);
428 }
429
430 /**
431  * Returns a lock matching description \a need.
432  *
433  * This is the main entry point into the cl_lock caching interface. First, a
434  * cache (implemented as a per-object linked list) is consulted. If lock is
435  * found there, it is returned immediately. Otherwise new lock is allocated
436  * and returned. In any case, additional reference to lock is acquired.
437  *
438  * \see cl_object_find(), cl_page_find()
439  */
440 static struct cl_lock *cl_lock_find(const struct lu_env *env,
441                                     const struct cl_io *io,
442                                     const struct cl_lock_descr *need)
443 {
444         struct cl_object_header *head;
445         struct cl_object        *obj;
446         struct cl_lock          *lock;
447         struct cl_site          *site;
448
449         ENTRY;
450
451         obj  = need->cld_obj;
452         head = cl_object_header(obj);
453         site = cl_object_site(obj);
454
455         spin_lock(&head->coh_lock_guard);
456         lock = cl_lock_lookup(env, obj, io, need);
457         spin_unlock(&head->coh_lock_guard);
458
459         if (lock == NULL) {
460                 lock = cl_lock_alloc(env, obj, io, need);
461                 if (!IS_ERR(lock)) {
462                         struct cl_lock *ghost;
463
464                         spin_lock(&head->coh_lock_guard);
465                         ghost = cl_lock_lookup(env, obj, io, need);
466                         if (ghost == NULL) {
467                                 list_add(&lock->cll_linkage, &head->coh_locks);
468                                 spin_unlock(&head->coh_lock_guard);
469                                 atomic_inc(&site->cs_locks.cs_busy);
470                         } else {
471                                 spin_unlock(&head->coh_lock_guard);
472                                 /*
473                                  * Other threads can acquire references to the
474                                  * top-lock through its sub-locks. Hence, it
475                                  * cannot be cl_lock_free()-ed immediately.
476                                  */
477                                 cl_lock_finish(env, lock);
478                                 lock = ghost;
479                         }
480                 }
481         }
482         RETURN(lock);
483 }
484
485 /**
486  * Returns existing lock matching given description. This is similar to
487  * cl_lock_find() except that no new lock is created, and returned lock is
488  * guaranteed to be in enum cl_lock_state::CLS_HELD state.
489  */
490 struct cl_lock *cl_lock_peek(const struct lu_env *env, const struct cl_io *io,
491                              const struct cl_lock_descr *need,
492                              const char *scope, const void *source)
493 {
494         struct cl_object_header *head;
495         struct cl_object        *obj;
496         struct cl_lock          *lock;
497
498         obj  = need->cld_obj;
499         head = cl_object_header(obj);
500
501         spin_lock(&head->coh_lock_guard);
502         lock = cl_lock_lookup(env, obj, io, need);
503         spin_unlock(&head->coh_lock_guard);
504
505         if (lock != NULL) {
506                 int ok;
507
508                 cl_lock_mutex_get(env, lock);
509                 if (lock->cll_state == CLS_CACHED)
510                         cl_use_try(env, lock);
511                 ok = lock->cll_state == CLS_HELD;
512                 if (ok) {
513                         cl_lock_hold_add(env, lock, scope, source);
514                         cl_lock_user_add(env, lock);
515                 }
516                 cl_lock_mutex_put(env, lock);
517                 if (!ok) {
518                         cl_lock_put(env, lock);
519                         lock = NULL;
520                 }
521         }
522         return lock;
523 }
524 EXPORT_SYMBOL(cl_lock_peek);
525
526 /**
527  * Returns a slice within a lock, corresponding to the given layer in the
528  * device stack.
529  *
530  * \see cl_page_at()
531  */
532 const struct cl_lock_slice *cl_lock_at(const struct cl_lock *lock,
533                                        const struct lu_device_type *dtype)
534 {
535         const struct cl_lock_slice *slice;
536
537         LINVRNT(cl_lock_invariant_trusted(NULL, lock));
538         ENTRY;
539
540         list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
541                 if (slice->cls_obj->co_lu.lo_dev->ld_type == dtype)
542                         RETURN(slice);
543         }
544         RETURN(NULL);
545 }
546 EXPORT_SYMBOL(cl_lock_at);
547
548 static void cl_lock_trace(struct cl_thread_info *info,
549                           const char *prefix, const struct cl_lock *lock)
550 {
551         CDEBUG(D_DLMTRACE|D_TRACE, "%s: %i@%p %p %i %i\n", prefix,
552                atomic_read(&lock->cll_ref), lock, lock->cll_guarder,
553                lock->cll_depth, info->clt_nr_locks_locked);
554 }
555
556 static void cl_lock_mutex_tail(const struct lu_env *env, struct cl_lock *lock)
557 {
558         struct cl_thread_info *info;
559
560         info = cl_env_info(env);
561         lock->cll_depth++;
562         info->clt_nr_locks_locked++;
563         lu_ref_add(&info->clt_locks_locked, "cll_guard", lock);
564         cl_lock_trace(info, "got mutex", lock);
565 }
566
567 /**
568  * Locks cl_lock object.
569  *
570  * This is used to manipulate cl_lock fields, and to serialize state
571  * transitions in the lock state machine.
572  *
573  * \post cl_lock_is_mutexed(lock)
574  *
575  * \see cl_lock_mutex_put()
576  */
577 void cl_lock_mutex_get(const struct lu_env *env, struct cl_lock *lock)
578 {
579         LINVRNT(cl_lock_invariant(env, lock));
580
581         if (lock->cll_guarder == cfs_current()) {
582                 LINVRNT(cl_lock_is_mutexed(lock));
583                 LINVRNT(lock->cll_depth > 0);
584         } else {
585                 struct cl_object_header *hdr;
586
587                 LINVRNT(lock->cll_guarder != cfs_current());
588                 hdr = cl_object_header(lock->cll_descr.cld_obj);
589                 mutex_lock_nested(&lock->cll_guard, hdr->coh_nesting);
590                 lock->cll_guarder = cfs_current();
591                 LINVRNT(lock->cll_depth == 0);
592         }
593         cl_lock_mutex_tail(env, lock);
594 }
595 EXPORT_SYMBOL(cl_lock_mutex_get);
596
597 /**
598  * Try-locks cl_lock object.
599  *
600  * \retval 0 \a lock was successfully locked
601  *
602  * \retval -EBUSY \a lock cannot be locked right now
603  *
604  * \post ergo(result == 0, cl_lock_is_mutexed(lock))
605  *
606  * \see cl_lock_mutex_get()
607  */
608 int cl_lock_mutex_try(const struct lu_env *env, struct cl_lock *lock)
609 {
610         int result;
611
612         LINVRNT(cl_lock_invariant_trusted(env, lock));
613         ENTRY;
614
615         result = 0;
616         if (lock->cll_guarder == cfs_current()) {
617                 LINVRNT(lock->cll_depth > 0);
618                 cl_lock_mutex_tail(env, lock);
619         } else if (mutex_trylock(&lock->cll_guard)) {
620                 LINVRNT(lock->cll_depth == 0);
621                 lock->cll_guarder = cfs_current();
622                 cl_lock_mutex_tail(env, lock);
623         } else
624                 result = -EBUSY;
625         RETURN(result);
626 }
627 EXPORT_SYMBOL(cl_lock_mutex_try);
628
629 /**
630  * Unlocks cl_lock object.
631  *
632  * \pre cl_lock_is_mutexed(lock)
633  *
634  * \see cl_lock_mutex_get()
635  */
636 void cl_lock_mutex_put(const struct lu_env *env, struct cl_lock *lock)
637 {
638         struct cl_thread_info *info;
639
640         LINVRNT(cl_lock_invariant(env, lock));
641         LINVRNT(cl_lock_is_mutexed(lock));
642         LINVRNT(lock->cll_guarder == cfs_current());
643         LINVRNT(lock->cll_depth > 0);
644
645         info = cl_env_info(env);
646         LINVRNT(info->clt_nr_locks_locked > 0);
647
648         cl_lock_trace(info, "put mutex", lock);
649         lu_ref_del(&info->clt_locks_locked, "cll_guard", lock);
650         info->clt_nr_locks_locked--;
651         if (--lock->cll_depth == 0) {
652                 lock->cll_guarder = NULL;
653                 mutex_unlock(&lock->cll_guard);
654         }
655 }
656 EXPORT_SYMBOL(cl_lock_mutex_put);
657
658 /**
659  * Returns true iff lock's mutex is owned by the current thread.
660  */
661 int cl_lock_is_mutexed(struct cl_lock *lock)
662 {
663         return lock->cll_guarder == cfs_current();
664 }
665 EXPORT_SYMBOL(cl_lock_is_mutexed);
666
667 /**
668  * Returns number of cl_lock mutices held by the current thread (environment).
669  */
670 int cl_lock_nr_mutexed(const struct lu_env *env)
671 {
672         return cl_env_info(env)->clt_nr_locks_locked;
673 }
674 EXPORT_SYMBOL(cl_lock_nr_mutexed);
675
676 static void cl_lock_cancel0(const struct lu_env *env, struct cl_lock *lock)
677 {
678         LINVRNT(cl_lock_is_mutexed(lock));
679         LINVRNT(cl_lock_invariant(env, lock));
680         ENTRY;
681         if (!(lock->cll_flags & CLF_CANCELLED)) {
682                 const struct cl_lock_slice *slice;
683
684                 lock->cll_flags |= CLF_CANCELLED;
685                 list_for_each_entry_reverse(slice, &lock->cll_layers,
686                                             cls_linkage) {
687                         if (slice->cls_ops->clo_cancel != NULL)
688                                 slice->cls_ops->clo_cancel(env, slice);
689                 }
690         }
691         EXIT;
692 }
693
694 static void cl_lock_delete0(const struct lu_env *env, struct cl_lock *lock)
695 {
696         struct cl_object_header    *head;
697         const struct cl_lock_slice *slice;
698
699         LINVRNT(cl_lock_is_mutexed(lock));
700         LINVRNT(cl_lock_invariant(env, lock));
701
702         ENTRY;
703         if (lock->cll_state < CLS_FREEING) {
704                 cl_lock_state_set(env, lock, CLS_FREEING);
705
706                 head = cl_object_header(lock->cll_descr.cld_obj);
707
708                 spin_lock(&head->coh_lock_guard);
709                 list_del_init(&lock->cll_linkage);
710                 /*
711                  * No locks, no pages. This is only valid for bottom sub-locks
712                  * and head->coh_nesting == 1 check assumes two level top-sub
713                  * hierarchy.
714                  */
715                 LASSERT(ergo(head->coh_nesting == 1 &&
716                              list_empty(&head->coh_locks), !head->coh_pages));
717                 spin_unlock(&head->coh_lock_guard);
718                 /*
719                  * From now on, no new references to this lock can be acquired
720                  * by cl_lock_lookup().
721                  */
722                 list_for_each_entry_reverse(slice, &lock->cll_layers,
723                                             cls_linkage) {
724                         if (slice->cls_ops->clo_delete != NULL)
725                                 slice->cls_ops->clo_delete(env, slice);
726                 }
727                 /*
728                  * From now on, no new references to this lock can be acquired
729                  * by layer-specific means (like a pointer from struct
730                  * ldlm_lock in osc, or a pointer from top-lock to sub-lock in
731                  * lov).
732                  *
733                  * Lock will be finally freed in cl_lock_put() when last of
734                  * existing references goes away.
735                  */
736         }
737         EXIT;
738 }
739
740 static void cl_lock_hold_mod(const struct lu_env *env, struct cl_lock *lock,
741                              int delta)
742 {
743         struct cl_thread_info   *cti;
744         struct cl_object_header *hdr;
745
746         cti = cl_env_info(env);
747         hdr = cl_object_header(lock->cll_descr.cld_obj);
748         lock->cll_holds += delta;
749         if (hdr->coh_nesting == 0) {
750                 cti->clt_nr_held += delta;
751                 LASSERT(cti->clt_nr_held >= 0);
752         }
753 }
754
755 static void cl_lock_used_mod(const struct lu_env *env, struct cl_lock *lock,
756                              int delta)
757 {
758         struct cl_thread_info   *cti;
759         struct cl_object_header *hdr;
760
761         cti = cl_env_info(env);
762         hdr = cl_object_header(lock->cll_descr.cld_obj);
763         lock->cll_users += delta;
764         if (hdr->coh_nesting == 0) {
765                 cti->clt_nr_used += delta;
766                 LASSERT(cti->clt_nr_used >= 0);
767         }
768 }
769
770 static void cl_lock_hold_release(const struct lu_env *env, struct cl_lock *lock,
771                                  const char *scope, const void *source)
772 {
773         LINVRNT(cl_lock_is_mutexed(lock));
774         LINVRNT(cl_lock_invariant(env, lock));
775         LASSERT(lock->cll_holds > 0);
776
777         ENTRY;
778         lu_ref_del(&lock->cll_holders, scope, source);
779         cl_lock_hold_mod(env, lock, -1);
780         if (lock->cll_holds == 0) {
781                 if (lock->cll_descr.cld_mode == CLM_PHANTOM)
782                         /*
783                          * If lock is still phantom when user is done with
784                          * it---destroy the lock.
785                          */
786                         lock->cll_flags |= CLF_CANCELPEND|CLF_DOOMED;
787                 if (lock->cll_flags & CLF_CANCELPEND) {
788                         lock->cll_flags &= ~CLF_CANCELPEND;
789                         cl_lock_cancel0(env, lock);
790                 }
791                 if (lock->cll_flags & CLF_DOOMED) {
792                         /* no longer doomed: it's dead... Jim. */
793                         lock->cll_flags &= ~CLF_DOOMED;
794                         cl_lock_delete0(env, lock);
795                 }
796         }
797         EXIT;
798 }
799
800
801 /**
802  * Waits until lock state is changed.
803  *
804  * This function is called with cl_lock mutex locked, atomically releases
805  * mutex and goes to sleep, waiting for a lock state change (signaled by
806  * cl_lock_signal()), and re-acquires the mutex before return.
807  *
808  * This function is used to wait until lock state machine makes some progress
809  * and to emulate synchronous operations on top of asynchronous lock
810  * interface.
811  *
812  * \retval -EINTR wait was interrupted
813  *
814  * \retval 0 wait wasn't interrupted
815  *
816  * \pre cl_lock_is_mutexed(lock)
817  *
818  * \see cl_lock_signal()
819  */
820 int cl_lock_state_wait(const struct lu_env *env, struct cl_lock *lock)
821 {
822         cfs_waitlink_t waiter;
823         int result;
824
825         ENTRY;
826         LINVRNT(cl_lock_is_mutexed(lock));
827         LINVRNT(cl_lock_invariant(env, lock));
828         LASSERT(lock->cll_depth == 1);
829         LASSERT(lock->cll_state != CLS_FREEING); /* too late to wait */
830
831         result = lock->cll_error;
832         if (result == 0 && !(lock->cll_flags & CLF_STATE)) {
833                 cfs_waitlink_init(&waiter);
834                 cfs_waitq_add(&lock->cll_wq, &waiter);
835                 set_current_state(CFS_TASK_INTERRUPTIBLE);
836                 cl_lock_mutex_put(env, lock);
837
838                 LASSERT(cl_lock_nr_mutexed(env) == 0);
839                 cfs_waitq_wait(&waiter, CFS_TASK_INTERRUPTIBLE);
840
841                 cl_lock_mutex_get(env, lock);
842                 set_current_state(CFS_TASK_RUNNING);
843                 cfs_waitq_del(&lock->cll_wq, &waiter);
844                 result = cfs_signal_pending() ? -EINTR : 0;
845         }
846         lock->cll_flags &= ~CLF_STATE;
847         RETURN(result);
848 }
849 EXPORT_SYMBOL(cl_lock_state_wait);
850
851 static void cl_lock_state_signal(const struct lu_env *env, struct cl_lock *lock,
852                                  enum cl_lock_state state)
853 {
854         const struct cl_lock_slice *slice;
855
856         ENTRY;
857         LINVRNT(cl_lock_is_mutexed(lock));
858         LINVRNT(cl_lock_invariant(env, lock));
859
860         list_for_each_entry(slice, &lock->cll_layers, cls_linkage)
861                 if (slice->cls_ops->clo_state != NULL)
862                         slice->cls_ops->clo_state(env, slice, state);
863         lock->cll_flags |= CLF_STATE;
864         cfs_waitq_broadcast(&lock->cll_wq);
865         EXIT;
866 }
867
868 /**
869  * Notifies waiters that lock state changed.
870  *
871  * Wakes up all waiters sleeping in cl_lock_state_wait(), also notifies all
872  * layers about state change by calling cl_lock_operations::clo_state()
873  * top-to-bottom.
874  */
875 void cl_lock_signal(const struct lu_env *env, struct cl_lock *lock)
876 {
877         ENTRY;
878         cl_lock_state_signal(env, lock, lock->cll_state);
879         EXIT;
880 }
881 EXPORT_SYMBOL(cl_lock_signal);
882
883 /**
884  * Changes lock state.
885  *
886  * This function is invoked to notify layers that lock state changed, possible
887  * as a result of an asynchronous event such as call-back reception.
888  *
889  * \post lock->cll_state == state
890  *
891  * \see cl_lock_operations::clo_state()
892  */
893 void cl_lock_state_set(const struct lu_env *env, struct cl_lock *lock,
894                        enum cl_lock_state state)
895 {
896         struct cl_site *site = cl_object_site(lock->cll_descr.cld_obj);
897
898         ENTRY;
899         LASSERT(lock->cll_state <= state ||
900                 (lock->cll_state == CLS_CACHED &&
901                  (state == CLS_HELD || /* lock found in cache */
902                   state == CLS_NEW     /* sub-lock canceled */)) ||
903                 /* sub-lock canceled during unlocking */
904                 (lock->cll_state == CLS_UNLOCKING && state == CLS_NEW));
905
906         if (lock->cll_state != state) {
907                 atomic_dec(&site->cs_locks_state[lock->cll_state]);
908                 atomic_inc(&site->cs_locks_state[state]);
909
910                 cl_lock_state_signal(env, lock, state);
911                 lock->cll_state = state;
912         }
913         EXIT;
914 }
915 EXPORT_SYMBOL(cl_lock_state_set);
916
917 /**
918  * Yanks lock from the cache (cl_lock_state::CLS_CACHED state) by calling
919  * cl_lock_operations::clo_use() top-to-bottom to notify layers.
920  */
921 int cl_use_try(const struct lu_env *env, struct cl_lock *lock)
922 {
923         int result;
924         const struct cl_lock_slice *slice;
925
926         ENTRY;
927         result = -ENOSYS;
928         list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
929                 if (slice->cls_ops->clo_use != NULL) {
930                         result = slice->cls_ops->clo_use(env, slice);
931                         if (result != 0)
932                                 break;
933                 }
934         }
935         LASSERT(result != -ENOSYS);
936         if (result == 0)
937                 cl_lock_state_set(env, lock, CLS_HELD);
938         RETURN(result);
939 }
940 EXPORT_SYMBOL(cl_use_try);
941
942 /**
943  * Helper for cl_enqueue_try() that calls ->clo_enqueue() across all layers
944  * top-to-bottom.
945  */
946 static int cl_enqueue_kick(const struct lu_env *env,
947                            struct cl_lock *lock,
948                            struct cl_io *io, __u32 flags)
949 {
950         int result;
951         const struct cl_lock_slice *slice;
952
953         ENTRY;
954         result = -ENOSYS;
955         list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
956                 if (slice->cls_ops->clo_enqueue != NULL) {
957                         result = slice->cls_ops->clo_enqueue(env,
958                                                              slice, io, flags);
959                         if (result != 0)
960                                 break;
961                 }
962         }
963         LASSERT(result != -ENOSYS);
964         RETURN(result);
965 }
966
967 /**
968  * Tries to enqueue a lock.
969  *
970  * This function is called repeatedly by cl_enqueue() until either lock is
971  * enqueued, or error occurs. This function does not block waiting for
972  * networking communication to complete.
973  *
974  * \post ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
975  *                         lock->cll_state == CLS_HELD)
976  *
977  * \see cl_enqueue() cl_lock_operations::clo_enqueue()
978  * \see cl_lock_state::CLS_ENQUEUED
979  */
980 int cl_enqueue_try(const struct lu_env *env, struct cl_lock *lock,
981                    struct cl_io *io, __u32 flags)
982 {
983         int result;
984
985         ENTRY;
986         do {
987                 result = 0;
988
989                 LINVRNT(cl_lock_is_mutexed(lock));
990
991                 if (lock->cll_error != 0)
992                         break;
993                 switch (lock->cll_state) {
994                 case CLS_NEW:
995                         cl_lock_state_set(env, lock, CLS_QUEUING);
996                         /* fall-through */
997                 case CLS_QUEUING:
998                         /* kick layers. */
999                         result = cl_enqueue_kick(env, lock, io, flags);
1000                         if (result == 0)
1001                                 cl_lock_state_set(env, lock, CLS_ENQUEUED);
1002                         break;
1003                 case CLS_UNLOCKING:
1004                         /* wait until unlocking finishes, and enqueue lock
1005                          * afresh. */
1006                         result = CLO_WAIT;
1007                         break;
1008                 case CLS_CACHED:
1009                         /* yank lock from the cache. */
1010                         result = cl_use_try(env, lock);
1011                         break;
1012                 case CLS_ENQUEUED:
1013                 case CLS_HELD:
1014                         result = 0;
1015                         break;
1016                 default:
1017                 case CLS_FREEING:
1018                         /*
1019                          * impossible, only held locks with increased
1020                          * ->cll_holds can be enqueued, and they cannot be
1021                          * freed.
1022                          */
1023                         LBUG();
1024                 }
1025         } while (result == CLO_REPEAT);
1026         if (result < 0)
1027                 cl_lock_error(env, lock, result);
1028         RETURN(result ?: lock->cll_error);
1029 }
1030 EXPORT_SYMBOL(cl_enqueue_try);
1031
1032 static int cl_enqueue_locked(const struct lu_env *env, struct cl_lock *lock,
1033                              struct cl_io *io, __u32 enqflags)
1034 {
1035         int result;
1036
1037         ENTRY;
1038
1039         LINVRNT(cl_lock_is_mutexed(lock));
1040         LINVRNT(cl_lock_invariant(env, lock));
1041         LASSERT(lock->cll_holds > 0);
1042
1043         cl_lock_user_add(env, lock);
1044         do {
1045                 result = cl_enqueue_try(env, lock, io, enqflags);
1046                 if (result == CLO_WAIT) {
1047                         result = cl_lock_state_wait(env, lock);
1048                         if (result == 0)
1049                                 continue;
1050                 }
1051                 break;
1052         } while (1);
1053         if (result != 0) {
1054                 cl_lock_user_del(env, lock);
1055                 if (result != -EINTR)
1056                         cl_lock_error(env, lock, result);
1057         }
1058         LASSERT(ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
1059                      lock->cll_state == CLS_HELD));
1060         RETURN(result);
1061 }
1062
1063 /**
1064  * Enqueues a lock.
1065  *
1066  * \pre current thread or io owns a hold on lock.
1067  *
1068  * \post ergo(result == 0, lock->users increased)
1069  * \post ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
1070  *                         lock->cll_state == CLS_HELD)
1071  */
1072 int cl_enqueue(const struct lu_env *env, struct cl_lock *lock,
1073                struct cl_io *io, __u32 enqflags)
1074 {
1075         int result;
1076
1077         ENTRY;
1078
1079         cl_lock_lockdep_acquire(env, lock, enqflags);
1080         cl_lock_mutex_get(env, lock);
1081         result = cl_enqueue_locked(env, lock, io, enqflags);
1082         cl_lock_mutex_put(env, lock);
1083         if (result != 0)
1084                 cl_lock_lockdep_release(env, lock);
1085         LASSERT(ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
1086                      lock->cll_state == CLS_HELD));
1087         RETURN(result);
1088 }
1089 EXPORT_SYMBOL(cl_enqueue);
1090
1091 /**
1092  * Tries to unlock a lock.
1093  *
1094  * This function is called repeatedly by cl_unuse() until either lock is
1095  * unlocked, or error occurs.
1096  *
1097  * \ppre lock->cll_state <= CLS_HELD || lock->cll_state == CLS_UNLOCKING
1098  *
1099  * \post ergo(result == 0, lock->cll_state == CLS_CACHED)
1100  *
1101  * \see cl_unuse() cl_lock_operations::clo_unuse()
1102  * \see cl_lock_state::CLS_CACHED
1103  */
1104 int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock)
1105 {
1106         const struct cl_lock_slice *slice;
1107         int                         result;
1108
1109         ENTRY;
1110         if (lock->cll_state != CLS_UNLOCKING) {
1111                 if (lock->cll_users > 1) {
1112                         cl_lock_user_del(env, lock);
1113                         RETURN(0);
1114                 }
1115                 /*
1116                  * New lock users (->cll_users) are not protecting unlocking
1117                  * from proceeding. From this point, lock eventually reaches
1118                  * CLS_CACHED, is reinitialized to CLS_NEW or fails into
1119                  * CLS_FREEING.
1120                  */
1121                 cl_lock_state_set(env, lock, CLS_UNLOCKING);
1122         }
1123         do {
1124                 result = 0;
1125
1126                 if (lock->cll_error != 0)
1127                         break;
1128
1129                 LINVRNT(cl_lock_is_mutexed(lock));
1130                 LINVRNT(cl_lock_invariant(env, lock));
1131                 LASSERT(lock->cll_state == CLS_UNLOCKING);
1132                 LASSERT(lock->cll_users > 0);
1133                 LASSERT(lock->cll_holds > 0);
1134
1135                 result = -ENOSYS;
1136                 list_for_each_entry_reverse(slice, &lock->cll_layers,
1137                                             cls_linkage) {
1138                         if (slice->cls_ops->clo_unuse != NULL) {
1139                                 result = slice->cls_ops->clo_unuse(env, slice);
1140                                 if (result != 0)
1141                                         break;
1142                         }
1143                 }
1144                 LASSERT(result != -ENOSYS);
1145         } while (result == CLO_REPEAT);
1146         if (result != CLO_WAIT)
1147                 /*
1148                  * Once there is no more need to iterate ->clo_unuse() calls,
1149                  * remove lock user. This is done even if unrecoverable error
1150                  * happened during unlocking, because nothing else can be
1151                  * done.
1152                  */
1153                 cl_lock_user_del(env, lock);
1154         if (result == 0 || result == -ESTALE) {
1155                 enum cl_lock_state state;
1156
1157                 /*
1158                  * Return lock back to the cache. This is the only
1159                  * place where lock is moved into CLS_CACHED state.
1160                  *
1161                  * If one of ->clo_unuse() methods returned -ESTALE, lock
1162                  * cannot be placed into cache and has to be
1163                  * re-initialized. This happens e.g., when a sub-lock was
1164                  * canceled while unlocking was in progress.
1165                  */
1166                 state = result == 0 ? CLS_CACHED : CLS_NEW;
1167                 cl_lock_state_set(env, lock, state);
1168
1169                 /*
1170                  * Hide -ESTALE error.
1171                  * If the lock is a glimpse lock, and it has multiple
1172                  * stripes. Assuming that one of its sublock returned -ENAVAIL,
1173                  * and other sublocks are matched write locks. In this case,
1174                  * we can't set this lock to error because otherwise some of
1175                  * its sublocks may not be canceled. This causes some dirty
1176                  * pages won't be written to OSTs. -jay
1177                  */
1178                 result = 0;
1179         }
1180         result = result ?: lock->cll_error;
1181         if (result < 0)
1182                 cl_lock_error(env, lock, result);
1183         RETURN(result);
1184 }
1185 EXPORT_SYMBOL(cl_unuse_try);
1186
1187 static void cl_unuse_locked(const struct lu_env *env, struct cl_lock *lock)
1188 {
1189         ENTRY;
1190         LASSERT(lock->cll_state <= CLS_HELD);
1191         do {
1192                 int result;
1193
1194                 result = cl_unuse_try(env, lock);
1195                 if (result == CLO_WAIT) {
1196                         result = cl_lock_state_wait(env, lock);
1197                         if (result == 0)
1198                                 continue;
1199                 }
1200                 break;
1201         } while (1);
1202         EXIT;
1203 }
1204
1205 /**
1206  * Unlocks a lock.
1207  */
1208 void cl_unuse(const struct lu_env *env, struct cl_lock *lock)
1209 {
1210         ENTRY;
1211         cl_lock_mutex_get(env, lock);
1212         cl_unuse_locked(env, lock);
1213         cl_lock_mutex_put(env, lock);
1214         cl_lock_lockdep_release(env, lock);
1215         EXIT;
1216 }
1217 EXPORT_SYMBOL(cl_unuse);
1218
1219 /**
1220  * Tries to wait for a lock.
1221  *
1222  * This function is called repeatedly by cl_wait() until either lock is
1223  * granted, or error occurs. This function does not block waiting for network
1224  * communication to complete.
1225  *
1226  * \see cl_wait() cl_lock_operations::clo_wait()
1227  * \see cl_lock_state::CLS_HELD
1228  */
1229 int cl_wait_try(const struct lu_env *env, struct cl_lock *lock)
1230 {
1231         const struct cl_lock_slice *slice;
1232         int                         result;
1233
1234         ENTRY;
1235         do {
1236                 LINVRNT(cl_lock_is_mutexed(lock));
1237                 LINVRNT(cl_lock_invariant(env, lock));
1238                 LASSERT(lock->cll_state == CLS_ENQUEUED ||
1239                         lock->cll_state == CLS_HELD);
1240                 LASSERT(lock->cll_users > 0);
1241                 LASSERT(lock->cll_holds > 0);
1242
1243                 result = 0;
1244                 if (lock->cll_error != 0)
1245                         break;
1246                 if (lock->cll_state == CLS_HELD)
1247                         /* nothing to do */
1248                         break;
1249
1250                 result = -ENOSYS;
1251                 list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
1252                         if (slice->cls_ops->clo_wait != NULL) {
1253                                 result = slice->cls_ops->clo_wait(env, slice);
1254                                 if (result != 0)
1255                                         break;
1256                         }
1257                 }
1258                 LASSERT(result != -ENOSYS);
1259                 if (result == 0)
1260                         cl_lock_state_set(env, lock, CLS_HELD);
1261         } while (result == CLO_REPEAT);
1262         RETURN(result ?: lock->cll_error);
1263 }
1264 EXPORT_SYMBOL(cl_wait_try);
1265
1266 /**
1267  * Waits until enqueued lock is granted.
1268  *
1269  * \pre current thread or io owns a hold on the lock
1270  * \pre ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
1271  *                        lock->cll_state == CLS_HELD)
1272  *
1273  * \post ergo(result == 0, lock->cll_state == CLS_HELD)
1274  */
1275 int cl_wait(const struct lu_env *env, struct cl_lock *lock)
1276 {
1277         int result;
1278
1279         ENTRY;
1280         cl_lock_mutex_get(env, lock);
1281
1282         LINVRNT(cl_lock_invariant(env, lock));
1283         LASSERT(lock->cll_state == CLS_ENQUEUED || lock->cll_state == CLS_HELD);
1284         LASSERT(lock->cll_holds > 0);
1285
1286         do {
1287                 result = cl_wait_try(env, lock);
1288                 if (result == CLO_WAIT) {
1289                         result = cl_lock_state_wait(env, lock);
1290                         if (result == 0)
1291                                 continue;
1292                 }
1293                 break;
1294         } while (1);
1295         if (result < 0) {
1296                 cl_lock_user_del(env, lock);
1297                 if (result != -EINTR)
1298                         cl_lock_error(env, lock, result);
1299                 cl_lock_lockdep_release(env, lock);
1300         }
1301         cl_lock_mutex_put(env, lock);
1302         LASSERT(ergo(result == 0, lock->cll_state == CLS_HELD));
1303         RETURN(result);
1304 }
1305 EXPORT_SYMBOL(cl_wait);
1306
1307 /**
1308  * Executes cl_lock_operations::clo_weigh(), and sums results to estimate lock
1309  * value.
1310  */
1311 unsigned long cl_lock_weigh(const struct lu_env *env, struct cl_lock *lock)
1312 {
1313         const struct cl_lock_slice *slice;
1314         unsigned long pound;
1315         unsigned long ounce;
1316
1317         ENTRY;
1318         LINVRNT(cl_lock_is_mutexed(lock));
1319         LINVRNT(cl_lock_invariant(env, lock));
1320
1321         pound = 0;
1322         list_for_each_entry_reverse(slice, &lock->cll_layers, cls_linkage) {
1323                 if (slice->cls_ops->clo_weigh != NULL) {
1324                         ounce = slice->cls_ops->clo_weigh(env, slice);
1325                         pound += ounce;
1326                         if (pound < ounce) /* over-weight^Wflow */
1327                                 pound = ~0UL;
1328                 }
1329         }
1330         RETURN(pound);
1331 }
1332 EXPORT_SYMBOL(cl_lock_weigh);
1333
1334 /**
1335  * Notifies layers that lock description changed.
1336  *
1337  * The server can grant client a lock different from one that was requested
1338  * (e.g., larger in extent). This method is called when actually granted lock
1339  * description becomes known to let layers to accommodate for changed lock
1340  * description.
1341  *
1342  * \see cl_lock_operations::clo_modify()
1343  */
1344 int cl_lock_modify(const struct lu_env *env, struct cl_lock *lock,
1345                    const struct cl_lock_descr *desc)
1346 {
1347         const struct cl_lock_slice *slice;
1348         struct cl_object           *obj = lock->cll_descr.cld_obj;
1349         struct cl_object_header    *hdr = cl_object_header(obj);
1350         int result;
1351
1352         ENTRY;
1353         /* don't allow object to change */
1354         LASSERT(obj == desc->cld_obj);
1355         LINVRNT(cl_lock_is_mutexed(lock));
1356         LINVRNT(cl_lock_invariant(env, lock));
1357
1358         list_for_each_entry_reverse(slice, &lock->cll_layers, cls_linkage) {
1359                 if (slice->cls_ops->clo_modify != NULL) {
1360                         result = slice->cls_ops->clo_modify(env, slice, desc);
1361                         if (result != 0)
1362                                 RETURN(result);
1363                 }
1364         }
1365         CL_LOCK_DEBUG(D_DLMTRACE, env, lock, " -> "DDESCR"@"DFID"\n",
1366                       PDESCR(desc), PFID(lu_object_fid(&desc->cld_obj->co_lu)));
1367         /*
1368          * Just replace description in place. Nothing more is needed for
1369          * now. If locks were indexed according to their extent and/or mode,
1370          * that index would have to be updated here.
1371          */
1372         spin_lock(&hdr->coh_lock_guard);
1373         lock->cll_descr = *desc;
1374         spin_unlock(&hdr->coh_lock_guard);
1375         RETURN(0);
1376 }
1377 EXPORT_SYMBOL(cl_lock_modify);
1378
1379 /**
1380  * Initializes lock closure with a given origin.
1381  *
1382  * \see cl_lock_closure
1383  */
1384 void cl_lock_closure_init(const struct lu_env *env,
1385                           struct cl_lock_closure *closure,
1386                           struct cl_lock *origin, int wait)
1387 {
1388         LINVRNT(cl_lock_is_mutexed(origin));
1389         LINVRNT(cl_lock_invariant(env, origin));
1390
1391         CFS_INIT_LIST_HEAD(&closure->clc_list);
1392         closure->clc_origin = origin;
1393         closure->clc_wait   = wait;
1394         closure->clc_nr     = 0;
1395 }
1396 EXPORT_SYMBOL(cl_lock_closure_init);
1397
1398 /**
1399  * Builds a closure of \a lock.
1400  *
1401  * Building of a closure consists of adding initial lock (\a lock) into it,
1402  * and calling cl_lock_operations::clo_closure() methods of \a lock. These
1403  * methods might call cl_lock_closure_build() recursively again, adding more
1404  * locks to the closure, etc.
1405  *
1406  * \see cl_lock_closure
1407  */
1408 int cl_lock_closure_build(const struct lu_env *env, struct cl_lock *lock,
1409                           struct cl_lock_closure *closure)
1410 {
1411         const struct cl_lock_slice *slice;
1412         int result;
1413
1414         ENTRY;
1415         LINVRNT(cl_lock_is_mutexed(closure->clc_origin));
1416         LINVRNT(cl_lock_invariant(env, closure->clc_origin));
1417
1418         result = cl_lock_enclosure(env, lock, closure);
1419         if (result == 0) {
1420                 list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
1421                         if (slice->cls_ops->clo_closure != NULL) {
1422                                 result = slice->cls_ops->clo_closure(env, slice,
1423                                                                      closure);
1424                                 if (result != 0)
1425                                         break;
1426                         }
1427                 }
1428         }
1429         if (result != 0)
1430                 cl_lock_disclosure(env, closure);
1431         RETURN(result);
1432 }
1433 EXPORT_SYMBOL(cl_lock_closure_build);
1434
1435 /**
1436  * Adds new lock to a closure.
1437  *
1438  * Try-locks \a lock and if succeeded, adds it to the closure (never more than
1439  * once). If try-lock failed, returns CLO_REPEAT, after optionally waiting
1440  * until next try-lock is likely to succeed.
1441  */
1442 int cl_lock_enclosure(const struct lu_env *env, struct cl_lock *lock,
1443                       struct cl_lock_closure *closure)
1444 {
1445         int result;
1446         ENTRY;
1447         if (!cl_lock_mutex_try(env, lock)) {
1448                 /*
1449                  * If lock->cll_inclosure is not empty, lock is already in
1450                  * this closure.
1451                  */
1452                 if (list_empty(&lock->cll_inclosure)) {
1453                         cl_lock_get_trust(lock);
1454                         lu_ref_add(&lock->cll_reference, "closure", closure);
1455                         list_add(&lock->cll_inclosure, &closure->clc_list);
1456                         closure->clc_nr++;
1457                 } else
1458                         cl_lock_mutex_put(env, lock);
1459                 result = 0;
1460         } else {
1461                 cl_lock_disclosure(env, closure);
1462                 if (closure->clc_wait) {
1463                         cl_lock_get_trust(lock);
1464                         lu_ref_add(&lock->cll_reference, "closure-w", closure);
1465                         cl_lock_mutex_put(env, closure->clc_origin);
1466
1467                         LASSERT(cl_lock_nr_mutexed(env) == 0);
1468                         cl_lock_mutex_get(env, lock);
1469                         cl_lock_mutex_put(env, lock);
1470
1471                         cl_lock_mutex_get(env, closure->clc_origin);
1472                         lu_ref_del(&lock->cll_reference, "closure-w", closure);
1473                         cl_lock_put(env, lock);
1474                 }
1475                 result = CLO_REPEAT;
1476         }
1477         RETURN(result);
1478 }
1479 EXPORT_SYMBOL(cl_lock_enclosure);
1480
1481 /** Releases mutices of enclosed locks. */
1482 void cl_lock_disclosure(const struct lu_env *env,
1483                         struct cl_lock_closure *closure)
1484 {
1485         struct cl_lock *scan;
1486         struct cl_lock *temp;
1487
1488         list_for_each_entry_safe(scan, temp, &closure->clc_list, cll_inclosure){
1489                 list_del_init(&scan->cll_inclosure);
1490                 cl_lock_mutex_put(env, scan);
1491                 lu_ref_del(&scan->cll_reference, "closure", closure);
1492                 cl_lock_put(env, scan);
1493                 closure->clc_nr--;
1494         }
1495         LASSERT(closure->clc_nr == 0);
1496 }
1497 EXPORT_SYMBOL(cl_lock_disclosure);
1498
1499 /** Finalizes a closure. */
1500 void cl_lock_closure_fini(struct cl_lock_closure *closure)
1501 {
1502         LASSERT(closure->clc_nr == 0);
1503         LASSERT(list_empty(&closure->clc_list));
1504 }
1505 EXPORT_SYMBOL(cl_lock_closure_fini);
1506
1507 /**
1508  * Destroys this lock. Notifies layers (bottom-to-top) that lock is being
1509  * destroyed, then destroy the lock. If there are holds on the lock, postpone
1510  * destruction until all holds are released. This is called when a decision is
1511  * made to destroy the lock in the future. E.g., when a blocking AST is
1512  * received on it, or fatal communication error happens.
1513  *
1514  * Caller must have a reference on this lock to prevent a situation, when
1515  * deleted lock lingers in memory for indefinite time, because nobody calls
1516  * cl_lock_put() to finish it.
1517  *
1518  * \pre atomic_read(&lock->cll_ref) > 0
1519  *
1520  * \see cl_lock_operations::clo_delete()
1521  * \see cl_lock::cll_holds
1522  */
1523 void cl_lock_delete(const struct lu_env *env, struct cl_lock *lock)
1524 {
1525         LINVRNT(cl_lock_is_mutexed(lock));
1526         LINVRNT(cl_lock_invariant(env, lock));
1527
1528         ENTRY;
1529         if (lock->cll_holds == 0)
1530                 cl_lock_delete0(env, lock);
1531         else
1532                 lock->cll_flags |= CLF_DOOMED;
1533         EXIT;
1534 }
1535 EXPORT_SYMBOL(cl_lock_delete);
1536
1537 /**
1538  * Mark lock as irrecoverably failed, and mark it for destruction. This
1539  * happens when, e.g., server fails to grant a lock to us, or networking
1540  * time-out happens.
1541  *
1542  * \pre atomic_read(&lock->cll_ref) > 0
1543  *
1544  * \see clo_lock_delete()
1545  * \see cl_lock::cll_holds
1546  */
1547 void cl_lock_error(const struct lu_env *env, struct cl_lock *lock, int error)
1548 {
1549         LINVRNT(cl_lock_is_mutexed(lock));
1550         LINVRNT(cl_lock_invariant(env, lock));
1551
1552         ENTRY;
1553         if (lock->cll_error == 0 && error != 0) {
1554                 lock->cll_error = error;
1555                 cl_lock_signal(env, lock);
1556                 cl_lock_cancel(env, lock);
1557                 cl_lock_delete(env, lock);
1558         }
1559         EXIT;
1560 }
1561 EXPORT_SYMBOL(cl_lock_error);
1562
1563 /**
1564  * Cancels this lock. Notifies layers
1565  * (bottom-to-top) that lock is being cancelled, then destroy the lock. If
1566  * there are holds on the lock, postpone cancellation until
1567  * all holds are released.
1568  *
1569  * Cancellation notification is delivered to layers at most once.
1570  *
1571  * \see cl_lock_operations::clo_cancel()
1572  * \see cl_lock::cll_holds
1573  */
1574 void cl_lock_cancel(const struct lu_env *env, struct cl_lock *lock)
1575 {
1576         LINVRNT(cl_lock_is_mutexed(lock));
1577         LINVRNT(cl_lock_invariant(env, lock));
1578         ENTRY;
1579         if (lock->cll_holds == 0)
1580                 cl_lock_cancel0(env, lock);
1581         else
1582                 lock->cll_flags |= CLF_CANCELPEND;
1583         EXIT;
1584 }
1585 EXPORT_SYMBOL(cl_lock_cancel);
1586
1587 /**
1588  * Finds an existing lock covering given page and optionally different from a
1589  * given \a except lock.
1590  */
1591 struct cl_lock *cl_lock_at_page(const struct lu_env *env, struct cl_object *obj,
1592                                 struct cl_page *page, struct cl_lock *except,
1593                                 int pending, int canceld)
1594 {
1595         struct cl_object_header *head;
1596         struct cl_lock          *scan;
1597         struct cl_lock          *lock;
1598         struct cl_lock_descr    *need;
1599
1600         ENTRY;
1601
1602         head = cl_object_header(obj);
1603         need = &cl_env_info(env)->clt_descr;
1604         lock = NULL;
1605
1606         need->cld_mode = CLM_READ; /* CLM_READ matches both READ & WRITE, but
1607                                     * not PHANTOM */
1608         need->cld_start = need->cld_end = page->cp_index;
1609
1610         spin_lock(&head->coh_lock_guard);
1611         list_for_each_entry(scan, &head->coh_locks, cll_linkage) {
1612                 if (scan != except &&
1613                     cl_lock_ext_match(&scan->cll_descr, need) &&
1614                     scan->cll_state < CLS_FREEING &&
1615                     /*
1616                      * This check is racy as the lock can be canceled right
1617                      * after it is done, but this is fine, because page exists
1618                      * already.
1619                      */
1620                     (canceld || !(scan->cll_flags & CLF_CANCELLED)) &&
1621                     (pending || !(scan->cll_flags & CLF_CANCELPEND))) {
1622                         /* Don't increase cs_hit here since this
1623                          * is just a helper function. */
1624                         cl_lock_get_trust(scan);
1625                         lock = scan;
1626                         break;
1627                 }
1628         }
1629         spin_unlock(&head->coh_lock_guard);
1630         RETURN(lock);
1631 }
1632 EXPORT_SYMBOL(cl_lock_at_page);
1633
1634 /**
1635  * Returns a list of pages protected (only) by a given lock.
1636  *
1637  * Scans an extent of page radix tree, corresponding to the \a lock and queues
1638  * all pages that are not protected by locks other than \a lock into \a queue.
1639  */
1640 void cl_lock_page_list_fixup(const struct lu_env *env,
1641                              struct cl_io *io, struct cl_lock *lock,
1642                              struct cl_page_list *queue)
1643 {
1644         struct cl_page        *page;
1645         struct cl_page        *temp;
1646         struct cl_page_list   *plist = &cl_env_info(env)->clt_list;
1647
1648         LINVRNT(cl_lock_invariant(env, lock));
1649         ENTRY;
1650
1651         /* Now, we have a list of cl_pages under the \a lock, we need
1652          * to check if some of pages are covered by other ldlm lock.
1653          * If this is the case, they aren't needed to be written out this time.
1654          *
1655          * For example, we have A:[0,200] & B:[100,300] PW locks on client, now
1656          * the latter is to be canceled, this means other client is
1657          * reading/writing [200,300] since A won't canceled. Actually
1658          * we just need to write the pages covered by [200,300]. This is safe,
1659          * since [100,200] is also protected lock A.
1660          */
1661
1662         cl_page_list_init(plist);
1663         cl_page_list_for_each_safe(page, temp, queue) {
1664                 pgoff_t                idx = page->cp_index;
1665                 struct cl_lock        *found;
1666                 struct cl_lock_descr  *descr;
1667
1668                 /* The algorithm counts on the index-ascending page index. */
1669                 LASSERT(ergo(&temp->cp_batch != &queue->pl_pages,
1670                         page->cp_index < temp->cp_index));
1671
1672                 found = cl_lock_at_page(env, lock->cll_descr.cld_obj,
1673                                         page, lock, 0, 0);
1674                 if (found == NULL)
1675                         continue;
1676
1677                 descr = &found->cll_descr;
1678                 list_for_each_entry_safe_from(page, temp, &queue->pl_pages,
1679                                               cp_batch) {
1680                         idx = page->cp_index;
1681                         if (descr->cld_start > idx || descr->cld_end < idx)
1682                                 break;
1683                         cl_page_list_move(plist, queue, page);
1684                 }
1685                 cl_lock_put(env, found);
1686         }
1687
1688         /* The pages in plist are covered by other locks, don't handle them
1689          * this time.
1690          */
1691         if (io != NULL)
1692                 cl_page_list_disown(env, io, plist);
1693         cl_page_list_fini(env, plist);
1694         EXIT;
1695 }
1696 EXPORT_SYMBOL(cl_lock_page_list_fixup);
1697
1698 /**
1699  * Invalidate pages protected by the given lock, sending them out to the
1700  * server first, if necessary.
1701  *
1702  * This function does the following:
1703  *
1704  *     - collects a list of pages to be invalidated,
1705  *
1706  *     - unmaps them from the user virtual memory,
1707  *
1708  *     - sends dirty pages to the server,
1709  *
1710  *     - waits for transfer completion,
1711  *
1712  *     - discards pages, and throws them out of memory.
1713  *
1714  * If \a discard is set, pages are discarded without sending them to the
1715  * server.
1716  *
1717  * If error happens on any step, the process continues anyway (the reasoning
1718  * behind this being that lock cancellation cannot be delayed indefinitely).
1719  */
1720 int cl_lock_page_out(const struct lu_env *env, struct cl_lock *lock,
1721                      int discard)
1722 {
1723         struct cl_thread_info *info  = cl_env_info(env);
1724         struct cl_io          *io    = &info->clt_io;
1725         struct cl_2queue      *queue = &info->clt_queue;
1726         struct cl_lock_descr  *descr = &lock->cll_descr;
1727         int                      result;
1728         int                      rc0;
1729         int                      rc1;
1730
1731         LINVRNT(cl_lock_invariant(env, lock));
1732         ENTRY;
1733
1734         io->ci_obj = cl_object_top(descr->cld_obj);
1735         result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
1736         if (result == 0) {
1737
1738                 cl_2queue_init(queue);
1739                 cl_page_gang_lookup(env, descr->cld_obj, io, descr->cld_start,
1740                                     descr->cld_end, &queue->c2_qin);
1741                 if (queue->c2_qin.pl_nr > 0) {
1742                         result = cl_page_list_unmap(env, io, &queue->c2_qin);
1743                         if (!discard) {
1744                                 rc0 = cl_io_submit_rw(env, io, CRT_WRITE,
1745                                                       queue, CRP_CANCEL);
1746                                 rc1 = cl_page_list_own(env, io,
1747                                                        &queue->c2_qout);
1748                                 result = result ?: rc0 ?: rc1;
1749                         }
1750                         cl_lock_page_list_fixup(env, io, lock, &queue->c2_qout);
1751                         cl_2queue_discard(env, io, queue);
1752                         cl_2queue_disown(env, io, queue);
1753                 }
1754                 cl_2queue_fini(env, queue);
1755         }
1756         cl_io_fini(env, io);
1757         RETURN(result);
1758 }
1759 EXPORT_SYMBOL(cl_lock_page_out);
1760
1761 /**
1762  * Eliminate all locks for a given object.
1763  *
1764  * Caller has to guarantee that no lock is in active use.
1765  *
1766  * \param cancel when this is set, cl_locks_prune() cancels locks before
1767  *               destroying.
1768  */
1769 void cl_locks_prune(const struct lu_env *env, struct cl_object *obj, int cancel)
1770 {
1771         struct cl_object_header *head;
1772         struct cl_lock          *lock;
1773
1774         ENTRY;
1775         head = cl_object_header(obj);
1776         /*
1777          * If locks are destroyed without cancellation, all pages must be
1778          * already destroyed (as otherwise they will be left unprotected).
1779          */
1780         LASSERT(ergo(!cancel,
1781                      head->coh_tree.rnode == NULL && head->coh_pages == 0));
1782
1783         spin_lock(&head->coh_lock_guard);
1784         while (!list_empty(&head->coh_locks)) {
1785                 lock = container_of(head->coh_locks.next,
1786                                     struct cl_lock, cll_linkage);
1787                 cl_lock_get_trust(lock);
1788                 spin_unlock(&head->coh_lock_guard);
1789                 lu_ref_add(&lock->cll_reference, "prune", cfs_current());
1790                 cl_lock_mutex_get(env, lock);
1791                 if (lock->cll_state < CLS_FREEING) {
1792                         LASSERT(lock->cll_holds == 0);
1793                         LASSERT(lock->cll_users == 0);
1794                         if (cancel)
1795                                 cl_lock_cancel(env, lock);
1796                         cl_lock_delete(env, lock);
1797                 }
1798                 cl_lock_mutex_put(env, lock);
1799                 lu_ref_del(&lock->cll_reference, "prune", cfs_current());
1800                 cl_lock_put(env, lock);
1801                 spin_lock(&head->coh_lock_guard);
1802         }
1803         spin_unlock(&head->coh_lock_guard);
1804         EXIT;
1805 }
1806 EXPORT_SYMBOL(cl_locks_prune);
1807
1808 /**
1809  * Returns true if \a addr is an address of an allocated cl_lock. Used in
1810  * assertions. This check is optimistically imprecise, i.e., it occasionally
1811  * returns true for the incorrect addresses, but if it returns false, then the
1812  * address is guaranteed to be incorrect. (Should be named cl_lockp().)
1813  *
1814  * \see cl_is_page()
1815  */
1816 int cl_is_lock(const void *addr)
1817 {
1818         return cfs_mem_is_in_cache(addr, cl_lock_kmem);
1819 }
1820 EXPORT_SYMBOL(cl_is_lock);
1821
1822 static struct cl_lock *cl_lock_hold_mutex(const struct lu_env *env,
1823                                           const struct cl_io *io,
1824                                           const struct cl_lock_descr *need,
1825                                           const char *scope, const void *source)
1826 {
1827         struct cl_lock *lock;
1828
1829         ENTRY;
1830
1831         while (1) {
1832                 lock = cl_lock_find(env, io, need);
1833                 if (IS_ERR(lock))
1834                         break;
1835                 cl_lock_mutex_get(env, lock);
1836                 if (lock->cll_state < CLS_FREEING) {
1837                         cl_lock_hold_mod(env, lock, +1);
1838                         lu_ref_add(&lock->cll_holders, scope, source);
1839                         lu_ref_add(&lock->cll_reference, scope, source);
1840                         break;
1841                 }
1842                 cl_lock_mutex_put(env, lock);
1843                 cl_lock_put(env, lock);
1844         }
1845         RETURN(lock);
1846 }
1847
1848 /**
1849  * Returns a lock matching \a need description with a reference and a hold on
1850  * it.
1851  *
1852  * This is much like cl_lock_find(), except that cl_lock_hold() additionally
1853  * guarantees that lock is not in the CLS_FREEING state on return.
1854  */
1855 struct cl_lock *cl_lock_hold(const struct lu_env *env, const struct cl_io *io,
1856                              const struct cl_lock_descr *need,
1857                              const char *scope, const void *source)
1858 {
1859         struct cl_lock *lock;
1860
1861         ENTRY;
1862
1863         lock = cl_lock_hold_mutex(env, io, need, scope, source);
1864         if (!IS_ERR(lock))
1865                 cl_lock_mutex_put(env, lock);
1866         RETURN(lock);
1867 }
1868 EXPORT_SYMBOL(cl_lock_hold);
1869
1870 /**
1871  * Main high-level entry point of cl_lock interface that finds existing or
1872  * enqueues new lock matching given description.
1873  */
1874 struct cl_lock *cl_lock_request(const struct lu_env *env, struct cl_io *io,
1875                                 const struct cl_lock_descr *need,
1876                                 __u32 enqflags,
1877                                 const char *scope, const void *source)
1878 {
1879         struct cl_lock       *lock;
1880         const struct lu_fid  *fid;
1881         int                   rc;
1882         int                   iter;
1883         int warn;
1884
1885         ENTRY;
1886         fid = lu_object_fid(&io->ci_obj->co_lu);
1887         iter = 0;
1888         do {
1889                 warn = iter >= 16 && IS_PO2(iter);
1890                 CDEBUG(warn ? D_WARNING : D_DLMTRACE,
1891                        DDESCR"@"DFID" %i %08x `%s'\n",
1892                        PDESCR(need), PFID(fid), iter, enqflags, scope);
1893                 lock = cl_lock_hold_mutex(env, io, need, scope, source);
1894                 if (!IS_ERR(lock)) {
1895                         rc = cl_enqueue_locked(env, lock, io, enqflags);
1896                         if (rc == 0) {
1897                                 if (cl_lock_fits_into(env, lock, need, io)) {
1898                                         cl_lock_mutex_put(env, lock);
1899                                         cl_lock_lockdep_acquire(env,
1900                                                                 lock, enqflags);
1901                                         break;
1902                                 } else if (warn)
1903                                         CL_LOCK_DEBUG(D_WARNING, env, lock,
1904                                                       "got (see bug 17665)\n");
1905                                 cl_unuse_locked(env, lock);
1906                         }
1907                         cl_lock_hold_release(env, lock, scope, source);
1908                         cl_lock_mutex_put(env, lock);
1909                         lu_ref_del(&lock->cll_reference, scope, source);
1910                         cl_lock_put(env, lock);
1911                         lock = ERR_PTR(rc);
1912                 } else
1913                         rc = PTR_ERR(lock);
1914                 iter++;
1915         } while (rc == 0);
1916         RETURN(lock);
1917 }
1918 EXPORT_SYMBOL(cl_lock_request);
1919
1920 /**
1921  * Adds a hold to a known lock.
1922  */
1923 void cl_lock_hold_add(const struct lu_env *env, struct cl_lock *lock,
1924                       const char *scope, const void *source)
1925 {
1926         LINVRNT(cl_lock_is_mutexed(lock));
1927         LINVRNT(cl_lock_invariant(env, lock));
1928         LASSERT(lock->cll_state != CLS_FREEING);
1929
1930         ENTRY;
1931         cl_lock_hold_mod(env, lock, +1);
1932         cl_lock_get(lock);
1933         lu_ref_add(&lock->cll_holders, scope, source);
1934         lu_ref_add(&lock->cll_reference, scope, source);
1935         EXIT;
1936 }
1937 EXPORT_SYMBOL(cl_lock_hold_add);
1938
1939 /**
1940  * Releases a hold and a reference on a lock, on which caller acquired a
1941  * mutex.
1942  */
1943 void cl_lock_unhold(const struct lu_env *env, struct cl_lock *lock,
1944                     const char *scope, const void *source)
1945 {
1946         LINVRNT(cl_lock_invariant(env, lock));
1947         ENTRY;
1948         cl_lock_hold_release(env, lock, scope, source);
1949         lu_ref_del(&lock->cll_reference, scope, source);
1950         cl_lock_put(env, lock);
1951         EXIT;
1952 }
1953 EXPORT_SYMBOL(cl_lock_unhold);
1954
1955 /**
1956  * Releases a hold and a reference on a lock, obtained by cl_lock_hold().
1957  */
1958 void cl_lock_release(const struct lu_env *env, struct cl_lock *lock,
1959                      const char *scope, const void *source)
1960 {
1961         LINVRNT(cl_lock_invariant(env, lock));
1962         ENTRY;
1963         cl_lock_mutex_get(env, lock);
1964         cl_lock_hold_release(env, lock, scope, source);
1965         cl_lock_mutex_put(env, lock);
1966         lu_ref_del(&lock->cll_reference, scope, source);
1967         cl_lock_put(env, lock);
1968         EXIT;
1969 }
1970 EXPORT_SYMBOL(cl_lock_release);
1971
1972 void cl_lock_user_add(const struct lu_env *env, struct cl_lock *lock)
1973 {
1974         LINVRNT(cl_lock_is_mutexed(lock));
1975         LINVRNT(cl_lock_invariant(env, lock));
1976
1977         ENTRY;
1978         cl_lock_used_mod(env, lock, +1);
1979         EXIT;
1980 }
1981 EXPORT_SYMBOL(cl_lock_user_add);
1982
1983 int cl_lock_user_del(const struct lu_env *env, struct cl_lock *lock)
1984 {
1985         LINVRNT(cl_lock_is_mutexed(lock));
1986         LINVRNT(cl_lock_invariant(env, lock));
1987         LASSERT(lock->cll_users > 0);
1988
1989         ENTRY;
1990         cl_lock_used_mod(env, lock, -1);
1991         RETURN(lock->cll_users == 0);
1992 }
1993 EXPORT_SYMBOL(cl_lock_user_del);
1994
1995 /**
1996  * Check if two lock's mode are compatible.
1997  *
1998  * This returns true iff en-queuing \a lock2 won't cause cancellation of \a
1999  * lock1 even when these locks overlap.
2000  */
2001 int cl_lock_compatible(const struct cl_lock *lock1, const struct cl_lock *lock2)
2002 {
2003         enum cl_lock_mode mode1;
2004         enum cl_lock_mode mode2;
2005
2006         ENTRY;
2007         mode1 = lock1->cll_descr.cld_mode;
2008         mode2 = lock2->cll_descr.cld_mode;
2009         RETURN(mode2 == CLM_PHANTOM ||
2010                (mode1 == CLM_READ && mode2 == CLM_READ));
2011 }
2012 EXPORT_SYMBOL(cl_lock_compatible);
2013
2014 const char *cl_lock_mode_name(const enum cl_lock_mode mode)
2015 {
2016         static const char *names[] = {
2017                 [CLM_PHANTOM] = "PHANTOM",
2018                 [CLM_READ]    = "READ",
2019                 [CLM_WRITE]   = "WRITE"
2020         };
2021         if (0 <= mode && mode < ARRAY_SIZE(names))
2022                 return names[mode];
2023         else
2024                 return "UNKNW";
2025 }
2026 EXPORT_SYMBOL(cl_lock_mode_name);
2027
2028 /**
2029  * Prints human readable representation of a lock description.
2030  */
2031 void cl_lock_descr_print(const struct lu_env *env, void *cookie,
2032                        lu_printer_t printer,
2033                        const struct cl_lock_descr *descr)
2034 {
2035         const struct lu_fid  *fid;
2036
2037         fid = lu_object_fid(&descr->cld_obj->co_lu);
2038         (*printer)(env, cookie, DDESCR"@"DFID, PDESCR(descr), PFID(fid));
2039 }
2040 EXPORT_SYMBOL(cl_lock_descr_print);
2041
2042 /**
2043  * Prints human readable representation of \a lock to the \a f.
2044  */
2045 void cl_lock_print(const struct lu_env *env, void *cookie,
2046                    lu_printer_t printer, const struct cl_lock *lock)
2047 {
2048         const struct cl_lock_slice *slice;
2049         (*printer)(env, cookie, "lock@%p[%d %d %d %d %d %08lx] ",
2050                    lock, atomic_read(&lock->cll_ref),
2051                    lock->cll_state, lock->cll_error, lock->cll_holds,
2052                    lock->cll_users, lock->cll_flags);
2053         cl_lock_descr_print(env, cookie, printer, &lock->cll_descr);
2054         (*printer)(env, cookie, " {\n");
2055
2056         list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
2057                 (*printer)(env, cookie, "    %s@%p: ",
2058                            slice->cls_obj->co_lu.lo_dev->ld_type->ldt_name,
2059                            slice);
2060                 if (slice->cls_ops->clo_print != NULL)
2061                         slice->cls_ops->clo_print(env, cookie, printer, slice);
2062                 (*printer)(env, cookie, "\n");
2063         }
2064         (*printer)(env, cookie, "} lock@%p\n", lock);
2065 }
2066 EXPORT_SYMBOL(cl_lock_print);
2067
2068 int cl_lock_init(void)
2069 {
2070         return lu_kmem_init(cl_lock_caches);
2071 }
2072
2073 void cl_lock_fini(void)
2074 {
2075         lu_kmem_fini(cl_lock_caches);
2076 }