Whamcloud - gitweb
b85becc4243c5a3a2a585bd094dec994ccb6cc62
[fs/lustre-release.git] / lustre / obdclass / cl_lock.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Client Extent Lock.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_CLASS
42 #ifndef EXPORT_SYMTAB
43 # define EXPORT_SYMTAB
44 #endif
45
46 #include <obd_class.h>
47 #include <obd_support.h>
48 #include <lustre_fid.h>
49 #include <libcfs/list.h>
50 /* lu_time_global_{init,fini}() */
51 #include <lu_time.h>
52
53 #include <cl_object.h>
54 #include "cl_internal.h"
55
56 /** Lock class of cl_lock::cll_guard */
57 static struct lock_class_key cl_lock_guard_class;
58 static cfs_mem_cache_t *cl_lock_kmem;
59
60 static struct lu_kmem_descr cl_lock_caches[] = {
61         {
62                 .ckd_cache = &cl_lock_kmem,
63                 .ckd_name  = "cl_lock_kmem",
64                 .ckd_size  = sizeof (struct cl_lock)
65         },
66         {
67                 .ckd_cache = NULL
68         }
69 };
70
71 /**
72  * Basic lock invariant that is maintained at all times. Caller either has a
73  * reference to \a lock, or somehow assures that \a lock cannot be freed.
74  *
75  * \see cl_lock_invariant()
76  */
77 static int cl_lock_invariant_trusted(const struct lu_env *env,
78                                      const struct cl_lock *lock)
79 {
80         return
81                 cl_is_lock(lock) &&
82                 ergo(lock->cll_state == CLS_FREEING, lock->cll_holds == 0) &&
83                 atomic_read(&lock->cll_ref) >= lock->cll_holds &&
84                 lock->cll_holds >= lock->cll_users &&
85                 lock->cll_holds >= 0 &&
86                 lock->cll_users >= 0 &&
87                 lock->cll_depth >= 0;
88 }
89
90 /**
91  * Stronger lock invariant, checking that caller has a reference on a lock.
92  *
93  * \see cl_lock_invariant_trusted()
94  */
95 static int cl_lock_invariant(const struct lu_env *env,
96                              const struct cl_lock *lock)
97 {
98         int result;
99
100         result = atomic_read(&lock->cll_ref) > 0 &&
101                 cl_lock_invariant_trusted(env, lock);
102         if (!result && env != NULL)
103                 CL_LOCK_DEBUG(D_ERROR, env, lock, "invariant broken");
104         return result;
105 }
106
107 /**
108  * Returns lock "nesting": 0 for a top-lock and 1 for a sub-lock.
109  */
110 static enum clt_nesting_level cl_lock_nesting(const struct cl_lock *lock)
111 {
112         return cl_object_header(lock->cll_descr.cld_obj)->coh_nesting;
113 }
114
115 /**
116  * Returns a set of counters for this lock, depending on a lock nesting.
117  */
118 static struct cl_thread_counters *cl_lock_counters(const struct lu_env *env,
119                                                    const struct cl_lock *lock)
120 {
121         struct cl_thread_info *info;
122         enum clt_nesting_level nesting;
123
124         info = cl_env_info(env);
125         nesting = cl_lock_nesting(lock);
126         LASSERT(nesting < ARRAY_SIZE(info->clt_counters));
127         return &info->clt_counters[nesting];
128 }
129
130 static void cl_lock_trace0(int level, const struct lu_env *env,
131                            const char *prefix, const struct cl_lock *lock,
132                            const char *func, const int line)
133 {
134         struct cl_object_header *h = cl_object_header(lock->cll_descr.cld_obj);
135         CDEBUG(level, "%s: %p@(%i %p %i %d %d %d %d %lx)"
136                       "(%p/%d/%i) at %s():%d\n",
137                prefix, lock,
138                atomic_read(&lock->cll_ref), lock->cll_guarder, lock->cll_depth,
139                lock->cll_state, lock->cll_error, lock->cll_holds,
140                lock->cll_users, lock->cll_flags,
141                env, h->coh_nesting, cl_lock_nr_mutexed(env),
142                func, line);
143 }
144 #define cl_lock_trace(level, env, prefix, lock)                         \
145         cl_lock_trace0(level, env, prefix, lock, __FUNCTION__, __LINE__)
146
147 #define RETIP ((unsigned long)__builtin_return_address(0))
148
149 #ifdef CONFIG_LOCKDEP
150 static struct lock_class_key cl_lock_key;
151
152 static void cl_lock_lockdep_init(struct cl_lock *lock)
153 {
154         lockdep_set_class_and_name(lock, &cl_lock_key, "EXT");
155 }
156
157 static void cl_lock_lockdep_acquire(const struct lu_env *env,
158                                     struct cl_lock *lock, __u32 enqflags)
159 {
160         cl_lock_counters(env, lock)->ctc_nr_locks_acquired++;
161         lock_acquire(&lock->dep_map, !!(enqflags & CEF_ASYNC),
162                      /* try: */ 0, lock->cll_descr.cld_mode <= CLM_READ,
163                      /* check: */ 2, RETIP);
164 }
165
166 static void cl_lock_lockdep_release(const struct lu_env *env,
167                                     struct cl_lock *lock)
168 {
169         cl_lock_counters(env, lock)->ctc_nr_locks_acquired--;
170         lock_release(&lock->dep_map, 0, RETIP);
171 }
172
173 #else /* !CONFIG_LOCKDEP */
174
175 static void cl_lock_lockdep_init(struct cl_lock *lock)
176 {}
177 static void cl_lock_lockdep_acquire(const struct lu_env *env,
178                                     struct cl_lock *lock, __u32 enqflags)
179 {}
180 static void cl_lock_lockdep_release(const struct lu_env *env,
181                                     struct cl_lock *lock)
182 {}
183
184 #endif /* !CONFIG_LOCKDEP */
185
186 /**
187  * Adds lock slice to the compound lock.
188  *
189  * This is called by cl_object_operations::coo_lock_init() methods to add a
190  * per-layer state to the lock. New state is added at the end of
191  * cl_lock::cll_layers list, that is, it is at the bottom of the stack.
192  *
193  * \see cl_req_slice_add(), cl_page_slice_add(), cl_io_slice_add()
194  */
195 void cl_lock_slice_add(struct cl_lock *lock, struct cl_lock_slice *slice,
196                        struct cl_object *obj,
197                        const struct cl_lock_operations *ops)
198 {
199         ENTRY;
200         slice->cls_lock = lock;
201         list_add_tail(&slice->cls_linkage, &lock->cll_layers);
202         slice->cls_obj = obj;
203         slice->cls_ops = ops;
204         EXIT;
205 }
206 EXPORT_SYMBOL(cl_lock_slice_add);
207
208 /**
209  * Returns true iff a lock with the mode \a has provides at least the same
210  * guarantees as a lock with the mode \a need.
211  */
212 int cl_lock_mode_match(enum cl_lock_mode has, enum cl_lock_mode need)
213 {
214         LINVRNT(need == CLM_READ || need == CLM_WRITE ||
215                 need == CLM_PHANTOM || need == CLM_GROUP);
216         LINVRNT(has == CLM_READ || has == CLM_WRITE ||
217                 has == CLM_PHANTOM || has == CLM_GROUP);
218         CLASSERT(CLM_PHANTOM < CLM_READ);
219         CLASSERT(CLM_READ < CLM_WRITE);
220         CLASSERT(CLM_WRITE < CLM_GROUP);
221
222         if (has != CLM_GROUP)
223                 return need <= has;
224         else
225                 return need == has;
226 }
227 EXPORT_SYMBOL(cl_lock_mode_match);
228
229 /**
230  * Returns true iff extent portions of lock descriptions match.
231  */
232 int cl_lock_ext_match(const struct cl_lock_descr *has,
233                       const struct cl_lock_descr *need)
234 {
235         return
236                 has->cld_start <= need->cld_start &&
237                 has->cld_end >= need->cld_end &&
238                 cl_lock_mode_match(has->cld_mode, need->cld_mode) &&
239                 (has->cld_mode != CLM_GROUP || has->cld_gid == need->cld_gid);
240 }
241 EXPORT_SYMBOL(cl_lock_ext_match);
242
243 /**
244  * Returns true iff a lock with the description \a has provides at least the
245  * same guarantees as a lock with the description \a need.
246  */
247 int cl_lock_descr_match(const struct cl_lock_descr *has,
248                         const struct cl_lock_descr *need)
249 {
250         return
251                 cl_object_same(has->cld_obj, need->cld_obj) &&
252                 cl_lock_ext_match(has, need);
253 }
254 EXPORT_SYMBOL(cl_lock_descr_match);
255
256 static void cl_lock_free(const struct lu_env *env, struct cl_lock *lock)
257 {
258         struct cl_object *obj = lock->cll_descr.cld_obj;
259
260         LASSERT(cl_is_lock(lock));
261         LINVRNT(!cl_lock_is_mutexed(lock));
262
263         ENTRY;
264         cl_lock_trace(D_DLMTRACE, env, "free lock", lock);
265         might_sleep();
266         while (!list_empty(&lock->cll_layers)) {
267                 struct cl_lock_slice *slice;
268
269                 slice = list_entry(lock->cll_layers.next, struct cl_lock_slice,
270                                    cls_linkage);
271                 list_del_init(lock->cll_layers.next);
272                 slice->cls_ops->clo_fini(env, slice);
273         }
274         atomic_dec(&cl_object_site(obj)->cs_locks.cs_total);
275         atomic_dec(&cl_object_site(obj)->cs_locks_state[lock->cll_state]);
276         lu_object_ref_del_at(&obj->co_lu, lock->cll_obj_ref, "cl_lock", lock);
277         cl_object_put(env, obj);
278         lu_ref_fini(&lock->cll_reference);
279         lu_ref_fini(&lock->cll_holders);
280         mutex_destroy(&lock->cll_guard);
281         OBD_SLAB_FREE_PTR(lock, cl_lock_kmem);
282         EXIT;
283 }
284
285 /**
286  * Releases a reference on a lock.
287  *
288  * When last reference is released, lock is returned to the cache, unless it
289  * is in cl_lock_state::CLS_FREEING state, in which case it is destroyed
290  * immediately.
291  *
292  * \see cl_object_put(), cl_page_put()
293  */
294 void cl_lock_put(const struct lu_env *env, struct cl_lock *lock)
295 {
296         struct cl_object        *obj;
297         struct cl_object_header *head;
298         struct cl_site          *site;
299
300         LINVRNT(cl_lock_invariant(env, lock));
301         ENTRY;
302         obj = lock->cll_descr.cld_obj;
303         LINVRNT(obj != NULL);
304         head = cl_object_header(obj);
305         site = cl_object_site(obj);
306
307         CDEBUG(D_TRACE, "releasing reference: %d %p %lu\n",
308                atomic_read(&lock->cll_ref), lock, RETIP);
309
310         if (atomic_dec_and_test(&lock->cll_ref)) {
311                 if (lock->cll_state == CLS_FREEING) {
312                         LASSERT(list_empty(&lock->cll_linkage));
313                         cl_lock_free(env, lock);
314                 }
315                 atomic_dec(&site->cs_locks.cs_busy);
316         }
317         EXIT;
318 }
319 EXPORT_SYMBOL(cl_lock_put);
320
321 /**
322  * Acquires an additional reference to a lock.
323  *
324  * This can be called only by caller already possessing a reference to \a
325  * lock.
326  *
327  * \see cl_object_get(), cl_page_get()
328  */
329 void cl_lock_get(struct cl_lock *lock)
330 {
331         LINVRNT(cl_lock_invariant(NULL, lock));
332         CDEBUG(D_TRACE, "acquiring reference: %d %p %lu\n",
333                atomic_read(&lock->cll_ref), lock, RETIP);
334         atomic_inc(&lock->cll_ref);
335 }
336 EXPORT_SYMBOL(cl_lock_get);
337
338 /**
339  * Acquires a reference to a lock.
340  *
341  * This is much like cl_lock_get(), except that this function can be used to
342  * acquire initial reference to the cached lock. Caller has to deal with all
343  * possible races. Use with care!
344  *
345  * \see cl_page_get_trust()
346  */
347 void cl_lock_get_trust(struct cl_lock *lock)
348 {
349         struct cl_site *site = cl_object_site(lock->cll_descr.cld_obj);
350
351         LASSERT(cl_is_lock(lock));
352         CDEBUG(D_TRACE, "acquiring trusted reference: %d %p %lu\n",
353                atomic_read(&lock->cll_ref), lock, RETIP);
354         if (atomic_inc_return(&lock->cll_ref) == 1)
355                 atomic_inc(&site->cs_locks.cs_busy);
356 }
357 EXPORT_SYMBOL(cl_lock_get_trust);
358
359 /**
360  * Helper function destroying the lock that wasn't completely initialized.
361  *
362  * Other threads can acquire references to the top-lock through its
363  * sub-locks. Hence, it cannot be cl_lock_free()-ed immediately.
364  */
365 static void cl_lock_finish(const struct lu_env *env, struct cl_lock *lock)
366 {
367         cl_lock_mutex_get(env, lock);
368         cl_lock_delete(env, lock);
369         cl_lock_mutex_put(env, lock);
370         cl_lock_put(env, lock);
371 }
372
373 static struct cl_lock *cl_lock_alloc(const struct lu_env *env,
374                                      struct cl_object *obj,
375                                      const struct cl_io *io,
376                                      const struct cl_lock_descr *descr)
377 {
378         struct cl_lock          *lock;
379         struct lu_object_header *head;
380         struct cl_site          *site = cl_object_site(obj);
381
382         ENTRY;
383         OBD_SLAB_ALLOC_PTR_GFP(lock, cl_lock_kmem, CFS_ALLOC_IO);
384         if (lock != NULL) {
385                 atomic_set(&lock->cll_ref, 1);
386                 lock->cll_descr = *descr;
387                 lock->cll_state = CLS_NEW;
388                 cl_object_get(obj);
389                 lock->cll_obj_ref = lu_object_ref_add(&obj->co_lu,
390                                                       "cl_lock", lock);
391                 CFS_INIT_LIST_HEAD(&lock->cll_layers);
392                 CFS_INIT_LIST_HEAD(&lock->cll_linkage);
393                 CFS_INIT_LIST_HEAD(&lock->cll_inclosure);
394                 lu_ref_init(&lock->cll_reference);
395                 lu_ref_init(&lock->cll_holders);
396                 mutex_init(&lock->cll_guard);
397                 lockdep_set_class(&lock->cll_guard, &cl_lock_guard_class);
398                 cfs_waitq_init(&lock->cll_wq);
399                 head = obj->co_lu.lo_header;
400                 atomic_inc(&site->cs_locks_state[CLS_NEW]);
401                 atomic_inc(&site->cs_locks.cs_total);
402                 atomic_inc(&site->cs_locks.cs_created);
403                 cl_lock_lockdep_init(lock);
404                 list_for_each_entry(obj, &head->loh_layers, co_lu.lo_linkage) {
405                         int err;
406
407                         err = obj->co_ops->coo_lock_init(env, obj, lock, io);
408                         if (err != 0) {
409                                 cl_lock_finish(env, lock);
410                                 lock = ERR_PTR(err);
411                                 break;
412                         }
413                 }
414         } else
415                 lock = ERR_PTR(-ENOMEM);
416         RETURN(lock);
417 }
418
419 /**
420  * Transfer the lock into INTRANSIT state and return the original state.
421  *
422  * \pre  state: CLS_CACHED, CLS_HELD or CLS_ENQUEUED
423  * \post state: CLS_INTRANSIT
424  * \see CLS_INTRANSIT
425  */
426 enum cl_lock_state cl_lock_intransit(const struct lu_env *env,
427                                      struct cl_lock *lock)
428 {
429         enum cl_lock_state state = lock->cll_state;
430
431         LASSERT(cl_lock_is_mutexed(lock));
432         LASSERT(state != CLS_INTRANSIT);
433         LASSERTF(state >= CLS_ENQUEUED && state <= CLS_CACHED,
434                  "Malformed lock state %d.\n", state);
435
436         cl_lock_state_set(env, lock, CLS_INTRANSIT);
437         lock->cll_intransit_owner = cfs_current();
438         cl_lock_hold_add(env, lock, "intransit", cfs_current());
439         return state;
440 }
441 EXPORT_SYMBOL(cl_lock_intransit);
442
443 /**
444  *  Exit the intransit state and restore the lock state to the original state
445  */
446 void cl_lock_extransit(const struct lu_env *env, struct cl_lock *lock,
447                        enum cl_lock_state state)
448 {
449         LASSERT(cl_lock_is_mutexed(lock));
450         LASSERT(lock->cll_state == CLS_INTRANSIT);
451         LASSERT(state != CLS_INTRANSIT);
452         LASSERT(lock->cll_intransit_owner == cfs_current());
453
454         lock->cll_intransit_owner = NULL;
455         cl_lock_state_set(env, lock, state);
456         cl_lock_unhold(env, lock, "intransit", cfs_current());
457 }
458 EXPORT_SYMBOL(cl_lock_extransit);
459
460 /**
461  * Checking whether the lock is intransit state
462  */
463 int cl_lock_is_intransit(struct cl_lock *lock)
464 {
465         LASSERT(cl_lock_is_mutexed(lock));
466         return lock->cll_state == CLS_INTRANSIT &&
467                lock->cll_intransit_owner != cfs_current();
468 }
469 EXPORT_SYMBOL(cl_lock_is_intransit);
470 /**
471  * Returns true iff lock is "suitable" for given io. E.g., locks acquired by
472  * truncate and O_APPEND cannot be reused for read/non-append-write, as they
473  * cover multiple stripes and can trigger cascading timeouts.
474  */
475 static int cl_lock_fits_into(const struct lu_env *env,
476                              const struct cl_lock *lock,
477                              const struct cl_lock_descr *need,
478                              const struct cl_io *io)
479 {
480         const struct cl_lock_slice *slice;
481
482         LINVRNT(cl_lock_invariant_trusted(env, lock));
483         ENTRY;
484         list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
485                 if (slice->cls_ops->clo_fits_into != NULL &&
486                     !slice->cls_ops->clo_fits_into(env, slice, need, io))
487                         RETURN(0);
488         }
489         RETURN(1);
490 }
491
492 static struct cl_lock *cl_lock_lookup(const struct lu_env *env,
493                                       struct cl_object *obj,
494                                       const struct cl_io *io,
495                                       const struct cl_lock_descr *need)
496 {
497         struct cl_lock          *lock;
498         struct cl_object_header *head;
499         struct cl_site          *site;
500
501         ENTRY;
502
503         head = cl_object_header(obj);
504         site = cl_object_site(obj);
505         LINVRNT_SPIN_LOCKED(&head->coh_lock_guard);
506         atomic_inc(&site->cs_locks.cs_lookup);
507         list_for_each_entry(lock, &head->coh_locks, cll_linkage) {
508                 int matched;
509
510                 LASSERT(cl_is_lock(lock));
511                 matched = cl_lock_ext_match(&lock->cll_descr, need) &&
512                         lock->cll_state < CLS_FREEING &&
513                         !(lock->cll_flags & CLF_CANCELLED) &&
514                         cl_lock_fits_into(env, lock, need, io);
515                 CDEBUG(D_DLMTRACE, "has: "DDESCR"(%i) need: "DDESCR": %d\n",
516                        PDESCR(&lock->cll_descr), lock->cll_state, PDESCR(need),
517                        matched);
518                 if (matched) {
519                         cl_lock_get_trust(lock);
520                         /* move the lock to the LRU head */
521                         list_move(&lock->cll_linkage, &head->coh_locks);
522                         atomic_inc(&cl_object_site(obj)->cs_locks.cs_hit);
523                         RETURN(lock);
524                 }
525         }
526         RETURN(NULL);
527 }
528
529 /**
530  * Returns a lock matching description \a need.
531  *
532  * This is the main entry point into the cl_lock caching interface. First, a
533  * cache (implemented as a per-object linked list) is consulted. If lock is
534  * found there, it is returned immediately. Otherwise new lock is allocated
535  * and returned. In any case, additional reference to lock is acquired.
536  *
537  * \see cl_object_find(), cl_page_find()
538  */
539 static struct cl_lock *cl_lock_find(const struct lu_env *env,
540                                     const struct cl_io *io,
541                                     const struct cl_lock_descr *need)
542 {
543         struct cl_object_header *head;
544         struct cl_object        *obj;
545         struct cl_lock          *lock;
546         struct cl_site          *site;
547
548         ENTRY;
549
550         obj  = need->cld_obj;
551         head = cl_object_header(obj);
552         site = cl_object_site(obj);
553
554         spin_lock(&head->coh_lock_guard);
555         lock = cl_lock_lookup(env, obj, io, need);
556         spin_unlock(&head->coh_lock_guard);
557
558         if (lock == NULL) {
559                 lock = cl_lock_alloc(env, obj, io, need);
560                 if (!IS_ERR(lock)) {
561                         struct cl_lock *ghost;
562
563                         spin_lock(&head->coh_lock_guard);
564                         ghost = cl_lock_lookup(env, obj, io, need);
565                         if (ghost == NULL) {
566                                 list_add(&lock->cll_linkage, &head->coh_locks);
567                                 spin_unlock(&head->coh_lock_guard);
568                                 atomic_inc(&site->cs_locks.cs_busy);
569                         } else {
570                                 spin_unlock(&head->coh_lock_guard);
571                                 /*
572                                  * Other threads can acquire references to the
573                                  * top-lock through its sub-locks. Hence, it
574                                  * cannot be cl_lock_free()-ed immediately.
575                                  */
576                                 cl_lock_finish(env, lock);
577                                 lock = ghost;
578                         }
579                 }
580         }
581         RETURN(lock);
582 }
583
584 /**
585  * Returns existing lock matching given description. This is similar to
586  * cl_lock_find() except that no new lock is created, and returned lock is
587  * guaranteed to be in enum cl_lock_state::CLS_HELD state.
588  */
589 struct cl_lock *cl_lock_peek(const struct lu_env *env, const struct cl_io *io,
590                              const struct cl_lock_descr *need,
591                              const char *scope, const void *source)
592 {
593         struct cl_object_header *head;
594         struct cl_object        *obj;
595         struct cl_lock          *lock;
596         int ok;
597
598         obj  = need->cld_obj;
599         head = cl_object_header(obj);
600
601         spin_lock(&head->coh_lock_guard);
602         lock = cl_lock_lookup(env, obj, io, need);
603         spin_unlock(&head->coh_lock_guard);
604
605         if (lock == NULL)
606                 return NULL;
607
608         cl_lock_mutex_get(env, lock);
609         if (lock->cll_state == CLS_INTRANSIT)
610                 cl_lock_state_wait(env, lock); /* Don't care return value. */
611         if (lock->cll_state == CLS_CACHED) {
612                 int result;
613                 result = cl_use_try(env, lock, 1);
614                 if (result < 0)
615                         cl_lock_error(env, lock, result);
616         }
617         ok = lock->cll_state == CLS_HELD;
618         if (ok) {
619                 cl_lock_hold_add(env, lock, scope, source);
620                 cl_lock_user_add(env, lock);
621                 cl_lock_put(env, lock);
622         }
623         cl_lock_mutex_put(env, lock);
624         if (!ok) {
625                 cl_lock_put(env, lock);
626                 lock = NULL;
627         }
628
629         return lock;
630 }
631 EXPORT_SYMBOL(cl_lock_peek);
632
633 /**
634  * Returns a slice within a lock, corresponding to the given layer in the
635  * device stack.
636  *
637  * \see cl_page_at()
638  */
639 const struct cl_lock_slice *cl_lock_at(const struct cl_lock *lock,
640                                        const struct lu_device_type *dtype)
641 {
642         const struct cl_lock_slice *slice;
643
644         LINVRNT(cl_lock_invariant_trusted(NULL, lock));
645         ENTRY;
646
647         list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
648                 if (slice->cls_obj->co_lu.lo_dev->ld_type == dtype)
649                         RETURN(slice);
650         }
651         RETURN(NULL);
652 }
653 EXPORT_SYMBOL(cl_lock_at);
654
655 static void cl_lock_mutex_tail(const struct lu_env *env, struct cl_lock *lock)
656 {
657         struct cl_thread_counters *counters;
658
659         counters = cl_lock_counters(env, lock);
660         lock->cll_depth++;
661         counters->ctc_nr_locks_locked++;
662         lu_ref_add(&counters->ctc_locks_locked, "cll_guard", lock);
663         cl_lock_trace(D_TRACE, env, "got mutex", lock);
664 }
665
666 /**
667  * Locks cl_lock object.
668  *
669  * This is used to manipulate cl_lock fields, and to serialize state
670  * transitions in the lock state machine.
671  *
672  * \post cl_lock_is_mutexed(lock)
673  *
674  * \see cl_lock_mutex_put()
675  */
676 void cl_lock_mutex_get(const struct lu_env *env, struct cl_lock *lock)
677 {
678         LINVRNT(cl_lock_invariant(env, lock));
679
680         if (lock->cll_guarder == cfs_current()) {
681                 LINVRNT(cl_lock_is_mutexed(lock));
682                 LINVRNT(lock->cll_depth > 0);
683         } else {
684                 struct cl_object_header *hdr;
685                 struct cl_thread_info   *info;
686                 int i;
687
688                 LINVRNT(lock->cll_guarder != cfs_current());
689                 hdr = cl_object_header(lock->cll_descr.cld_obj);
690                 /*
691                  * Check that mutices are taken in the bottom-to-top order.
692                  */
693                 info = cl_env_info(env);
694                 for (i = 0; i < hdr->coh_nesting; ++i)
695                         LASSERT(info->clt_counters[i].ctc_nr_locks_locked == 0);
696                 mutex_lock_nested(&lock->cll_guard, hdr->coh_nesting);
697                 lock->cll_guarder = cfs_current();
698                 LINVRNT(lock->cll_depth == 0);
699         }
700         cl_lock_mutex_tail(env, lock);
701 }
702 EXPORT_SYMBOL(cl_lock_mutex_get);
703
704 /**
705  * Try-locks cl_lock object.
706  *
707  * \retval 0 \a lock was successfully locked
708  *
709  * \retval -EBUSY \a lock cannot be locked right now
710  *
711  * \post ergo(result == 0, cl_lock_is_mutexed(lock))
712  *
713  * \see cl_lock_mutex_get()
714  */
715 int cl_lock_mutex_try(const struct lu_env *env, struct cl_lock *lock)
716 {
717         int result;
718
719         LINVRNT(cl_lock_invariant_trusted(env, lock));
720         ENTRY;
721
722         result = 0;
723         if (lock->cll_guarder == cfs_current()) {
724                 LINVRNT(lock->cll_depth > 0);
725                 cl_lock_mutex_tail(env, lock);
726         } else if (mutex_trylock(&lock->cll_guard)) {
727                 LINVRNT(lock->cll_depth == 0);
728                 lock->cll_guarder = cfs_current();
729                 cl_lock_mutex_tail(env, lock);
730         } else
731                 result = -EBUSY;
732         RETURN(result);
733 }
734 EXPORT_SYMBOL(cl_lock_mutex_try);
735
736 /**
737  {* Unlocks cl_lock object.
738  *
739  * \pre cl_lock_is_mutexed(lock)
740  *
741  * \see cl_lock_mutex_get()
742  */
743 void cl_lock_mutex_put(const struct lu_env *env, struct cl_lock *lock)
744 {
745         struct cl_thread_counters *counters;
746
747         LINVRNT(cl_lock_invariant(env, lock));
748         LINVRNT(cl_lock_is_mutexed(lock));
749         LINVRNT(lock->cll_guarder == cfs_current());
750         LINVRNT(lock->cll_depth > 0);
751
752         counters = cl_lock_counters(env, lock);
753         LINVRNT(counters->ctc_nr_locks_locked > 0);
754
755         cl_lock_trace(D_TRACE, env, "put mutex", lock);
756         lu_ref_del(&counters->ctc_locks_locked, "cll_guard", lock);
757         counters->ctc_nr_locks_locked--;
758         if (--lock->cll_depth == 0) {
759                 lock->cll_guarder = NULL;
760                 mutex_unlock(&lock->cll_guard);
761         }
762 }
763 EXPORT_SYMBOL(cl_lock_mutex_put);
764
765 /**
766  * Returns true iff lock's mutex is owned by the current thread.
767  */
768 int cl_lock_is_mutexed(struct cl_lock *lock)
769 {
770         return lock->cll_guarder == cfs_current();
771 }
772 EXPORT_SYMBOL(cl_lock_is_mutexed);
773
774 /**
775  * Returns number of cl_lock mutices held by the current thread (environment).
776  */
777 int cl_lock_nr_mutexed(const struct lu_env *env)
778 {
779         struct cl_thread_info *info;
780         int i;
781         int locked;
782
783         /*
784          * NOTE: if summation across all nesting levels (currently 2) proves
785          *       too expensive, a summary counter can be added to
786          *       struct cl_thread_info.
787          */
788         info = cl_env_info(env);
789         for (i = 0, locked = 0; i < ARRAY_SIZE(info->clt_counters); ++i)
790                 locked += info->clt_counters[i].ctc_nr_locks_locked;
791         return locked;
792 }
793 EXPORT_SYMBOL(cl_lock_nr_mutexed);
794
795 static void cl_lock_cancel0(const struct lu_env *env, struct cl_lock *lock)
796 {
797         LINVRNT(cl_lock_is_mutexed(lock));
798         LINVRNT(cl_lock_invariant(env, lock));
799         ENTRY;
800         if (!(lock->cll_flags & CLF_CANCELLED)) {
801                 const struct cl_lock_slice *slice;
802
803                 lock->cll_flags |= CLF_CANCELLED;
804                 list_for_each_entry_reverse(slice, &lock->cll_layers,
805                                             cls_linkage) {
806                         if (slice->cls_ops->clo_cancel != NULL)
807                                 slice->cls_ops->clo_cancel(env, slice);
808                 }
809         }
810         EXIT;
811 }
812
813 static void cl_lock_delete0(const struct lu_env *env, struct cl_lock *lock)
814 {
815         struct cl_object_header    *head;
816         const struct cl_lock_slice *slice;
817
818         LINVRNT(cl_lock_is_mutexed(lock));
819         LINVRNT(cl_lock_invariant(env, lock));
820
821         ENTRY;
822         if (lock->cll_state < CLS_FREEING) {
823                 cl_lock_state_set(env, lock, CLS_FREEING);
824
825                 head = cl_object_header(lock->cll_descr.cld_obj);
826
827                 spin_lock(&head->coh_lock_guard);
828                 list_del_init(&lock->cll_linkage);
829
830                 spin_unlock(&head->coh_lock_guard);
831                 /*
832                  * From now on, no new references to this lock can be acquired
833                  * by cl_lock_lookup().
834                  */
835                 list_for_each_entry_reverse(slice, &lock->cll_layers,
836                                             cls_linkage) {
837                         if (slice->cls_ops->clo_delete != NULL)
838                                 slice->cls_ops->clo_delete(env, slice);
839                 }
840                 /*
841                  * From now on, no new references to this lock can be acquired
842                  * by layer-specific means (like a pointer from struct
843                  * ldlm_lock in osc, or a pointer from top-lock to sub-lock in
844                  * lov).
845                  *
846                  * Lock will be finally freed in cl_lock_put() when last of
847                  * existing references goes away.
848                  */
849         }
850         EXIT;
851 }
852
853 /**
854  * Mod(ifie)s cl_lock::cll_holds counter for a given lock. Also, for a
855  * top-lock (nesting == 0) accounts for this modification in the per-thread
856  * debugging counters. Sub-lock holds can be released by a thread different
857  * from one that acquired it.
858  */
859 static void cl_lock_hold_mod(const struct lu_env *env, struct cl_lock *lock,
860                              int delta)
861 {
862         struct cl_thread_counters *counters;
863         enum clt_nesting_level     nesting;
864
865         lock->cll_holds += delta;
866         nesting = cl_lock_nesting(lock);
867         if (nesting == CNL_TOP) {
868                 counters = &cl_env_info(env)->clt_counters[CNL_TOP];
869                 counters->ctc_nr_held += delta;
870                 LASSERT(counters->ctc_nr_held >= 0);
871         }
872 }
873
874 /**
875  * Mod(ifie)s cl_lock::cll_users counter for a given lock. See
876  * cl_lock_hold_mod() for the explanation of the debugging code.
877  */
878 static void cl_lock_used_mod(const struct lu_env *env, struct cl_lock *lock,
879                              int delta)
880 {
881         struct cl_thread_counters *counters;
882         enum clt_nesting_level     nesting;
883
884         lock->cll_users += delta;
885         nesting = cl_lock_nesting(lock);
886         if (nesting == CNL_TOP) {
887                 counters = &cl_env_info(env)->clt_counters[CNL_TOP];
888                 counters->ctc_nr_used += delta;
889                 LASSERT(counters->ctc_nr_used >= 0);
890         }
891 }
892
893 static void cl_lock_hold_release(const struct lu_env *env, struct cl_lock *lock,
894                                  const char *scope, const void *source)
895 {
896         LINVRNT(cl_lock_is_mutexed(lock));
897         LINVRNT(cl_lock_invariant(env, lock));
898         LASSERT(lock->cll_holds > 0);
899
900         ENTRY;
901         cl_lock_trace(D_DLMTRACE, env, "hold release lock", lock);
902         lu_ref_del(&lock->cll_holders, scope, source);
903         cl_lock_hold_mod(env, lock, -1);
904         if (lock->cll_holds == 0) {
905                 if (lock->cll_descr.cld_mode == CLM_PHANTOM ||
906                     lock->cll_descr.cld_mode == CLM_GROUP)
907                         /*
908                          * If lock is still phantom or grouplock when user is
909                          * done with it---destroy the lock.
910                          */
911                         lock->cll_flags |= CLF_CANCELPEND|CLF_DOOMED;
912                 if (lock->cll_flags & CLF_CANCELPEND) {
913                         lock->cll_flags &= ~CLF_CANCELPEND;
914                         cl_lock_cancel0(env, lock);
915                 }
916                 if (lock->cll_flags & CLF_DOOMED) {
917                         /* no longer doomed: it's dead... Jim. */
918                         lock->cll_flags &= ~CLF_DOOMED;
919                         cl_lock_delete0(env, lock);
920                 }
921         }
922         EXIT;
923 }
924
925
926 /**
927  * Waits until lock state is changed.
928  *
929  * This function is called with cl_lock mutex locked, atomically releases
930  * mutex and goes to sleep, waiting for a lock state change (signaled by
931  * cl_lock_signal()), and re-acquires the mutex before return.
932  *
933  * This function is used to wait until lock state machine makes some progress
934  * and to emulate synchronous operations on top of asynchronous lock
935  * interface.
936  *
937  * \retval -EINTR wait was interrupted
938  *
939  * \retval 0 wait wasn't interrupted
940  *
941  * \pre cl_lock_is_mutexed(lock)
942  *
943  * \see cl_lock_signal()
944  */
945 int cl_lock_state_wait(const struct lu_env *env, struct cl_lock *lock)
946 {
947         cfs_waitlink_t waiter;
948         int result;
949
950         ENTRY;
951         LINVRNT(cl_lock_is_mutexed(lock));
952         LINVRNT(cl_lock_invariant(env, lock));
953         LASSERT(lock->cll_depth == 1);
954         LASSERT(lock->cll_state != CLS_FREEING); /* too late to wait */
955
956         cl_lock_trace(D_DLMTRACE, env, "state wait lock", lock);
957         result = lock->cll_error;
958         if (result == 0) {
959                 cfs_waitlink_init(&waiter);
960                 cfs_waitq_add(&lock->cll_wq, &waiter);
961                 set_current_state(CFS_TASK_INTERRUPTIBLE);
962                 cl_lock_mutex_put(env, lock);
963
964                 LASSERT(cl_lock_nr_mutexed(env) == 0);
965                 cfs_waitq_wait(&waiter, CFS_TASK_INTERRUPTIBLE);
966
967                 cl_lock_mutex_get(env, lock);
968                 set_current_state(CFS_TASK_RUNNING);
969                 cfs_waitq_del(&lock->cll_wq, &waiter);
970                 result = cfs_signal_pending() ? -EINTR : 0;
971         }
972         RETURN(result);
973 }
974 EXPORT_SYMBOL(cl_lock_state_wait);
975
976 static void cl_lock_state_signal(const struct lu_env *env, struct cl_lock *lock,
977                                  enum cl_lock_state state)
978 {
979         const struct cl_lock_slice *slice;
980
981         ENTRY;
982         LINVRNT(cl_lock_is_mutexed(lock));
983         LINVRNT(cl_lock_invariant(env, lock));
984
985         list_for_each_entry(slice, &lock->cll_layers, cls_linkage)
986                 if (slice->cls_ops->clo_state != NULL)
987                         slice->cls_ops->clo_state(env, slice, state);
988         cfs_waitq_broadcast(&lock->cll_wq);
989         EXIT;
990 }
991
992 /**
993  * Notifies waiters that lock state changed.
994  *
995  * Wakes up all waiters sleeping in cl_lock_state_wait(), also notifies all
996  * layers about state change by calling cl_lock_operations::clo_state()
997  * top-to-bottom.
998  */
999 void cl_lock_signal(const struct lu_env *env, struct cl_lock *lock)
1000 {
1001         ENTRY;
1002         cl_lock_trace(D_DLMTRACE, env, "state signal lock", lock);
1003         cl_lock_state_signal(env, lock, lock->cll_state);
1004         EXIT;
1005 }
1006 EXPORT_SYMBOL(cl_lock_signal);
1007
1008 /**
1009  * Changes lock state.
1010  *
1011  * This function is invoked to notify layers that lock state changed, possible
1012  * as a result of an asynchronous event such as call-back reception.
1013  *
1014  * \post lock->cll_state == state
1015  *
1016  * \see cl_lock_operations::clo_state()
1017  */
1018 void cl_lock_state_set(const struct lu_env *env, struct cl_lock *lock,
1019                        enum cl_lock_state state)
1020 {
1021         struct cl_site *site = cl_object_site(lock->cll_descr.cld_obj);
1022
1023         ENTRY;
1024         LASSERT(lock->cll_state <= state ||
1025                 (lock->cll_state == CLS_CACHED &&
1026                  (state == CLS_HELD || /* lock found in cache */
1027                   state == CLS_NEW  ||   /* sub-lock canceled */
1028                   state == CLS_INTRANSIT)) ||
1029                 /* lock is in transit state */
1030                 lock->cll_state == CLS_INTRANSIT);
1031
1032         if (lock->cll_state != state) {
1033                 atomic_dec(&site->cs_locks_state[lock->cll_state]);
1034                 atomic_inc(&site->cs_locks_state[state]);
1035
1036                 cl_lock_state_signal(env, lock, state);
1037                 lock->cll_state = state;
1038         }
1039         EXIT;
1040 }
1041 EXPORT_SYMBOL(cl_lock_state_set);
1042
1043 static int cl_unuse_try_internal(const struct lu_env *env, struct cl_lock *lock)
1044 {
1045         const struct cl_lock_slice *slice;
1046         int result;
1047
1048         do {
1049                 result = 0;
1050
1051                 if (lock->cll_error != 0)
1052                         break;
1053
1054                 LINVRNT(cl_lock_is_mutexed(lock));
1055                 LINVRNT(cl_lock_invariant(env, lock));
1056                 LASSERT(lock->cll_state == CLS_INTRANSIT);
1057                 LASSERT(lock->cll_users > 0);
1058                 LASSERT(lock->cll_holds > 0);
1059
1060                 result = -ENOSYS;
1061                 list_for_each_entry_reverse(slice, &lock->cll_layers,
1062                                             cls_linkage) {
1063                         if (slice->cls_ops->clo_unuse != NULL) {
1064                                 result = slice->cls_ops->clo_unuse(env, slice);
1065                                 if (result != 0)
1066                                         break;
1067                         }
1068                 }
1069                 LASSERT(result != -ENOSYS);
1070         } while (result == CLO_REPEAT);
1071
1072         return result ?: lock->cll_error;
1073 }
1074
1075 /**
1076  * Yanks lock from the cache (cl_lock_state::CLS_CACHED state) by calling
1077  * cl_lock_operations::clo_use() top-to-bottom to notify layers.
1078  * @atomic = 1, it must unuse the lock to recovery the lock to keep the
1079  *  use process atomic
1080  */
1081 int cl_use_try(const struct lu_env *env, struct cl_lock *lock, int atomic)
1082 {
1083         const struct cl_lock_slice *slice;
1084         int result;
1085         enum cl_lock_state state;
1086
1087         ENTRY;
1088         cl_lock_trace(D_DLMTRACE, env, "use lock", lock);
1089
1090         result = -ENOSYS;
1091         state = cl_lock_intransit(env, lock);
1092         list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
1093                 if (slice->cls_ops->clo_use != NULL) {
1094                         result = slice->cls_ops->clo_use(env, slice);
1095                         if (result != 0)
1096                                 break;
1097                 }
1098         }
1099         LASSERT(result != -ENOSYS);
1100
1101         LASSERT(lock->cll_state == CLS_INTRANSIT);
1102
1103         if (result == 0) {
1104                 state = CLS_HELD;
1105         } else {
1106                 if (result == -ESTALE) {
1107                         /*
1108                          * ESTALE means sublock being cancelled
1109                          * at this time, and set lock state to
1110                          * be NEW here and ask the caller to repeat.
1111                          */
1112                         state = CLS_NEW;
1113                         result = CLO_REPEAT;
1114                 }
1115
1116                 /* @atomic means back-off-on-failure. */
1117                 if (atomic) {
1118                         int rc;
1119
1120                         do {
1121                                 rc = cl_unuse_try_internal(env, lock);
1122                                 if (rc == 0)
1123                                         break;
1124                                 if (rc == CLO_WAIT)
1125                                         rc = cl_lock_state_wait(env, lock);
1126                                 if (rc < 0)
1127                                         break;
1128                         } while(1);
1129
1130                         /* Vet the results. */
1131                         if (rc < 0 && result > 0)
1132                                 result = rc;
1133                 }
1134
1135         }
1136         cl_lock_extransit(env, lock, state);
1137         RETURN(result);
1138 }
1139 EXPORT_SYMBOL(cl_use_try);
1140
1141 /**
1142  * Helper for cl_enqueue_try() that calls ->clo_enqueue() across all layers
1143  * top-to-bottom.
1144  */
1145 static int cl_enqueue_kick(const struct lu_env *env,
1146                            struct cl_lock *lock,
1147                            struct cl_io *io, __u32 flags)
1148 {
1149         int result;
1150         const struct cl_lock_slice *slice;
1151
1152         ENTRY;
1153         result = -ENOSYS;
1154         list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
1155                 if (slice->cls_ops->clo_enqueue != NULL) {
1156                         result = slice->cls_ops->clo_enqueue(env,
1157                                                              slice, io, flags);
1158                         if (result != 0)
1159                                 break;
1160                 }
1161         }
1162         LASSERT(result != -ENOSYS);
1163         RETURN(result);
1164 }
1165
1166 /**
1167  * Tries to enqueue a lock.
1168  *
1169  * This function is called repeatedly by cl_enqueue() until either lock is
1170  * enqueued, or error occurs. This function does not block waiting for
1171  * networking communication to complete.
1172  *
1173  * \post ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
1174  *                         lock->cll_state == CLS_HELD)
1175  *
1176  * \see cl_enqueue() cl_lock_operations::clo_enqueue()
1177  * \see cl_lock_state::CLS_ENQUEUED
1178  */
1179 int cl_enqueue_try(const struct lu_env *env, struct cl_lock *lock,
1180                    struct cl_io *io, __u32 flags)
1181 {
1182         int result;
1183
1184         ENTRY;
1185         cl_lock_trace(D_DLMTRACE, env, "enqueue lock", lock);
1186         do {
1187                 result = 0;
1188
1189                 LINVRNT(cl_lock_is_mutexed(lock));
1190
1191                 if (lock->cll_error != 0)
1192                         break;
1193                 switch (lock->cll_state) {
1194                 case CLS_NEW:
1195                         cl_lock_state_set(env, lock, CLS_QUEUING);
1196                         /* fall-through */
1197                 case CLS_QUEUING:
1198                         /* kick layers. */
1199                         result = cl_enqueue_kick(env, lock, io, flags);
1200                         if (result == 0)
1201                                 cl_lock_state_set(env, lock, CLS_ENQUEUED);
1202                         break;
1203                 case CLS_INTRANSIT:
1204                         LASSERT(cl_lock_is_intransit(lock));
1205                         result = CLO_WAIT;
1206                         break;
1207                 case CLS_CACHED:
1208                         /* yank lock from the cache. */
1209                         result = cl_use_try(env, lock, 0);
1210                         break;
1211                 case CLS_ENQUEUED:
1212                 case CLS_HELD:
1213                         result = 0;
1214                         break;
1215                 default:
1216                 case CLS_FREEING:
1217                         /*
1218                          * impossible, only held locks with increased
1219                          * ->cll_holds can be enqueued, and they cannot be
1220                          * freed.
1221                          */
1222                         LBUG();
1223                 }
1224         } while (result == CLO_REPEAT);
1225         if (result < 0)
1226                 cl_lock_error(env, lock, result);
1227         RETURN(result ?: lock->cll_error);
1228 }
1229 EXPORT_SYMBOL(cl_enqueue_try);
1230
1231 static int cl_enqueue_locked(const struct lu_env *env, struct cl_lock *lock,
1232                              struct cl_io *io, __u32 enqflags)
1233 {
1234         int result;
1235
1236         ENTRY;
1237
1238         LINVRNT(cl_lock_is_mutexed(lock));
1239         LINVRNT(cl_lock_invariant(env, lock));
1240         LASSERT(lock->cll_holds > 0);
1241
1242         cl_lock_user_add(env, lock);
1243         do {
1244                 result = cl_enqueue_try(env, lock, io, enqflags);
1245                 if (result == CLO_WAIT) {
1246                         result = cl_lock_state_wait(env, lock);
1247                         if (result == 0)
1248                                 continue;
1249                 }
1250                 break;
1251         } while (1);
1252         if (result != 0) {
1253                 cl_lock_user_del(env, lock);
1254                 if (result != -EINTR)
1255                         cl_lock_error(env, lock, result);
1256         }
1257         LASSERT(ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
1258                      lock->cll_state == CLS_HELD));
1259         RETURN(result);
1260 }
1261
1262 /**
1263  * Enqueues a lock.
1264  *
1265  * \pre current thread or io owns a hold on lock.
1266  *
1267  * \post ergo(result == 0, lock->users increased)
1268  * \post ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
1269  *                         lock->cll_state == CLS_HELD)
1270  */
1271 int cl_enqueue(const struct lu_env *env, struct cl_lock *lock,
1272                struct cl_io *io, __u32 enqflags)
1273 {
1274         int result;
1275
1276         ENTRY;
1277
1278         cl_lock_lockdep_acquire(env, lock, enqflags);
1279         cl_lock_mutex_get(env, lock);
1280         result = cl_enqueue_locked(env, lock, io, enqflags);
1281         cl_lock_mutex_put(env, lock);
1282         if (result != 0)
1283                 cl_lock_lockdep_release(env, lock);
1284         LASSERT(ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
1285                      lock->cll_state == CLS_HELD));
1286         RETURN(result);
1287 }
1288 EXPORT_SYMBOL(cl_enqueue);
1289
1290 /**
1291  * Tries to unlock a lock.
1292  *
1293  * This function is called repeatedly by cl_unuse() until either lock is
1294  * unlocked, or error occurs.
1295  *
1296  * \pre  lock->cll_state <= CLS_HELD || cl_lock_is_intransit(lock)
1297  *
1298  * \post ergo(result == 0, lock->cll_state == CLS_CACHED)
1299  *
1300  * \see cl_unuse() cl_lock_operations::clo_unuse()
1301  * \see cl_lock_state::CLS_CACHED
1302  */
1303 int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock)
1304 {
1305         int                         result;
1306         enum cl_lock_state          state = CLS_NEW;
1307
1308         ENTRY;
1309         cl_lock_trace(D_DLMTRACE, env, "unuse lock", lock);
1310
1311         if (lock->cll_state != CLS_INTRANSIT) {
1312                 if (lock->cll_users > 1) {
1313                         cl_lock_user_del(env, lock);
1314                         RETURN(0);
1315                 }
1316                 /*
1317                  * New lock users (->cll_users) are not protecting unlocking
1318                  * from proceeding. From this point, lock eventually reaches
1319                  * CLS_CACHED, is reinitialized to CLS_NEW or fails into
1320                  * CLS_FREEING.
1321                  */
1322                 state = cl_lock_intransit(env, lock);
1323         }
1324
1325         result = cl_unuse_try_internal(env, lock);
1326         LASSERT(lock->cll_state == CLS_INTRANSIT);
1327         if (result != CLO_WAIT)
1328                 /*
1329                  * Once there is no more need to iterate ->clo_unuse() calls,
1330                  * remove lock user. This is done even if unrecoverable error
1331                  * happened during unlocking, because nothing else can be
1332                  * done.
1333                  */
1334                 cl_lock_user_del(env, lock);
1335         if (result == 0 || result == -ESTALE) {
1336                 /*
1337                  * Return lock back to the cache. This is the only
1338                  * place where lock is moved into CLS_CACHED state.
1339                  *
1340                  * If one of ->clo_unuse() methods returned -ESTALE, lock
1341                  * cannot be placed into cache and has to be
1342                  * re-initialized. This happens e.g., when a sub-lock was
1343                  * canceled while unlocking was in progress.
1344                  */
1345                 state = result == 0 ? CLS_CACHED : CLS_NEW;
1346                 cl_lock_extransit(env, lock, state);
1347
1348                 /*
1349                  * Hide -ESTALE error.
1350                  * If the lock is a glimpse lock, and it has multiple
1351                  * stripes. Assuming that one of its sublock returned -ENAVAIL,
1352                  * and other sublocks are matched write locks. In this case,
1353                  * we can't set this lock to error because otherwise some of
1354                  * its sublocks may not be canceled. This causes some dirty
1355                  * pages won't be written to OSTs. -jay
1356                  */
1357                 result = 0;
1358         } else {
1359                 CWARN("result = %d, this is unlikely!\n", result);
1360                 cl_lock_extransit(env, lock, state);
1361         }
1362
1363         result = result ?: lock->cll_error;
1364         if (result < 0)
1365                 cl_lock_error(env, lock, result);
1366         RETURN(result);
1367 }
1368 EXPORT_SYMBOL(cl_unuse_try);
1369
1370 static void cl_unuse_locked(const struct lu_env *env, struct cl_lock *lock)
1371 {
1372         ENTRY;
1373         LASSERT(lock->cll_state <= CLS_HELD);
1374         do {
1375                 int result;
1376
1377                 result = cl_unuse_try(env, lock);
1378                 if (result == CLO_WAIT) {
1379                         result = cl_lock_state_wait(env, lock);
1380                         if (result == 0)
1381                                 continue;
1382                 }
1383                 break;
1384         } while (1);
1385         EXIT;
1386 }
1387
1388 /**
1389  * Unlocks a lock.
1390  */
1391 void cl_unuse(const struct lu_env *env, struct cl_lock *lock)
1392 {
1393         ENTRY;
1394         cl_lock_mutex_get(env, lock);
1395         cl_unuse_locked(env, lock);
1396         cl_lock_mutex_put(env, lock);
1397         cl_lock_lockdep_release(env, lock);
1398         EXIT;
1399 }
1400 EXPORT_SYMBOL(cl_unuse);
1401
1402 /**
1403  * Tries to wait for a lock.
1404  *
1405  * This function is called repeatedly by cl_wait() until either lock is
1406  * granted, or error occurs. This function does not block waiting for network
1407  * communication to complete.
1408  *
1409  * \see cl_wait() cl_lock_operations::clo_wait()
1410  * \see cl_lock_state::CLS_HELD
1411  */
1412 int cl_wait_try(const struct lu_env *env, struct cl_lock *lock)
1413 {
1414         const struct cl_lock_slice *slice;
1415         int                         result;
1416
1417         ENTRY;
1418         cl_lock_trace(D_DLMTRACE, env, "wait lock try", lock);
1419         do {
1420                 LINVRNT(cl_lock_is_mutexed(lock));
1421                 LINVRNT(cl_lock_invariant(env, lock));
1422                 LASSERT(lock->cll_state == CLS_ENQUEUED ||
1423                         lock->cll_state == CLS_HELD ||
1424                         lock->cll_state == CLS_INTRANSIT);
1425                 LASSERT(lock->cll_users > 0);
1426                 LASSERT(lock->cll_holds > 0);
1427
1428                 result = 0;
1429                 if (lock->cll_error != 0)
1430                         break;
1431
1432                 if (cl_lock_is_intransit(lock)) {
1433                         result = CLO_WAIT;
1434                         break;
1435                 }
1436
1437                 if (lock->cll_state == CLS_HELD)
1438                         /* nothing to do */
1439                         break;
1440
1441                 result = -ENOSYS;
1442                 list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
1443                         if (slice->cls_ops->clo_wait != NULL) {
1444                                 result = slice->cls_ops->clo_wait(env, slice);
1445                                 if (result != 0)
1446                                         break;
1447                         }
1448                 }
1449                 LASSERT(result != -ENOSYS);
1450                 if (result == 0)
1451                         cl_lock_state_set(env, lock, CLS_HELD);
1452         } while (result == CLO_REPEAT);
1453         RETURN(result ?: lock->cll_error);
1454 }
1455 EXPORT_SYMBOL(cl_wait_try);
1456
1457 /**
1458  * Waits until enqueued lock is granted.
1459  *
1460  * \pre current thread or io owns a hold on the lock
1461  * \pre ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
1462  *                        lock->cll_state == CLS_HELD)
1463  *
1464  * \post ergo(result == 0, lock->cll_state == CLS_HELD)
1465  */
1466 int cl_wait(const struct lu_env *env, struct cl_lock *lock)
1467 {
1468         int result;
1469
1470         ENTRY;
1471         cl_lock_mutex_get(env, lock);
1472
1473         LINVRNT(cl_lock_invariant(env, lock));
1474         LASSERT(lock->cll_state == CLS_ENQUEUED || lock->cll_state == CLS_HELD);
1475         LASSERT(lock->cll_holds > 0);
1476         cl_lock_trace(D_DLMTRACE, env, "wait lock", lock);
1477
1478         do {
1479                 result = cl_wait_try(env, lock);
1480                 if (result == CLO_WAIT) {
1481                         result = cl_lock_state_wait(env, lock);
1482                         if (result == 0)
1483                                 continue;
1484                 }
1485                 break;
1486         } while (1);
1487         if (result < 0) {
1488                 cl_lock_user_del(env, lock);
1489                 if (result != -EINTR)
1490                         cl_lock_error(env, lock, result);
1491                 cl_lock_lockdep_release(env, lock);
1492         }
1493         cl_lock_mutex_put(env, lock);
1494         LASSERT(ergo(result == 0, lock->cll_state == CLS_HELD));
1495         RETURN(result);
1496 }
1497 EXPORT_SYMBOL(cl_wait);
1498
1499 /**
1500  * Executes cl_lock_operations::clo_weigh(), and sums results to estimate lock
1501  * value.
1502  */
1503 unsigned long cl_lock_weigh(const struct lu_env *env, struct cl_lock *lock)
1504 {
1505         const struct cl_lock_slice *slice;
1506         unsigned long pound;
1507         unsigned long ounce;
1508
1509         ENTRY;
1510         LINVRNT(cl_lock_is_mutexed(lock));
1511         LINVRNT(cl_lock_invariant(env, lock));
1512
1513         pound = 0;
1514         list_for_each_entry_reverse(slice, &lock->cll_layers, cls_linkage) {
1515                 if (slice->cls_ops->clo_weigh != NULL) {
1516                         ounce = slice->cls_ops->clo_weigh(env, slice);
1517                         pound += ounce;
1518                         if (pound < ounce) /* over-weight^Wflow */
1519                                 pound = ~0UL;
1520                 }
1521         }
1522         RETURN(pound);
1523 }
1524 EXPORT_SYMBOL(cl_lock_weigh);
1525
1526 /**
1527  * Notifies layers that lock description changed.
1528  *
1529  * The server can grant client a lock different from one that was requested
1530  * (e.g., larger in extent). This method is called when actually granted lock
1531  * description becomes known to let layers to accommodate for changed lock
1532  * description.
1533  *
1534  * \see cl_lock_operations::clo_modify()
1535  */
1536 int cl_lock_modify(const struct lu_env *env, struct cl_lock *lock,
1537                    const struct cl_lock_descr *desc)
1538 {
1539         const struct cl_lock_slice *slice;
1540         struct cl_object           *obj = lock->cll_descr.cld_obj;
1541         struct cl_object_header    *hdr = cl_object_header(obj);
1542         int result;
1543
1544         ENTRY;
1545         cl_lock_trace(D_DLMTRACE, env, "modify lock", lock);
1546         /* don't allow object to change */
1547         LASSERT(obj == desc->cld_obj);
1548         LINVRNT(cl_lock_is_mutexed(lock));
1549         LINVRNT(cl_lock_invariant(env, lock));
1550
1551         list_for_each_entry_reverse(slice, &lock->cll_layers, cls_linkage) {
1552                 if (slice->cls_ops->clo_modify != NULL) {
1553                         result = slice->cls_ops->clo_modify(env, slice, desc);
1554                         if (result != 0)
1555                                 RETURN(result);
1556                 }
1557         }
1558         CL_LOCK_DEBUG(D_DLMTRACE, env, lock, " -> "DDESCR"@"DFID"\n",
1559                       PDESCR(desc), PFID(lu_object_fid(&desc->cld_obj->co_lu)));
1560         /*
1561          * Just replace description in place. Nothing more is needed for
1562          * now. If locks were indexed according to their extent and/or mode,
1563          * that index would have to be updated here.
1564          */
1565         spin_lock(&hdr->coh_lock_guard);
1566         lock->cll_descr = *desc;
1567         spin_unlock(&hdr->coh_lock_guard);
1568         RETURN(0);
1569 }
1570 EXPORT_SYMBOL(cl_lock_modify);
1571
1572 /**
1573  * Initializes lock closure with a given origin.
1574  *
1575  * \see cl_lock_closure
1576  */
1577 void cl_lock_closure_init(const struct lu_env *env,
1578                           struct cl_lock_closure *closure,
1579                           struct cl_lock *origin, int wait)
1580 {
1581         LINVRNT(cl_lock_is_mutexed(origin));
1582         LINVRNT(cl_lock_invariant(env, origin));
1583
1584         CFS_INIT_LIST_HEAD(&closure->clc_list);
1585         closure->clc_origin = origin;
1586         closure->clc_wait   = wait;
1587         closure->clc_nr     = 0;
1588 }
1589 EXPORT_SYMBOL(cl_lock_closure_init);
1590
1591 /**
1592  * Builds a closure of \a lock.
1593  *
1594  * Building of a closure consists of adding initial lock (\a lock) into it,
1595  * and calling cl_lock_operations::clo_closure() methods of \a lock. These
1596  * methods might call cl_lock_closure_build() recursively again, adding more
1597  * locks to the closure, etc.
1598  *
1599  * \see cl_lock_closure
1600  */
1601 int cl_lock_closure_build(const struct lu_env *env, struct cl_lock *lock,
1602                           struct cl_lock_closure *closure)
1603 {
1604         const struct cl_lock_slice *slice;
1605         int result;
1606
1607         ENTRY;
1608         LINVRNT(cl_lock_is_mutexed(closure->clc_origin));
1609         LINVRNT(cl_lock_invariant(env, closure->clc_origin));
1610
1611         result = cl_lock_enclosure(env, lock, closure);
1612         if (result == 0) {
1613                 list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
1614                         if (slice->cls_ops->clo_closure != NULL) {
1615                                 result = slice->cls_ops->clo_closure(env, slice,
1616                                                                      closure);
1617                                 if (result != 0)
1618                                         break;
1619                         }
1620                 }
1621         }
1622         if (result != 0)
1623                 cl_lock_disclosure(env, closure);
1624         RETURN(result);
1625 }
1626 EXPORT_SYMBOL(cl_lock_closure_build);
1627
1628 /**
1629  * Adds new lock to a closure.
1630  *
1631  * Try-locks \a lock and if succeeded, adds it to the closure (never more than
1632  * once). If try-lock failed, returns CLO_REPEAT, after optionally waiting
1633  * until next try-lock is likely to succeed.
1634  */
1635 int cl_lock_enclosure(const struct lu_env *env, struct cl_lock *lock,
1636                       struct cl_lock_closure *closure)
1637 {
1638         int result = 0;
1639         ENTRY;
1640         cl_lock_trace(D_DLMTRACE, env, "enclosure lock", lock);
1641         if (!cl_lock_mutex_try(env, lock)) {
1642                 /*
1643                  * If lock->cll_inclosure is not empty, lock is already in
1644                  * this closure.
1645                  */
1646                 if (list_empty(&lock->cll_inclosure)) {
1647                         cl_lock_get_trust(lock);
1648                         lu_ref_add(&lock->cll_reference, "closure", closure);
1649                         list_add(&lock->cll_inclosure, &closure->clc_list);
1650                         closure->clc_nr++;
1651                 } else
1652                         cl_lock_mutex_put(env, lock);
1653                 result = 0;
1654         } else {
1655                 cl_lock_disclosure(env, closure);
1656                 if (closure->clc_wait) {
1657                         cl_lock_get_trust(lock);
1658                         lu_ref_add(&lock->cll_reference, "closure-w", closure);
1659                         cl_lock_mutex_put(env, closure->clc_origin);
1660
1661                         LASSERT(cl_lock_nr_mutexed(env) == 0);
1662                         cl_lock_mutex_get(env, lock);
1663                         cl_lock_mutex_put(env, lock);
1664
1665                         cl_lock_mutex_get(env, closure->clc_origin);
1666                         lu_ref_del(&lock->cll_reference, "closure-w", closure);
1667                         cl_lock_put(env, lock);
1668                 }
1669                 result = CLO_REPEAT;
1670         }
1671         RETURN(result);
1672 }
1673 EXPORT_SYMBOL(cl_lock_enclosure);
1674
1675 /** Releases mutices of enclosed locks. */
1676 void cl_lock_disclosure(const struct lu_env *env,
1677                         struct cl_lock_closure *closure)
1678 {
1679         struct cl_lock *scan;
1680         struct cl_lock *temp;
1681
1682         cl_lock_trace(D_DLMTRACE, env, "disclosure lock", closure->clc_origin);
1683         list_for_each_entry_safe(scan, temp, &closure->clc_list, cll_inclosure){
1684                 list_del_init(&scan->cll_inclosure);
1685                 cl_lock_mutex_put(env, scan);
1686                 lu_ref_del(&scan->cll_reference, "closure", closure);
1687                 cl_lock_put(env, scan);
1688                 closure->clc_nr--;
1689         }
1690         LASSERT(closure->clc_nr == 0);
1691 }
1692 EXPORT_SYMBOL(cl_lock_disclosure);
1693
1694 /** Finalizes a closure. */
1695 void cl_lock_closure_fini(struct cl_lock_closure *closure)
1696 {
1697         LASSERT(closure->clc_nr == 0);
1698         LASSERT(list_empty(&closure->clc_list));
1699 }
1700 EXPORT_SYMBOL(cl_lock_closure_fini);
1701
1702 /**
1703  * Destroys this lock. Notifies layers (bottom-to-top) that lock is being
1704  * destroyed, then destroy the lock. If there are holds on the lock, postpone
1705  * destruction until all holds are released. This is called when a decision is
1706  * made to destroy the lock in the future. E.g., when a blocking AST is
1707  * received on it, or fatal communication error happens.
1708  *
1709  * Caller must have a reference on this lock to prevent a situation, when
1710  * deleted lock lingers in memory for indefinite time, because nobody calls
1711  * cl_lock_put() to finish it.
1712  *
1713  * \pre atomic_read(&lock->cll_ref) > 0
1714  * \pre ergo(cl_lock_nesting(lock) == CNL_TOP,
1715  *           cl_lock_nr_mutexed(env) == 1)
1716  *      [i.e., if a top-lock is deleted, mutices of no other locks can be
1717  *      held, as deletion of sub-locks might require releasing a top-lock
1718  *      mutex]
1719  *
1720  * \see cl_lock_operations::clo_delete()
1721  * \see cl_lock::cll_holds
1722  */
1723 void cl_lock_delete(const struct lu_env *env, struct cl_lock *lock)
1724 {
1725         LINVRNT(cl_lock_is_mutexed(lock));
1726         LINVRNT(cl_lock_invariant(env, lock));
1727         LASSERT(ergo(cl_lock_nesting(lock) == CNL_TOP,
1728                      cl_lock_nr_mutexed(env) == 1));
1729
1730         ENTRY;
1731         cl_lock_trace(D_DLMTRACE, env, "delete lock", lock);
1732         if (lock->cll_holds == 0)
1733                 cl_lock_delete0(env, lock);
1734         else
1735                 lock->cll_flags |= CLF_DOOMED;
1736         EXIT;
1737 }
1738 EXPORT_SYMBOL(cl_lock_delete);
1739
1740 /**
1741  * Mark lock as irrecoverably failed, and mark it for destruction. This
1742  * happens when, e.g., server fails to grant a lock to us, or networking
1743  * time-out happens.
1744  *
1745  * \pre atomic_read(&lock->cll_ref) > 0
1746  *
1747  * \see clo_lock_delete()
1748  * \see cl_lock::cll_holds
1749  */
1750 void cl_lock_error(const struct lu_env *env, struct cl_lock *lock, int error)
1751 {
1752         LINVRNT(cl_lock_is_mutexed(lock));
1753         LINVRNT(cl_lock_invariant(env, lock));
1754
1755         ENTRY;
1756         cl_lock_trace(D_DLMTRACE, env, "set lock error", lock);
1757         if (lock->cll_error == 0 && error != 0) {
1758                 lock->cll_error = error;
1759                 cl_lock_signal(env, lock);
1760                 cl_lock_cancel(env, lock);
1761                 cl_lock_delete(env, lock);
1762         }
1763         EXIT;
1764 }
1765 EXPORT_SYMBOL(cl_lock_error);
1766
1767 /**
1768  * Cancels this lock. Notifies layers
1769  * (bottom-to-top) that lock is being cancelled, then destroy the lock. If
1770  * there are holds on the lock, postpone cancellation until
1771  * all holds are released.
1772  *
1773  * Cancellation notification is delivered to layers at most once.
1774  *
1775  * \see cl_lock_operations::clo_cancel()
1776  * \see cl_lock::cll_holds
1777  */
1778 void cl_lock_cancel(const struct lu_env *env, struct cl_lock *lock)
1779 {
1780         LINVRNT(cl_lock_is_mutexed(lock));
1781         LINVRNT(cl_lock_invariant(env, lock));
1782
1783         ENTRY;
1784         cl_lock_trace(D_DLMTRACE, env, "cancel lock", lock);
1785         if (lock->cll_holds == 0)
1786                 cl_lock_cancel0(env, lock);
1787         else
1788                 lock->cll_flags |= CLF_CANCELPEND;
1789         EXIT;
1790 }
1791 EXPORT_SYMBOL(cl_lock_cancel);
1792
1793 /**
1794  * Finds an existing lock covering given page and optionally different from a
1795  * given \a except lock.
1796  */
1797 struct cl_lock *cl_lock_at_page(const struct lu_env *env, struct cl_object *obj,
1798                                 struct cl_page *page, struct cl_lock *except,
1799                                 int pending, int canceld)
1800 {
1801         struct cl_object_header *head;
1802         struct cl_lock          *scan;
1803         struct cl_lock          *lock;
1804         struct cl_lock_descr    *need;
1805
1806         ENTRY;
1807
1808         head = cl_object_header(obj);
1809         need = &cl_env_info(env)->clt_descr;
1810         lock = NULL;
1811
1812         need->cld_mode = CLM_READ; /* CLM_READ matches both READ & WRITE, but
1813                                     * not PHANTOM */
1814         need->cld_start = need->cld_end = page->cp_index;
1815
1816         spin_lock(&head->coh_lock_guard);
1817         /* It is fine to match any group lock since there could be only one
1818          * with a uniq gid and it conflicts with all other lock modes too */
1819         list_for_each_entry(scan, &head->coh_locks, cll_linkage) {
1820                 if (scan != except &&
1821                     (scan->cll_descr.cld_mode == CLM_GROUP ||
1822                     cl_lock_ext_match(&scan->cll_descr, need)) &&
1823                     scan->cll_state >= CLS_HELD &&
1824                     scan->cll_state < CLS_FREEING &&
1825                     /*
1826                      * This check is racy as the lock can be canceled right
1827                      * after it is done, but this is fine, because page exists
1828                      * already.
1829                      */
1830                     (canceld || !(scan->cll_flags & CLF_CANCELLED)) &&
1831                     (pending || !(scan->cll_flags & CLF_CANCELPEND))) {
1832                         /* Don't increase cs_hit here since this
1833                          * is just a helper function. */
1834                         cl_lock_get_trust(scan);
1835                         lock = scan;
1836                         break;
1837                 }
1838         }
1839         spin_unlock(&head->coh_lock_guard);
1840         RETURN(lock);
1841 }
1842 EXPORT_SYMBOL(cl_lock_at_page);
1843
1844 /**
1845  * Returns a list of pages protected (only) by a given lock.
1846  *
1847  * Scans an extent of page radix tree, corresponding to the \a lock and queues
1848  * all pages that are not protected by locks other than \a lock into \a queue.
1849  */
1850 void cl_lock_page_list_fixup(const struct lu_env *env,
1851                              struct cl_io *io, struct cl_lock *lock,
1852                              struct cl_page_list *queue)
1853 {
1854         struct cl_page        *page;
1855         struct cl_page        *temp;
1856         struct cl_page_list   *plist = &cl_env_info(env)->clt_list;
1857
1858         LINVRNT(cl_lock_invariant(env, lock));
1859         ENTRY;
1860
1861         /* Now, we have a list of cl_pages under the \a lock, we need
1862          * to check if some of pages are covered by other ldlm lock.
1863          * If this is the case, they aren't needed to be written out this time.
1864          *
1865          * For example, we have A:[0,200] & B:[100,300] PW locks on client, now
1866          * the latter is to be canceled, this means other client is
1867          * reading/writing [200,300] since A won't canceled. Actually
1868          * we just need to write the pages covered by [200,300]. This is safe,
1869          * since [100,200] is also protected lock A.
1870          */
1871
1872         cl_page_list_init(plist);
1873         cl_page_list_for_each_safe(page, temp, queue) {
1874                 pgoff_t                idx = page->cp_index;
1875                 struct cl_lock        *found;
1876                 struct cl_lock_descr  *descr;
1877
1878                 /* The algorithm counts on the index-ascending page index. */
1879                 LASSERT(ergo(&temp->cp_batch != &queue->pl_pages,
1880                         page->cp_index < temp->cp_index));
1881
1882                 found = cl_lock_at_page(env, lock->cll_descr.cld_obj,
1883                                         page, lock, 0, 0);
1884                 if (found == NULL)
1885                         continue;
1886
1887                 descr = &found->cll_descr;
1888                 list_for_each_entry_safe_from(page, temp, &queue->pl_pages,
1889                                               cp_batch) {
1890                         idx = page->cp_index;
1891                         if (descr->cld_start > idx || descr->cld_end < idx)
1892                                 break;
1893                         cl_page_list_move(plist, queue, page);
1894                 }
1895                 cl_lock_put(env, found);
1896         }
1897
1898         /* The pages in plist are covered by other locks, don't handle them
1899          * this time.
1900          */
1901         if (io != NULL)
1902                 cl_page_list_disown(env, io, plist);
1903         cl_page_list_fini(env, plist);
1904         EXIT;
1905 }
1906 EXPORT_SYMBOL(cl_lock_page_list_fixup);
1907
1908 /**
1909  * Invalidate pages protected by the given lock, sending them out to the
1910  * server first, if necessary.
1911  *
1912  * This function does the following:
1913  *
1914  *     - collects a list of pages to be invalidated,
1915  *
1916  *     - unmaps them from the user virtual memory,
1917  *
1918  *     - sends dirty pages to the server,
1919  *
1920  *     - waits for transfer completion,
1921  *
1922  *     - discards pages, and throws them out of memory.
1923  *
1924  * If \a discard is set, pages are discarded without sending them to the
1925  * server.
1926  *
1927  * If error happens on any step, the process continues anyway (the reasoning
1928  * behind this being that lock cancellation cannot be delayed indefinitely).
1929  */
1930 int cl_lock_page_out(const struct lu_env *env, struct cl_lock *lock,
1931                      int discard)
1932 {
1933         struct cl_thread_info *info  = cl_env_info(env);
1934         struct cl_io          *io    = &info->clt_io;
1935         struct cl_2queue      *queue = &info->clt_queue;
1936         struct cl_lock_descr  *descr = &lock->cll_descr;
1937         long page_count;
1938         int result;
1939
1940         LINVRNT(cl_lock_invariant(env, lock));
1941         ENTRY;
1942
1943         io->ci_obj = cl_object_top(descr->cld_obj);
1944         result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
1945         if (result == 0) {
1946                 int nonblock = 1;
1947
1948 restart:
1949                 cl_2queue_init(queue);
1950                 cl_page_gang_lookup(env, descr->cld_obj, io, descr->cld_start,
1951                                     descr->cld_end, &queue->c2_qin, nonblock);
1952                 page_count = queue->c2_qin.pl_nr;
1953                 if (page_count > 0) {
1954                         result = cl_page_list_unmap(env, io, &queue->c2_qin);
1955                         if (!discard) {
1956                                 long timeout = 600; /* 10 minutes. */
1957                                 /* for debug purpose, if this request can't be
1958                                  * finished in 10 minutes, we hope it can
1959                                  * notify us.
1960                                  */
1961                                 result = cl_io_submit_sync(env, io, CRT_WRITE,
1962                                                            queue, CRP_CANCEL,
1963                                                            timeout);
1964                                 if (result)
1965                                         CWARN("Writing %lu pages error: %d\n",
1966                                               page_count, result);
1967                         }
1968                         cl_lock_page_list_fixup(env, io, lock, &queue->c2_qout);
1969                         cl_2queue_discard(env, io, queue);
1970                         cl_2queue_disown(env, io, queue);
1971                 }
1972                 cl_2queue_fini(env, queue);
1973
1974                 if (nonblock) {
1975                         nonblock = 0;
1976                         goto restart;
1977                 }
1978         }
1979         cl_io_fini(env, io);
1980         RETURN(result);
1981 }
1982 EXPORT_SYMBOL(cl_lock_page_out);
1983
1984 /**
1985  * Eliminate all locks for a given object.
1986  *
1987  * Caller has to guarantee that no lock is in active use.
1988  *
1989  * \param cancel when this is set, cl_locks_prune() cancels locks before
1990  *               destroying.
1991  */
1992 void cl_locks_prune(const struct lu_env *env, struct cl_object *obj, int cancel)
1993 {
1994         struct cl_object_header *head;
1995         struct cl_lock          *lock;
1996
1997         ENTRY;
1998         head = cl_object_header(obj);
1999         /*
2000          * If locks are destroyed without cancellation, all pages must be
2001          * already destroyed (as otherwise they will be left unprotected).
2002          */
2003         LASSERT(ergo(!cancel,
2004                      head->coh_tree.rnode == NULL && head->coh_pages == 0));
2005
2006         spin_lock(&head->coh_lock_guard);
2007         while (!list_empty(&head->coh_locks)) {
2008                 lock = container_of(head->coh_locks.next,
2009                                     struct cl_lock, cll_linkage);
2010                 cl_lock_get_trust(lock);
2011                 spin_unlock(&head->coh_lock_guard);
2012                 lu_ref_add(&lock->cll_reference, "prune", cfs_current());
2013                 cl_lock_mutex_get(env, lock);
2014                 if (lock->cll_state < CLS_FREEING) {
2015                         LASSERT(lock->cll_holds == 0);
2016                         LASSERT(lock->cll_users == 0);
2017                         if (cancel)
2018                                 cl_lock_cancel(env, lock);
2019                         cl_lock_delete(env, lock);
2020                 }
2021                 cl_lock_mutex_put(env, lock);
2022                 lu_ref_del(&lock->cll_reference, "prune", cfs_current());
2023                 cl_lock_put(env, lock);
2024                 spin_lock(&head->coh_lock_guard);
2025         }
2026         spin_unlock(&head->coh_lock_guard);
2027         EXIT;
2028 }
2029 EXPORT_SYMBOL(cl_locks_prune);
2030
2031 /**
2032  * Returns true if \a addr is an address of an allocated cl_lock. Used in
2033  * assertions. This check is optimistically imprecise, i.e., it occasionally
2034  * returns true for the incorrect addresses, but if it returns false, then the
2035  * address is guaranteed to be incorrect. (Should be named cl_lockp().)
2036  *
2037  * \see cl_is_page()
2038  */
2039 int cl_is_lock(const void *addr)
2040 {
2041         return cfs_mem_is_in_cache(addr, cl_lock_kmem);
2042 }
2043 EXPORT_SYMBOL(cl_is_lock);
2044
2045 static struct cl_lock *cl_lock_hold_mutex(const struct lu_env *env,
2046                                           const struct cl_io *io,
2047                                           const struct cl_lock_descr *need,
2048                                           const char *scope, const void *source)
2049 {
2050         struct cl_lock *lock;
2051
2052         ENTRY;
2053
2054         while (1) {
2055                 lock = cl_lock_find(env, io, need);
2056                 if (IS_ERR(lock))
2057                         break;
2058                 cl_lock_mutex_get(env, lock);
2059                 if (lock->cll_state < CLS_FREEING) {
2060                         cl_lock_hold_mod(env, lock, +1);
2061                         lu_ref_add(&lock->cll_holders, scope, source);
2062                         lu_ref_add(&lock->cll_reference, scope, source);
2063                         break;
2064                 }
2065                 cl_lock_mutex_put(env, lock);
2066                 cl_lock_put(env, lock);
2067         }
2068         RETURN(lock);
2069 }
2070
2071 /**
2072  * Returns a lock matching \a need description with a reference and a hold on
2073  * it.
2074  *
2075  * This is much like cl_lock_find(), except that cl_lock_hold() additionally
2076  * guarantees that lock is not in the CLS_FREEING state on return.
2077  */
2078 struct cl_lock *cl_lock_hold(const struct lu_env *env, const struct cl_io *io,
2079                              const struct cl_lock_descr *need,
2080                              const char *scope, const void *source)
2081 {
2082         struct cl_lock *lock;
2083
2084         ENTRY;
2085
2086         lock = cl_lock_hold_mutex(env, io, need, scope, source);
2087         if (!IS_ERR(lock))
2088                 cl_lock_mutex_put(env, lock);
2089         RETURN(lock);
2090 }
2091 EXPORT_SYMBOL(cl_lock_hold);
2092
2093 /**
2094  * Main high-level entry point of cl_lock interface that finds existing or
2095  * enqueues new lock matching given description.
2096  */
2097 struct cl_lock *cl_lock_request(const struct lu_env *env, struct cl_io *io,
2098                                 const struct cl_lock_descr *need,
2099                                 __u32 enqflags,
2100                                 const char *scope, const void *source)
2101 {
2102         struct cl_lock       *lock;
2103         const struct lu_fid  *fid;
2104         int                   rc;
2105         int                   iter;
2106         int warn;
2107
2108         ENTRY;
2109         fid = lu_object_fid(&io->ci_obj->co_lu);
2110         iter = 0;
2111         do {
2112                 warn = iter >= 16 && IS_PO2(iter);
2113                 CDEBUG(warn ? D_WARNING : D_DLMTRACE,
2114                        DDESCR"@"DFID" %i %08x `%s'\n",
2115                        PDESCR(need), PFID(fid), iter, enqflags, scope);
2116                 lock = cl_lock_hold_mutex(env, io, need, scope, source);
2117                 if (!IS_ERR(lock)) {
2118                         rc = cl_enqueue_locked(env, lock, io, enqflags);
2119                         if (rc == 0) {
2120                                 if (cl_lock_fits_into(env, lock, need, io)) {
2121                                         cl_lock_mutex_put(env, lock);
2122                                         cl_lock_lockdep_acquire(env,
2123                                                                 lock, enqflags);
2124                                         break;
2125                                 } else if (warn)
2126                                         CL_LOCK_DEBUG(D_WARNING, env, lock,
2127                                                       "got (see bug 17665)\n");
2128                                 cl_unuse_locked(env, lock);
2129                         }
2130                         cl_lock_trace(D_DLMTRACE, env, "enqueue failed", lock);
2131                         cl_lock_hold_release(env, lock, scope, source);
2132                         cl_lock_mutex_put(env, lock);
2133                         lu_ref_del(&lock->cll_reference, scope, source);
2134                         cl_lock_put(env, lock);
2135                         lock = ERR_PTR(rc);
2136                 } else
2137                         rc = PTR_ERR(lock);
2138                 iter++;
2139         } while (rc == 0);
2140         RETURN(lock);
2141 }
2142 EXPORT_SYMBOL(cl_lock_request);
2143
2144 /**
2145  * Adds a hold to a known lock.
2146  */
2147 void cl_lock_hold_add(const struct lu_env *env, struct cl_lock *lock,
2148                       const char *scope, const void *source)
2149 {
2150         LINVRNT(cl_lock_is_mutexed(lock));
2151         LINVRNT(cl_lock_invariant(env, lock));
2152         LASSERT(lock->cll_state != CLS_FREEING);
2153
2154         ENTRY;
2155         cl_lock_hold_mod(env, lock, +1);
2156         cl_lock_get(lock);
2157         lu_ref_add(&lock->cll_holders, scope, source);
2158         lu_ref_add(&lock->cll_reference, scope, source);
2159         EXIT;
2160 }
2161 EXPORT_SYMBOL(cl_lock_hold_add);
2162
2163 /**
2164  * Releases a hold and a reference on a lock, on which caller acquired a
2165  * mutex.
2166  */
2167 void cl_lock_unhold(const struct lu_env *env, struct cl_lock *lock,
2168                     const char *scope, const void *source)
2169 {
2170         LINVRNT(cl_lock_invariant(env, lock));
2171         ENTRY;
2172         cl_lock_hold_release(env, lock, scope, source);
2173         lu_ref_del(&lock->cll_reference, scope, source);
2174         cl_lock_put(env, lock);
2175         EXIT;
2176 }
2177 EXPORT_SYMBOL(cl_lock_unhold);
2178
2179 /**
2180  * Releases a hold and a reference on a lock, obtained by cl_lock_hold().
2181  */
2182 void cl_lock_release(const struct lu_env *env, struct cl_lock *lock,
2183                      const char *scope, const void *source)
2184 {
2185         LINVRNT(cl_lock_invariant(env, lock));
2186         ENTRY;
2187         cl_lock_trace(D_DLMTRACE, env, "release lock", lock);
2188         cl_lock_mutex_get(env, lock);
2189         cl_lock_hold_release(env, lock, scope, source);
2190         cl_lock_mutex_put(env, lock);
2191         lu_ref_del(&lock->cll_reference, scope, source);
2192         cl_lock_put(env, lock);
2193         EXIT;
2194 }
2195 EXPORT_SYMBOL(cl_lock_release);
2196
2197 void cl_lock_user_add(const struct lu_env *env, struct cl_lock *lock)
2198 {
2199         LINVRNT(cl_lock_is_mutexed(lock));
2200         LINVRNT(cl_lock_invariant(env, lock));
2201
2202         ENTRY;
2203         cl_lock_used_mod(env, lock, +1);
2204         EXIT;
2205 }
2206 EXPORT_SYMBOL(cl_lock_user_add);
2207
2208 int cl_lock_user_del(const struct lu_env *env, struct cl_lock *lock)
2209 {
2210         LINVRNT(cl_lock_is_mutexed(lock));
2211         LINVRNT(cl_lock_invariant(env, lock));
2212         LASSERT(lock->cll_users > 0);
2213
2214         ENTRY;
2215         cl_lock_used_mod(env, lock, -1);
2216         RETURN(lock->cll_users == 0);
2217 }
2218 EXPORT_SYMBOL(cl_lock_user_del);
2219
2220 /**
2221  * Check if two lock's mode are compatible.
2222  *
2223  * This returns true iff en-queuing \a lock2 won't cause cancellation of \a
2224  * lock1 even when these locks overlap.
2225  */
2226 int cl_lock_compatible(const struct cl_lock *lock1, const struct cl_lock *lock2)
2227 {
2228         enum cl_lock_mode mode1;
2229         enum cl_lock_mode mode2;
2230
2231         ENTRY;
2232         mode1 = lock1->cll_descr.cld_mode;
2233         mode2 = lock2->cll_descr.cld_mode;
2234         RETURN(mode2 == CLM_PHANTOM ||
2235                (mode1 == CLM_READ && mode2 == CLM_READ));
2236 }
2237 EXPORT_SYMBOL(cl_lock_compatible);
2238
2239 const char *cl_lock_mode_name(const enum cl_lock_mode mode)
2240 {
2241         static const char *names[] = {
2242                 [CLM_PHANTOM] = "P",
2243                 [CLM_READ]    = "R",
2244                 [CLM_WRITE]   = "W",
2245                 [CLM_GROUP]   = "G"
2246         };
2247         if (0 <= mode && mode < ARRAY_SIZE(names))
2248                 return names[mode];
2249         else
2250                 return "U";
2251 }
2252 EXPORT_SYMBOL(cl_lock_mode_name);
2253
2254 /**
2255  * Prints human readable representation of a lock description.
2256  */
2257 void cl_lock_descr_print(const struct lu_env *env, void *cookie,
2258                        lu_printer_t printer,
2259                        const struct cl_lock_descr *descr)
2260 {
2261         const struct lu_fid  *fid;
2262
2263         fid = lu_object_fid(&descr->cld_obj->co_lu);
2264         (*printer)(env, cookie, DDESCR"@"DFID, PDESCR(descr), PFID(fid));
2265 }
2266 EXPORT_SYMBOL(cl_lock_descr_print);
2267
2268 /**
2269  * Prints human readable representation of \a lock to the \a f.
2270  */
2271 void cl_lock_print(const struct lu_env *env, void *cookie,
2272                    lu_printer_t printer, const struct cl_lock *lock)
2273 {
2274         const struct cl_lock_slice *slice;
2275         (*printer)(env, cookie, "lock@%p[%d %d %d %d %d %08lx] ",
2276                    lock, atomic_read(&lock->cll_ref),
2277                    lock->cll_state, lock->cll_error, lock->cll_holds,
2278                    lock->cll_users, lock->cll_flags);
2279         cl_lock_descr_print(env, cookie, printer, &lock->cll_descr);
2280         (*printer)(env, cookie, " {\n");
2281
2282         list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
2283                 (*printer)(env, cookie, "    %s@%p: ",
2284                            slice->cls_obj->co_lu.lo_dev->ld_type->ldt_name,
2285                            slice);
2286                 if (slice->cls_ops->clo_print != NULL)
2287                         slice->cls_ops->clo_print(env, cookie, printer, slice);
2288                 (*printer)(env, cookie, "\n");
2289         }
2290         (*printer)(env, cookie, "} lock@%p\n", lock);
2291 }
2292 EXPORT_SYMBOL(cl_lock_print);
2293
2294 int cl_lock_init(void)
2295 {
2296         return lu_kmem_init(cl_lock_caches);
2297 }
2298
2299 void cl_lock_fini(void)
2300 {
2301         lu_kmem_fini(cl_lock_caches);
2302 }