Whamcloud - gitweb
LU-19098 hsm: don't print progname twice with lhsmtool
[fs/lustre-release.git] / lustre / ldlm / ldlm_extent.c
1 // SPDX-License-Identifier: GPL-2.0
2
3 /*
4  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
5  * Use is subject to license terms.
6  *
7  * Copyright (c) 2010, 2017, Intel Corporation.
8  */
9
10 /*
11  * This file is part of Lustre, http://www.lustre.org/
12  *
13  * Author: Peter Braam <braam@clusterfs.com>
14  * Author: Phil Schwan <phil@clusterfs.com>
15  */
16
17 /*
18  * This file contains implementation of EXTENT lock type
19  *
20  * EXTENT lock type is for locking a contiguous range of values, represented
21  * by 64-bit starting and ending offsets (inclusive). There are several extent
22  * lock modes, some of which may be mutually incompatible. Extent locks are
23  * considered incompatible if their modes are incompatible and their extents
24  * intersect.  See the lock mode compatibility matrix in lustre_dlm.h.
25  */
26
27 #define DEBUG_SUBSYSTEM S_LDLM
28
29 #include <libcfs/libcfs.h>
30 #include <lustre_dlm.h>
31 #include <obd_support.h>
32 #include <obd.h>
33 #include <obd_class.h>
34 #include <lustre_lib.h>
35
36 #include "ldlm_internal.h"
37 #include <linux/interval_tree_generic.h>
38
39 #define START(node) ((node)->l_policy_data.l_extent.start)
40 #define LAST(node) ((node)->l_policy_data.l_extent.end)
41 INTERVAL_TREE_DEFINE(struct ldlm_lock, l_rb, u64, l_subtree_last,
42                      START, LAST, static inline, extent);
43
44 static inline struct ldlm_lock *extent_next_lock(struct ldlm_lock *lock)
45 {
46         lock = list_next_entry(lock, l_same_extent);
47         if (RB_EMPTY_NODE(&lock->l_rb))
48                 return lock;
49         return extent_next(lock);
50 }
51
52 static inline struct ldlm_lock *extent_prev_lock(struct ldlm_lock *lock)
53 {
54         lock = list_prev_entry(lock, l_same_extent);
55         if (RB_EMPTY_NODE(&lock->l_rb))
56                 return lock;
57         return extent_prev(lock);
58 }
59
60 static inline struct ldlm_lock *extent_insert_unique(
61         struct ldlm_lock *node, struct interval_tree_root *root)
62 {
63         struct rb_node **link, *rb_parent = NULL;
64         u64 start = START(node), last = LAST(node);
65         struct ldlm_lock *parent;
66         bool leftmost = true;
67
68 #ifdef HAVE_INTERVAL_TREE_CACHED
69         link = &root->rb_root.rb_node;
70 #else
71         link = &root->rb_node;
72 #endif
73         while (*link) {
74                 rb_parent = *link;
75                 parent = rb_entry(rb_parent, struct ldlm_lock, l_rb);
76                 if (parent->l_subtree_last < last)
77                         parent->l_subtree_last = last;
78                 if (start == START(parent)) {
79                         if (last == LAST(parent))
80                                 return parent;
81                         if (last < LAST(parent)) {
82                                 link = &parent->l_rb.rb_left;
83                         } else {
84                                 link = &parent->l_rb.rb_right;
85                                 leftmost = false;
86                         }
87                 } else if (start < START(parent))
88                         link = &parent->l_rb.rb_left;
89                 else {
90                         link = &parent->l_rb.rb_right;
91                         leftmost = false;
92                 }
93         }
94
95         node->l_subtree_last = last;
96         rb_link_node(&node->l_rb, rb_parent, link);
97 #ifdef HAVE_INTERVAL_TREE_CACHED
98         rb_insert_augmented_cached(&node->l_rb, root,
99                                    leftmost, &extent_augment);
100 #else
101         rb_insert_augmented(&node->l_rb, root,
102                             &extent_augment);
103 #endif
104         return NULL;
105 }
106
107 static inline void extent_replace(struct ldlm_lock *in_tree,
108                                   struct ldlm_lock *new,
109                                   struct interval_tree_root *tree)
110 {
111         struct rb_node *p = rb_parent(&in_tree->l_rb);
112
113         /* place 'new' in the rbtree replacing in_tree */
114         new->l_rb.rb_left = in_tree->l_rb.rb_left;
115         new->l_rb.rb_right = in_tree->l_rb.rb_right;
116
117         if (new->l_rb.rb_left)
118                 rb_set_parent_color(new->l_rb.rb_left, &new->l_rb,
119                                     rb_color(new->l_rb.rb_left));
120         if (new->l_rb.rb_right)
121                 rb_set_parent_color(new->l_rb.rb_right, &new->l_rb,
122                                     rb_color(new->l_rb.rb_right));
123         rb_set_parent_color(&new->l_rb, p, rb_color(&in_tree->l_rb));
124
125         if (!p) {
126 #ifdef HAVE_INTERVAL_TREE_CACHED
127                 tree->rb_root.rb_node = &new->l_rb;
128 #else
129                 tree->rb_node = &new->l_rb;
130 #endif
131         } else if (p->rb_left == &in_tree->l_rb) {
132                 p->rb_left = &new->l_rb;
133         } else {
134                 p->rb_right = &new->l_rb;
135         }
136 #ifdef HAVE_INTERVAL_TREE_CACHED
137         if (tree->rb_leftmost == &in_tree->l_rb)
138                 tree->rb_leftmost = &new->l_rb;
139 #endif
140 }
141
142 #ifdef HAVE_SERVER_SUPPORT
143 # define LDLM_MAX_GROWN_EXTENT (32 * 1024 * 1024 - 1)
144
145 /**
146  * Fix up the ldlm_extent after expanding it.
147  *
148  * After expansion has been done, we might still want to do certain adjusting
149  * based on overall contention of the resource and the like to avoid granting
150  * overly wide locks.
151  */
152 static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
153                                               struct ldlm_extent *new_ex,
154                                               int conflicting)
155 {
156         enum ldlm_mode req_mode = req->l_req_mode;
157         __u64 req_start = req->l_req_extent.start;
158         __u64 req_end = req->l_req_extent.end;
159         __u64 req_align, mask;
160
161         if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) {
162                 if (req_end < req_start + LDLM_MAX_GROWN_EXTENT)
163                         new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT,
164                                           new_ex->end);
165         }
166
167         if (new_ex->start == 0 && new_ex->end == OBD_OBJECT_EOF) {
168                 EXIT;
169                 return;
170         }
171
172         /* we need to ensure that the lock extent is properly aligned to what
173          * the client requested. Also we need to make sure it's also server
174          * page size aligned otherwise a server page can be covered by two
175          * write locks.
176          */
177         mask = PAGE_SIZE;
178         req_align = (req_end + 1) | req_start;
179         if (req_align != 0 && (req_align & (mask - 1)) == 0) {
180                 while ((req_align & mask) == 0)
181                         mask <<= 1;
182         }
183         mask -= 1;
184         /* We can only shrink the lock, not grow it.
185          * This should never cause lock to be smaller than requested,
186          * since requested lock was already aligned on these boundaries.
187          */
188         new_ex->start = ((new_ex->start - 1) | mask) + 1;
189         new_ex->end = ((new_ex->end + 1) & ~mask) - 1;
190         LASSERTF(new_ex->start <= req_start,
191                  "mask %#llx grant start %llu req start %llu\n",
192                  mask, new_ex->start, req_start);
193         LASSERTF(new_ex->end >= req_end,
194                  "mask %#llx grant end %llu req end %llu\n",
195                  mask, new_ex->end, req_end);
196 }
197
198 /**
199  * Return the maximum extent that:
200  * - contains the requested extent
201  * - does not overlap existing conflicting extents outside the requested one
202  *
203  * This allows clients to request a small required extent range, but if there
204  * is no contention on the lock the full lock can be granted to the client.
205  * This avoids the need for many smaller lock requests to be granted in the
206  * common (uncontended) case.
207  *
208  * Use interval tree to expand the lock extent for granted lock.
209  */
210 static void ldlm_extent_internal_policy_granted(struct ldlm_lock *req,
211                                                 struct ldlm_extent *new_ex)
212 {
213         struct ldlm_resource *res = req->l_resource;
214         enum ldlm_mode req_mode = req->l_req_mode;
215         __u64 req_start = req->l_req_extent.start;
216         __u64 req_end = req->l_req_extent.end;
217         struct ldlm_interval_tree *tree;
218         int conflicting = 0;
219         int idx;
220
221         ENTRY;
222
223         lockmode_verify(req_mode);
224
225         /* Using interval tree to handle the LDLM extent granted locks. */
226         for (idx = 0; idx < LCK_MODE_NUM; idx++) {
227                 tree = &res->lr_itree[idx];
228                 if (lockmode_compat(tree->lit_mode, req_mode))
229                         continue;
230
231                 conflicting += tree->lit_size;
232
233                 if (!INTERVAL_TREE_EMPTY(&tree->lit_root)) {
234                         struct ldlm_lock *lck = NULL;
235
236                         LASSERTF(!extent_iter_first(&tree->lit_root,
237                                                     req_start, req_end),
238                                  "req_mode=%d, start=%llu, end=%llu\n",
239                                  req_mode, req_start, req_end);
240                         /*
241                          * If any tree is non-empty we don't bother
242                          * expanding backwards, it won't be worth
243                          * the effort.
244                          */
245                         new_ex->start = req_start;
246
247                         if (req_end < (U64_MAX - 1))
248                                 /* lck is the lock with the lowest endpoint
249                                  * which covers anything after 'req'
250                                  */
251                                 lck = extent_iter_first(&tree->lit_root,
252                                                         req_end + 1, U64_MAX);
253                         if (lck)
254                                 new_ex->end = min(new_ex->end, START(lck) - 1);
255                 }
256
257                 if (new_ex->start == req_start && new_ex->end == req_end)
258                         break;
259         }
260
261         LASSERT(new_ex->start <= req_start);
262         LASSERT(new_ex->end >= req_end);
263
264         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
265         EXIT;
266 }
267
268 /* The purpose of this function is to return:
269  * - the maximum extent
270  * - containing the requested extent
271  * - and not overlapping existing conflicting extents outside the requested one
272  */
273 static void
274 ldlm_extent_internal_policy_waiting(struct ldlm_lock *req,
275                                     struct ldlm_extent *new_ex)
276 {
277         struct ldlm_resource *res = req->l_resource;
278         enum ldlm_mode req_mode = req->l_req_mode;
279         __u64 req_start = req->l_req_extent.start;
280         __u64 req_end = req->l_req_extent.end;
281         struct ldlm_lock *lock;
282         int conflicting = 0;
283
284         ENTRY;
285
286         lockmode_verify(req_mode);
287
288         /* for waiting locks */
289         list_for_each_entry(lock, &res->lr_waiting, l_res_link) {
290                 struct ldlm_extent *l_extent = &lock->l_policy_data.l_extent;
291
292                 /* We already hit the minimum requested size, search no more */
293                 if (new_ex->start == req_start && new_ex->end == req_end) {
294                         EXIT;
295                         return;
296                 }
297
298                 /* Don't conflict with ourselves */
299                 if (req == lock)
300                         continue;
301
302                 /* Locks are compatible, overlap doesn't matter
303                  * Until bug 20 is fixed, try to avoid granting overlapping
304                  * locks on one client (they take a long time to cancel)
305                  */
306                 if (lockmode_compat(lock->l_req_mode, req_mode) &&
307                     lock->l_export != req->l_export)
308                         continue;
309
310                 /* If this is a high-traffic lock, don't grow downwards at all
311                  * or grow upwards too much
312                  */
313                 ++conflicting;
314                 if (conflicting > 4)
315                         new_ex->start = req_start;
316
317                 /* If lock doesn't overlap new_ex, skip it. */
318                 if (!ldlm_extent_overlap(l_extent, new_ex))
319                         continue;
320
321                 /* Locks conflicting in requested extents and we can't satisfy
322                  * both locks, so ignore it.  Either we will ping-pong this
323                  * extent (we would regardless of what extent we granted) or
324                  * lock is unused and it shouldn't limit our extent growth.
325                  */
326                 if (ldlm_extent_overlap(&lock->l_req_extent,
327                                          &req->l_req_extent))
328                         continue;
329
330                 /* We grow extents downwards only as far as they don't overlap
331                  * with already-granted locks, on the assumption that clients
332                  * will be writing beyond the initial requested end and would
333                  * then need to enqueue a new lock beyond previous request.
334                  * l_req_extent->end strictly < req_start, checked above.
335                  */
336                 if (l_extent->start < req_start && new_ex->start != req_start) {
337                         if (l_extent->end >= req_start)
338                                 new_ex->start = req_start;
339                         else
340                                 new_ex->start = min(l_extent->end+1, req_start);
341                 }
342
343                 /* If we need to cancel this lock anyways because our request
344                  * overlaps the granted lock, we grow up to its requested
345                  * extent start instead of limiting this extent, assuming that
346                  * clients are writing forwards and the lock had over grown
347                  * its extent downwards before we enqueued our request.
348                  */
349                 if (l_extent->end > req_end) {
350                         if (l_extent->start <= req_end)
351                                 new_ex->end = max(lock->l_req_extent.start - 1,
352                                                   req_end);
353                         else
354                                 new_ex->end = max(l_extent->start - 1, req_end);
355                 }
356         }
357
358         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
359         EXIT;
360 }
361
362
363 /* In order to determine the largest possible extent we can grant, we need
364  * to scan all of the queues.
365  */
366 static void ldlm_extent_policy(struct ldlm_resource *res,
367                                struct ldlm_lock *lock, __u64 *flags)
368 {
369         struct ldlm_extent new_ex = { .start = 0, .end = OBD_OBJECT_EOF };
370
371         if (lock->l_export == NULL)
372                 /* this is a local lock taken by server (e.g., as a part of
373                  * OST-side locking, or unlink handling). Expansion doesn't
374                  * make a lot of sense for local locks, because they are
375                  * dropped immediately on operation completion and would only
376                  * conflict with other threads.
377                  */
378                 return;
379
380         if (lock->l_policy_data.l_extent.start == 0 &&
381             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
382                 /* fast-path whole file locks */
383                 return;
384
385         /* Because reprocess_queue zeroes flags and uses it to return
386          * LDLM_FL_LOCK_CHANGED, we must check for the NO_EXPANSION flag
387          * in the lock flags rather than the 'flags' argument
388          */
389         if (likely(!(lock->l_flags & LDLM_FL_NO_EXPANSION))) {
390                 ldlm_extent_internal_policy_granted(lock, &new_ex);
391                 ldlm_extent_internal_policy_waiting(lock, &new_ex);
392         } else {
393                 LDLM_DEBUG(lock, "Not expanding manually requested lock");
394                 new_ex.start = lock->l_policy_data.l_extent.start;
395                 new_ex.end = lock->l_policy_data.l_extent.end;
396                 /* In case the request is not on correct boundaries, we call
397                  * fixup. (normally called in ldlm_extent_internal_policy_*)
398                  */
399                 ldlm_extent_internal_policy_fixup(lock, &new_ex, 0);
400         }
401
402         if (!ldlm_extent_equal(&new_ex, &lock->l_policy_data.l_extent)) {
403                 *flags |= LDLM_FL_LOCK_CHANGED;
404                 lock->l_policy_data.l_extent.start = new_ex.start;
405                 lock->l_policy_data.l_extent.end = new_ex.end;
406         }
407 }
408
409 static bool ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
410 {
411         struct ldlm_resource *res = lock->l_resource;
412         time64_t now = ktime_get_seconds();
413
414         if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_SET_CONTENTION))
415                 return true;
416
417         CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
418         if (contended_locks > ldlm_res_to_ns(res)->ns_contended_locks)
419                 res->lr_contention_time = now;
420
421         return now < res->lr_contention_time +
422                 ldlm_res_to_ns(res)->ns_contention_time;
423 }
424
425 struct ldlm_extent_compat_args {
426         struct list_head *work_list;
427         struct ldlm_lock *lock;
428         enum ldlm_mode mode;
429         int *locks;
430         int *compat;
431 };
432
433 static bool ldlm_extent_compat_cb(struct ldlm_lock *lock, void *data)
434 {
435         struct ldlm_extent_compat_args *priv = data;
436         struct list_head *work_list = priv->work_list;
437         struct ldlm_lock *enq = priv->lock;
438         enum ldlm_mode mode = priv->mode;
439
440         ENTRY;
441         /* interval tree is for granted lock */
442         LASSERTF(mode == lock->l_granted_mode,
443                  "mode = %s, lock->l_granted_mode = %s\n",
444                  ldlm_lockname[mode],
445                  ldlm_lockname[lock->l_granted_mode]);
446         if (lock->l_blocking_ast && lock->l_granted_mode != LCK_GROUP)
447                 ldlm_add_ast_work_item(lock, enq, work_list);
448
449         /* don't count conflicting glimpse locks */
450         if (!(mode == LCK_PR && lock->l_policy_data.l_extent.start == 0 &&
451               lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF))
452                 *priv->locks += 1;
453
454         if (priv->compat)
455                 *priv->compat = 0;
456
457         RETURN(false);
458 }
459
460 /**
461  * Determine if the lock is compatible with all locks on the queue.
462  *
463  * If \a work_list is provided, conflicting locks are linked there.
464  * If \a work_list is not provided, we exit this function on first conflict.
465  *
466  * \retval 0 if the lock is not compatible
467  * \retval 1 if the lock is compatible
468  * \retval 2 if \a req is a group lock and it is compatible and requires
469  *           no further checking
470  * \retval negative error, such as EAGAIN for group locks
471  */
472 static int
473 ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
474                          __u64 *flags, struct list_head *work_list,
475                          int *contended_locks)
476 {
477         struct ldlm_resource *res = req->l_resource;
478         enum ldlm_mode req_mode = req->l_req_mode;
479         __u64 req_start = req->l_req_extent.start;
480         __u64 req_end = req->l_req_extent.end;
481         struct ldlm_lock *lock;
482         int check_contention;
483         int compat = 1;
484
485         ENTRY;
486
487         lockmode_verify(req_mode);
488
489         /* Using interval tree for granted lock */
490         if (queue == &res->lr_granted) {
491                 struct ldlm_interval_tree *tree;
492                 struct ldlm_extent_compat_args data = {.work_list = work_list,
493                                                        .lock = req,
494                                                        .locks = contended_locks,
495                                                        .compat = &compat };
496                 int idx;
497
498                 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
499                         tree = &res->lr_itree[idx];
500                         if (INTERVAL_TREE_EMPTY(&tree->lit_root))
501                                 continue;
502
503                         data.mode = tree->lit_mode;
504                         if (lockmode_compat(req_mode, tree->lit_mode)) {
505                                 struct ldlm_lock *lock;
506
507                                 if (req_mode != LCK_GROUP)
508                                         continue;
509
510                                 /* group lock, grant it immediately if
511                                  * compatible
512                                  */
513                                 lock = extent_top(tree);
514                                 if (req->l_policy_data.l_extent.gid ==
515                                     lock->l_policy_data.l_extent.gid)
516                                         RETURN(2);
517                         }
518
519                         if (tree->lit_mode == LCK_GROUP) {
520                                 if (*flags & (LDLM_FL_BLOCK_NOWAIT |
521                                     LDLM_FL_SPECULATIVE)) {
522                                         compat = -EAGAIN;
523                                         goto destroylock;
524                                 }
525
526                                 if (!work_list)
527                                         RETURN(0);
528
529                                 /* if work list is not NULL, add all
530                                  * locks in the tree to work list.
531                                  */
532                                 compat = 0;
533                                 for (lock = extent_first(tree);
534                                      lock;
535                                      lock = extent_next_lock(lock))
536                                         ldlm_extent_compat_cb(lock, &data);
537                                 continue;
538                         }
539
540                         /* We've found a potentially blocking lock, check
541                          * compatibility.  This handles locks other than GROUP
542                          * locks, which are handled separately above.
543                          *
544                          * Locks with FL_SPECULATIVE are asynchronous requests
545                          * which must never wait behind another lock, so they
546                          * fail if any conflicting lock is found.
547                          */
548                         if (!work_list || (*flags & LDLM_FL_SPECULATIVE)) {
549                                 if (extent_iter_first(&tree->lit_root,
550                                                       req_start, req_end)) {
551                                         if (!work_list) {
552                                                 RETURN(0);
553                                         } else {
554                                                 compat = -EAGAIN;
555                                                 goto destroylock;
556                                         }
557                                 }
558                         } else {
559                                 ldlm_extent_search(&tree->lit_root,
560                                                    req_start, req_end,
561                                                    ldlm_extent_compat_cb,
562                                                    &data);
563                                 if (!list_empty(work_list) && compat)
564                                         compat = 0;
565                         }
566                 }
567         } else { /* for waiting queue */
568                 list_for_each_entry(lock, queue, l_res_link) {
569                         check_contention = 1;
570
571                         /* We stop walking the queue if we hit ourselves so
572                          * we don't take conflicting locks enqueued after us
573                          * into account, or we'd wait forever.
574                          */
575                         if (req == lock)
576                                 break;
577
578                         /* locks are compatible, overlap doesn't matter */
579                         if (lockmode_compat(lock->l_req_mode, req_mode)) {
580                                 if (req_mode == LCK_PR &&
581                                     ((lock->l_policy_data.l_extent.start <=
582                                       req->l_policy_data.l_extent.start) &&
583                                      (lock->l_policy_data.l_extent.end >=
584                                       req->l_policy_data.l_extent.end))) {
585                                         /* If we met a PR lock just like us or
586                                          * wider, and nobody down the list
587                                          * conflicted with it, that means we
588                                          * can skip processing of the rest of
589                                          * the list and safely place ourselves
590                                          * at the end of the list, or grant
591                                          * (dependent if we met an conflicting
592                                          * locks before in the list).  In case
593                                          * of 1st enqueue only we continue
594                                          * traversing if there is something
595                                          * conflicting down the list because
596                                          * we need to make sure that something
597                                          * is marked as AST_SENT as well, in
598                                          * case of empy worklist we would exit
599                                          * on first conflict met.
600                                          */
601
602                                         /* There IS a case where such flag is
603                                          * not set for a lock, yet it blocks
604                                          * something.  Luckily for us this is
605                                          * only during destroy, so lock is
606                                          * exclusive.  So here we are safe
607                                          */
608                                         if (!ldlm_is_ast_sent(lock))
609                                                 RETURN(compat);
610                                 }
611
612                                 /* non-group locks are compatible,
613                                  * overlap doesn't matter
614                                  */
615                                 if (likely(req_mode != LCK_GROUP))
616                                         continue;
617
618                                 /* If we are trying to get a GROUP lock and
619                                  * there is another one of this kind, we need
620                                  * to compare gid
621                                  */
622                                 if (req->l_policy_data.l_extent.gid ==
623                                 lock->l_policy_data.l_extent.gid) {
624                                         /* If existing lock with matched gid
625                                          * is granted, we grant new one too.
626                                          */
627                                         if (ldlm_is_granted(lock))
628                                                 RETURN(2);
629
630                                         /* Otherwise we are scanning queue of
631                                          * waiting locks and it means current
632                                          * request would block along with
633                                          * existing lock (that is already
634                                          * blocked.  If we are in nonblocking
635                                          * mode - return immediately
636                                          */
637                                         if (*flags & (LDLM_FL_BLOCK_NOWAIT |
638                                                       LDLM_FL_SPECULATIVE)) {
639                                                 compat = -EAGAIN;
640                                                 goto destroylock;
641                                         }
642                                         /* If this group lock is compatible with
643                                          * another group lock on the waiting
644                                          * list, they must be together in the
645                                          * list, so they can be granted at the
646                                          * same time.  Otherwise the later lock
647                                          * can get stuck behind another,
648                                          * incompatible, lock.
649                                          */
650                                         ldlm_resource_insert_lock_after(lock,
651                                                                         req);
652                                         /* Because 'lock' is not granted, we can
653                                          * stop processing this queue and return
654                                          * immediately. There is no need to
655                                          * check the rest of the list.
656                                          */
657                                         RETURN(0);
658                                 }
659                         }
660
661                         if (unlikely(req_mode == LCK_GROUP &&
662                             !ldlm_is_granted(lock))) {
663                                 compat = 0;
664                                 if (lock->l_req_mode != LCK_GROUP) {
665                                         /* Ok, we hit non-GROUP lock,
666                                          * there
667                                          * should be no more GROUP
668                                          * locks later
669                                          * on, queue in
670                                          * front of first
671                                          * non-GROUP lock
672                                          */
673                                         ldlm_resource_insert_lock_before(lock,
674                                                                          req);
675                                         break;
676                                 }
677                                 LASSERT(req->l_policy_data.l_extent.gid !=
678                                         lock->l_policy_data.l_extent.gid);
679                                 continue;
680                         }
681
682                         if (unlikely(lock->l_req_mode == LCK_GROUP)) {
683                                 /* If compared lock is GROUP, then requested is
684                                  * PR/PW so this is not compatible; extent
685                                  * range does not matter
686                                  */
687                                 if (*flags & (LDLM_FL_BLOCK_NOWAIT |
688                                     LDLM_FL_SPECULATIVE)) {
689                                         compat = -EAGAIN;
690                                         goto destroylock;
691                                 }
692                         } else if (lock->l_policy_data.l_extent.end < req_start ||
693                                    lock->l_policy_data.l_extent.start > req_end) {
694                                 /* if non group lock doesn't overlap skip it */
695                                 continue;
696                         } else if (lock->l_req_extent.end < req_start ||
697                                    lock->l_req_extent.start > req_end) {
698                                 /* false contention, the requests doesn't
699                                  * really overlap
700                                  */
701                                 check_contention = 0;
702                         }
703
704                         if (!work_list)
705                                 RETURN(0);
706
707                         if (*flags & LDLM_FL_SPECULATIVE) {
708                                 compat = -EAGAIN;
709                                 goto destroylock;
710                         }
711
712                         /* don't count conflicting glimpse locks */
713                         if (lock->l_req_mode == LCK_PR &&
714                             lock->l_policy_data.l_extent.start == 0 &&
715                              lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
716                                 check_contention = 0;
717
718                         *contended_locks += check_contention;
719
720                         compat = 0;
721                         if (lock->l_blocking_ast &&
722                             lock->l_req_mode != LCK_GROUP)
723                                 ldlm_add_ast_work_item(lock, req, work_list);
724                 }
725         }
726
727         if (ldlm_check_contention(req, *contended_locks) &&
728             compat == 0 &&
729             (*flags & LDLM_FL_DENY_ON_CONTENTION) &&
730             req->l_req_mode != LCK_GROUP &&
731             req_end - req_start <=
732             ldlm_res_to_ns(req->l_resource)->ns_max_nolock_size)
733                 GOTO(destroylock, compat = -EUSERS);
734
735         RETURN(compat);
736 destroylock:
737         list_del_init(&req->l_res_link);
738         if (ldlm_is_local(req))
739                 ldlm_lock_decref_internal_nolock(req, req_mode);
740         ldlm_lock_destroy_nolock(req);
741         RETURN(compat);
742 }
743
744 /**
745  * This function refresh eviction timer for cancelled lock.
746  * \param[in] lock              ldlm lock for refresh
747  * \param[in] arg               ldlm prolong arguments, timeout, export, extent
748  *                              and counter are used
749  */
750 void ldlm_lock_prolong_one(struct ldlm_lock *lock,
751                            struct ldlm_prolong_args *arg)
752 {
753         timeout_t timeout;
754
755         CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_PROLONG_PAUSE, 3);
756
757         if (arg->lpa_export != lock->l_export ||
758             lock->l_flags & LDLM_FL_DESTROYED)
759                 /* ignore unrelated locks */
760                 return;
761
762         arg->lpa_locks_cnt++;
763
764         if (!(lock->l_flags & LDLM_FL_AST_SENT))
765                 /* ignore locks not being cancelled */
766                 return;
767
768         arg->lpa_blocks_cnt++;
769
770         /* OK. this is a possible lock the user holds doing I/O
771          * let's refresh eviction timer for it.
772          */
773         timeout = ptlrpc_export_prolong_timeout(arg->lpa_req, false);
774         LDLM_DEBUG(lock, "refreshed to %ds. ", timeout);
775         ldlm_refresh_waiting_lock(lock, timeout);
776 }
777 EXPORT_SYMBOL(ldlm_lock_prolong_one);
778
779 static bool ldlm_resource_prolong_cb(struct ldlm_lock *lock, void *data)
780 {
781         struct ldlm_prolong_args *arg = data;
782
783         ENTRY;
784         ldlm_lock_prolong_one(lock, arg);
785
786         RETURN(false);
787 }
788
789 /**
790  * Walk through granted tree and prolong locks if they overlaps extent.
791  *
792  * \param[in] arg               prolong args
793  */
794 void ldlm_resource_prolong(struct ldlm_prolong_args *arg)
795 {
796         struct ldlm_interval_tree *tree;
797         struct ldlm_resource *res;
798         int idx;
799
800         ENTRY;
801
802         res = ldlm_resource_get(arg->lpa_export->exp_obd->obd_namespace,
803                                 &arg->lpa_resid, LDLM_EXTENT, 0);
804         if (IS_ERR(res)) {
805                 CDEBUG(D_DLMTRACE, "Failed to get resource for resid %llu/%llu\n",
806                 arg->lpa_resid.name[0], arg->lpa_resid.name[1]);
807                 RETURN_EXIT;
808         }
809
810         lock_res(res);
811         for (idx = 0; idx < LCK_MODE_NUM; idx++) {
812                 tree = &res->lr_itree[idx];
813                 if (INTERVAL_TREE_EMPTY(&tree->lit_root))
814                         continue;
815
816                 /* There is no possibility to check for the groupID
817                  * so all the group locks are considered as valid
818                  * here, especially because the client is supposed
819                  * to check it has such a lock before sending an RPC.
820                  */
821                 if (!(tree->lit_mode & arg->lpa_mode))
822                         continue;
823
824                 ldlm_extent_search(&tree->lit_root, arg->lpa_extent.start,
825                                  arg->lpa_extent.end,
826                                  ldlm_resource_prolong_cb, arg);
827         }
828         unlock_res(res);
829         ldlm_resource_putref(res);
830
831         EXIT;
832 }
833 EXPORT_SYMBOL(ldlm_resource_prolong);
834
835 /**
836  * Process a granting attempt for extent lock.
837  * Must be called with ns lock held.
838  *
839  * This function looks for any conflicts for \a lock in the granted or
840  * waiting queues. The lock is granted if no conflicts are found in
841  * either queue.
842  */
843 int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags,
844                 enum ldlm_process_intention intention,
845                 enum ldlm_error *err, struct list_head *work_list)
846 {
847         struct ldlm_resource *res = lock->l_resource;
848         int rc, rc2 = 0;
849         int contended_locks = 0;
850         struct list_head *grant_work = intention == LDLM_PROCESS_ENQUEUE ?
851                                                         NULL : work_list;
852         ENTRY;
853
854         LASSERT(!ldlm_is_granted(lock));
855         LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
856                 !ldlm_is_ast_discard_data(lock));
857         check_res_locked(res);
858         *err = ELDLM_OK;
859
860         if (intention == LDLM_PROCESS_RESCAN) {
861                 /* Careful observers will note that we don't handle -EAGAIN
862                  * here, but it's ok for a non-obvious reason -- compat_queue
863                  * can only return -EAGAIN if (flags & BLOCK_NOWAIT |
864                  * SPECULATIVE). flags should always be zero here, and if that
865                  * ever stops being true, we want to find out. */
866                 LASSERT(*flags == 0);
867                 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
868                                         NULL, &contended_locks);
869                 if (rc == 1) {
870                         rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
871                                                 flags, NULL,
872                                                 &contended_locks);
873                 }
874                 if (rc == 0)
875                         RETURN(LDLM_ITER_STOP);
876
877                 ldlm_resource_unlink_lock(lock);
878
879                 if (!CFS_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE))
880                         ldlm_extent_policy(res, lock, flags);
881                 ldlm_grant_lock(lock, grant_work);
882                 RETURN(LDLM_ITER_CONTINUE);
883         }
884
885         contended_locks = 0;
886         rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
887                                 work_list, &contended_locks);
888         if (rc < 0)
889                 GOTO(out, *err = rc);
890
891         if (rc != 2) {
892                 rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock,
893                                         flags, work_list,
894                                         &contended_locks);
895                 if (rc2 < 0)
896                         GOTO(out, *err = rc = rc2);
897         }
898
899         if (rc + rc2 == 2) {
900                 ldlm_extent_policy(res, lock, flags);
901                 ldlm_resource_unlink_lock(lock);
902                 ldlm_grant_lock(lock, grant_work);
903         } else {
904                 /* Adding LDLM_FL_NO_TIMEOUT flag to granted lock to
905                  * force client to wait for the lock endlessly once
906                  * the lock is enqueued -bzzz */
907                 *flags |= LDLM_FL_NO_TIMEOUT;
908         }
909
910         RETURN(LDLM_ITER_CONTINUE);
911 out:
912         return rc;
913 }
914 #endif /* HAVE_SERVER_SUPPORT */
915
916 /* When a lock is cancelled by a client, the KMS may undergo change if this
917  * is the "highest lock".  This function returns the new KMS value, updating
918  * it only if we were the highest lock.
919  *
920  * Caller must hold lr_lock already.
921  *
922  * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes!
923  */
924 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
925 {
926         struct ldlm_resource *res = lock->l_resource;
927         struct ldlm_interval_tree *tree;
928         struct ldlm_lock *lck;
929         __u64 kms = 0;
930         int idx = 0;
931         bool complete = false;
932
933         ENTRY;
934
935         /* don't let another thread in ldlm_extent_shift_kms race in
936          * just after we finish and take our lock into account in its
937          * calculation of the kms
938          */
939         ldlm_set_kms_ignore(lock);
940
941         /* We iterate over the lock trees, looking for the largest kms
942          * smaller than the current one.
943          */
944         for (idx = 0; idx < LCK_MODE_NUM; idx++) {
945                 tree = &res->lr_itree[idx];
946
947                 /* If our already known kms is >= than the highest 'end' in
948                  * this tree, we don't need to check this tree, because
949                  * the kms from a tree can be lower than in_max_high (due to
950                  * kms_ignore), but it can never be higher. */
951                 lck = extent_top(tree);
952                 if (!lck || kms >= lck->l_subtree_last)
953                         continue;
954
955                 for (lck = extent_last(tree);
956                      lck;
957                      lck = extent_prev(lck)) {
958                         if (ldlm_is_kms_ignore(lck)) {
959                                 struct ldlm_lock *lk;
960
961                                 list_for_each_entry(lk, &lck->l_same_extent,
962                                                     l_same_extent)
963                                         if (!ldlm_is_kms_ignore(lk)) {
964                                                 lk = NULL;
965                                                 break;
966                                         }
967                                 if (lk)
968                                         continue;
969                         }
970
971                         /* If this lock has a greater or equal kms, we are not
972                          * the highest lock (or we share that distinction
973                          * with another lock), and don't need to update KMS.
974                          * Record old_kms and stop looking.
975                          */
976                         if (lck->l_policy_data.l_extent.end == OBD_OBJECT_EOF ||
977                             lck->l_policy_data.l_extent.end + 1 >= old_kms) {
978                                 kms = old_kms;
979                                 complete = true;
980                                 break;
981                         }
982                         if (lck->l_policy_data.l_extent.end + 1 > kms)
983                                 kms = lck->l_policy_data.l_extent.end + 1;
984
985                         /* Since we start with the highest lock and work
986                          * down, for PW locks, we only need to check if we
987                          * should update the kms, then stop walking the tree.
988                          * PR locks are not exclusive, so the highest start
989                          * does not imply the highest end and we must
990                          * continue.  (Only one group lock is allowed per
991                          * resource, so this is irrelevant for group locks.)
992                          */
993                         if (lck->l_granted_mode == LCK_PW)
994                                 break;
995                 }
996
997                 /* This tells us we're not the highest lock, so we don't need
998                  * to check the remaining trees
999                  */
1000                 if (complete)
1001                         break;
1002         }
1003
1004         LASSERTF(kms <= old_kms, "kms %llu old_kms %llu\n", kms, old_kms);
1005
1006         RETURN(kms);
1007 }
1008 EXPORT_SYMBOL(ldlm_extent_shift_kms);
1009
1010 static inline int ldlm_mode_to_index(enum ldlm_mode mode)
1011 {
1012         int index;
1013
1014         LASSERT(mode != 0);
1015         LASSERT(is_power_of_2(mode));
1016         index = ilog2(mode);
1017         LASSERT(index < LCK_MODE_NUM);
1018         return index;
1019 }
1020
1021 /** Add newly granted lock into interval tree for the resource. */
1022 void ldlm_extent_add_lock(struct ldlm_resource *res,
1023                           struct ldlm_lock *lock)
1024 {
1025         struct ldlm_interval_tree *tree;
1026         struct ldlm_lock *orig;
1027         int idx;
1028
1029         LASSERT(ldlm_is_granted(lock));
1030
1031         LASSERT(RB_EMPTY_NODE(&lock->l_rb));
1032         LASSERT(list_empty(&lock->l_same_extent));
1033
1034         idx = ldlm_mode_to_index(lock->l_granted_mode);
1035         LASSERT(lock->l_granted_mode == BIT(idx));
1036         LASSERT(lock->l_granted_mode == res->lr_itree[idx].lit_mode);
1037
1038         tree = &res->lr_itree[idx];
1039         orig = extent_insert_unique(lock, &tree->lit_root);
1040         if (orig)
1041                 list_add(&lock->l_same_extent, &orig->l_same_extent);
1042         tree->lit_size++;
1043
1044         /* even though we use interval tree to manage the extent lock, we also
1045          * add the locks into grant list, for debug purpose, ..
1046          */
1047         ldlm_resource_add_lock(res, &res->lr_granted, lock);
1048
1049         if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_GRANT_CHECK)) {
1050                 struct ldlm_lock *lck;
1051
1052                 list_for_each_entry_reverse(lck, &res->lr_granted, l_res_link) {
1053                         if (lck == lock)
1054                                 continue;
1055                         if (lockmode_compat(lck->l_granted_mode,
1056                                         lock->l_granted_mode))
1057                                 continue;
1058                         if (ldlm_extent_overlap(&lck->l_req_extent,
1059                                                 &lock->l_req_extent)) {
1060                                 CDEBUG(D_ERROR,
1061                                        "granting conflicting lock %p %p\n",
1062                                        lck, lock);
1063                                 ldlm_resource_dump(D_ERROR, res);
1064                                 LBUG();
1065                         }
1066                 }
1067         }
1068 }
1069
1070 /** Remove cancelled lock from resource interval tree. */
1071 void ldlm_extent_unlink_lock(struct ldlm_lock *lock)
1072 {
1073         struct ldlm_resource *res = lock->l_resource;
1074         struct ldlm_interval_tree *tree;
1075         int idx;
1076
1077         if (RB_EMPTY_NODE(&lock->l_rb) &&
1078             list_empty(&lock->l_same_extent)) /* duplicate unlink */
1079                 return;
1080
1081         idx = ldlm_mode_to_index(lock->l_granted_mode);
1082         LASSERT(lock->l_granted_mode == BIT(idx));
1083         tree = &res->lr_itree[idx];
1084
1085         LASSERT(!INTERVAL_TREE_EMPTY(&tree->lit_root));
1086
1087         tree->lit_size--;
1088
1089         if (RB_EMPTY_NODE(&lock->l_rb)) {
1090                 list_del_init(&lock->l_same_extent);
1091         } else if (list_empty(&lock->l_same_extent)) {
1092                 extent_remove(lock, &tree->lit_root);
1093                 RB_CLEAR_NODE(&lock->l_rb);
1094         } else {
1095                 struct ldlm_lock *next = list_next_entry(lock, l_same_extent);
1096                 list_del_init(&lock->l_same_extent);
1097                 extent_remove(lock, &tree->lit_root);
1098                 RB_CLEAR_NODE(&lock->l_rb);
1099                 extent_insert(next, &tree->lit_root);
1100         }
1101 }
1102
1103 void ldlm_extent_policy_wire_to_local(const union ldlm_wire_policy_data *wpolicy,
1104                                       union ldlm_policy_data *lpolicy)
1105 {
1106         lpolicy->l_extent.start = wpolicy->l_extent.start;
1107         lpolicy->l_extent.end = wpolicy->l_extent.end;
1108         lpolicy->l_extent.gid = wpolicy->l_extent.gid;
1109 }
1110
1111 void ldlm_extent_policy_local_to_wire(const union ldlm_policy_data *lpolicy,
1112                                       union ldlm_wire_policy_data *wpolicy)
1113 {
1114         memset(wpolicy, 0, sizeof(*wpolicy));
1115         wpolicy->l_extent.start = lpolicy->l_extent.start;
1116         wpolicy->l_extent.end = lpolicy->l_extent.end;
1117         wpolicy->l_extent.gid = lpolicy->l_extent.gid;
1118 }
1119 void ldlm_extent_search(struct interval_tree_root *root,
1120                         u64 start, u64 end,
1121                         bool (*matches)(struct ldlm_lock *lock, void *data),
1122                         void *data)
1123 {
1124         struct ldlm_lock *lock, *lock2;
1125
1126         for (lock = extent_iter_first(root, start, end);
1127              lock;
1128              lock = extent_iter_next(lock, start, end)) {
1129                 if (matches(lock, data))
1130                         return;
1131                 list_for_each_entry(lock2, &lock->l_same_extent, l_same_extent)
1132                         if (matches(lock2, data))
1133                                 return;
1134         }
1135 }