Whamcloud - gitweb
b=24037 Changes of 2.6.32 kernel.
[fs/lustre-release.git] / lustre / obdclass / cl_lock.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Client Extent Lock.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_CLASS
42 #ifndef EXPORT_SYMTAB
43 # define EXPORT_SYMTAB
44 #endif
45
46 #include <obd_class.h>
47 #include <obd_support.h>
48 #include <lustre_fid.h>
49 #include <libcfs/list.h>
50 /* lu_time_global_{init,fini}() */
51 #include <lu_time.h>
52
53 #include <cl_object.h>
54 #include "cl_internal.h"
55
56 /** Lock class of cl_lock::cll_guard */
57 static cfs_lock_class_key_t cl_lock_guard_class;
58 static cfs_mem_cache_t *cl_lock_kmem;
59
60 static struct lu_kmem_descr cl_lock_caches[] = {
61         {
62                 .ckd_cache = &cl_lock_kmem,
63                 .ckd_name  = "cl_lock_kmem",
64                 .ckd_size  = sizeof (struct cl_lock)
65         },
66         {
67                 .ckd_cache = NULL
68         }
69 };
70
71 /**
72  * Basic lock invariant that is maintained at all times. Caller either has a
73  * reference to \a lock, or somehow assures that \a lock cannot be freed.
74  *
75  * \see cl_lock_invariant()
76  */
77 static int cl_lock_invariant_trusted(const struct lu_env *env,
78                                      const struct cl_lock *lock)
79 {
80         return  ergo(lock->cll_state == CLS_FREEING, lock->cll_holds == 0) &&
81                 cfs_atomic_read(&lock->cll_ref) >= lock->cll_holds &&
82                 lock->cll_holds >= lock->cll_users &&
83                 lock->cll_holds >= 0 &&
84                 lock->cll_users >= 0 &&
85                 lock->cll_depth >= 0;
86 }
87
88 /**
89  * Stronger lock invariant, checking that caller has a reference on a lock.
90  *
91  * \see cl_lock_invariant_trusted()
92  */
93 static int cl_lock_invariant(const struct lu_env *env,
94                              const struct cl_lock *lock)
95 {
96         int result;
97
98         result = cfs_atomic_read(&lock->cll_ref) > 0 &&
99                 cl_lock_invariant_trusted(env, lock);
100         if (!result && env != NULL)
101                 CL_LOCK_DEBUG(D_ERROR, env, lock, "invariant broken");
102         return result;
103 }
104
105 /**
106  * Returns lock "nesting": 0 for a top-lock and 1 for a sub-lock.
107  */
108 static enum clt_nesting_level cl_lock_nesting(const struct cl_lock *lock)
109 {
110         return cl_object_header(lock->cll_descr.cld_obj)->coh_nesting;
111 }
112
113 /**
114  * Returns a set of counters for this lock, depending on a lock nesting.
115  */
116 static struct cl_thread_counters *cl_lock_counters(const struct lu_env *env,
117                                                    const struct cl_lock *lock)
118 {
119         struct cl_thread_info *info;
120         enum clt_nesting_level nesting;
121
122         info = cl_env_info(env);
123         nesting = cl_lock_nesting(lock);
124         LASSERT(nesting < ARRAY_SIZE(info->clt_counters));
125         return &info->clt_counters[nesting];
126 }
127
128 static void cl_lock_trace0(int level, const struct lu_env *env,
129                            const char *prefix, const struct cl_lock *lock,
130                            const char *func, const int line)
131 {
132         struct cl_object_header *h = cl_object_header(lock->cll_descr.cld_obj);
133         CDEBUG(level, "%s: %p@(%d %p %d %d %d %d %d %lx)"
134                       "(%p/%d/%d) at %s():%d\n",
135                prefix, lock, cfs_atomic_read(&lock->cll_ref),
136                lock->cll_guarder, lock->cll_depth,
137                lock->cll_state, lock->cll_error, lock->cll_holds,
138                lock->cll_users, lock->cll_flags,
139                env, h->coh_nesting, cl_lock_nr_mutexed(env),
140                func, line);
141 }
142 #define cl_lock_trace(level, env, prefix, lock)                         \
143         cl_lock_trace0(level, env, prefix, lock, __FUNCTION__, __LINE__)
144
145 #define RETIP ((unsigned long)__builtin_return_address(0))
146
147 #ifdef CONFIG_LOCKDEP
148 static cfs_lock_class_key_t cl_lock_key;
149
150 static void cl_lock_lockdep_init(struct cl_lock *lock)
151 {
152         lockdep_set_class_and_name(lock, &cl_lock_key, "EXT");
153 }
154
155 static void cl_lock_lockdep_acquire(const struct lu_env *env,
156                                     struct cl_lock *lock, __u32 enqflags)
157 {
158         cl_lock_counters(env, lock)->ctc_nr_locks_acquired++;
159 #ifdef HAVE_LOCK_MAP_ACQUIRE
160         lock_map_acquire(&lock->dep_map);
161 #else  /* HAVE_LOCK_MAP_ACQUIRE */
162         lock_acquire(&lock->dep_map, !!(enqflags & CEF_ASYNC),
163                      /* try: */ 0, lock->cll_descr.cld_mode <= CLM_READ,
164                      /* check: */ 2, RETIP);
165 #endif /* HAVE_LOCK_MAP_ACQUIRE */
166 }
167
168 static void cl_lock_lockdep_release(const struct lu_env *env,
169                                     struct cl_lock *lock)
170 {
171         cl_lock_counters(env, lock)->ctc_nr_locks_acquired--;
172         lock_release(&lock->dep_map, 0, RETIP);
173 }
174
175 #else /* !CONFIG_LOCKDEP */
176
177 static void cl_lock_lockdep_init(struct cl_lock *lock)
178 {}
179 static void cl_lock_lockdep_acquire(const struct lu_env *env,
180                                     struct cl_lock *lock, __u32 enqflags)
181 {}
182 static void cl_lock_lockdep_release(const struct lu_env *env,
183                                     struct cl_lock *lock)
184 {}
185
186 #endif /* !CONFIG_LOCKDEP */
187
188 /**
189  * Adds lock slice to the compound lock.
190  *
191  * This is called by cl_object_operations::coo_lock_init() methods to add a
192  * per-layer state to the lock. New state is added at the end of
193  * cl_lock::cll_layers list, that is, it is at the bottom of the stack.
194  *
195  * \see cl_req_slice_add(), cl_page_slice_add(), cl_io_slice_add()
196  */
197 void cl_lock_slice_add(struct cl_lock *lock, struct cl_lock_slice *slice,
198                        struct cl_object *obj,
199                        const struct cl_lock_operations *ops)
200 {
201         ENTRY;
202         slice->cls_lock = lock;
203         cfs_list_add_tail(&slice->cls_linkage, &lock->cll_layers);
204         slice->cls_obj = obj;
205         slice->cls_ops = ops;
206         EXIT;
207 }
208 EXPORT_SYMBOL(cl_lock_slice_add);
209
210 /**
211  * Returns true iff a lock with the mode \a has provides at least the same
212  * guarantees as a lock with the mode \a need.
213  */
214 int cl_lock_mode_match(enum cl_lock_mode has, enum cl_lock_mode need)
215 {
216         LINVRNT(need == CLM_READ || need == CLM_WRITE ||
217                 need == CLM_PHANTOM || need == CLM_GROUP);
218         LINVRNT(has == CLM_READ || has == CLM_WRITE ||
219                 has == CLM_PHANTOM || has == CLM_GROUP);
220         CLASSERT(CLM_PHANTOM < CLM_READ);
221         CLASSERT(CLM_READ < CLM_WRITE);
222         CLASSERT(CLM_WRITE < CLM_GROUP);
223
224         if (has != CLM_GROUP)
225                 return need <= has;
226         else
227                 return need == has;
228 }
229 EXPORT_SYMBOL(cl_lock_mode_match);
230
231 /**
232  * Returns true iff extent portions of lock descriptions match.
233  */
234 int cl_lock_ext_match(const struct cl_lock_descr *has,
235                       const struct cl_lock_descr *need)
236 {
237         return
238                 has->cld_start <= need->cld_start &&
239                 has->cld_end >= need->cld_end &&
240                 cl_lock_mode_match(has->cld_mode, need->cld_mode) &&
241                 (has->cld_mode != CLM_GROUP || has->cld_gid == need->cld_gid);
242 }
243 EXPORT_SYMBOL(cl_lock_ext_match);
244
245 /**
246  * Returns true iff a lock with the description \a has provides at least the
247  * same guarantees as a lock with the description \a need.
248  */
249 int cl_lock_descr_match(const struct cl_lock_descr *has,
250                         const struct cl_lock_descr *need)
251 {
252         return
253                 cl_object_same(has->cld_obj, need->cld_obj) &&
254                 cl_lock_ext_match(has, need);
255 }
256 EXPORT_SYMBOL(cl_lock_descr_match);
257
258 static void cl_lock_free(const struct lu_env *env, struct cl_lock *lock)
259 {
260         struct cl_object *obj = lock->cll_descr.cld_obj;
261
262         LINVRNT(!cl_lock_is_mutexed(lock));
263
264         ENTRY;
265         cl_lock_trace(D_DLMTRACE, env, "free lock", lock);
266         cfs_might_sleep();
267         while (!cfs_list_empty(&lock->cll_layers)) {
268                 struct cl_lock_slice *slice;
269
270                 slice = cfs_list_entry(lock->cll_layers.next,
271                                        struct cl_lock_slice, cls_linkage);
272                 cfs_list_del_init(lock->cll_layers.next);
273                 slice->cls_ops->clo_fini(env, slice);
274         }
275         cfs_atomic_dec(&cl_object_site(obj)->cs_locks.cs_total);
276         cfs_atomic_dec(&cl_object_site(obj)->cs_locks_state[lock->cll_state]);
277         lu_object_ref_del_at(&obj->co_lu, lock->cll_obj_ref, "cl_lock", lock);
278         cl_object_put(env, obj);
279         lu_ref_fini(&lock->cll_reference);
280         lu_ref_fini(&lock->cll_holders);
281         cfs_mutex_destroy(&lock->cll_guard);
282         OBD_SLAB_FREE_PTR(lock, cl_lock_kmem);
283         EXIT;
284 }
285
286 /**
287  * Releases a reference on a lock.
288  *
289  * When last reference is released, lock is returned to the cache, unless it
290  * is in cl_lock_state::CLS_FREEING state, in which case it is destroyed
291  * immediately.
292  *
293  * \see cl_object_put(), cl_page_put()
294  */
295 void cl_lock_put(const struct lu_env *env, struct cl_lock *lock)
296 {
297         struct cl_object        *obj;
298         struct cl_object_header *head;
299         struct cl_site          *site;
300
301         LINVRNT(cl_lock_invariant(env, lock));
302         ENTRY;
303         obj = lock->cll_descr.cld_obj;
304         LINVRNT(obj != NULL);
305         head = cl_object_header(obj);
306         site = cl_object_site(obj);
307
308         CDEBUG(D_TRACE, "releasing reference: %d %p %lu\n",
309                cfs_atomic_read(&lock->cll_ref), lock, RETIP);
310
311         if (cfs_atomic_dec_and_test(&lock->cll_ref)) {
312                 if (lock->cll_state == CLS_FREEING) {
313                         LASSERT(cfs_list_empty(&lock->cll_linkage));
314                         cl_lock_free(env, lock);
315                 }
316                 cfs_atomic_dec(&site->cs_locks.cs_busy);
317         }
318         EXIT;
319 }
320 EXPORT_SYMBOL(cl_lock_put);
321
322 /**
323  * Acquires an additional reference to a lock.
324  *
325  * This can be called only by caller already possessing a reference to \a
326  * lock.
327  *
328  * \see cl_object_get(), cl_page_get()
329  */
330 void cl_lock_get(struct cl_lock *lock)
331 {
332         LINVRNT(cl_lock_invariant(NULL, lock));
333         CDEBUG(D_TRACE, "acquiring reference: %d %p %lu\n",
334                cfs_atomic_read(&lock->cll_ref), lock, RETIP);
335         cfs_atomic_inc(&lock->cll_ref);
336 }
337 EXPORT_SYMBOL(cl_lock_get);
338
339 /**
340  * Acquires a reference to a lock.
341  *
342  * This is much like cl_lock_get(), except that this function can be used to
343  * acquire initial reference to the cached lock. Caller has to deal with all
344  * possible races. Use with care!
345  *
346  * \see cl_page_get_trust()
347  */
348 void cl_lock_get_trust(struct cl_lock *lock)
349 {
350         struct cl_site *site = cl_object_site(lock->cll_descr.cld_obj);
351
352         CDEBUG(D_TRACE, "acquiring trusted reference: %d %p %lu\n",
353                cfs_atomic_read(&lock->cll_ref), lock, RETIP);
354         if (cfs_atomic_inc_return(&lock->cll_ref) == 1)
355                 cfs_atomic_inc(&site->cs_locks.cs_busy);
356 }
357 EXPORT_SYMBOL(cl_lock_get_trust);
358
359 /**
360  * Helper function destroying the lock that wasn't completely initialized.
361  *
362  * Other threads can acquire references to the top-lock through its
363  * sub-locks. Hence, it cannot be cl_lock_free()-ed immediately.
364  */
365 static void cl_lock_finish(const struct lu_env *env, struct cl_lock *lock)
366 {
367         cl_lock_mutex_get(env, lock);
368         cl_lock_cancel(env, lock);
369         cl_lock_delete(env, lock);
370         cl_lock_mutex_put(env, lock);
371         cl_lock_put(env, lock);
372 }
373
374 static struct cl_lock *cl_lock_alloc(const struct lu_env *env,
375                                      struct cl_object *obj,
376                                      const struct cl_io *io,
377                                      const struct cl_lock_descr *descr)
378 {
379         struct cl_lock          *lock;
380         struct lu_object_header *head;
381         struct cl_site          *site = cl_object_site(obj);
382
383         ENTRY;
384         OBD_SLAB_ALLOC_PTR_GFP(lock, cl_lock_kmem, CFS_ALLOC_IO);
385         if (lock != NULL) {
386                 cfs_atomic_set(&lock->cll_ref, 1);
387                 lock->cll_descr = *descr;
388                 lock->cll_state = CLS_NEW;
389                 cl_object_get(obj);
390                 lock->cll_obj_ref = lu_object_ref_add(&obj->co_lu,
391                                                       "cl_lock", lock);
392                 CFS_INIT_LIST_HEAD(&lock->cll_layers);
393                 CFS_INIT_LIST_HEAD(&lock->cll_linkage);
394                 CFS_INIT_LIST_HEAD(&lock->cll_inclosure);
395                 lu_ref_init(&lock->cll_reference);
396                 lu_ref_init(&lock->cll_holders);
397                 cfs_mutex_init(&lock->cll_guard);
398                 cfs_lockdep_set_class(&lock->cll_guard, &cl_lock_guard_class);
399                 cfs_waitq_init(&lock->cll_wq);
400                 head = obj->co_lu.lo_header;
401                 cfs_atomic_inc(&site->cs_locks_state[CLS_NEW]);
402                 cfs_atomic_inc(&site->cs_locks.cs_total);
403                 cfs_atomic_inc(&site->cs_locks.cs_created);
404                 cl_lock_lockdep_init(lock);
405                 cfs_list_for_each_entry(obj, &head->loh_layers,
406                                         co_lu.lo_linkage) {
407                         int err;
408
409                         err = obj->co_ops->coo_lock_init(env, obj, lock, io);
410                         if (err != 0) {
411                                 cl_lock_finish(env, lock);
412                                 lock = ERR_PTR(err);
413                                 break;
414                         }
415                 }
416         } else
417                 lock = ERR_PTR(-ENOMEM);
418         RETURN(lock);
419 }
420
421 /**
422  * Transfer the lock into INTRANSIT state and return the original state.
423  *
424  * \pre  state: CLS_CACHED, CLS_HELD or CLS_ENQUEUED
425  * \post state: CLS_INTRANSIT
426  * \see CLS_INTRANSIT
427  */
428 enum cl_lock_state cl_lock_intransit(const struct lu_env *env,
429                                      struct cl_lock *lock)
430 {
431         enum cl_lock_state state = lock->cll_state;
432
433         LASSERT(cl_lock_is_mutexed(lock));
434         LASSERT(state != CLS_INTRANSIT);
435         LASSERTF(state >= CLS_ENQUEUED && state <= CLS_CACHED,
436                  "Malformed lock state %d.\n", state);
437
438         cl_lock_state_set(env, lock, CLS_INTRANSIT);
439         lock->cll_intransit_owner = cfs_current();
440         cl_lock_hold_add(env, lock, "intransit", cfs_current());
441         return state;
442 }
443 EXPORT_SYMBOL(cl_lock_intransit);
444
445 /**
446  *  Exit the intransit state and restore the lock state to the original state
447  */
448 void cl_lock_extransit(const struct lu_env *env, struct cl_lock *lock,
449                        enum cl_lock_state state)
450 {
451         LASSERT(cl_lock_is_mutexed(lock));
452         LASSERT(lock->cll_state == CLS_INTRANSIT);
453         LASSERT(state != CLS_INTRANSIT);
454         LASSERT(lock->cll_intransit_owner == cfs_current());
455
456         lock->cll_intransit_owner = NULL;
457         cl_lock_state_set(env, lock, state);
458         cl_lock_unhold(env, lock, "intransit", cfs_current());
459 }
460 EXPORT_SYMBOL(cl_lock_extransit);
461
462 /**
463  * Checking whether the lock is intransit state
464  */
465 int cl_lock_is_intransit(struct cl_lock *lock)
466 {
467         LASSERT(cl_lock_is_mutexed(lock));
468         return lock->cll_state == CLS_INTRANSIT &&
469                lock->cll_intransit_owner != cfs_current();
470 }
471 EXPORT_SYMBOL(cl_lock_is_intransit);
472 /**
473  * Returns true iff lock is "suitable" for given io. E.g., locks acquired by
474  * truncate and O_APPEND cannot be reused for read/non-append-write, as they
475  * cover multiple stripes and can trigger cascading timeouts.
476  */
477 static int cl_lock_fits_into(const struct lu_env *env,
478                              const struct cl_lock *lock,
479                              const struct cl_lock_descr *need,
480                              const struct cl_io *io)
481 {
482         const struct cl_lock_slice *slice;
483
484         LINVRNT(cl_lock_invariant_trusted(env, lock));
485         ENTRY;
486         cfs_list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
487                 if (slice->cls_ops->clo_fits_into != NULL &&
488                     !slice->cls_ops->clo_fits_into(env, slice, need, io))
489                         RETURN(0);
490         }
491         RETURN(1);
492 }
493
494 static struct cl_lock *cl_lock_lookup(const struct lu_env *env,
495                                       struct cl_object *obj,
496                                       const struct cl_io *io,
497                                       const struct cl_lock_descr *need)
498 {
499         struct cl_lock          *lock;
500         struct cl_object_header *head;
501         struct cl_site          *site;
502
503         ENTRY;
504
505         head = cl_object_header(obj);
506         site = cl_object_site(obj);
507         LINVRNT_SPIN_LOCKED(&head->coh_lock_guard);
508         cfs_atomic_inc(&site->cs_locks.cs_lookup);
509         cfs_list_for_each_entry(lock, &head->coh_locks, cll_linkage) {
510                 int matched;
511
512                 matched = cl_lock_ext_match(&lock->cll_descr, need) &&
513                           lock->cll_state < CLS_FREEING &&
514                           lock->cll_error == 0 &&
515                           !(lock->cll_flags & CLF_CANCELLED) &&
516                           cl_lock_fits_into(env, lock, need, io);
517                 CDEBUG(D_DLMTRACE, "has: "DDESCR"(%d) need: "DDESCR": %d\n",
518                        PDESCR(&lock->cll_descr), lock->cll_state, PDESCR(need),
519                        matched);
520                 if (matched) {
521                         cl_lock_get_trust(lock);
522                         cfs_atomic_inc(&cl_object_site(obj)->cs_locks.cs_hit);
523                         RETURN(lock);
524                 }
525         }
526         RETURN(NULL);
527 }
528
529 /**
530  * Returns a lock matching description \a need.
531  *
532  * This is the main entry point into the cl_lock caching interface. First, a
533  * cache (implemented as a per-object linked list) is consulted. If lock is
534  * found there, it is returned immediately. Otherwise new lock is allocated
535  * and returned. In any case, additional reference to lock is acquired.
536  *
537  * \see cl_object_find(), cl_page_find()
538  */
539 static struct cl_lock *cl_lock_find(const struct lu_env *env,
540                                     const struct cl_io *io,
541                                     const struct cl_lock_descr *need)
542 {
543         struct cl_object_header *head;
544         struct cl_object        *obj;
545         struct cl_lock          *lock;
546         struct cl_site          *site;
547
548         ENTRY;
549
550         obj  = need->cld_obj;
551         head = cl_object_header(obj);
552         site = cl_object_site(obj);
553
554         cfs_spin_lock(&head->coh_lock_guard);
555         lock = cl_lock_lookup(env, obj, io, need);
556         cfs_spin_unlock(&head->coh_lock_guard);
557
558         if (lock == NULL) {
559                 lock = cl_lock_alloc(env, obj, io, need);
560                 if (!IS_ERR(lock)) {
561                         struct cl_lock *ghost;
562
563                         cfs_spin_lock(&head->coh_lock_guard);
564                         ghost = cl_lock_lookup(env, obj, io, need);
565                         if (ghost == NULL) {
566                                 cfs_list_add_tail(&lock->cll_linkage,
567                                                   &head->coh_locks);
568                                 cfs_spin_unlock(&head->coh_lock_guard);
569                                 cfs_atomic_inc(&site->cs_locks.cs_busy);
570                         } else {
571                                 cfs_spin_unlock(&head->coh_lock_guard);
572                                 /*
573                                  * Other threads can acquire references to the
574                                  * top-lock through its sub-locks. Hence, it
575                                  * cannot be cl_lock_free()-ed immediately.
576                                  */
577                                 cl_lock_finish(env, lock);
578                                 lock = ghost;
579                         }
580                 }
581         }
582         RETURN(lock);
583 }
584
585 /**
586  * Returns existing lock matching given description. This is similar to
587  * cl_lock_find() except that no new lock is created, and returned lock is
588  * guaranteed to be in enum cl_lock_state::CLS_HELD state.
589  */
590 struct cl_lock *cl_lock_peek(const struct lu_env *env, const struct cl_io *io,
591                              const struct cl_lock_descr *need,
592                              const char *scope, const void *source)
593 {
594         struct cl_object_header *head;
595         struct cl_object        *obj;
596         struct cl_lock          *lock;
597         int ok;
598
599         obj  = need->cld_obj;
600         head = cl_object_header(obj);
601
602         cfs_spin_lock(&head->coh_lock_guard);
603         lock = cl_lock_lookup(env, obj, io, need);
604         cfs_spin_unlock(&head->coh_lock_guard);
605
606         if (lock == NULL)
607                 return NULL;
608
609         cl_lock_mutex_get(env, lock);
610         if (lock->cll_state == CLS_INTRANSIT)
611                 cl_lock_state_wait(env, lock); /* Don't care return value. */
612         if (lock->cll_state == CLS_CACHED) {
613                 int result;
614                 result = cl_use_try(env, lock, 1);
615                 if (result < 0)
616                         cl_lock_error(env, lock, result);
617         }
618         ok = lock->cll_state == CLS_HELD;
619         if (ok) {
620                 cl_lock_hold_add(env, lock, scope, source);
621                 cl_lock_user_add(env, lock);
622                 cl_lock_put(env, lock);
623         }
624         cl_lock_mutex_put(env, lock);
625         if (!ok) {
626                 cl_lock_put(env, lock);
627                 lock = NULL;
628         }
629
630         return lock;
631 }
632 EXPORT_SYMBOL(cl_lock_peek);
633
634 /**
635  * Returns a slice within a lock, corresponding to the given layer in the
636  * device stack.
637  *
638  * \see cl_page_at()
639  */
640 const struct cl_lock_slice *cl_lock_at(const struct cl_lock *lock,
641                                        const struct lu_device_type *dtype)
642 {
643         const struct cl_lock_slice *slice;
644
645         LINVRNT(cl_lock_invariant_trusted(NULL, lock));
646         ENTRY;
647
648         cfs_list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
649                 if (slice->cls_obj->co_lu.lo_dev->ld_type == dtype)
650                         RETURN(slice);
651         }
652         RETURN(NULL);
653 }
654 EXPORT_SYMBOL(cl_lock_at);
655
656 static void cl_lock_mutex_tail(const struct lu_env *env, struct cl_lock *lock)
657 {
658         struct cl_thread_counters *counters;
659
660         counters = cl_lock_counters(env, lock);
661         lock->cll_depth++;
662         counters->ctc_nr_locks_locked++;
663         lu_ref_add(&counters->ctc_locks_locked, "cll_guard", lock);
664         cl_lock_trace(D_TRACE, env, "got mutex", lock);
665 }
666
667 /**
668  * Locks cl_lock object.
669  *
670  * This is used to manipulate cl_lock fields, and to serialize state
671  * transitions in the lock state machine.
672  *
673  * \post cl_lock_is_mutexed(lock)
674  *
675  * \see cl_lock_mutex_put()
676  */
677 void cl_lock_mutex_get(const struct lu_env *env, struct cl_lock *lock)
678 {
679         LINVRNT(cl_lock_invariant(env, lock));
680
681         if (lock->cll_guarder == cfs_current()) {
682                 LINVRNT(cl_lock_is_mutexed(lock));
683                 LINVRNT(lock->cll_depth > 0);
684         } else {
685                 struct cl_object_header *hdr;
686                 struct cl_thread_info   *info;
687                 int i;
688
689                 LINVRNT(lock->cll_guarder != cfs_current());
690                 hdr = cl_object_header(lock->cll_descr.cld_obj);
691                 /*
692                  * Check that mutices are taken in the bottom-to-top order.
693                  */
694                 info = cl_env_info(env);
695                 for (i = 0; i < hdr->coh_nesting; ++i)
696                         LASSERT(info->clt_counters[i].ctc_nr_locks_locked == 0);
697                 cfs_mutex_lock_nested(&lock->cll_guard, hdr->coh_nesting);
698                 lock->cll_guarder = cfs_current();
699                 LINVRNT(lock->cll_depth == 0);
700         }
701         cl_lock_mutex_tail(env, lock);
702 }
703 EXPORT_SYMBOL(cl_lock_mutex_get);
704
705 /**
706  * Try-locks cl_lock object.
707  *
708  * \retval 0 \a lock was successfully locked
709  *
710  * \retval -EBUSY \a lock cannot be locked right now
711  *
712  * \post ergo(result == 0, cl_lock_is_mutexed(lock))
713  *
714  * \see cl_lock_mutex_get()
715  */
716 int cl_lock_mutex_try(const struct lu_env *env, struct cl_lock *lock)
717 {
718         int result;
719
720         LINVRNT(cl_lock_invariant_trusted(env, lock));
721         ENTRY;
722
723         result = 0;
724         if (lock->cll_guarder == cfs_current()) {
725                 LINVRNT(lock->cll_depth > 0);
726                 cl_lock_mutex_tail(env, lock);
727         } else if (cfs_mutex_trylock(&lock->cll_guard)) {
728                 LINVRNT(lock->cll_depth == 0);
729                 lock->cll_guarder = cfs_current();
730                 cl_lock_mutex_tail(env, lock);
731         } else
732                 result = -EBUSY;
733         RETURN(result);
734 }
735 EXPORT_SYMBOL(cl_lock_mutex_try);
736
737 /**
738  {* Unlocks cl_lock object.
739  *
740  * \pre cl_lock_is_mutexed(lock)
741  *
742  * \see cl_lock_mutex_get()
743  */
744 void cl_lock_mutex_put(const struct lu_env *env, struct cl_lock *lock)
745 {
746         struct cl_thread_counters *counters;
747
748         LINVRNT(cl_lock_invariant(env, lock));
749         LINVRNT(cl_lock_is_mutexed(lock));
750         LINVRNT(lock->cll_guarder == cfs_current());
751         LINVRNT(lock->cll_depth > 0);
752
753         counters = cl_lock_counters(env, lock);
754         LINVRNT(counters->ctc_nr_locks_locked > 0);
755
756         cl_lock_trace(D_TRACE, env, "put mutex", lock);
757         lu_ref_del(&counters->ctc_locks_locked, "cll_guard", lock);
758         counters->ctc_nr_locks_locked--;
759         if (--lock->cll_depth == 0) {
760                 lock->cll_guarder = NULL;
761                 cfs_mutex_unlock(&lock->cll_guard);
762         }
763 }
764 EXPORT_SYMBOL(cl_lock_mutex_put);
765
766 /**
767  * Returns true iff lock's mutex is owned by the current thread.
768  */
769 int cl_lock_is_mutexed(struct cl_lock *lock)
770 {
771         return lock->cll_guarder == cfs_current();
772 }
773 EXPORT_SYMBOL(cl_lock_is_mutexed);
774
775 /**
776  * Returns number of cl_lock mutices held by the current thread (environment).
777  */
778 int cl_lock_nr_mutexed(const struct lu_env *env)
779 {
780         struct cl_thread_info *info;
781         int i;
782         int locked;
783
784         /*
785          * NOTE: if summation across all nesting levels (currently 2) proves
786          *       too expensive, a summary counter can be added to
787          *       struct cl_thread_info.
788          */
789         info = cl_env_info(env);
790         for (i = 0, locked = 0; i < ARRAY_SIZE(info->clt_counters); ++i)
791                 locked += info->clt_counters[i].ctc_nr_locks_locked;
792         return locked;
793 }
794 EXPORT_SYMBOL(cl_lock_nr_mutexed);
795
796 static void cl_lock_cancel0(const struct lu_env *env, struct cl_lock *lock)
797 {
798         LINVRNT(cl_lock_is_mutexed(lock));
799         LINVRNT(cl_lock_invariant(env, lock));
800         ENTRY;
801         if (!(lock->cll_flags & CLF_CANCELLED)) {
802                 const struct cl_lock_slice *slice;
803
804                 lock->cll_flags |= CLF_CANCELLED;
805                 cfs_list_for_each_entry_reverse(slice, &lock->cll_layers,
806                                                 cls_linkage) {
807                         if (slice->cls_ops->clo_cancel != NULL)
808                                 slice->cls_ops->clo_cancel(env, slice);
809                 }
810         }
811         EXIT;
812 }
813
814 static void cl_lock_delete0(const struct lu_env *env, struct cl_lock *lock)
815 {
816         struct cl_object_header    *head;
817         const struct cl_lock_slice *slice;
818
819         LINVRNT(cl_lock_is_mutexed(lock));
820         LINVRNT(cl_lock_invariant(env, lock));
821
822         ENTRY;
823         if (lock->cll_state < CLS_FREEING) {
824                 LASSERT(lock->cll_state != CLS_INTRANSIT);
825                 cl_lock_state_set(env, lock, CLS_FREEING);
826
827                 head = cl_object_header(lock->cll_descr.cld_obj);
828
829                 cfs_spin_lock(&head->coh_lock_guard);
830                 cfs_list_del_init(&lock->cll_linkage);
831
832                 cfs_spin_unlock(&head->coh_lock_guard);
833                 /*
834                  * From now on, no new references to this lock can be acquired
835                  * by cl_lock_lookup().
836                  */
837                 cfs_list_for_each_entry_reverse(slice, &lock->cll_layers,
838                                                 cls_linkage) {
839                         if (slice->cls_ops->clo_delete != NULL)
840                                 slice->cls_ops->clo_delete(env, slice);
841                 }
842                 /*
843                  * From now on, no new references to this lock can be acquired
844                  * by layer-specific means (like a pointer from struct
845                  * ldlm_lock in osc, or a pointer from top-lock to sub-lock in
846                  * lov).
847                  *
848                  * Lock will be finally freed in cl_lock_put() when last of
849                  * existing references goes away.
850                  */
851         }
852         EXIT;
853 }
854
855 /**
856  * Mod(ifie)s cl_lock::cll_holds counter for a given lock. Also, for a
857  * top-lock (nesting == 0) accounts for this modification in the per-thread
858  * debugging counters. Sub-lock holds can be released by a thread different
859  * from one that acquired it.
860  */
861 static void cl_lock_hold_mod(const struct lu_env *env, struct cl_lock *lock,
862                              int delta)
863 {
864         struct cl_thread_counters *counters;
865         enum clt_nesting_level     nesting;
866
867         lock->cll_holds += delta;
868         nesting = cl_lock_nesting(lock);
869         if (nesting == CNL_TOP) {
870                 counters = &cl_env_info(env)->clt_counters[CNL_TOP];
871                 counters->ctc_nr_held += delta;
872                 LASSERT(counters->ctc_nr_held >= 0);
873         }
874 }
875
876 /**
877  * Mod(ifie)s cl_lock::cll_users counter for a given lock. See
878  * cl_lock_hold_mod() for the explanation of the debugging code.
879  */
880 static void cl_lock_used_mod(const struct lu_env *env, struct cl_lock *lock,
881                              int delta)
882 {
883         struct cl_thread_counters *counters;
884         enum clt_nesting_level     nesting;
885
886         lock->cll_users += delta;
887         nesting = cl_lock_nesting(lock);
888         if (nesting == CNL_TOP) {
889                 counters = &cl_env_info(env)->clt_counters[CNL_TOP];
890                 counters->ctc_nr_used += delta;
891                 LASSERT(counters->ctc_nr_used >= 0);
892         }
893 }
894
895 static void cl_lock_hold_release(const struct lu_env *env, struct cl_lock *lock,
896                                  const char *scope, const void *source)
897 {
898         LINVRNT(cl_lock_is_mutexed(lock));
899         LINVRNT(cl_lock_invariant(env, lock));
900         LASSERT(lock->cll_holds > 0);
901
902         ENTRY;
903         cl_lock_trace(D_DLMTRACE, env, "hold release lock", lock);
904         lu_ref_del(&lock->cll_holders, scope, source);
905         cl_lock_hold_mod(env, lock, -1);
906         if (lock->cll_holds == 0) {
907                 if (lock->cll_descr.cld_mode == CLM_PHANTOM ||
908                     lock->cll_descr.cld_mode == CLM_GROUP)
909                         /*
910                          * If lock is still phantom or grouplock when user is
911                          * done with it---destroy the lock.
912                          */
913                         lock->cll_flags |= CLF_CANCELPEND|CLF_DOOMED;
914                 if (lock->cll_flags & CLF_CANCELPEND) {
915                         lock->cll_flags &= ~CLF_CANCELPEND;
916                         cl_lock_cancel0(env, lock);
917                 }
918                 if (lock->cll_flags & CLF_DOOMED) {
919                         /* no longer doomed: it's dead... Jim. */
920                         lock->cll_flags &= ~CLF_DOOMED;
921                         cl_lock_delete0(env, lock);
922                 }
923         }
924         EXIT;
925 }
926
927
928 /**
929  * Waits until lock state is changed.
930  *
931  * This function is called with cl_lock mutex locked, atomically releases
932  * mutex and goes to sleep, waiting for a lock state change (signaled by
933  * cl_lock_signal()), and re-acquires the mutex before return.
934  *
935  * This function is used to wait until lock state machine makes some progress
936  * and to emulate synchronous operations on top of asynchronous lock
937  * interface.
938  *
939  * \retval -EINTR wait was interrupted
940  *
941  * \retval 0 wait wasn't interrupted
942  *
943  * \pre cl_lock_is_mutexed(lock)
944  *
945  * \see cl_lock_signal()
946  */
947 int cl_lock_state_wait(const struct lu_env *env, struct cl_lock *lock)
948 {
949         cfs_waitlink_t waiter;
950         int result;
951
952         ENTRY;
953         LINVRNT(cl_lock_is_mutexed(lock));
954         LINVRNT(cl_lock_invariant(env, lock));
955         LASSERT(lock->cll_depth == 1);
956         LASSERT(lock->cll_state != CLS_FREEING); /* too late to wait */
957
958         cl_lock_trace(D_DLMTRACE, env, "state wait lock", lock);
959         result = lock->cll_error;
960         if (result == 0) {
961                 cfs_waitlink_init(&waiter);
962                 cfs_waitq_add(&lock->cll_wq, &waiter);
963                 cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
964                 cl_lock_mutex_put(env, lock);
965
966                 LASSERT(cl_lock_nr_mutexed(env) == 0);
967                 cfs_waitq_wait(&waiter, CFS_TASK_INTERRUPTIBLE);
968
969                 cl_lock_mutex_get(env, lock);
970                 cfs_set_current_state(CFS_TASK_RUNNING);
971                 cfs_waitq_del(&lock->cll_wq, &waiter);
972                 result = cfs_signal_pending() ? -EINTR : 0;
973         }
974         RETURN(result);
975 }
976 EXPORT_SYMBOL(cl_lock_state_wait);
977
978 static void cl_lock_state_signal(const struct lu_env *env, struct cl_lock *lock,
979                                  enum cl_lock_state state)
980 {
981         const struct cl_lock_slice *slice;
982
983         ENTRY;
984         LINVRNT(cl_lock_is_mutexed(lock));
985         LINVRNT(cl_lock_invariant(env, lock));
986
987         cfs_list_for_each_entry(slice, &lock->cll_layers, cls_linkage)
988                 if (slice->cls_ops->clo_state != NULL)
989                         slice->cls_ops->clo_state(env, slice, state);
990         cfs_waitq_broadcast(&lock->cll_wq);
991         EXIT;
992 }
993
994 /**
995  * Notifies waiters that lock state changed.
996  *
997  * Wakes up all waiters sleeping in cl_lock_state_wait(), also notifies all
998  * layers about state change by calling cl_lock_operations::clo_state()
999  * top-to-bottom.
1000  */
1001 void cl_lock_signal(const struct lu_env *env, struct cl_lock *lock)
1002 {
1003         ENTRY;
1004         cl_lock_trace(D_DLMTRACE, env, "state signal lock", lock);
1005         cl_lock_state_signal(env, lock, lock->cll_state);
1006         EXIT;
1007 }
1008 EXPORT_SYMBOL(cl_lock_signal);
1009
1010 /**
1011  * Changes lock state.
1012  *
1013  * This function is invoked to notify layers that lock state changed, possible
1014  * as a result of an asynchronous event such as call-back reception.
1015  *
1016  * \post lock->cll_state == state
1017  *
1018  * \see cl_lock_operations::clo_state()
1019  */
1020 void cl_lock_state_set(const struct lu_env *env, struct cl_lock *lock,
1021                        enum cl_lock_state state)
1022 {
1023         struct cl_site *site = cl_object_site(lock->cll_descr.cld_obj);
1024
1025         ENTRY;
1026         LASSERT(lock->cll_state <= state ||
1027                 (lock->cll_state == CLS_CACHED &&
1028                  (state == CLS_HELD || /* lock found in cache */
1029                   state == CLS_NEW  ||   /* sub-lock canceled */
1030                   state == CLS_INTRANSIT)) ||
1031                 /* lock is in transit state */
1032                 lock->cll_state == CLS_INTRANSIT);
1033
1034         if (lock->cll_state != state) {
1035                 cfs_atomic_dec(&site->cs_locks_state[lock->cll_state]);
1036                 cfs_atomic_inc(&site->cs_locks_state[state]);
1037
1038                 cl_lock_state_signal(env, lock, state);
1039                 lock->cll_state = state;
1040         }
1041         EXIT;
1042 }
1043 EXPORT_SYMBOL(cl_lock_state_set);
1044
1045 static int cl_unuse_try_internal(const struct lu_env *env, struct cl_lock *lock)
1046 {
1047         const struct cl_lock_slice *slice;
1048         int result;
1049
1050         do {
1051                 result = 0;
1052
1053                 LINVRNT(cl_lock_is_mutexed(lock));
1054                 LINVRNT(cl_lock_invariant(env, lock));
1055                 LASSERT(lock->cll_state == CLS_INTRANSIT);
1056
1057                 result = -ENOSYS;
1058                 cfs_list_for_each_entry_reverse(slice, &lock->cll_layers,
1059                                                 cls_linkage) {
1060                         if (slice->cls_ops->clo_unuse != NULL) {
1061                                 result = slice->cls_ops->clo_unuse(env, slice);
1062                                 if (result != 0)
1063                                         break;
1064                         }
1065                 }
1066                 LASSERT(result != -ENOSYS);
1067         } while (result == CLO_REPEAT);
1068
1069         return result;
1070 }
1071
1072 /**
1073  * Yanks lock from the cache (cl_lock_state::CLS_CACHED state) by calling
1074  * cl_lock_operations::clo_use() top-to-bottom to notify layers.
1075  * @atomic = 1, it must unuse the lock to recovery the lock to keep the
1076  *  use process atomic
1077  */
1078 int cl_use_try(const struct lu_env *env, struct cl_lock *lock, int atomic)
1079 {
1080         const struct cl_lock_slice *slice;
1081         int result;
1082         enum cl_lock_state state;
1083
1084         ENTRY;
1085         cl_lock_trace(D_DLMTRACE, env, "use lock", lock);
1086
1087         LASSERT(lock->cll_state == CLS_CACHED);
1088         if (lock->cll_error)
1089                 RETURN(lock->cll_error);
1090
1091         result = -ENOSYS;
1092         state = cl_lock_intransit(env, lock);
1093         cfs_list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
1094                 if (slice->cls_ops->clo_use != NULL) {
1095                         result = slice->cls_ops->clo_use(env, slice);
1096                         if (result != 0)
1097                                 break;
1098                 }
1099         }
1100         LASSERT(result != -ENOSYS);
1101
1102         LASSERTF(lock->cll_state == CLS_INTRANSIT, "Wrong state %d.\n",
1103                  lock->cll_state);
1104
1105         if (result == 0) {
1106                 state = CLS_HELD;
1107         } else {
1108                 if (result == -ESTALE) {
1109                         /*
1110                          * ESTALE means sublock being cancelled
1111                          * at this time, and set lock state to
1112                          * be NEW here and ask the caller to repeat.
1113                          */
1114                         state = CLS_NEW;
1115                         result = CLO_REPEAT;
1116                 }
1117
1118                 /* @atomic means back-off-on-failure. */
1119                 if (atomic) {
1120                         int rc;
1121                         rc = cl_unuse_try_internal(env, lock);
1122                         /* Vet the results. */
1123                         if (rc < 0 && result > 0)
1124                                 result = rc;
1125                 }
1126
1127         }
1128         cl_lock_extransit(env, lock, state);
1129         RETURN(result);
1130 }
1131 EXPORT_SYMBOL(cl_use_try);
1132
1133 /**
1134  * Helper for cl_enqueue_try() that calls ->clo_enqueue() across all layers
1135  * top-to-bottom.
1136  */
1137 static int cl_enqueue_kick(const struct lu_env *env,
1138                            struct cl_lock *lock,
1139                            struct cl_io *io, __u32 flags)
1140 {
1141         int result;
1142         const struct cl_lock_slice *slice;
1143
1144         ENTRY;
1145         result = -ENOSYS;
1146         cfs_list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
1147                 if (slice->cls_ops->clo_enqueue != NULL) {
1148                         result = slice->cls_ops->clo_enqueue(env,
1149                                                              slice, io, flags);
1150                         if (result != 0)
1151                                 break;
1152                 }
1153         }
1154         LASSERT(result != -ENOSYS);
1155         RETURN(result);
1156 }
1157
1158 /**
1159  * Tries to enqueue a lock.
1160  *
1161  * This function is called repeatedly by cl_enqueue() until either lock is
1162  * enqueued, or error occurs. This function does not block waiting for
1163  * networking communication to complete.
1164  *
1165  * \post ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
1166  *                         lock->cll_state == CLS_HELD)
1167  *
1168  * \see cl_enqueue() cl_lock_operations::clo_enqueue()
1169  * \see cl_lock_state::CLS_ENQUEUED
1170  */
1171 int cl_enqueue_try(const struct lu_env *env, struct cl_lock *lock,
1172                    struct cl_io *io, __u32 flags)
1173 {
1174         int result;
1175
1176         ENTRY;
1177         cl_lock_trace(D_DLMTRACE, env, "enqueue lock", lock);
1178         do {
1179                 result = 0;
1180
1181                 LINVRNT(cl_lock_is_mutexed(lock));
1182
1183                 if (lock->cll_error != 0)
1184                         break;
1185                 switch (lock->cll_state) {
1186                 case CLS_NEW:
1187                         cl_lock_state_set(env, lock, CLS_QUEUING);
1188                         /* fall-through */
1189                 case CLS_QUEUING:
1190                         /* kick layers. */
1191                         result = cl_enqueue_kick(env, lock, io, flags);
1192                         if (result == 0)
1193                                 cl_lock_state_set(env, lock, CLS_ENQUEUED);
1194                         break;
1195                 case CLS_INTRANSIT:
1196                         LASSERT(cl_lock_is_intransit(lock));
1197                         result = CLO_WAIT;
1198                         break;
1199                 case CLS_CACHED:
1200                         /* yank lock from the cache. */
1201                         result = cl_use_try(env, lock, 0);
1202                         break;
1203                 case CLS_ENQUEUED:
1204                 case CLS_HELD:
1205                         result = 0;
1206                         break;
1207                 default:
1208                 case CLS_FREEING:
1209                         /*
1210                          * impossible, only held locks with increased
1211                          * ->cll_holds can be enqueued, and they cannot be
1212                          * freed.
1213                          */
1214                         LBUG();
1215                 }
1216         } while (result == CLO_REPEAT);
1217         if (result < 0)
1218                 cl_lock_error(env, lock, result);
1219         RETURN(result ?: lock->cll_error);
1220 }
1221 EXPORT_SYMBOL(cl_enqueue_try);
1222
1223 /**
1224  * Cancel the conflicting lock found during previous enqueue.
1225  *
1226  * \retval 0 conflicting lock has been canceled.
1227  * \retval -ve error code.
1228  */
1229 int cl_lock_enqueue_wait(const struct lu_env *env,
1230                          struct cl_lock *lock,
1231                          int keep_mutex)
1232 {
1233         struct cl_lock  *conflict;
1234         int              rc = 0;
1235         ENTRY;
1236
1237         LASSERT(cl_lock_is_mutexed(lock));
1238         LASSERT(lock->cll_state == CLS_QUEUING);
1239         LASSERT(lock->cll_conflict != NULL);
1240
1241         conflict = lock->cll_conflict;
1242         lock->cll_conflict = NULL;
1243
1244         cl_lock_mutex_put(env, lock);
1245         LASSERT(cl_lock_nr_mutexed(env) == 0);
1246
1247         cl_lock_mutex_get(env, conflict);
1248         cl_lock_cancel(env, conflict);
1249         cl_lock_delete(env, conflict);
1250
1251         while (conflict->cll_state != CLS_FREEING) {
1252                 rc = cl_lock_state_wait(env, conflict);
1253                 if (rc != 0)
1254                         break;
1255         }
1256         cl_lock_mutex_put(env, conflict);
1257         lu_ref_del(&conflict->cll_reference, "cancel-wait", lock);
1258         cl_lock_put(env, conflict);
1259
1260         if (keep_mutex)
1261                 cl_lock_mutex_get(env, lock);
1262
1263         LASSERT(rc <= 0);
1264         RETURN(rc);
1265 }
1266 EXPORT_SYMBOL(cl_lock_enqueue_wait);
1267
1268 static int cl_enqueue_locked(const struct lu_env *env, struct cl_lock *lock,
1269                              struct cl_io *io, __u32 enqflags)
1270 {
1271         int result;
1272
1273         ENTRY;
1274
1275         LINVRNT(cl_lock_is_mutexed(lock));
1276         LINVRNT(cl_lock_invariant(env, lock));
1277         LASSERT(lock->cll_holds > 0);
1278
1279         cl_lock_user_add(env, lock);
1280         do {
1281                 result = cl_enqueue_try(env, lock, io, enqflags);
1282                 if (result == CLO_WAIT) {
1283                         if (lock->cll_conflict != NULL)
1284                                 result = cl_lock_enqueue_wait(env, lock, 1);
1285                         else
1286                                 result = cl_lock_state_wait(env, lock);
1287                         if (result == 0)
1288                                 continue;
1289                 }
1290                 break;
1291         } while (1);
1292         if (result != 0) {
1293                 cl_lock_user_del(env, lock);
1294                 cl_lock_error(env, lock, result);
1295         }
1296         LASSERT(ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
1297                      lock->cll_state == CLS_HELD));
1298         RETURN(result);
1299 }
1300
1301 /**
1302  * Enqueues a lock.
1303  *
1304  * \pre current thread or io owns a hold on lock.
1305  *
1306  * \post ergo(result == 0, lock->users increased)
1307  * \post ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
1308  *                         lock->cll_state == CLS_HELD)
1309  */
1310 int cl_enqueue(const struct lu_env *env, struct cl_lock *lock,
1311                struct cl_io *io, __u32 enqflags)
1312 {
1313         int result;
1314
1315         ENTRY;
1316
1317         cl_lock_lockdep_acquire(env, lock, enqflags);
1318         cl_lock_mutex_get(env, lock);
1319         result = cl_enqueue_locked(env, lock, io, enqflags);
1320         cl_lock_mutex_put(env, lock);
1321         if (result != 0)
1322                 cl_lock_lockdep_release(env, lock);
1323         LASSERT(ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
1324                      lock->cll_state == CLS_HELD));
1325         RETURN(result);
1326 }
1327 EXPORT_SYMBOL(cl_enqueue);
1328
1329 /**
1330  * Tries to unlock a lock.
1331  *
1332  * This function is called repeatedly by cl_unuse() until either lock is
1333  * unlocked, or error occurs.
1334  * cl_unuse_try is a one-shot operation, so it must NOT return CLO_WAIT.
1335  *
1336  * \pre  lock->cll_state == CLS_HELD
1337  *
1338  * \post ergo(result == 0, lock->cll_state == CLS_CACHED)
1339  *
1340  * \see cl_unuse() cl_lock_operations::clo_unuse()
1341  * \see cl_lock_state::CLS_CACHED
1342  */
1343 int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock)
1344 {
1345         int                         result;
1346         enum cl_lock_state          state = CLS_NEW;
1347
1348         ENTRY;
1349         cl_lock_trace(D_DLMTRACE, env, "unuse lock", lock);
1350
1351         LASSERT(lock->cll_state == CLS_HELD || lock->cll_state == CLS_ENQUEUED);
1352         if (lock->cll_users > 1) {
1353                 cl_lock_user_del(env, lock);
1354                 RETURN(0);
1355         }
1356
1357         /*
1358          * New lock users (->cll_users) are not protecting unlocking
1359          * from proceeding. From this point, lock eventually reaches
1360          * CLS_CACHED, is reinitialized to CLS_NEW or fails into
1361          * CLS_FREEING.
1362          */
1363         state = cl_lock_intransit(env, lock);
1364
1365         result = cl_unuse_try_internal(env, lock);
1366         LASSERT(lock->cll_state == CLS_INTRANSIT);
1367         LASSERT(result != CLO_WAIT);
1368         cl_lock_user_del(env, lock);
1369         if (result == 0 || result == -ESTALE) {
1370                 /*
1371                  * Return lock back to the cache. This is the only
1372                  * place where lock is moved into CLS_CACHED state.
1373                  *
1374                  * If one of ->clo_unuse() methods returned -ESTALE, lock
1375                  * cannot be placed into cache and has to be
1376                  * re-initialized. This happens e.g., when a sub-lock was
1377                  * canceled while unlocking was in progress.
1378                  */
1379                 if (state == CLS_HELD && result == 0)
1380                         state = CLS_CACHED;
1381                 else
1382                         state = CLS_NEW;
1383                 cl_lock_extransit(env, lock, state);
1384
1385                 /*
1386                  * Hide -ESTALE error.
1387                  * If the lock is a glimpse lock, and it has multiple
1388                  * stripes. Assuming that one of its sublock returned -ENAVAIL,
1389                  * and other sublocks are matched write locks. In this case,
1390                  * we can't set this lock to error because otherwise some of
1391                  * its sublocks may not be canceled. This causes some dirty
1392                  * pages won't be written to OSTs. -jay
1393                  */
1394                 result = 0;
1395         } else {
1396                 CERROR("result = %d, this is unlikely!\n", result);
1397                 cl_lock_extransit(env, lock, state);
1398         }
1399
1400         result = result ?: lock->cll_error;
1401         if (result < 0)
1402                 cl_lock_error(env, lock, result);
1403         RETURN(result);
1404 }
1405 EXPORT_SYMBOL(cl_unuse_try);
1406
1407 static void cl_unuse_locked(const struct lu_env *env, struct cl_lock *lock)
1408 {
1409         int result;
1410         ENTRY;
1411
1412         result = cl_unuse_try(env, lock);
1413         if (result)
1414                 CL_LOCK_DEBUG(D_ERROR, env, lock, "unuse return %d\n", result);
1415
1416         EXIT;
1417 }
1418
1419 /**
1420  * Unlocks a lock.
1421  */
1422 void cl_unuse(const struct lu_env *env, struct cl_lock *lock)
1423 {
1424         ENTRY;
1425         cl_lock_mutex_get(env, lock);
1426         cl_unuse_locked(env, lock);
1427         cl_lock_mutex_put(env, lock);
1428         cl_lock_lockdep_release(env, lock);
1429         EXIT;
1430 }
1431 EXPORT_SYMBOL(cl_unuse);
1432
1433 /**
1434  * Tries to wait for a lock.
1435  *
1436  * This function is called repeatedly by cl_wait() until either lock is
1437  * granted, or error occurs. This function does not block waiting for network
1438  * communication to complete.
1439  *
1440  * \see cl_wait() cl_lock_operations::clo_wait()
1441  * \see cl_lock_state::CLS_HELD
1442  */
1443 int cl_wait_try(const struct lu_env *env, struct cl_lock *lock)
1444 {
1445         const struct cl_lock_slice *slice;
1446         int                         result;
1447
1448         ENTRY;
1449         cl_lock_trace(D_DLMTRACE, env, "wait lock try", lock);
1450         do {
1451                 LINVRNT(cl_lock_is_mutexed(lock));
1452                 LINVRNT(cl_lock_invariant(env, lock));
1453                 LASSERT(lock->cll_state == CLS_ENQUEUED ||
1454                         lock->cll_state == CLS_HELD ||
1455                         lock->cll_state == CLS_INTRANSIT);
1456                 LASSERT(lock->cll_users > 0);
1457                 LASSERT(lock->cll_holds > 0);
1458
1459                 result = 0;
1460                 if (lock->cll_error != 0)
1461                         break;
1462
1463                 if (cl_lock_is_intransit(lock)) {
1464                         result = CLO_WAIT;
1465                         break;
1466                 }
1467
1468                 if (lock->cll_state == CLS_HELD)
1469                         /* nothing to do */
1470                         break;
1471
1472                 result = -ENOSYS;
1473                 cfs_list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
1474                         if (slice->cls_ops->clo_wait != NULL) {
1475                                 result = slice->cls_ops->clo_wait(env, slice);
1476                                 if (result != 0)
1477                                         break;
1478                         }
1479                 }
1480                 LASSERT(result != -ENOSYS);
1481                 if (result == 0) {
1482                         LASSERT(lock->cll_state != CLS_INTRANSIT);
1483                         cl_lock_state_set(env, lock, CLS_HELD);
1484                 }
1485         } while (result == CLO_REPEAT);
1486         RETURN(result ?: lock->cll_error);
1487 }
1488 EXPORT_SYMBOL(cl_wait_try);
1489
1490 /**
1491  * Waits until enqueued lock is granted.
1492  *
1493  * \pre current thread or io owns a hold on the lock
1494  * \pre ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
1495  *                        lock->cll_state == CLS_HELD)
1496  *
1497  * \post ergo(result == 0, lock->cll_state == CLS_HELD)
1498  */
1499 int cl_wait(const struct lu_env *env, struct cl_lock *lock)
1500 {
1501         int result;
1502
1503         ENTRY;
1504         cl_lock_mutex_get(env, lock);
1505
1506         LINVRNT(cl_lock_invariant(env, lock));
1507         LASSERTF(lock->cll_state == CLS_ENQUEUED || lock->cll_state == CLS_HELD,
1508                  "Wrong state %d \n", lock->cll_state);
1509         LASSERT(lock->cll_holds > 0);
1510
1511         do {
1512                 result = cl_wait_try(env, lock);
1513                 if (result == CLO_WAIT) {
1514                         result = cl_lock_state_wait(env, lock);
1515                         if (result == 0)
1516                                 continue;
1517                 }
1518                 break;
1519         } while (1);
1520         if (result < 0) {
1521                 cl_lock_user_del(env, lock);
1522                 cl_lock_error(env, lock, result);
1523                 cl_lock_lockdep_release(env, lock);
1524         }
1525         cl_lock_trace(D_DLMTRACE, env, "wait lock", lock);
1526         cl_lock_mutex_put(env, lock);
1527         LASSERT(ergo(result == 0, lock->cll_state == CLS_HELD));
1528         RETURN(result);
1529 }
1530 EXPORT_SYMBOL(cl_wait);
1531
1532 /**
1533  * Executes cl_lock_operations::clo_weigh(), and sums results to estimate lock
1534  * value.
1535  */
1536 unsigned long cl_lock_weigh(const struct lu_env *env, struct cl_lock *lock)
1537 {
1538         const struct cl_lock_slice *slice;
1539         unsigned long pound;
1540         unsigned long ounce;
1541
1542         ENTRY;
1543         LINVRNT(cl_lock_is_mutexed(lock));
1544         LINVRNT(cl_lock_invariant(env, lock));
1545
1546         pound = 0;
1547         cfs_list_for_each_entry_reverse(slice, &lock->cll_layers, cls_linkage) {
1548                 if (slice->cls_ops->clo_weigh != NULL) {
1549                         ounce = slice->cls_ops->clo_weigh(env, slice);
1550                         pound += ounce;
1551                         if (pound < ounce) /* over-weight^Wflow */
1552                                 pound = ~0UL;
1553                 }
1554         }
1555         RETURN(pound);
1556 }
1557 EXPORT_SYMBOL(cl_lock_weigh);
1558
1559 /**
1560  * Notifies layers that lock description changed.
1561  *
1562  * The server can grant client a lock different from one that was requested
1563  * (e.g., larger in extent). This method is called when actually granted lock
1564  * description becomes known to let layers to accommodate for changed lock
1565  * description.
1566  *
1567  * \see cl_lock_operations::clo_modify()
1568  */
1569 int cl_lock_modify(const struct lu_env *env, struct cl_lock *lock,
1570                    const struct cl_lock_descr *desc)
1571 {
1572         const struct cl_lock_slice *slice;
1573         struct cl_object           *obj = lock->cll_descr.cld_obj;
1574         struct cl_object_header    *hdr = cl_object_header(obj);
1575         int result;
1576
1577         ENTRY;
1578         cl_lock_trace(D_DLMTRACE, env, "modify lock", lock);
1579         /* don't allow object to change */
1580         LASSERT(obj == desc->cld_obj);
1581         LINVRNT(cl_lock_is_mutexed(lock));
1582         LINVRNT(cl_lock_invariant(env, lock));
1583
1584         cfs_list_for_each_entry_reverse(slice, &lock->cll_layers, cls_linkage) {
1585                 if (slice->cls_ops->clo_modify != NULL) {
1586                         result = slice->cls_ops->clo_modify(env, slice, desc);
1587                         if (result != 0)
1588                                 RETURN(result);
1589                 }
1590         }
1591         CL_LOCK_DEBUG(D_DLMTRACE, env, lock, " -> "DDESCR"@"DFID"\n",
1592                       PDESCR(desc), PFID(lu_object_fid(&desc->cld_obj->co_lu)));
1593         /*
1594          * Just replace description in place. Nothing more is needed for
1595          * now. If locks were indexed according to their extent and/or mode,
1596          * that index would have to be updated here.
1597          */
1598         cfs_spin_lock(&hdr->coh_lock_guard);
1599         lock->cll_descr = *desc;
1600         cfs_spin_unlock(&hdr->coh_lock_guard);
1601         RETURN(0);
1602 }
1603 EXPORT_SYMBOL(cl_lock_modify);
1604
1605 /**
1606  * Initializes lock closure with a given origin.
1607  *
1608  * \see cl_lock_closure
1609  */
1610 void cl_lock_closure_init(const struct lu_env *env,
1611                           struct cl_lock_closure *closure,
1612                           struct cl_lock *origin, int wait)
1613 {
1614         LINVRNT(cl_lock_is_mutexed(origin));
1615         LINVRNT(cl_lock_invariant(env, origin));
1616
1617         CFS_INIT_LIST_HEAD(&closure->clc_list);
1618         closure->clc_origin = origin;
1619         closure->clc_wait   = wait;
1620         closure->clc_nr     = 0;
1621 }
1622 EXPORT_SYMBOL(cl_lock_closure_init);
1623
1624 /**
1625  * Builds a closure of \a lock.
1626  *
1627  * Building of a closure consists of adding initial lock (\a lock) into it,
1628  * and calling cl_lock_operations::clo_closure() methods of \a lock. These
1629  * methods might call cl_lock_closure_build() recursively again, adding more
1630  * locks to the closure, etc.
1631  *
1632  * \see cl_lock_closure
1633  */
1634 int cl_lock_closure_build(const struct lu_env *env, struct cl_lock *lock,
1635                           struct cl_lock_closure *closure)
1636 {
1637         const struct cl_lock_slice *slice;
1638         int result;
1639
1640         ENTRY;
1641         LINVRNT(cl_lock_is_mutexed(closure->clc_origin));
1642         LINVRNT(cl_lock_invariant(env, closure->clc_origin));
1643
1644         result = cl_lock_enclosure(env, lock, closure);
1645         if (result == 0) {
1646                 cfs_list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
1647                         if (slice->cls_ops->clo_closure != NULL) {
1648                                 result = slice->cls_ops->clo_closure(env, slice,
1649                                                                      closure);
1650                                 if (result != 0)
1651                                         break;
1652                         }
1653                 }
1654         }
1655         if (result != 0)
1656                 cl_lock_disclosure(env, closure);
1657         RETURN(result);
1658 }
1659 EXPORT_SYMBOL(cl_lock_closure_build);
1660
1661 /**
1662  * Adds new lock to a closure.
1663  *
1664  * Try-locks \a lock and if succeeded, adds it to the closure (never more than
1665  * once). If try-lock failed, returns CLO_REPEAT, after optionally waiting
1666  * until next try-lock is likely to succeed.
1667  */
1668 int cl_lock_enclosure(const struct lu_env *env, struct cl_lock *lock,
1669                       struct cl_lock_closure *closure)
1670 {
1671         int result = 0;
1672         ENTRY;
1673         cl_lock_trace(D_DLMTRACE, env, "enclosure lock", lock);
1674         if (!cl_lock_mutex_try(env, lock)) {
1675                 /*
1676                  * If lock->cll_inclosure is not empty, lock is already in
1677                  * this closure.
1678                  */
1679                 if (cfs_list_empty(&lock->cll_inclosure)) {
1680                         cl_lock_get_trust(lock);
1681                         lu_ref_add(&lock->cll_reference, "closure", closure);
1682                         cfs_list_add(&lock->cll_inclosure, &closure->clc_list);
1683                         closure->clc_nr++;
1684                 } else
1685                         cl_lock_mutex_put(env, lock);
1686                 result = 0;
1687         } else {
1688                 cl_lock_disclosure(env, closure);
1689                 if (closure->clc_wait) {
1690                         cl_lock_get_trust(lock);
1691                         lu_ref_add(&lock->cll_reference, "closure-w", closure);
1692                         cl_lock_mutex_put(env, closure->clc_origin);
1693
1694                         LASSERT(cl_lock_nr_mutexed(env) == 0);
1695                         cl_lock_mutex_get(env, lock);
1696                         cl_lock_mutex_put(env, lock);
1697
1698                         cl_lock_mutex_get(env, closure->clc_origin);
1699                         lu_ref_del(&lock->cll_reference, "closure-w", closure);
1700                         cl_lock_put(env, lock);
1701                 }
1702                 result = CLO_REPEAT;
1703         }
1704         RETURN(result);
1705 }
1706 EXPORT_SYMBOL(cl_lock_enclosure);
1707
1708 /** Releases mutices of enclosed locks. */
1709 void cl_lock_disclosure(const struct lu_env *env,
1710                         struct cl_lock_closure *closure)
1711 {
1712         struct cl_lock *scan;
1713         struct cl_lock *temp;
1714
1715         cl_lock_trace(D_DLMTRACE, env, "disclosure lock", closure->clc_origin);
1716         cfs_list_for_each_entry_safe(scan, temp, &closure->clc_list,
1717                                      cll_inclosure){
1718                 cfs_list_del_init(&scan->cll_inclosure);
1719                 cl_lock_mutex_put(env, scan);
1720                 lu_ref_del(&scan->cll_reference, "closure", closure);
1721                 cl_lock_put(env, scan);
1722                 closure->clc_nr--;
1723         }
1724         LASSERT(closure->clc_nr == 0);
1725 }
1726 EXPORT_SYMBOL(cl_lock_disclosure);
1727
1728 /** Finalizes a closure. */
1729 void cl_lock_closure_fini(struct cl_lock_closure *closure)
1730 {
1731         LASSERT(closure->clc_nr == 0);
1732         LASSERT(cfs_list_empty(&closure->clc_list));
1733 }
1734 EXPORT_SYMBOL(cl_lock_closure_fini);
1735
1736 /**
1737  * Destroys this lock. Notifies layers (bottom-to-top) that lock is being
1738  * destroyed, then destroy the lock. If there are holds on the lock, postpone
1739  * destruction until all holds are released. This is called when a decision is
1740  * made to destroy the lock in the future. E.g., when a blocking AST is
1741  * received on it, or fatal communication error happens.
1742  *
1743  * Caller must have a reference on this lock to prevent a situation, when
1744  * deleted lock lingers in memory for indefinite time, because nobody calls
1745  * cl_lock_put() to finish it.
1746  *
1747  * \pre atomic_read(&lock->cll_ref) > 0
1748  * \pre ergo(cl_lock_nesting(lock) == CNL_TOP,
1749  *           cl_lock_nr_mutexed(env) == 1)
1750  *      [i.e., if a top-lock is deleted, mutices of no other locks can be
1751  *      held, as deletion of sub-locks might require releasing a top-lock
1752  *      mutex]
1753  *
1754  * \see cl_lock_operations::clo_delete()
1755  * \see cl_lock::cll_holds
1756  */
1757 void cl_lock_delete(const struct lu_env *env, struct cl_lock *lock)
1758 {
1759         LINVRNT(cl_lock_is_mutexed(lock));
1760         LINVRNT(cl_lock_invariant(env, lock));
1761         LASSERT(ergo(cl_lock_nesting(lock) == CNL_TOP,
1762                      cl_lock_nr_mutexed(env) == 1));
1763
1764         ENTRY;
1765         cl_lock_trace(D_DLMTRACE, env, "delete lock", lock);
1766         if (lock->cll_holds == 0)
1767                 cl_lock_delete0(env, lock);
1768         else
1769                 lock->cll_flags |= CLF_DOOMED;
1770         EXIT;
1771 }
1772 EXPORT_SYMBOL(cl_lock_delete);
1773
1774 /**
1775  * Mark lock as irrecoverably failed, and mark it for destruction. This
1776  * happens when, e.g., server fails to grant a lock to us, or networking
1777  * time-out happens.
1778  *
1779  * \pre atomic_read(&lock->cll_ref) > 0
1780  *
1781  * \see clo_lock_delete()
1782  * \see cl_lock::cll_holds
1783  */
1784 void cl_lock_error(const struct lu_env *env, struct cl_lock *lock, int error)
1785 {
1786         LINVRNT(cl_lock_is_mutexed(lock));
1787         LINVRNT(cl_lock_invariant(env, lock));
1788
1789         ENTRY;
1790         cl_lock_trace(D_DLMTRACE, env, "set lock error", lock);
1791         if (lock->cll_error == 0 && error != 0) {
1792                 lock->cll_error = error;
1793                 cl_lock_signal(env, lock);
1794                 cl_lock_cancel(env, lock);
1795                 cl_lock_delete(env, lock);
1796         }
1797         EXIT;
1798 }
1799 EXPORT_SYMBOL(cl_lock_error);
1800
1801 /**
1802  * Cancels this lock. Notifies layers
1803  * (bottom-to-top) that lock is being cancelled, then destroy the lock. If
1804  * there are holds on the lock, postpone cancellation until
1805  * all holds are released.
1806  *
1807  * Cancellation notification is delivered to layers at most once.
1808  *
1809  * \see cl_lock_operations::clo_cancel()
1810  * \see cl_lock::cll_holds
1811  */
1812 void cl_lock_cancel(const struct lu_env *env, struct cl_lock *lock)
1813 {
1814         LINVRNT(cl_lock_is_mutexed(lock));
1815         LINVRNT(cl_lock_invariant(env, lock));
1816
1817         ENTRY;
1818         cl_lock_trace(D_DLMTRACE, env, "cancel lock", lock);
1819         if (lock->cll_holds == 0)
1820                 cl_lock_cancel0(env, lock);
1821         else
1822                 lock->cll_flags |= CLF_CANCELPEND;
1823         EXIT;
1824 }
1825 EXPORT_SYMBOL(cl_lock_cancel);
1826
1827 /**
1828  * Finds an existing lock covering given page and optionally different from a
1829  * given \a except lock.
1830  */
1831 struct cl_lock *cl_lock_at_page(const struct lu_env *env, struct cl_object *obj,
1832                                 struct cl_page *page, struct cl_lock *except,
1833                                 int pending, int canceld)
1834 {
1835         struct cl_object_header *head;
1836         struct cl_lock          *scan;
1837         struct cl_lock          *lock;
1838         struct cl_lock_descr    *need;
1839
1840         ENTRY;
1841
1842         head = cl_object_header(obj);
1843         need = &cl_env_info(env)->clt_descr;
1844         lock = NULL;
1845
1846         need->cld_mode = CLM_READ; /* CLM_READ matches both READ & WRITE, but
1847                                     * not PHANTOM */
1848         need->cld_start = need->cld_end = page->cp_index;
1849         need->cld_enq_flags = 0;
1850
1851         cfs_spin_lock(&head->coh_lock_guard);
1852         /* It is fine to match any group lock since there could be only one
1853          * with a uniq gid and it conflicts with all other lock modes too */
1854         cfs_list_for_each_entry(scan, &head->coh_locks, cll_linkage) {
1855                 if (scan != except &&
1856                     (scan->cll_descr.cld_mode == CLM_GROUP ||
1857                     cl_lock_ext_match(&scan->cll_descr, need)) &&
1858                     scan->cll_state >= CLS_HELD &&
1859                     scan->cll_state < CLS_FREEING &&
1860                     /*
1861                      * This check is racy as the lock can be canceled right
1862                      * after it is done, but this is fine, because page exists
1863                      * already.
1864                      */
1865                     (canceld || !(scan->cll_flags & CLF_CANCELLED)) &&
1866                     (pending || !(scan->cll_flags & CLF_CANCELPEND))) {
1867                         /* Don't increase cs_hit here since this
1868                          * is just a helper function. */
1869                         cl_lock_get_trust(scan);
1870                         lock = scan;
1871                         break;
1872                 }
1873         }
1874         cfs_spin_unlock(&head->coh_lock_guard);
1875         RETURN(lock);
1876 }
1877 EXPORT_SYMBOL(cl_lock_at_page);
1878
1879 /**
1880  * Returns a list of pages protected (only) by a given lock.
1881  *
1882  * Scans an extent of page radix tree, corresponding to the \a lock and queues
1883  * all pages that are not protected by locks other than \a lock into \a queue.
1884  */
1885 void cl_lock_page_list_fixup(const struct lu_env *env,
1886                              struct cl_io *io, struct cl_lock *lock,
1887                              struct cl_page_list *queue)
1888 {
1889         struct cl_page        *page;
1890         struct cl_page        *temp;
1891         struct cl_page_list   *plist = &cl_env_info(env)->clt_list;
1892
1893         LINVRNT(cl_lock_invariant(env, lock));
1894         ENTRY;
1895
1896         /* Now, we have a list of cl_pages under the \a lock, we need
1897          * to check if some of pages are covered by other ldlm lock.
1898          * If this is the case, they aren't needed to be written out this time.
1899          *
1900          * For example, we have A:[0,200] & B:[100,300] PW locks on client, now
1901          * the latter is to be canceled, this means other client is
1902          * reading/writing [200,300] since A won't canceled. Actually
1903          * we just need to write the pages covered by [200,300]. This is safe,
1904          * since [100,200] is also protected lock A.
1905          */
1906
1907         cl_page_list_init(plist);
1908         cl_page_list_for_each_safe(page, temp, queue) {
1909                 pgoff_t                idx = page->cp_index;
1910                 struct cl_lock        *found;
1911                 struct cl_lock_descr  *descr;
1912
1913                 /* The algorithm counts on the index-ascending page index. */
1914                 LASSERT(ergo(&temp->cp_batch != &queue->pl_pages,
1915                         page->cp_index < temp->cp_index));
1916
1917                 found = cl_lock_at_page(env, lock->cll_descr.cld_obj,
1918                                         page, lock, 0, 0);
1919                 if (found == NULL)
1920                         continue;
1921
1922                 descr = &found->cll_descr;
1923                 cfs_list_for_each_entry_safe_from(page, temp, &queue->pl_pages,
1924                                                   cp_batch) {
1925                         idx = page->cp_index;
1926                         if (descr->cld_start > idx || descr->cld_end < idx)
1927                                 break;
1928                         cl_page_list_move(plist, queue, page);
1929                 }
1930                 cl_lock_put(env, found);
1931         }
1932
1933         /* The pages in plist are covered by other locks, don't handle them
1934          * this time.
1935          */
1936         if (io != NULL)
1937                 cl_page_list_disown(env, io, plist);
1938         cl_page_list_fini(env, plist);
1939         EXIT;
1940 }
1941 EXPORT_SYMBOL(cl_lock_page_list_fixup);
1942
1943 /**
1944  * Invalidate pages protected by the given lock, sending them out to the
1945  * server first, if necessary.
1946  *
1947  * This function does the following:
1948  *
1949  *     - collects a list of pages to be invalidated,
1950  *
1951  *     - unmaps them from the user virtual memory,
1952  *
1953  *     - sends dirty pages to the server,
1954  *
1955  *     - waits for transfer completion,
1956  *
1957  *     - discards pages, and throws them out of memory.
1958  *
1959  * If \a discard is set, pages are discarded without sending them to the
1960  * server.
1961  *
1962  * If error happens on any step, the process continues anyway (the reasoning
1963  * behind this being that lock cancellation cannot be delayed indefinitely).
1964  */
1965 int cl_lock_page_out(const struct lu_env *env, struct cl_lock *lock,
1966                      int discard)
1967 {
1968         struct cl_thread_info *info  = cl_env_info(env);
1969         struct cl_io          *io    = &info->clt_io;
1970         struct cl_2queue      *queue = &info->clt_queue;
1971         struct cl_lock_descr  *descr = &lock->cll_descr;
1972         long page_count;
1973         int nonblock = 1, resched;
1974         int result;
1975
1976         LINVRNT(cl_lock_invariant(env, lock));
1977         ENTRY;
1978
1979         io->ci_obj = cl_object_top(descr->cld_obj);
1980         result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
1981         if (result != 0)
1982                 GOTO(out, result);
1983
1984         do {
1985                 cl_2queue_init(queue);
1986                 cl_page_gang_lookup(env, descr->cld_obj, io, descr->cld_start,
1987                                     descr->cld_end, &queue->c2_qin, nonblock,
1988                                     &resched);
1989                 page_count = queue->c2_qin.pl_nr;
1990                 if (page_count > 0) {
1991                         result = cl_page_list_unmap(env, io, &queue->c2_qin);
1992                         if (!discard) {
1993                                 long timeout = 600; /* 10 minutes. */
1994                                 /* for debug purpose, if this request can't be
1995                                  * finished in 10 minutes, we hope it can
1996                                  * notify us.
1997                                  */
1998                                 result = cl_io_submit_sync(env, io, CRT_WRITE,
1999                                                            queue, CRP_CANCEL,
2000                                                            timeout);
2001                                 if (result)
2002                                         CWARN("Writing %lu pages error: %d\n",
2003                                               page_count, result);
2004                         }
2005                         cl_lock_page_list_fixup(env, io, lock, &queue->c2_qout);
2006                         cl_2queue_discard(env, io, queue);
2007                         cl_2queue_disown(env, io, queue);
2008                 }
2009                 cl_2queue_fini(env, queue);
2010
2011                 if (resched)
2012                         cfs_cond_resched();
2013         } while (resched || nonblock--);
2014 out:
2015         cl_io_fini(env, io);
2016         RETURN(result);
2017 }
2018 EXPORT_SYMBOL(cl_lock_page_out);
2019
2020 /**
2021  * Eliminate all locks for a given object.
2022  *
2023  * Caller has to guarantee that no lock is in active use.
2024  *
2025  * \param cancel when this is set, cl_locks_prune() cancels locks before
2026  *               destroying.
2027  */
2028 void cl_locks_prune(const struct lu_env *env, struct cl_object *obj, int cancel)
2029 {
2030         struct cl_object_header *head;
2031         struct cl_lock          *lock;
2032
2033         ENTRY;
2034         head = cl_object_header(obj);
2035         /*
2036          * If locks are destroyed without cancellation, all pages must be
2037          * already destroyed (as otherwise they will be left unprotected).
2038          */
2039         LASSERT(ergo(!cancel,
2040                      head->coh_tree.rnode == NULL && head->coh_pages == 0));
2041
2042         cfs_spin_lock(&head->coh_lock_guard);
2043         while (!cfs_list_empty(&head->coh_locks)) {
2044                 lock = container_of(head->coh_locks.next,
2045                                     struct cl_lock, cll_linkage);
2046                 cl_lock_get_trust(lock);
2047                 cfs_spin_unlock(&head->coh_lock_guard);
2048                 lu_ref_add(&lock->cll_reference, "prune", cfs_current());
2049                 cl_lock_mutex_get(env, lock);
2050                 if (lock->cll_state < CLS_FREEING) {
2051                         LASSERT(lock->cll_holds == 0);
2052                         LASSERT(lock->cll_users == 0);
2053                         if (cancel)
2054                                 cl_lock_cancel(env, lock);
2055                         cl_lock_delete(env, lock);
2056                 }
2057                 cl_lock_mutex_put(env, lock);
2058                 lu_ref_del(&lock->cll_reference, "prune", cfs_current());
2059                 cl_lock_put(env, lock);
2060                 cfs_spin_lock(&head->coh_lock_guard);
2061         }
2062         cfs_spin_unlock(&head->coh_lock_guard);
2063         EXIT;
2064 }
2065 EXPORT_SYMBOL(cl_locks_prune);
2066
2067 static struct cl_lock *cl_lock_hold_mutex(const struct lu_env *env,
2068                                           const struct cl_io *io,
2069                                           const struct cl_lock_descr *need,
2070                                           const char *scope, const void *source)
2071 {
2072         struct cl_lock *lock;
2073
2074         ENTRY;
2075
2076         while (1) {
2077                 lock = cl_lock_find(env, io, need);
2078                 if (IS_ERR(lock))
2079                         break;
2080                 cl_lock_mutex_get(env, lock);
2081                 if (lock->cll_state < CLS_FREEING &&
2082                     !(lock->cll_flags & CLF_CANCELLED)) {
2083                         cl_lock_hold_mod(env, lock, +1);
2084                         lu_ref_add(&lock->cll_holders, scope, source);
2085                         lu_ref_add(&lock->cll_reference, scope, source);
2086                         break;
2087                 }
2088                 cl_lock_mutex_put(env, lock);
2089                 cl_lock_put(env, lock);
2090         }
2091         RETURN(lock);
2092 }
2093
2094 /**
2095  * Returns a lock matching \a need description with a reference and a hold on
2096  * it.
2097  *
2098  * This is much like cl_lock_find(), except that cl_lock_hold() additionally
2099  * guarantees that lock is not in the CLS_FREEING state on return.
2100  */
2101 struct cl_lock *cl_lock_hold(const struct lu_env *env, const struct cl_io *io,
2102                              const struct cl_lock_descr *need,
2103                              const char *scope, const void *source)
2104 {
2105         struct cl_lock *lock;
2106
2107         ENTRY;
2108
2109         lock = cl_lock_hold_mutex(env, io, need, scope, source);
2110         if (!IS_ERR(lock))
2111                 cl_lock_mutex_put(env, lock);
2112         RETURN(lock);
2113 }
2114 EXPORT_SYMBOL(cl_lock_hold);
2115
2116 /**
2117  * Main high-level entry point of cl_lock interface that finds existing or
2118  * enqueues new lock matching given description.
2119  */
2120 struct cl_lock *cl_lock_request(const struct lu_env *env, struct cl_io *io,
2121                                 const struct cl_lock_descr *need,
2122                                 const char *scope, const void *source)
2123 {
2124         struct cl_lock       *lock;
2125         int                   rc;
2126         __u32                 enqflags = need->cld_enq_flags;
2127
2128         ENTRY;
2129         do {
2130                 lock = cl_lock_hold_mutex(env, io, need, scope, source);
2131                 if (!IS_ERR(lock)) {
2132                         rc = cl_enqueue_locked(env, lock, io, enqflags);
2133                         if (rc == 0) {
2134                                 if (cl_lock_fits_into(env, lock, need, io)) {
2135                                         cl_lock_mutex_put(env, lock);
2136                                         cl_lock_lockdep_acquire(env,
2137                                                                 lock, enqflags);
2138                                         break;
2139                                 }
2140                                 cl_unuse_locked(env, lock);
2141                         }
2142                         cl_lock_trace(D_DLMTRACE, env, "enqueue failed", lock);
2143                         cl_lock_hold_release(env, lock, scope, source);
2144                         cl_lock_mutex_put(env, lock);
2145                         lu_ref_del(&lock->cll_reference, scope, source);
2146                         cl_lock_put(env, lock);
2147                         lock = ERR_PTR(rc);
2148                 } else
2149                         rc = PTR_ERR(lock);
2150         } while (rc == 0);
2151         RETURN(lock);
2152 }
2153 EXPORT_SYMBOL(cl_lock_request);
2154
2155 /**
2156  * Adds a hold to a known lock.
2157  */
2158 void cl_lock_hold_add(const struct lu_env *env, struct cl_lock *lock,
2159                       const char *scope, const void *source)
2160 {
2161         LINVRNT(cl_lock_is_mutexed(lock));
2162         LINVRNT(cl_lock_invariant(env, lock));
2163         LASSERT(lock->cll_state != CLS_FREEING);
2164
2165         ENTRY;
2166         cl_lock_hold_mod(env, lock, +1);
2167         cl_lock_get(lock);
2168         lu_ref_add(&lock->cll_holders, scope, source);
2169         lu_ref_add(&lock->cll_reference, scope, source);
2170         EXIT;
2171 }
2172 EXPORT_SYMBOL(cl_lock_hold_add);
2173
2174 /**
2175  * Releases a hold and a reference on a lock, on which caller acquired a
2176  * mutex.
2177  */
2178 void cl_lock_unhold(const struct lu_env *env, struct cl_lock *lock,
2179                     const char *scope, const void *source)
2180 {
2181         LINVRNT(cl_lock_invariant(env, lock));
2182         ENTRY;
2183         cl_lock_hold_release(env, lock, scope, source);
2184         lu_ref_del(&lock->cll_reference, scope, source);
2185         cl_lock_put(env, lock);
2186         EXIT;
2187 }
2188 EXPORT_SYMBOL(cl_lock_unhold);
2189
2190 /**
2191  * Releases a hold and a reference on a lock, obtained by cl_lock_hold().
2192  */
2193 void cl_lock_release(const struct lu_env *env, struct cl_lock *lock,
2194                      const char *scope, const void *source)
2195 {
2196         LINVRNT(cl_lock_invariant(env, lock));
2197         ENTRY;
2198         cl_lock_trace(D_DLMTRACE, env, "release lock", lock);
2199         cl_lock_mutex_get(env, lock);
2200         cl_lock_hold_release(env, lock, scope, source);
2201         cl_lock_mutex_put(env, lock);
2202         lu_ref_del(&lock->cll_reference, scope, source);
2203         cl_lock_put(env, lock);
2204         EXIT;
2205 }
2206 EXPORT_SYMBOL(cl_lock_release);
2207
2208 void cl_lock_user_add(const struct lu_env *env, struct cl_lock *lock)
2209 {
2210         LINVRNT(cl_lock_is_mutexed(lock));
2211         LINVRNT(cl_lock_invariant(env, lock));
2212
2213         ENTRY;
2214         cl_lock_used_mod(env, lock, +1);
2215         EXIT;
2216 }
2217 EXPORT_SYMBOL(cl_lock_user_add);
2218
2219 int cl_lock_user_del(const struct lu_env *env, struct cl_lock *lock)
2220 {
2221         LINVRNT(cl_lock_is_mutexed(lock));
2222         LINVRNT(cl_lock_invariant(env, lock));
2223         LASSERT(lock->cll_users > 0);
2224
2225         ENTRY;
2226         cl_lock_used_mod(env, lock, -1);
2227         RETURN(lock->cll_users == 0);
2228 }
2229 EXPORT_SYMBOL(cl_lock_user_del);
2230
2231 const char *cl_lock_mode_name(const enum cl_lock_mode mode)
2232 {
2233         static const char *names[] = {
2234                 [CLM_PHANTOM] = "P",
2235                 [CLM_READ]    = "R",
2236                 [CLM_WRITE]   = "W",
2237                 [CLM_GROUP]   = "G"
2238         };
2239         if (0 <= mode && mode < ARRAY_SIZE(names))
2240                 return names[mode];
2241         else
2242                 return "U";
2243 }
2244 EXPORT_SYMBOL(cl_lock_mode_name);
2245
2246 /**
2247  * Prints human readable representation of a lock description.
2248  */
2249 void cl_lock_descr_print(const struct lu_env *env, void *cookie,
2250                        lu_printer_t printer,
2251                        const struct cl_lock_descr *descr)
2252 {
2253         const struct lu_fid  *fid;
2254
2255         fid = lu_object_fid(&descr->cld_obj->co_lu);
2256         (*printer)(env, cookie, DDESCR"@"DFID, PDESCR(descr), PFID(fid));
2257 }
2258 EXPORT_SYMBOL(cl_lock_descr_print);
2259
2260 /**
2261  * Prints human readable representation of \a lock to the \a f.
2262  */
2263 void cl_lock_print(const struct lu_env *env, void *cookie,
2264                    lu_printer_t printer, const struct cl_lock *lock)
2265 {
2266         const struct cl_lock_slice *slice;
2267         (*printer)(env, cookie, "lock@%p[%d %d %d %d %d %08lx] ",
2268                    lock, cfs_atomic_read(&lock->cll_ref),
2269                    lock->cll_state, lock->cll_error, lock->cll_holds,
2270                    lock->cll_users, lock->cll_flags);
2271         cl_lock_descr_print(env, cookie, printer, &lock->cll_descr);
2272         (*printer)(env, cookie, " {\n");
2273
2274         cfs_list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
2275                 (*printer)(env, cookie, "    %s@%p: ",
2276                            slice->cls_obj->co_lu.lo_dev->ld_type->ldt_name,
2277                            slice);
2278                 if (slice->cls_ops->clo_print != NULL)
2279                         slice->cls_ops->clo_print(env, cookie, printer, slice);
2280                 (*printer)(env, cookie, "\n");
2281         }
2282         (*printer)(env, cookie, "} lock@%p\n", lock);
2283 }
2284 EXPORT_SYMBOL(cl_lock_print);
2285
2286 int cl_lock_init(void)
2287 {
2288         return lu_kmem_init(cl_lock_caches);
2289 }
2290
2291 void cl_lock_fini(void)
2292 {
2293         lu_kmem_fini(cl_lock_caches);
2294 }