Whamcloud - gitweb
LU-17744 ldiskfs: mballoc stats fixes
[fs/lustre-release.git] / lustre / ldlm / ldlm_extent.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2010, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/ldlm/ldlm_extent.c
32  *
33  * Author: Peter Braam <braam@clusterfs.com>
34  * Author: Phil Schwan <phil@clusterfs.com>
35  */
36
37 /**
38  * This file contains implementation of EXTENT lock type
39  *
40  * EXTENT lock type is for locking a contiguous range of values, represented
41  * by 64-bit starting and ending offsets (inclusive). There are several extent
42  * lock modes, some of which may be mutually incompatible. Extent locks are
43  * considered incompatible if their modes are incompatible and their extents
44  * intersect.  See the lock mode compatibility matrix in lustre_dlm.h.
45  */
46
47 #define DEBUG_SUBSYSTEM S_LDLM
48
49 #include <libcfs/libcfs.h>
50 #include <lustre_dlm.h>
51 #include <obd_support.h>
52 #include <obd.h>
53 #include <obd_class.h>
54 #include <lustre_lib.h>
55
56 #include "ldlm_internal.h"
57
58 #ifdef HAVE_SERVER_SUPPORT
59 # define LDLM_MAX_GROWN_EXTENT (32 * 1024 * 1024 - 1)
60
61 /**
62  * Fix up the ldlm_extent after expanding it.
63  *
64  * After expansion has been done, we might still want to do certain adjusting
65  * based on overall contention of the resource and the like to avoid granting
66  * overly wide locks.
67  */
68 static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
69                                               struct ldlm_extent *new_ex,
70                                               int conflicting)
71 {
72         enum ldlm_mode req_mode = req->l_req_mode;
73         __u64 req_start = req->l_req_extent.start;
74         __u64 req_end = req->l_req_extent.end;
75         __u64 req_align, mask;
76
77         if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) {
78                 if (req_end < req_start + LDLM_MAX_GROWN_EXTENT)
79                         new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT,
80                                           new_ex->end);
81         }
82
83         if (new_ex->start == 0 && new_ex->end == OBD_OBJECT_EOF) {
84                 EXIT;
85                 return;
86         }
87
88         /* we need to ensure that the lock extent is properly aligned to what
89          * the client requested. Also we need to make sure it's also server
90          * page size aligned otherwise a server page can be covered by two
91          * write locks.
92          */
93         mask = PAGE_SIZE;
94         req_align = (req_end + 1) | req_start;
95         if (req_align != 0 && (req_align & (mask - 1)) == 0) {
96                 while ((req_align & mask) == 0)
97                         mask <<= 1;
98         }
99         mask -= 1;
100         /* We can only shrink the lock, not grow it.
101          * This should never cause lock to be smaller than requested,
102          * since requested lock was already aligned on these boundaries.
103          */
104         new_ex->start = ((new_ex->start - 1) | mask) + 1;
105         new_ex->end = ((new_ex->end + 1) & ~mask) - 1;
106         LASSERTF(new_ex->start <= req_start,
107                  "mask %#llx grant start %llu req start %llu\n",
108                  mask, new_ex->start, req_start);
109         LASSERTF(new_ex->end >= req_end,
110                  "mask %#llx grant end %llu req end %llu\n",
111                  mask, new_ex->end, req_end);
112 }
113
114 /**
115  * Return the maximum extent that:
116  * - contains the requested extent
117  * - does not overlap existing conflicting extents outside the requested one
118  *
119  * This allows clients to request a small required extent range, but if there
120  * is no contention on the lock the full lock can be granted to the client.
121  * This avoids the need for many smaller lock requests to be granted in the
122  * common (uncontended) case.
123  *
124  * Use interval tree to expand the lock extent for granted lock.
125  */
126 static void ldlm_extent_internal_policy_granted(struct ldlm_lock *req,
127                                                 struct ldlm_extent *new_ex)
128 {
129         struct ldlm_resource *res = req->l_resource;
130         enum ldlm_mode req_mode = req->l_req_mode;
131         __u64 req_start = req->l_req_extent.start;
132         __u64 req_end = req->l_req_extent.end;
133         struct ldlm_interval_tree *tree;
134         struct interval_node_extent limiter = {
135                 .start  = new_ex->start,
136                 .end    = new_ex->end,
137         };
138         int conflicting = 0;
139         int idx;
140
141         ENTRY;
142
143         lockmode_verify(req_mode);
144
145         /* Using interval tree to handle the LDLM extent granted locks. */
146         for (idx = 0; idx < LCK_MODE_NUM; idx++) {
147                 struct interval_node_extent ext = {
148                         .start  = req_start,
149                         .end    = req_end,
150                 };
151
152                 tree = &res->lr_itree[idx];
153                 if (lockmode_compat(tree->lit_mode, req_mode))
154                         continue;
155
156                 conflicting += tree->lit_size;
157                 if (conflicting > 4)
158                         limiter.start = req_start;
159
160                 if (interval_is_overlapped(tree->lit_root, &ext))
161                         CDEBUG(D_INFO,
162                                "req_mode = %d, tree->lit_mode = %d, tree->lit_size = %d\n",
163                                req_mode, tree->lit_mode, tree->lit_size);
164                 interval_expand(tree->lit_root, &ext, &limiter);
165                 limiter.start = max(limiter.start, ext.start);
166                 limiter.end = min(limiter.end, ext.end);
167                 if (limiter.start == req_start && limiter.end == req_end)
168                         break;
169         }
170
171         new_ex->start = limiter.start;
172         new_ex->end = limiter.end;
173         LASSERT(new_ex->start <= req_start);
174         LASSERT(new_ex->end >= req_end);
175
176         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
177         EXIT;
178 }
179
180 /* The purpose of this function is to return:
181  * - the maximum extent
182  * - containing the requested extent
183  * - and not overlapping existing conflicting extents outside the requested one
184  */
185 static void
186 ldlm_extent_internal_policy_waiting(struct ldlm_lock *req,
187                                     struct ldlm_extent *new_ex)
188 {
189         struct ldlm_resource *res = req->l_resource;
190         enum ldlm_mode req_mode = req->l_req_mode;
191         __u64 req_start = req->l_req_extent.start;
192         __u64 req_end = req->l_req_extent.end;
193         struct ldlm_lock *lock;
194         int conflicting = 0;
195
196         ENTRY;
197
198         lockmode_verify(req_mode);
199
200         /* for waiting locks */
201         list_for_each_entry(lock, &res->lr_waiting, l_res_link) {
202                 struct ldlm_extent *l_extent = &lock->l_policy_data.l_extent;
203
204                 /* We already hit the minimum requested size, search no more */
205                 if (new_ex->start == req_start && new_ex->end == req_end) {
206                         EXIT;
207                         return;
208                 }
209
210                 /* Don't conflict with ourselves */
211                 if (req == lock)
212                         continue;
213
214                 /* Locks are compatible, overlap doesn't matter
215                  * Until bug 20 is fixed, try to avoid granting overlapping
216                  * locks on one client (they take a long time to cancel)
217                  */
218                 if (lockmode_compat(lock->l_req_mode, req_mode) &&
219                     lock->l_export != req->l_export)
220                         continue;
221
222                 /* If this is a high-traffic lock, don't grow downwards at all
223                  * or grow upwards too much
224                  */
225                 ++conflicting;
226                 if (conflicting > 4)
227                         new_ex->start = req_start;
228
229                 /* If lock doesn't overlap new_ex, skip it. */
230                 if (!ldlm_extent_overlap(l_extent, new_ex))
231                         continue;
232
233                 /* Locks conflicting in requested extents and we can't satisfy
234                  * both locks, so ignore it.  Either we will ping-pong this
235                  * extent (we would regardless of what extent we granted) or
236                  * lock is unused and it shouldn't limit our extent growth.
237                  */
238                 if (ldlm_extent_overlap(&lock->l_req_extent,
239                                          &req->l_req_extent))
240                         continue;
241
242                 /* We grow extents downwards only as far as they don't overlap
243                  * with already-granted locks, on the assumption that clients
244                  * will be writing beyond the initial requested end and would
245                  * then need to enqueue a new lock beyond previous request.
246                  * l_req_extent->end strictly < req_start, checked above.
247                  */
248                 if (l_extent->start < req_start && new_ex->start != req_start) {
249                         if (l_extent->end >= req_start)
250                                 new_ex->start = req_start;
251                         else
252                                 new_ex->start = min(l_extent->end+1, req_start);
253                 }
254
255                 /* If we need to cancel this lock anyways because our request
256                  * overlaps the granted lock, we grow up to its requested
257                  * extent start instead of limiting this extent, assuming that
258                  * clients are writing forwards and the lock had over grown
259                  * its extent downwards before we enqueued our request.
260                  */
261                 if (l_extent->end > req_end) {
262                         if (l_extent->start <= req_end)
263                                 new_ex->end = max(lock->l_req_extent.start - 1,
264                                                   req_end);
265                         else
266                                 new_ex->end = max(l_extent->start - 1, req_end);
267                 }
268         }
269
270         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
271         EXIT;
272 }
273
274
275 /* In order to determine the largest possible extent we can grant, we need
276  * to scan all of the queues.
277  */
278 static void ldlm_extent_policy(struct ldlm_resource *res,
279                                struct ldlm_lock *lock, __u64 *flags)
280 {
281         struct ldlm_extent new_ex = { .start = 0, .end = OBD_OBJECT_EOF };
282
283         if (lock->l_export == NULL)
284                 /* this is a local lock taken by server (e.g., as a part of
285                  * OST-side locking, or unlink handling). Expansion doesn't
286                  * make a lot of sense for local locks, because they are
287                  * dropped immediately on operation completion and would only
288                  * conflict with other threads.
289                  */
290                 return;
291
292         if (lock->l_policy_data.l_extent.start == 0 &&
293             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
294                 /* fast-path whole file locks */
295                 return;
296
297         /* Because reprocess_queue zeroes flags and uses it to return
298          * LDLM_FL_LOCK_CHANGED, we must check for the NO_EXPANSION flag
299          * in the lock flags rather than the 'flags' argument
300          */
301         if (likely(!(lock->l_flags & LDLM_FL_NO_EXPANSION))) {
302                 ldlm_extent_internal_policy_granted(lock, &new_ex);
303                 ldlm_extent_internal_policy_waiting(lock, &new_ex);
304         } else {
305                 LDLM_DEBUG(lock, "Not expanding manually requested lock");
306                 new_ex.start = lock->l_policy_data.l_extent.start;
307                 new_ex.end = lock->l_policy_data.l_extent.end;
308                 /* In case the request is not on correct boundaries, we call
309                  * fixup. (normally called in ldlm_extent_internal_policy_*)
310                  */
311                 ldlm_extent_internal_policy_fixup(lock, &new_ex, 0);
312         }
313
314         if (!ldlm_extent_equal(&new_ex, &lock->l_policy_data.l_extent)) {
315                 *flags |= LDLM_FL_LOCK_CHANGED;
316                 lock->l_policy_data.l_extent.start = new_ex.start;
317                 lock->l_policy_data.l_extent.end = new_ex.end;
318         }
319 }
320
321 static bool ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
322 {
323         struct ldlm_resource *res = lock->l_resource;
324         time64_t now = ktime_get_seconds();
325
326         if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_SET_CONTENTION))
327                 return true;
328
329         CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
330         if (contended_locks > ldlm_res_to_ns(res)->ns_contended_locks)
331                 res->lr_contention_time = now;
332
333         return now < res->lr_contention_time +
334                 ldlm_res_to_ns(res)->ns_contention_time;
335 }
336
337 struct ldlm_extent_compat_args {
338         struct list_head *work_list;
339         struct ldlm_lock *lock;
340         enum ldlm_mode mode;
341         int *locks;
342         int *compat;
343 };
344
345 static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
346                                                 void *data)
347 {
348         struct ldlm_extent_compat_args *priv = data;
349         struct ldlm_interval *node = to_ldlm_interval(n);
350         struct ldlm_extent *extent;
351         struct list_head *work_list = priv->work_list;
352         struct ldlm_lock *lock, *enq = priv->lock;
353         enum ldlm_mode mode = priv->mode;
354         int count = 0;
355
356         ENTRY;
357
358         LASSERT(!list_empty(&node->li_group));
359
360         list_for_each_entry(lock, &node->li_group, l_sl_policy) {
361                 /* interval tree is for granted lock */
362                 LASSERTF(mode == lock->l_granted_mode,
363                          "mode = %s, lock->l_granted_mode = %s\n",
364                          ldlm_lockname[mode],
365                          ldlm_lockname[lock->l_granted_mode]);
366                 count++;
367                 if (lock->l_blocking_ast && lock->l_granted_mode != LCK_GROUP)
368                         ldlm_add_ast_work_item(lock, enq, work_list);
369         }
370
371         /* don't count conflicting glimpse locks */
372         extent = ldlm_interval_extent(node);
373         if (!(mode == LCK_PR && extent->start == 0 &&
374             extent->end == OBD_OBJECT_EOF))
375                 *priv->locks += count;
376
377         if (priv->compat)
378                 *priv->compat = 0;
379
380         RETURN(INTERVAL_ITER_CONT);
381 }
382
383 /**
384  * Determine if the lock is compatible with all locks on the queue.
385  *
386  * If \a work_list is provided, conflicting locks are linked there.
387  * If \a work_list is not provided, we exit this function on first conflict.
388  *
389  * \retval 0 if the lock is not compatible
390  * \retval 1 if the lock is compatible
391  * \retval 2 if \a req is a group lock and it is compatible and requires
392  *           no further checking
393  * \retval negative error, such as EAGAIN for group locks
394  */
395 static int
396 ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
397                          __u64 *flags, struct list_head *work_list,
398                          int *contended_locks)
399 {
400         struct ldlm_resource *res = req->l_resource;
401         enum ldlm_mode req_mode = req->l_req_mode;
402         __u64 req_start = req->l_req_extent.start;
403         __u64 req_end = req->l_req_extent.end;
404         struct ldlm_lock *lock;
405         int check_contention;
406         int compat = 1;
407
408         ENTRY;
409
410         lockmode_verify(req_mode);
411
412         /* Using interval tree for granted lock */
413         if (queue == &res->lr_granted) {
414                 struct ldlm_interval_tree *tree;
415                 struct ldlm_extent_compat_args data = {
416                         .work_list = work_list,
417                         .lock = req,
418                         .locks = contended_locks,
419                         .compat = &compat };
420                 struct interval_node_extent ex = {
421                         .start = req_start,
422                         .end = req_end };
423                 int idx, rc;
424
425                 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
426                         tree = &res->lr_itree[idx];
427                         if (tree->lit_root == NULL) /* empty tree, skipped */
428                                 continue;
429
430                         data.mode = tree->lit_mode;
431                         if (lockmode_compat(req_mode, tree->lit_mode)) {
432                                 struct ldlm_interval *node;
433                                 struct ldlm_extent *extent;
434
435                                 if (req_mode != LCK_GROUP)
436                                         continue;
437
438                                 /* group lock,grant it immediately if
439                                  * compatible */
440                                 node = to_ldlm_interval(tree->lit_root);
441                                 extent = ldlm_interval_extent(node);
442                                 if (req->l_policy_data.l_extent.gid ==
443                                     extent->gid)
444                                         RETURN(2);
445                         }
446
447                         if (tree->lit_mode == LCK_GROUP) {
448                                 if (*flags & (LDLM_FL_BLOCK_NOWAIT |
449                                     LDLM_FL_SPECULATIVE)) {
450                                         compat = -EAGAIN;
451                                         goto destroylock;
452                                 }
453
454                                 if (!work_list)
455                                         RETURN(0);
456
457                                 /* if work list is not NULL,add all
458                                  * locks in the tree to work list
459                                  */
460                                 compat = 0;
461                                 interval_iterate(tree->lit_root,
462                                                  ldlm_extent_compat_cb, &data);
463                                 continue;
464                         }
465
466                         /* We've found a potentially blocking lock, check
467                          * compatibility.  This handles locks other than GROUP
468                          * locks, which are handled separately above.
469                          *
470                          * Locks with FL_SPECULATIVE are asynchronous requests
471                          * which must never wait behind another lock, so they
472                          * fail if any conflicting lock is found.
473                          */
474                         if (!work_list || (*flags & LDLM_FL_SPECULATIVE)) {
475                                 rc = interval_is_overlapped(tree->lit_root,
476                                                             &ex);
477                                 if (rc) {
478                                         if (!work_list) {
479                                                 RETURN(0);
480                                         } else {
481                                                 compat = -EAGAIN;
482                                                 goto destroylock;
483                                         }
484                                 }
485                         } else {
486                                 interval_search(tree->lit_root, &ex,
487                                                 ldlm_extent_compat_cb, &data);
488                                 if (!list_empty(work_list) && compat)
489                                         compat = 0;
490                         }
491                 }
492         } else { /* for waiting queue */
493                 list_for_each_entry(lock, queue, l_res_link) {
494                         check_contention = 1;
495
496                         /* We stop walking the queue if we hit ourselves so
497                          * we don't take conflicting locks enqueued after us
498                          * into account, or we'd wait forever.
499                          */
500                         if (req == lock)
501                                 break;
502
503                         /* locks are compatible, overlap doesn't matter */
504                         if (lockmode_compat(lock->l_req_mode, req_mode)) {
505                                 if (req_mode == LCK_PR &&
506                                 ((lock->l_policy_data.l_extent.start <=
507                                 req->l_policy_data.l_extent.start) &&
508                                 (lock->l_policy_data.l_extent.end >=
509                                 req->l_policy_data.l_extent.end))) {
510                                         /* If we met a PR lock just like us or
511                                          * wider, and nobody down the list
512                                          * conflicted with it, that means we
513                                          * can skip processing of the rest of
514                                          * the list and safely place ourselves
515                                          * at the end of the list, or grant
516                                          * (dependent if we met an conflicting
517                                          * locks before in the list).  In case
518                                          * of 1st enqueue only we continue
519                                          * traversing if there is something
520                                          * conflicting down the list because
521                                          * we need to make sure that something
522                                          * is marked as AST_SENT as well, in
523                                          * case of empy worklist we would exit
524                                          * on first conflict met.
525                                          */
526
527                                         /* There IS a case where such flag is
528                                          * not set for a lock, yet it blocks
529                                          * something. Luckily for us this is
530                                          * only during destroy, so lock is
531                                          * exclusive. So here we are safe
532                                          */
533                                         if (!ldlm_is_ast_sent(lock))
534                                                 RETURN(compat);
535                                 }
536
537                                 /* non-group locks are compatible,
538                                  * overlap doesn't matter
539                                  */
540                                 if (likely(req_mode != LCK_GROUP))
541                                         continue;
542
543                                 /* If we are trying to get a GROUP lock and
544                                  * there is another one of this kind, we need
545                                  * to compare gid
546                                  */
547                                 if (req->l_policy_data.l_extent.gid ==
548                                 lock->l_policy_data.l_extent.gid) {
549                                         /* If existing lock with matched gid
550                                          * is granted, we grant new one too.
551                                          */
552                                         if (ldlm_is_granted(lock))
553                                                 RETURN(2);
554
555                                         /* Otherwise we are scanning queue of
556                                          * waiting locks and it means current
557                                          * request would block along with
558                                          * existing lock (that is already
559                                          * blocked.  If we are in nonblocking
560                                          * mode - return immediately
561                                          */
562                                         if (*flags & (LDLM_FL_BLOCK_NOWAIT |
563                                                       LDLM_FL_SPECULATIVE)) {
564                                                 compat = -EAGAIN;
565                                                 goto destroylock;
566                                         }
567                                         /* If this group lock is compatible with
568                                          * another group lock on the waiting
569                                          * list, they must be together in the
570                                          * list, so they can be granted at the
571                                          * same time.  Otherwise the later lock
572                                          * can get stuck behind another,
573                                          * incompatible, lock.
574                                          */
575                                         ldlm_resource_insert_lock_after(lock,
576                                                                         req);
577                                         /* Because 'lock' is not granted, we can
578                                          * stop processing this queue and return
579                                          * immediately. There is no need to
580                                          * check the rest of the list.
581                                          */
582                                         RETURN(0);
583                                 }
584                         }
585
586                         if (unlikely(req_mode == LCK_GROUP &&
587                             !ldlm_is_granted(lock))) {
588                                 compat = 0;
589                                 if (lock->l_req_mode != LCK_GROUP) {
590                                         /* Ok, we hit non-GROUP lock, there
591                                          * should be no more GROUP locks later
592                                          * on, queue in front of first
593                                          * non-GROUP lock
594                                          */
595
596                                         ldlm_resource_insert_lock_before(lock,
597                                                                          req);
598                                         break;
599                                 }
600                                 LASSERT(req->l_policy_data.l_extent.gid !=
601                                         lock->l_policy_data.l_extent.gid);
602                                 continue;
603                         }
604
605                         if (unlikely(lock->l_req_mode == LCK_GROUP)) {
606                                 /* If compared lock is GROUP, then requested is
607                                  * PR/PW so this is not compatible; extent
608                                  * range does not matter
609                                  */
610                                 if (*flags & (LDLM_FL_BLOCK_NOWAIT |
611                                     LDLM_FL_SPECULATIVE)) {
612                                         compat = -EAGAIN;
613                                         goto destroylock;
614                                 }
615                         } else if (lock->l_policy_data.l_extent.end < req_start ||
616                                    lock->l_policy_data.l_extent.start > req_end) {
617                                 /* if non group lock doesn't overlap skip it */
618                                 continue;
619                         } else if (lock->l_req_extent.end < req_start ||
620                                    lock->l_req_extent.start > req_end) {
621                                 /* false contention, the requests doesn't
622                                  * really overlap */
623                                 check_contention = 0;
624                         }
625
626                         if (!work_list)
627                                 RETURN(0);
628
629                         if (*flags & LDLM_FL_SPECULATIVE) {
630                                 compat = -EAGAIN;
631                                 goto destroylock;
632                         }
633
634                         /* don't count conflicting glimpse locks */
635                         if (lock->l_req_mode == LCK_PR &&
636                             lock->l_policy_data.l_extent.start == 0 &&
637                              lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
638                                 check_contention = 0;
639
640                         *contended_locks += check_contention;
641
642                         compat = 0;
643                         if (lock->l_blocking_ast &&
644                             lock->l_req_mode != LCK_GROUP)
645                                 ldlm_add_ast_work_item(lock, req, work_list);
646                 }
647         }
648
649         if (ldlm_check_contention(req, *contended_locks) &&
650             compat == 0 && (*flags & LDLM_FL_DENY_ON_CONTENTION) &&
651              req->l_req_mode != LCK_GROUP && req_end - req_start <=
652               ldlm_res_to_ns(req->l_resource)->ns_max_nolock_size)
653                 GOTO(destroylock, compat = -EUSERS);
654
655         RETURN(compat);
656 destroylock:
657         list_del_init(&req->l_res_link);
658         ldlm_lock_destroy_nolock(req);
659         RETURN(compat);
660         }
661
662 /**
663  * This function refresh eviction timer for cancelled lock.
664  * \param[in] lock              ldlm lock for refresh
665  * \param[in] arg               ldlm prolong arguments, timeout, export, extent
666  *                              and counter are used
667  */
668 void ldlm_lock_prolong_one(struct ldlm_lock *lock,
669                            struct ldlm_prolong_args *arg)
670 {
671         timeout_t timeout;
672
673         CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_PROLONG_PAUSE, 3);
674
675         if (arg->lpa_export != lock->l_export ||
676             lock->l_flags & LDLM_FL_DESTROYED)
677                 /* ignore unrelated locks */
678                 return;
679
680         arg->lpa_locks_cnt++;
681
682         if (!(lock->l_flags & LDLM_FL_AST_SENT))
683                 /* ignore locks not being cancelled */
684                 return;
685
686         arg->lpa_blocks_cnt++;
687
688         /* OK. this is a possible lock the user holds doing I/O
689          * let's refresh eviction timer for it.
690          */
691         timeout = ldlm_bl_timeout_by_rpc(arg->lpa_req);
692         LDLM_DEBUG(lock, "refreshed to %ds", timeout);
693         ldlm_refresh_waiting_lock(lock, timeout);
694 }
695 EXPORT_SYMBOL(ldlm_lock_prolong_one);
696
697 static enum interval_iter ldlm_resource_prolong_cb(struct interval_node *n,
698                                                    void *data)
699 {
700         struct ldlm_prolong_args *arg = data;
701         struct ldlm_interval *node = to_ldlm_interval(n);
702         struct ldlm_lock *lock;
703
704         ENTRY;
705
706         LASSERT(!list_empty(&node->li_group));
707
708         list_for_each_entry(lock, &node->li_group, l_sl_policy) {
709                 ldlm_lock_prolong_one(lock, arg);
710         }
711
712         RETURN(INTERVAL_ITER_CONT);
713 }
714
715 /**
716  * Walk through granted tree and prolong locks if they overlaps extent.
717  *
718  * \param[in] arg               prolong args
719  */
720 void ldlm_resource_prolong(struct ldlm_prolong_args *arg)
721 {
722         struct ldlm_interval_tree *tree;
723         struct ldlm_resource *res;
724         struct interval_node_extent ex = { .start = arg->lpa_extent.start,
725                                         .end = arg->lpa_extent.end };
726         int idx;
727
728         ENTRY;
729
730         res = ldlm_resource_get(arg->lpa_export->exp_obd->obd_namespace,
731                                 &arg->lpa_resid, LDLM_EXTENT, 0);
732         if (IS_ERR(res)) {
733                 CDEBUG(D_DLMTRACE, "Failed to get resource for resid %llu/%llu\n",
734                 arg->lpa_resid.name[0], arg->lpa_resid.name[1]);
735                 RETURN_EXIT;
736         }
737
738         lock_res(res);
739         for (idx = 0; idx < LCK_MODE_NUM; idx++) {
740                 tree = &res->lr_itree[idx];
741                 if (tree->lit_root == NULL) /* empty tree, skipped */
742                         continue;
743
744                 /* There is no possibility to check for the groupID
745                  * so all the group locks are considered as valid
746                  * here, especially because the client is supposed
747                  * to check it has such a lock before sending an RPC.
748                  */
749                 if (!(tree->lit_mode & arg->lpa_mode))
750                         continue;
751
752                 interval_search(tree->lit_root, &ex,
753                                 ldlm_resource_prolong_cb, arg);
754         }
755         unlock_res(res);
756         ldlm_resource_putref(res);
757
758         EXIT;
759 }
760 EXPORT_SYMBOL(ldlm_resource_prolong);
761
762 /**
763  * Process a granting attempt for extent lock.
764  * Must be called with ns lock held.
765  *
766  * This function looks for any conflicts for \a lock in the granted or
767  * waiting queues. The lock is granted if no conflicts are found in
768  * either queue.
769  */
770 int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags,
771                 enum ldlm_process_intention intention,
772                 enum ldlm_error *err, struct list_head *work_list)
773 {
774         struct ldlm_resource *res = lock->l_resource;
775         int rc, rc2 = 0;
776         int contended_locks = 0;
777         struct list_head *grant_work = intention == LDLM_PROCESS_ENQUEUE ?
778                                                         NULL : work_list;
779         ENTRY;
780
781         LASSERT(!ldlm_is_granted(lock));
782         LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
783                 !ldlm_is_ast_discard_data(lock));
784         check_res_locked(res);
785         *err = ELDLM_OK;
786
787         if (intention == LDLM_PROCESS_RESCAN) {
788                 /* Careful observers will note that we don't handle -EAGAIN
789                  * here, but it's ok for a non-obvious reason -- compat_queue
790                  * can only return -EAGAIN if (flags & BLOCK_NOWAIT |
791                  * SPECULATIVE). flags should always be zero here, and if that
792                  * ever stops being true, we want to find out. */
793                 LASSERT(*flags == 0);
794                 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
795                                         NULL, &contended_locks);
796                 if (rc == 1) {
797                         rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
798                                                 flags, NULL,
799                                                 &contended_locks);
800                 }
801                 if (rc == 0)
802                         RETURN(LDLM_ITER_STOP);
803
804                 ldlm_resource_unlink_lock(lock);
805
806                 if (!CFS_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE))
807                         ldlm_extent_policy(res, lock, flags);
808                 ldlm_grant_lock(lock, grant_work);
809                 RETURN(LDLM_ITER_CONTINUE);
810         }
811
812         contended_locks = 0;
813         rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
814                                 work_list, &contended_locks);
815         if (rc < 0)
816                 GOTO(out, *err = rc);
817
818         if (rc != 2) {
819                 rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock,
820                                         flags, work_list,
821                                         &contended_locks);
822                 if (rc2 < 0)
823                         GOTO(out, *err = rc = rc2);
824         }
825
826         if (rc + rc2 == 2) {
827                 ldlm_extent_policy(res, lock, flags);
828                 ldlm_resource_unlink_lock(lock);
829                 ldlm_grant_lock(lock, grant_work);
830         } else {
831                 /* Adding LDLM_FL_NO_TIMEOUT flag to granted lock to
832                  * force client to wait for the lock endlessly once
833                  * the lock is enqueued -bzzz */
834                 *flags |= LDLM_FL_NO_TIMEOUT;
835         }
836
837         RETURN(LDLM_ITER_CONTINUE);
838 out:
839         return rc;
840 }
841 #endif /* HAVE_SERVER_SUPPORT */
842
843 struct ldlm_kms_shift_args {
844         __u64   old_kms;
845         __u64   kms;
846         bool    complete;
847 };
848
849 /* Callback for interval_iterate functions, used by ldlm_extent_shift_Kms */
850 static enum interval_iter ldlm_kms_shift_cb(struct interval_node *n,
851                                             void *args)
852 {
853         struct ldlm_kms_shift_args *arg = args;
854         struct ldlm_interval *node = to_ldlm_interval(n);
855         struct ldlm_lock *tmplock;
856         struct ldlm_lock *lock = NULL;
857
858         ENTRY;
859
860         /* Since all locks in an interval have the same extent, we can just
861          * use the first lock without kms_ignore set. */
862         list_for_each_entry(tmplock, &node->li_group, l_sl_policy) {
863                 if (ldlm_is_kms_ignore(tmplock))
864                         continue;
865
866                 lock = tmplock;
867
868                 break;
869         }
870
871         /* No locks in this interval without kms_ignore set */
872         if (!lock)
873                 RETURN(INTERVAL_ITER_CONT);
874
875         /* If we find a lock with a greater or equal kms, we are not the
876          * highest lock (or we share that distinction with another lock), and
877          * don't need to update KMS.  Return old_kms and stop looking. */
878         if (lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF ||
879         lock->l_policy_data.l_extent.end + 1 >= arg->old_kms) {
880                 arg->kms = arg->old_kms;
881                 arg->complete = true;
882                 RETURN(INTERVAL_ITER_STOP);
883         }
884
885         if (lock->l_policy_data.l_extent.end + 1 > arg->kms)
886                 arg->kms = lock->l_policy_data.l_extent.end + 1;
887
888         /* Since interval_iterate_reverse starts with the highest lock and
889          * works down, for PW locks, we only need to check if we should update
890          * the kms, then stop walking the tree.  PR locks are not exclusive, so
891          * the highest start does not imply the highest end and we must
892          * continue. (Only one group lock is allowed per resource, so this is
893          * irrelevant for group locks.)*/
894         if (lock->l_granted_mode == LCK_PW)
895                 RETURN(INTERVAL_ITER_STOP);
896         else
897                 RETURN(INTERVAL_ITER_CONT);
898 }
899
900 /* When a lock is cancelled by a client, the KMS may undergo change if this
901  * is the "highest lock".  This function returns the new KMS value, updating
902  * it only if we were the highest lock.
903  *
904  * Caller must hold lr_lock already.
905  *
906  * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes!
907  */
908 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
909 {
910         struct ldlm_resource *res = lock->l_resource;
911         struct ldlm_interval_tree *tree;
912         struct ldlm_kms_shift_args args;
913         int idx = 0;
914
915         ENTRY;
916
917         args.old_kms = old_kms;
918         args.kms = 0;
919         args.complete = false;
920
921         /* don't let another thread in ldlm_extent_shift_kms race in
922          * just after we finish and take our lock into account in its
923          * calculation of the kms */
924         ldlm_set_kms_ignore(lock);
925
926         /* We iterate over the lock trees, looking for the largest kms smaller
927          * than the current one. */
928         for (idx = 0; idx < LCK_MODE_NUM; idx++) {
929                 tree = &res->lr_itree[idx];
930
931                 /* If our already known kms is >= than the highest 'end' in
932                  * this tree, we don't need to check this tree, because
933                  * the kms from a tree can be lower than in_max_high (due to
934                  * kms_ignore), but it can never be higher. */
935                 if (!tree->lit_root || args.kms >= tree->lit_root->in_max_high)
936                         continue;
937
938                 interval_iterate_reverse(tree->lit_root, ldlm_kms_shift_cb,
939                                          &args);
940
941                 /* this tells us we're not the highest lock, so we don't need
942                  * to check the remaining trees */
943                 if (args.complete)
944                         break;
945         }
946
947         LASSERTF(args.kms <= args.old_kms, "kms %llu old_kms %llu\n", args.kms,
948                  args.old_kms);
949
950         RETURN(args.kms);
951 }
952 EXPORT_SYMBOL(ldlm_extent_shift_kms);
953
954 struct kmem_cache *ldlm_interval_slab;
955 static struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock)
956 {
957         struct ldlm_interval *node;
958
959         ENTRY;
960
961         LASSERT(lock->l_resource->lr_type == LDLM_EXTENT ||
962                 lock->l_resource->lr_type == LDLM_FLOCK);
963         OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS);
964         if (node == NULL)
965                 RETURN(NULL);
966
967         INIT_LIST_HEAD(&node->li_group);
968         ldlm_interval_attach(node, lock);
969         RETURN(node);
970 }
971
972 void ldlm_interval_free(struct ldlm_interval *node)
973 {
974         if (node) {
975                 LASSERT(list_empty(&node->li_group));
976                 LASSERT(!interval_is_intree(&node->li_node));
977                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
978         }
979 }
980
981 /* interval tree, for LDLM_EXTENT. */
982 void ldlm_interval_attach(struct ldlm_interval *n,
983                           struct ldlm_lock *l)
984 {
985         LASSERT(l->l_tree_node == NULL);
986         LASSERT(l->l_resource->lr_type == LDLM_EXTENT ||
987                 l->l_resource->lr_type == LDLM_FLOCK);
988
989         list_add_tail(&l->l_sl_policy, &n->li_group);
990         l->l_tree_node = n;
991 }
992
993 struct ldlm_interval *ldlm_interval_detach(struct ldlm_lock *l)
994 {
995         struct ldlm_interval *n = l->l_tree_node;
996
997         if (n == NULL)
998                 return NULL;
999
1000         LASSERT(!list_empty(&n->li_group));
1001         l->l_tree_node = NULL;
1002         list_del_init(&l->l_sl_policy);
1003
1004         return list_empty(&n->li_group) ? n : NULL;
1005 }
1006
1007 static inline int ldlm_mode_to_index(enum ldlm_mode mode)
1008 {
1009         int index;
1010
1011         LASSERT(mode != 0);
1012         LASSERT(is_power_of_2(mode));
1013         index = ilog2(mode);
1014         LASSERT(index < LCK_MODE_NUM);
1015         return index;
1016 }
1017
1018 int ldlm_extent_alloc_lock(struct ldlm_lock *lock)
1019 {
1020         lock->l_tree_node = NULL;
1021
1022         if (ldlm_interval_alloc(lock) == NULL)
1023                 return -ENOMEM;
1024         return 0;
1025 }
1026
1027 /** Add newly granted lock into interval tree for the resource. */
1028 void ldlm_extent_add_lock(struct ldlm_resource *res,
1029                           struct ldlm_lock *lock)
1030 {
1031         struct interval_node *found, **root;
1032         struct ldlm_interval *node;
1033         struct ldlm_extent *extent;
1034         int idx, rc;
1035
1036         LASSERT(ldlm_is_granted(lock));
1037
1038         node = lock->l_tree_node;
1039         LASSERT(node != NULL);
1040         LASSERT(!interval_is_intree(&node->li_node));
1041
1042         idx = ldlm_mode_to_index(lock->l_granted_mode);
1043         LASSERT(lock->l_granted_mode == BIT(idx));
1044         LASSERT(lock->l_granted_mode == res->lr_itree[idx].lit_mode);
1045
1046         /* node extent initialize */
1047         extent = &lock->l_policy_data.l_extent;
1048
1049         rc = interval_set(&node->li_node, extent->start, extent->end);
1050         LASSERT(!rc);
1051
1052         root = &res->lr_itree[idx].lit_root;
1053         found = interval_insert(&node->li_node, root);
1054         if (found) { /* The policy group found. */
1055                 struct ldlm_interval *tmp = ldlm_interval_detach(lock);
1056
1057                 LASSERT(tmp != NULL);
1058                 ldlm_interval_free(tmp);
1059                 ldlm_interval_attach(to_ldlm_interval(found), lock);
1060         }
1061         res->lr_itree[idx].lit_size++;
1062
1063         /* even though we use interval tree to manage the extent lock, we also
1064          * add the locks into grant list, for debug purpose, .. */
1065         ldlm_resource_add_lock(res, &res->lr_granted, lock);
1066
1067         if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_GRANT_CHECK)) {
1068                 struct ldlm_lock *lck;
1069
1070                 list_for_each_entry_reverse(lck, &res->lr_granted,
1071                                         l_res_link) {
1072                         if (lck == lock)
1073                                 continue;
1074                         if (lockmode_compat(lck->l_granted_mode,
1075                                         lock->l_granted_mode))
1076                                 continue;
1077                         if (ldlm_extent_overlap(&lck->l_req_extent,
1078                                                 &lock->l_req_extent)) {
1079                                 CDEBUG(D_ERROR,
1080                                        "granting conflicting lock %p %p\n",
1081                                        lck, lock);
1082                                 ldlm_resource_dump(D_ERROR, res);
1083                                 LBUG();
1084                         }
1085                 }
1086         }
1087 }
1088
1089 /** Remove cancelled lock from resource interval tree. */
1090 void ldlm_extent_unlink_lock(struct ldlm_lock *lock)
1091 {
1092         struct ldlm_resource *res = lock->l_resource;
1093         struct ldlm_interval *node = lock->l_tree_node;
1094         struct ldlm_interval_tree *tree;
1095         int idx;
1096
1097         if (!node || !interval_is_intree(&node->li_node)) /* duplicate unlink */
1098                 return;
1099
1100         idx = ldlm_mode_to_index(lock->l_granted_mode);
1101         LASSERT(lock->l_granted_mode == BIT(idx));
1102         tree = &res->lr_itree[idx];
1103
1104         LASSERT(tree->lit_root != NULL); /* assure the tree is not null */
1105
1106         tree->lit_size--;
1107         node = ldlm_interval_detach(lock);
1108         if (node) {
1109                 interval_erase(&node->li_node, &tree->lit_root);
1110                 ldlm_interval_free(node);
1111         }
1112 }
1113
1114 void ldlm_extent_policy_wire_to_local(const union ldlm_wire_policy_data *wpolicy,
1115                                       union ldlm_policy_data *lpolicy)
1116 {
1117         lpolicy->l_extent.start = wpolicy->l_extent.start;
1118         lpolicy->l_extent.end = wpolicy->l_extent.end;
1119         lpolicy->l_extent.gid = wpolicy->l_extent.gid;
1120 }
1121
1122 void ldlm_extent_policy_local_to_wire(const union ldlm_policy_data *lpolicy,
1123                                       union ldlm_wire_policy_data *wpolicy)
1124 {
1125         memset(wpolicy, 0, sizeof(*wpolicy));
1126         wpolicy->l_extent.start = lpolicy->l_extent.start;
1127         wpolicy->l_extent.end = lpolicy->l_extent.end;
1128         wpolicy->l_extent.gid = lpolicy->l_extent.gid;
1129 }