Whamcloud - gitweb
LU-8272 ldlm: Use interval tree to update kms
[fs/lustre-release.git] / lustre / ldlm / ldlm_extent.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2010, 2013, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/ldlm/ldlm_extent.c
33  *
34  * Author: Peter Braam <braam@clusterfs.com>
35  * Author: Phil Schwan <phil@clusterfs.com>
36  */
37
38 /**
39  * This file contains implementation of EXTENT lock type
40  *
41  * EXTENT lock type is for locking a contiguous range of values, represented
42  * by 64-bit starting and ending offsets (inclusive). There are several extent
43  * lock modes, some of which may be mutually incompatible. Extent locks are
44  * considered incompatible if their modes are incompatible and their extents
45  * intersect.  See the lock mode compatibility matrix in lustre_dlm.h.
46  */
47
48 #define DEBUG_SUBSYSTEM S_LDLM
49
50 #include <libcfs/libcfs.h>
51 #include <lustre_dlm.h>
52 #include <obd_support.h>
53 #include <obd.h>
54 #include <obd_class.h>
55 #include <lustre_lib.h>
56
57 #include "ldlm_internal.h"
58
59 #ifdef HAVE_SERVER_SUPPORT
60 # define LDLM_MAX_GROWN_EXTENT (32 * 1024 * 1024 - 1)
61
62 /**
63  * Fix up the ldlm_extent after expanding it.
64  *
65  * After expansion has been done, we might still want to do certain adjusting
66  * based on overall contention of the resource and the like to avoid granting
67  * overly wide locks.
68  */
69 static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
70                                               struct ldlm_extent *new_ex,
71                                               int conflicting)
72 {
73         enum ldlm_mode req_mode = req->l_req_mode;
74         __u64 req_start = req->l_req_extent.start;
75         __u64 req_end = req->l_req_extent.end;
76         __u64 req_align, mask;
77
78         if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) {
79                 if (req_end < req_start + LDLM_MAX_GROWN_EXTENT)
80                         new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT,
81                                           new_ex->end);
82         }
83
84         if (new_ex->start == 0 && new_ex->end == OBD_OBJECT_EOF) {
85                 EXIT;
86                 return;
87         }
88
89         /* we need to ensure that the lock extent is properly aligned to what
90          * the client requested. Also we need to make sure it's also server
91          * page size aligned otherwise a server page can be covered by two
92          * write locks. */
93         mask = PAGE_SIZE;
94         req_align = (req_end + 1) | req_start;
95         if (req_align != 0 && (req_align & (mask - 1)) == 0) {
96                 while ((req_align & mask) == 0)
97                         mask <<= 1;
98         }
99         mask -= 1;
100         /* We can only shrink the lock, not grow it.
101          * This should never cause lock to be smaller than requested,
102          * since requested lock was already aligned on these boundaries. */
103         new_ex->start = ((new_ex->start - 1) | mask) + 1;
104         new_ex->end = ((new_ex->end + 1) & ~mask) - 1;
105         LASSERTF(new_ex->start <= req_start,
106                  "mask %#llx grant start %llu req start %llu\n",
107                  mask, new_ex->start, req_start);
108         LASSERTF(new_ex->end >= req_end,
109                  "mask %#llx grant end %llu req end %llu\n",
110                  mask, new_ex->end, req_end);
111 }
112
113 /**
114  * Return the maximum extent that:
115  * - contains the requested extent
116  * - does not overlap existing conflicting extents outside the requested one
117  *
118  * This allows clients to request a small required extent range, but if there
119  * is no contention on the lock the full lock can be granted to the client.
120  * This avoids the need for many smaller lock requests to be granted in the
121  * common (uncontended) case.
122  *
123  * Use interval tree to expand the lock extent for granted lock.
124  */
125 static void ldlm_extent_internal_policy_granted(struct ldlm_lock *req,
126                                                 struct ldlm_extent *new_ex)
127 {
128         struct ldlm_resource *res = req->l_resource;
129         enum ldlm_mode req_mode = req->l_req_mode;
130         __u64 req_start = req->l_req_extent.start;
131         __u64 req_end = req->l_req_extent.end;
132         struct ldlm_interval_tree *tree;
133         struct interval_node_extent limiter = { new_ex->start, new_ex->end };
134         int conflicting = 0;
135         int idx;
136         ENTRY;
137
138         lockmode_verify(req_mode);
139
140         /* Using interval tree to handle the LDLM extent granted locks. */
141         for (idx = 0; idx < LCK_MODE_NUM; idx++) {
142                 struct interval_node_extent ext = { req_start, req_end };
143
144                 tree = &res->lr_itree[idx];
145                 if (lockmode_compat(tree->lit_mode, req_mode))
146                         continue;
147
148                 conflicting += tree->lit_size;
149                 if (conflicting > 4)
150                         limiter.start = req_start;
151
152                 if (interval_is_overlapped(tree->lit_root, &ext))
153                         CDEBUG(D_INFO, 
154                                "req_mode = %d, tree->lit_mode = %d, "
155                                "tree->lit_size = %d\n",
156                                req_mode, tree->lit_mode, tree->lit_size);
157                 interval_expand(tree->lit_root, &ext, &limiter);
158                 limiter.start = max(limiter.start, ext.start);
159                 limiter.end = min(limiter.end, ext.end);
160                 if (limiter.start == req_start && limiter.end == req_end)
161                         break;
162         }
163
164         new_ex->start = limiter.start;
165         new_ex->end = limiter.end;
166         LASSERT(new_ex->start <= req_start);
167         LASSERT(new_ex->end >= req_end);
168
169         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
170         EXIT;
171 }
172
173 /* The purpose of this function is to return:
174  * - the maximum extent
175  * - containing the requested extent
176  * - and not overlapping existing conflicting extents outside the requested one
177  */
178 static void
179 ldlm_extent_internal_policy_waiting(struct ldlm_lock *req,
180                                     struct ldlm_extent *new_ex)
181 {
182         struct ldlm_resource *res = req->l_resource;
183         enum ldlm_mode req_mode = req->l_req_mode;
184         __u64 req_start = req->l_req_extent.start;
185         __u64 req_end = req->l_req_extent.end;
186         struct ldlm_lock *lock;
187         int conflicting = 0;
188         ENTRY;
189
190         lockmode_verify(req_mode);
191
192         /* for waiting locks */
193         list_for_each_entry(lock, &res->lr_waiting, l_res_link) {
194                 struct ldlm_extent *l_extent = &lock->l_policy_data.l_extent;
195
196                 /* We already hit the minimum requested size, search no more */
197                 if (new_ex->start == req_start && new_ex->end == req_end) {
198                         EXIT;
199                         return;
200                 }
201
202                 /* Don't conflict with ourselves */
203                 if (req == lock)
204                         continue;
205
206                 /* Locks are compatible, overlap doesn't matter */
207                 /* Until bug 20 is fixed, try to avoid granting overlapping
208                  * locks on one client (they take a long time to cancel) */
209                 if (lockmode_compat(lock->l_req_mode, req_mode) &&
210                     lock->l_export != req->l_export)
211                         continue;
212
213                 /* If this is a high-traffic lock, don't grow downwards at all
214                  * or grow upwards too much */
215                 ++conflicting;
216                 if (conflicting > 4)
217                         new_ex->start = req_start;
218
219                 /* If lock doesn't overlap new_ex, skip it. */
220                 if (!ldlm_extent_overlap(l_extent, new_ex))
221                         continue;
222
223                 /* Locks conflicting in requested extents and we can't satisfy
224                  * both locks, so ignore it.  Either we will ping-pong this
225                  * extent (we would regardless of what extent we granted) or
226                  * lock is unused and it shouldn't limit our extent growth. */
227                 if (ldlm_extent_overlap(&lock->l_req_extent,&req->l_req_extent))
228                         continue;
229
230                 /* We grow extents downwards only as far as they don't overlap
231                  * with already-granted locks, on the assumption that clients
232                  * will be writing beyond the initial requested end and would
233                  * then need to enqueue a new lock beyond previous request.
234                  * l_req_extent->end strictly < req_start, checked above. */
235                 if (l_extent->start < req_start && new_ex->start != req_start) {
236                         if (l_extent->end >= req_start)
237                                 new_ex->start = req_start;
238                         else
239                                 new_ex->start = min(l_extent->end+1, req_start);
240                 }
241
242                 /* If we need to cancel this lock anyways because our request
243                  * overlaps the granted lock, we grow up to its requested
244                  * extent start instead of limiting this extent, assuming that
245                  * clients are writing forwards and the lock had over grown
246                  * its extent downwards before we enqueued our request. */
247                 if (l_extent->end > req_end) {
248                         if (l_extent->start <= req_end)
249                                 new_ex->end = max(lock->l_req_extent.start - 1,
250                                                   req_end);
251                         else
252                                 new_ex->end = max(l_extent->start - 1, req_end);
253                 }
254         }
255
256         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
257         EXIT;
258 }
259
260
261 /* In order to determine the largest possible extent we can grant, we need
262  * to scan all of the queues. */
263 static void ldlm_extent_policy(struct ldlm_resource *res,
264                                struct ldlm_lock *lock, __u64 *flags)
265 {
266         struct ldlm_extent new_ex = { .start = 0, .end = OBD_OBJECT_EOF };
267
268         if (lock->l_export == NULL)
269                 /*
270                  * this is local lock taken by server (e.g., as a part of
271                  * OST-side locking, or unlink handling). Expansion doesn't
272                  * make a lot of sense for local locks, because they are
273                  * dropped immediately on operation completion and would only
274                  * conflict with other threads.
275                  */
276                 return;
277
278         if (lock->l_policy_data.l_extent.start == 0 &&
279             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
280                 /* fast-path whole file locks */
281                 return;
282
283         ldlm_extent_internal_policy_granted(lock, &new_ex);
284         ldlm_extent_internal_policy_waiting(lock, &new_ex);
285
286         if (new_ex.start != lock->l_policy_data.l_extent.start ||
287             new_ex.end != lock->l_policy_data.l_extent.end) {
288                 *flags |= LDLM_FL_LOCK_CHANGED;
289                 lock->l_policy_data.l_extent.start = new_ex.start;
290                 lock->l_policy_data.l_extent.end = new_ex.end;
291         }
292 }
293
294 static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
295 {
296         struct ldlm_resource *res = lock->l_resource;
297         cfs_time_t now = cfs_time_current();
298
299         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_SET_CONTENTION))
300                 return 1;
301
302         CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
303         if (contended_locks > ldlm_res_to_ns(res)->ns_contended_locks)
304                 res->lr_contention_time = now;
305         return cfs_time_before(now, cfs_time_add(res->lr_contention_time,
306                 cfs_time_seconds(ldlm_res_to_ns(res)->ns_contention_time)));
307 }
308
309 struct ldlm_extent_compat_args {
310         struct list_head *work_list;
311         struct ldlm_lock *lock;
312         enum ldlm_mode mode;
313         int *locks;
314         int *compat;
315 };
316
317 static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
318                                                 void *data)
319 {
320         struct ldlm_extent_compat_args *priv = data;
321         struct ldlm_interval *node = to_ldlm_interval(n);
322         struct ldlm_extent *extent;
323         struct list_head *work_list = priv->work_list;
324         struct ldlm_lock *lock, *enq = priv->lock;
325         enum ldlm_mode mode = priv->mode;
326         int count = 0;
327         ENTRY;
328
329         LASSERT(!list_empty(&node->li_group));
330
331         list_for_each_entry(lock, &node->li_group, l_sl_policy) {
332                 /* interval tree is for granted lock */
333                 LASSERTF(mode == lock->l_granted_mode,
334                          "mode = %s, lock->l_granted_mode = %s\n",
335                          ldlm_lockname[mode],
336                          ldlm_lockname[lock->l_granted_mode]);
337                 count++;
338                 if (lock->l_blocking_ast &&
339                     lock->l_granted_mode != LCK_GROUP)
340                         ldlm_add_ast_work_item(lock, enq, work_list);
341         }
342
343         /* don't count conflicting glimpse locks */
344         extent = ldlm_interval_extent(node);
345         if (!(mode == LCK_PR &&
346             extent->start == 0 && extent->end == OBD_OBJECT_EOF))
347                 *priv->locks += count;
348
349         if (priv->compat)
350                 *priv->compat = 0;
351
352         RETURN(INTERVAL_ITER_CONT);
353 }
354
355 /**
356  * Determine if the lock is compatible with all locks on the queue.
357  *
358  * If \a work_list is provided, conflicting locks are linked there.
359  * If \a work_list is not provided, we exit this function on first conflict.
360  *
361  * \retval 0 if the lock is not compatible
362  * \retval 1 if the lock is compatible
363  * \retval 2 if \a req is a group lock and it is compatible and requires
364  *           no further checking
365  * \retval negative error, such as EWOULDBLOCK for group locks
366  */
367 static int
368 ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
369                          __u64 *flags, enum ldlm_error *err,
370                          struct list_head *work_list, int *contended_locks)
371 {
372         struct ldlm_resource *res = req->l_resource;
373         enum ldlm_mode req_mode = req->l_req_mode;
374         __u64 req_start = req->l_req_extent.start;
375         __u64 req_end = req->l_req_extent.end;
376         struct ldlm_lock *lock;
377         int check_contention;
378         int compat = 1;
379         int scan = 0;
380         ENTRY;
381
382         lockmode_verify(req_mode);
383
384         /* Using interval tree for granted lock */
385         if (queue == &res->lr_granted) {
386                 struct ldlm_interval_tree *tree;
387                 struct ldlm_extent_compat_args data = {.work_list = work_list,
388                                                .lock = req,
389                                                .locks = contended_locks,
390                                                .compat = &compat };
391                 struct interval_node_extent ex = { .start = req_start,
392                                                    .end = req_end };
393                 int idx, rc;
394
395                 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
396                         tree = &res->lr_itree[idx];
397                         if (tree->lit_root == NULL) /* empty tree, skipped */
398                                 continue;
399
400                         data.mode = tree->lit_mode;
401                         if (lockmode_compat(req_mode, tree->lit_mode)) {
402                                 struct ldlm_interval *node;
403                                 struct ldlm_extent *extent;
404
405                                 if (req_mode != LCK_GROUP)
406                                         continue;
407
408                                 /* group lock, grant it immediately if
409                                  * compatible */
410                                 node = to_ldlm_interval(tree->lit_root);
411                                 extent = ldlm_interval_extent(node);
412                                 if (req->l_policy_data.l_extent.gid ==
413                                     extent->gid)
414                                         RETURN(2);
415                         }
416
417                         if (tree->lit_mode == LCK_GROUP) {
418                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
419                                         compat = -EWOULDBLOCK;
420                                         goto destroylock;
421                                 }
422
423                                 *flags |= LDLM_FL_NO_TIMEOUT;
424                                 if (!work_list)
425                                         RETURN(0);
426
427                                 /* if work list is not NULL,add all
428                                    locks in the tree to work list */
429                                 compat = 0;
430                                 interval_iterate(tree->lit_root,
431                                                  ldlm_extent_compat_cb, &data);
432                                 continue;
433                         }
434
435                         if (!work_list) {
436                                 rc = interval_is_overlapped(tree->lit_root,&ex);
437                                 if (rc)
438                                         RETURN(0);
439                         } else {
440                                 interval_search(tree->lit_root, &ex,
441                                                 ldlm_extent_compat_cb, &data);
442                                 if (!list_empty(work_list) && compat)
443                                         compat = 0;
444                         }
445                 }
446         } else { /* for waiting queue */
447                 list_for_each_entry(lock, queue, l_res_link) {
448                         check_contention = 1;
449
450                         /* We stop walking the queue if we hit ourselves so
451                          * we don't take conflicting locks enqueued after us
452                          * into account, or we'd wait forever. */
453                         if (req == lock)
454                                 break;
455
456                         if (unlikely(scan)) {
457                                 /* We only get here if we are queuing GROUP lock
458                                    and met some incompatible one. The main idea of this
459                                    code is to insert GROUP lock past compatible GROUP
460                                    lock in the waiting queue or if there is not any,
461                                    then in front of first non-GROUP lock */
462                                 if (lock->l_req_mode != LCK_GROUP) {
463                                         /* Ok, we hit non-GROUP lock, there should
464                                          * be no more GROUP locks later on, queue in
465                                          * front of first non-GROUP lock */
466
467                                         ldlm_resource_insert_lock_after(lock, req);
468                                         list_del_init(&lock->l_res_link);
469                                         ldlm_resource_insert_lock_after(req, lock);
470                                         compat = 0;
471                                         break;
472                                 }
473                                 if (req->l_policy_data.l_extent.gid ==
474                                     lock->l_policy_data.l_extent.gid) {
475                                         /* found it */
476                                         ldlm_resource_insert_lock_after(lock, req);
477                                         compat = 0;
478                                         break;
479                                 }
480                                 continue;
481                         }
482
483                         /* locks are compatible, overlap doesn't matter */
484                         if (lockmode_compat(lock->l_req_mode, req_mode)) {
485                                 if (req_mode == LCK_PR &&
486                                     ((lock->l_policy_data.l_extent.start <=
487                                       req->l_policy_data.l_extent.start) &&
488                                      (lock->l_policy_data.l_extent.end >=
489                                       req->l_policy_data.l_extent.end))) {
490                                         /* If we met a PR lock just like us or
491                                            wider, and nobody down the list
492                                            conflicted with it, that means we
493                                            can skip processing of the rest of
494                                            the list and safely place ourselves
495                                            at the end of the list, or grant
496                                            (dependent if we met an conflicting
497                                            locks before in the list).  In case
498                                            of 1st enqueue only we continue
499                                            traversing if there is something
500                                            conflicting down the list because
501                                            we need to make sure that something
502                                            is marked as AST_SENT as well, in
503                                            cse of empy worklist we would exit
504                                            on first conflict met. */
505                                         /* There IS a case where such flag is
506                                            not set for a lock, yet it blocks
507                                            something. Luckily for us this is
508                                            only during destroy, so lock is
509                                            exclusive. So here we are safe */
510                                         if (!ldlm_is_ast_sent(lock))
511                                                 RETURN(compat);
512                                 }
513
514                                 /* non-group locks are compatible, overlap doesn't
515                                    matter */
516                                 if (likely(req_mode != LCK_GROUP))
517                                         continue;
518
519                                 /* If we are trying to get a GROUP lock and there is
520                                    another one of this kind, we need to compare gid */
521                                 if (req->l_policy_data.l_extent.gid ==
522                                     lock->l_policy_data.l_extent.gid) {
523                                         /* If existing lock with matched gid is granted,
524                                            we grant new one too. */
525                                         if (lock->l_req_mode == lock->l_granted_mode)
526                                                 RETURN(2);
527
528                                         /* Otherwise we are scanning queue of waiting
529                                          * locks and it means current request would
530                                          * block along with existing lock (that is
531                                          * already blocked.
532                                          * If we are in nonblocking mode - return
533                                          * immediately */
534                                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
535                                                 compat = -EWOULDBLOCK;
536                                                 goto destroylock;
537                                         }
538                                         /* If this group lock is compatible with another
539                                          * group lock on the waiting list, they must be
540                                          * together in the list, so they can be granted
541                                          * at the same time.  Otherwise the later lock
542                                          * can get stuck behind another, incompatible,
543                                          * lock. */
544                                         ldlm_resource_insert_lock_after(lock, req);
545                                         /* Because 'lock' is not granted, we can stop
546                                          * processing this queue and return immediately.
547                                          * There is no need to check the rest of the
548                                          * list. */
549                                         RETURN(0);
550                                 }
551                         }
552
553                         if (unlikely(req_mode == LCK_GROUP &&
554                                      (lock->l_req_mode != lock->l_granted_mode))) {
555                                 scan = 1;
556                                 compat = 0;
557                                 if (lock->l_req_mode != LCK_GROUP) {
558                                         /* Ok, we hit non-GROUP lock, there should be no
559                                            more GROUP locks later on, queue in front of
560                                            first non-GROUP lock */
561
562                                         ldlm_resource_insert_lock_after(lock, req);
563                                         list_del_init(&lock->l_res_link);
564                                         ldlm_resource_insert_lock_after(req, lock);
565                                         break;
566                                 }
567                                 if (req->l_policy_data.l_extent.gid ==
568                                     lock->l_policy_data.l_extent.gid) {
569                                         /* found it */
570                                         ldlm_resource_insert_lock_after(lock, req);
571                                         break;
572                                 }
573                                 continue;
574                         }
575
576                         if (unlikely(lock->l_req_mode == LCK_GROUP)) {
577                                 /* If compared lock is GROUP, then requested is PR/PW/
578                                  * so this is not compatible; extent range does not
579                                  * matter */
580                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
581                                         compat = -EWOULDBLOCK;
582                                         goto destroylock;
583                                 } else {
584                                         *flags |= LDLM_FL_NO_TIMEOUT;
585                                 }
586                         } else if (lock->l_policy_data.l_extent.end < req_start ||
587                                    lock->l_policy_data.l_extent.start > req_end) {
588                                 /* if a non group lock doesn't overlap skip it */
589                                 continue;
590                         } else if (lock->l_req_extent.end < req_start ||
591                                    lock->l_req_extent.start > req_end) {
592                                 /* false contention, the requests doesn't really overlap */
593                                 check_contention = 0;
594                         }
595
596                         if (!work_list)
597                                 RETURN(0);
598
599                         /* don't count conflicting glimpse locks */
600                         if (lock->l_req_mode == LCK_PR &&
601                             lock->l_policy_data.l_extent.start == 0 &&
602                             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
603                                 check_contention = 0;
604
605                         *contended_locks += check_contention;
606
607                         compat = 0;
608                         if (lock->l_blocking_ast &&
609                             lock->l_req_mode != LCK_GROUP)
610                                 ldlm_add_ast_work_item(lock, req, work_list);
611                 }
612         }
613
614         if (ldlm_check_contention(req, *contended_locks) &&
615             compat == 0 &&
616             (*flags & LDLM_FL_DENY_ON_CONTENTION) &&
617             req->l_req_mode != LCK_GROUP &&
618             req_end - req_start <=
619             ldlm_res_to_ns(req->l_resource)->ns_max_nolock_size)
620                 GOTO(destroylock, compat = -EUSERS);
621
622         RETURN(compat);
623 destroylock:
624         list_del_init(&req->l_res_link);
625         ldlm_lock_destroy_nolock(req);
626         *err = compat;
627         RETURN(compat);
628 }
629
630 /**
631  * This function refresh eviction timer for cancelled lock.
632  * \param[in] lock              ldlm lock for refresh
633  * \param[in] arg               ldlm prolong arguments, timeout, export, extent
634  *                              and counter are used
635  */
636 void ldlm_lock_prolong_one(struct ldlm_lock *lock,
637                            struct ldlm_prolong_args *arg)
638 {
639         int timeout;
640
641         if (arg->lpa_export != lock->l_export ||
642             lock->l_flags & LDLM_FL_DESTROYED)
643                 /* ignore unrelated locks */
644                 return;
645
646         arg->lpa_locks_cnt++;
647
648         if (!(lock->l_flags & LDLM_FL_AST_SENT))
649                 /* ignore locks not being cancelled */
650                 return;
651
652         /* We are in the middle of the process - BL AST is sent, CANCEL
653          * is ahead. Take half of BL AT + IO AT process time.
654          */
655         timeout = arg->lpa_timeout + (ldlm_bl_timeout(lock) >> 1);
656
657         LDLM_DEBUG(lock, "refreshed to %ds.\n", timeout);
658
659         arg->lpa_blocks_cnt++;
660
661         /* OK. this is a possible lock the user holds doing I/O
662          * let's refresh eviction timer for it.
663          */
664         ldlm_refresh_waiting_lock(lock, timeout);
665 }
666 EXPORT_SYMBOL(ldlm_lock_prolong_one);
667
668 static enum interval_iter ldlm_resource_prolong_cb(struct interval_node *n,
669                                                    void *data)
670 {
671         struct ldlm_prolong_args *arg = data;
672         struct ldlm_interval *node = to_ldlm_interval(n);
673         struct ldlm_lock *lock;
674
675         ENTRY;
676
677         LASSERT(!list_empty(&node->li_group));
678
679         list_for_each_entry(lock, &node->li_group, l_sl_policy) {
680                 ldlm_lock_prolong_one(lock, arg);
681         }
682
683         RETURN(INTERVAL_ITER_CONT);
684 }
685
686 /**
687  * Walk through granted tree and prolong locks if they overlaps extent.
688  *
689  * \param[in] arg               prolong args
690  */
691 void ldlm_resource_prolong(struct ldlm_prolong_args *arg)
692 {
693         struct ldlm_interval_tree *tree;
694         struct ldlm_resource *res;
695         struct interval_node_extent ex = { .start = arg->lpa_extent.start,
696                                            .end = arg->lpa_extent.end };
697         int idx;
698
699         ENTRY;
700
701         res = ldlm_resource_get(arg->lpa_export->exp_obd->obd_namespace, NULL,
702                                 &arg->lpa_resid, LDLM_EXTENT, 0);
703         if (IS_ERR(res)) {
704                 CDEBUG(D_DLMTRACE, "Failed to get resource for resid %llu/%llu\n",
705                        arg->lpa_resid.name[0], arg->lpa_resid.name[1]);
706                 RETURN_EXIT;
707         }
708
709         lock_res(res);
710         for (idx = 0; idx < LCK_MODE_NUM; idx++) {
711                 tree = &res->lr_itree[idx];
712                 if (tree->lit_root == NULL) /* empty tree, skipped */
713                         continue;
714
715                 /* There is no possibility to check for the groupID
716                  * so all the group locks are considered as valid
717                  * here, especially because the client is supposed
718                  * to check it has such a lock before sending an RPC.
719                  */
720                 if (!(tree->lit_mode & arg->lpa_mode))
721                         continue;
722
723                 interval_search(tree->lit_root, &ex,
724                                 ldlm_resource_prolong_cb, arg);
725         }
726
727         unlock_res(res);
728         ldlm_resource_putref(res);
729
730         EXIT;
731 }
732 EXPORT_SYMBOL(ldlm_resource_prolong);
733
734
735 /**
736  * Discard all AST work items from list.
737  *
738  * If for whatever reason we do not want to send ASTs to conflicting locks
739  * anymore, disassemble the list with this function.
740  */
741 static void discard_bl_list(struct list_head *bl_list)
742 {
743         struct list_head *tmp, *pos;
744         ENTRY;
745
746         list_for_each_safe(pos, tmp, bl_list) {
747                 struct ldlm_lock *lock =
748                         list_entry(pos, struct ldlm_lock, l_bl_ast);
749
750                 list_del_init(&lock->l_bl_ast);
751                 LASSERT(ldlm_is_ast_sent(lock));
752                 ldlm_clear_ast_sent(lock);
753                 LASSERT(lock->l_bl_ast_run == 0);
754                 LASSERT(lock->l_blocking_lock);
755                 LDLM_LOCK_RELEASE(lock->l_blocking_lock);
756                 lock->l_blocking_lock = NULL;
757                 LDLM_LOCK_RELEASE(lock);
758         }
759         EXIT;
760 }
761
762 /**
763  * Process a granting attempt for extent lock.
764  * Must be called with ns lock held.
765  *
766  * This function looks for any conflicts for \a lock in the granted or
767  * waiting queues. The lock is granted if no conflicts are found in
768  * either queue.
769  *
770  * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
771  *   - blocking ASTs have already been sent
772  *
773  * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
774  *   - blocking ASTs have not been sent yet, so list of conflicting locks
775  *     would be collected and ASTs sent.
776  */
777 int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags,
778                              int first_enq, enum ldlm_error *err,
779                              struct list_head *work_list)
780 {
781         struct ldlm_resource *res = lock->l_resource;
782         struct list_head rpc_list;
783         int rc, rc2;
784         int contended_locks = 0;
785         ENTRY;
786
787         LASSERT(lock->l_granted_mode != lock->l_req_mode);
788         LASSERT(list_empty(&res->lr_converting));
789         LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
790                 !ldlm_is_ast_discard_data(lock));
791         INIT_LIST_HEAD(&rpc_list);
792         check_res_locked(res);
793         *err = ELDLM_OK;
794
795         if (!first_enq) {
796                 /* Careful observers will note that we don't handle -EWOULDBLOCK
797                  * here, but it's ok for a non-obvious reason -- compat_queue
798                  * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
799                  * flags should always be zero here, and if that ever stops
800                  * being true, we want to find out. */
801                 LASSERT(*flags == 0);
802                 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
803                                               err, NULL, &contended_locks);
804                 if (rc == 1) {
805                         rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
806                                                       flags, err, NULL,
807                                                       &contended_locks);
808                 }
809                 if (rc == 0)
810                         RETURN(LDLM_ITER_STOP);
811
812                 ldlm_resource_unlink_lock(lock);
813
814                 if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE))
815                         ldlm_extent_policy(res, lock, flags);
816                 ldlm_grant_lock(lock, work_list);
817                 RETURN(LDLM_ITER_CONTINUE);
818         }
819
820  restart:
821         contended_locks = 0;
822         rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
823                                       &rpc_list, &contended_locks);
824         if (rc < 0)
825                 GOTO(out, rc); /* lock was destroyed */
826         if (rc == 2)
827                 goto grant;
828
829         rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err,
830                                        &rpc_list, &contended_locks);
831         if (rc2 < 0)
832                 GOTO(out, rc = rc2); /* lock was destroyed */
833
834         if (rc + rc2 == 2) {
835         grant:
836                 ldlm_extent_policy(res, lock, flags);
837                 ldlm_resource_unlink_lock(lock);
838                 ldlm_grant_lock(lock, NULL);
839         } else {
840                 /* If either of the compat_queue()s returned failure, then we
841                  * have ASTs to send and must go onto the waiting list.
842                  *
843                  * bug 2322: we used to unlink and re-add here, which was a
844                  * terrible folly -- if we goto restart, we could get
845                  * re-ordered!  Causes deadlock, because ASTs aren't sent! */
846                 if (list_empty(&lock->l_res_link))
847                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
848                 unlock_res(res);
849                 rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
850                                        LDLM_WORK_BL_AST);
851
852                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_FAIL_RACE) &&
853                     !ns_is_client(ldlm_res_to_ns(res)))
854                         class_fail_export(lock->l_export);
855
856                 lock_res(res);
857                 if (rc == -ERESTART) {
858                         /* 15715: The lock was granted and destroyed after
859                          * resource lock was dropped. Interval node was freed
860                          * in ldlm_lock_destroy. Anyway, this always happens
861                          * when a client is being evicted. So it would be
862                          * ok to return an error. -jay */
863                         if (ldlm_is_destroyed(lock)) {
864                                 *err = -EAGAIN;
865                                 GOTO(out, rc = -EAGAIN);
866                         }
867
868                         /* lock was granted while resource was unlocked. */
869                         if (lock->l_granted_mode == lock->l_req_mode) {
870                                 /* bug 11300: if the lock has been granted,
871                                  * break earlier because otherwise, we will go
872                                  * to restart and ldlm_resource_unlink will be
873                                  * called and it causes the interval node to be
874                                  * freed. Then we will fail at
875                                  * ldlm_extent_add_lock() */
876                                 *flags &= ~LDLM_FL_BLOCKED_MASK;
877                                 GOTO(out, rc = 0);
878                         }
879
880                         GOTO(restart, rc);
881                 }
882
883                 /* this way we force client to wait for the lock
884                  * endlessly once the lock is enqueued -bzzz */
885                 *flags |= LDLM_FL_BLOCK_GRANTED | LDLM_FL_NO_TIMEOUT;
886
887         }
888         RETURN(0);
889 out:
890         if (!list_empty(&rpc_list)) {
891                 LASSERT(!ldlm_is_ast_discard_data(lock));
892                 discard_bl_list(&rpc_list);
893         }
894         RETURN(rc);
895 }
896 #endif /* HAVE_SERVER_SUPPORT */
897
898 struct ldlm_kms_shift_args {
899         __u64   old_kms;
900         __u64   kms;
901         bool    complete;
902 };
903
904 /* Callback for interval_iterate functions, used by ldlm_extent_shift_Kms */
905 static enum interval_iter ldlm_kms_shift_cb(struct interval_node *n,
906                                             void *args)
907 {
908         struct ldlm_kms_shift_args *arg = args;
909         struct ldlm_interval *node = to_ldlm_interval(n);
910         struct ldlm_lock *tmplock;
911         struct ldlm_lock *lock = NULL;
912
913         ENTRY;
914
915         /* Since all locks in an interval have the same extent, we can just
916          * use the first lock without kms_ignore set. */
917         list_for_each_entry(tmplock, &node->li_group, l_sl_policy) {
918                 if (ldlm_is_kms_ignore(tmplock))
919                         continue;
920
921                 lock = tmplock;
922
923                 break;
924         }
925
926         /* No locks in this interval without kms_ignore set */
927         if (!lock)
928                 RETURN(INTERVAL_ITER_CONT);
929
930         /* If we find a lock with a greater or equal kms, we are not the
931          * highest lock (or we share that distinction with another lock), and
932          * don't need to update KMS.  Return old_kms and stop looking. */
933         if (lock->l_policy_data.l_extent.end >= arg->old_kms) {
934                 arg->kms = arg->old_kms;
935                 arg->complete = true;
936                 RETURN(INTERVAL_ITER_STOP);
937         }
938
939         if (lock->l_policy_data.l_extent.end + 1 > arg->kms)
940                 arg->kms = lock->l_policy_data.l_extent.end + 1;
941
942         /* Since interval_iterate_reverse starts with the highest lock and
943          * works down, for PW locks, we only need to check if we should update
944          * the kms, then stop walking the tree.  PR locks are not exclusive, so
945          * the highest start does not imply the highest end and we must
946          * continue. (Only one group lock is allowed per resource, so this is
947          * irrelevant for group locks.)*/
948         if (lock->l_granted_mode == LCK_PW)
949                 RETURN(INTERVAL_ITER_STOP);
950         else
951                 RETURN(INTERVAL_ITER_CONT);
952 }
953
954 /* When a lock is cancelled by a client, the KMS may undergo change if this
955  * is the "highest lock".  This function returns the new KMS value, updating
956  * it only if we were the highest lock.
957  *
958  * Caller must hold lr_lock already.
959  *
960  * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes! */
961 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
962 {
963         struct ldlm_resource *res = lock->l_resource;
964         struct ldlm_interval_tree *tree;
965         struct ldlm_kms_shift_args args;
966         int idx = 0;
967
968         ENTRY;
969
970         args.old_kms = old_kms;
971         args.kms = 0;
972         args.complete = false;
973
974         /* don't let another thread in ldlm_extent_shift_kms race in
975          * just after we finish and take our lock into account in its
976          * calculation of the kms */
977         ldlm_set_kms_ignore(lock);
978
979         /* We iterate over the lock trees, looking for the largest kms smaller
980          * than the current one. */
981         for (idx = 0; idx < LCK_MODE_NUM; idx++) {
982                 tree = &res->lr_itree[idx];
983
984                 /* If our already known kms is >= than the highest 'end' in
985                  * this tree, we don't need to check this tree, because
986                  * the kms from a tree can be lower than in_max_high (due to
987                  * kms_ignore), but it can never be higher. */
988                 if (!tree->lit_root || args.kms >= tree->lit_root->in_max_high)
989                         continue;
990
991                 interval_iterate_reverse(tree->lit_root, ldlm_kms_shift_cb,
992                                          &args);
993
994                 /* this tells us we're not the highest lock, so we don't need
995                  * to check the remaining trees */
996                 if (args.complete)
997                         break;
998         }
999
1000         LASSERTF(args.kms <= args.old_kms, "kms %llu old_kms %llu\n", args.kms,
1001                  args.old_kms);
1002
1003         RETURN(args.kms);
1004 }
1005 EXPORT_SYMBOL(ldlm_extent_shift_kms);
1006
1007 struct kmem_cache *ldlm_interval_slab;
1008 struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock)
1009 {
1010         struct ldlm_interval *node;
1011         ENTRY;
1012
1013         LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
1014         OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS);
1015         if (node == NULL)
1016                 RETURN(NULL);
1017
1018         INIT_LIST_HEAD(&node->li_group);
1019         ldlm_interval_attach(node, lock);
1020         RETURN(node);
1021 }
1022
1023 void ldlm_interval_free(struct ldlm_interval *node)
1024 {
1025         if (node) {
1026                 LASSERT(list_empty(&node->li_group));
1027                 LASSERT(!interval_is_intree(&node->li_node));
1028                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
1029         }
1030 }
1031
1032 /* interval tree, for LDLM_EXTENT. */
1033 void ldlm_interval_attach(struct ldlm_interval *n,
1034                           struct ldlm_lock *l)
1035 {
1036         LASSERT(l->l_tree_node == NULL);
1037         LASSERT(l->l_resource->lr_type == LDLM_EXTENT);
1038
1039         list_add_tail(&l->l_sl_policy, &n->li_group);
1040         l->l_tree_node = n;
1041 }
1042
1043 struct ldlm_interval *ldlm_interval_detach(struct ldlm_lock *l)
1044 {
1045         struct ldlm_interval *n = l->l_tree_node;
1046
1047         if (n == NULL)
1048                 return NULL;
1049
1050         LASSERT(!list_empty(&n->li_group));
1051         l->l_tree_node = NULL;
1052         list_del_init(&l->l_sl_policy);
1053
1054         return list_empty(&n->li_group) ? n : NULL;
1055 }
1056
1057 static inline int ldlm_mode_to_index(enum ldlm_mode mode)
1058 {
1059         int index;
1060
1061         LASSERT(mode != 0);
1062         LASSERT(IS_PO2(mode));
1063         for (index = -1; mode != 0; index++, mode >>= 1)
1064                 /* do nothing */;
1065         LASSERT(index < LCK_MODE_NUM);
1066         return index;
1067 }
1068
1069 /** Add newly granted lock into interval tree for the resource. */
1070 void ldlm_extent_add_lock(struct ldlm_resource *res,
1071                           struct ldlm_lock *lock)
1072 {
1073         struct interval_node *found, **root;
1074         struct ldlm_interval *node;
1075         struct ldlm_extent *extent;
1076         int idx, rc;
1077
1078         LASSERT(lock->l_granted_mode == lock->l_req_mode);
1079
1080         node = lock->l_tree_node;
1081         LASSERT(node != NULL);
1082         LASSERT(!interval_is_intree(&node->li_node));
1083
1084         idx = ldlm_mode_to_index(lock->l_granted_mode);
1085         LASSERT(lock->l_granted_mode == 1 << idx);
1086         LASSERT(lock->l_granted_mode == res->lr_itree[idx].lit_mode);
1087
1088         /* node extent initialize */
1089         extent = &lock->l_policy_data.l_extent;
1090
1091         rc = interval_set(&node->li_node, extent->start, extent->end);
1092         LASSERT(!rc);
1093
1094         root = &res->lr_itree[idx].lit_root;
1095         found = interval_insert(&node->li_node, root);
1096         if (found) { /* The policy group found. */
1097                 struct ldlm_interval *tmp = ldlm_interval_detach(lock);
1098                 LASSERT(tmp != NULL);
1099                 ldlm_interval_free(tmp);
1100                 ldlm_interval_attach(to_ldlm_interval(found), lock);
1101         }
1102         res->lr_itree[idx].lit_size++;
1103
1104         /* even though we use interval tree to manage the extent lock, we also
1105          * add the locks into grant list, for debug purpose, .. */
1106         ldlm_resource_add_lock(res, &res->lr_granted, lock);
1107
1108         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_GRANT_CHECK)) {
1109                 struct ldlm_lock *lck;
1110
1111                 list_for_each_entry_reverse(lck, &res->lr_granted,
1112                                             l_res_link) {
1113                         if (lck == lock)
1114                                 continue;
1115                         if (lockmode_compat(lck->l_granted_mode,
1116                                             lock->l_granted_mode))
1117                                 continue;
1118                         if (ldlm_extent_overlap(&lck->l_req_extent,
1119                                                 &lock->l_req_extent)) {
1120                                 CDEBUG(D_ERROR, "granting conflicting lock %p "
1121                                                 "%p\n", lck, lock);
1122                                 ldlm_resource_dump(D_ERROR, res);
1123                                 LBUG();
1124                         }
1125                 }
1126         }
1127 }
1128
1129 /** Remove cancelled lock from resource interval tree. */
1130 void ldlm_extent_unlink_lock(struct ldlm_lock *lock)
1131 {
1132         struct ldlm_resource *res = lock->l_resource;
1133         struct ldlm_interval *node = lock->l_tree_node;
1134         struct ldlm_interval_tree *tree;
1135         int idx;
1136
1137         if (!node || !interval_is_intree(&node->li_node)) /* duplicate unlink */
1138                 return;
1139
1140         idx = ldlm_mode_to_index(lock->l_granted_mode);
1141         LASSERT(lock->l_granted_mode == 1 << idx);
1142         tree = &res->lr_itree[idx];
1143
1144         LASSERT(tree->lit_root != NULL); /* assure the tree is not null */
1145
1146         tree->lit_size--;
1147         node = ldlm_interval_detach(lock);
1148         if (node) {
1149                 interval_erase(&node->li_node, &tree->lit_root);
1150                 ldlm_interval_free(node);
1151         }
1152 }
1153
1154 void ldlm_extent_policy_wire_to_local(const union ldlm_wire_policy_data *wpolicy,
1155                                       union ldlm_policy_data *lpolicy)
1156 {
1157         lpolicy->l_extent.start = wpolicy->l_extent.start;
1158         lpolicy->l_extent.end = wpolicy->l_extent.end;
1159         lpolicy->l_extent.gid = wpolicy->l_extent.gid;
1160 }
1161
1162 void ldlm_extent_policy_local_to_wire(const union ldlm_policy_data *lpolicy,
1163                                       union ldlm_wire_policy_data *wpolicy)
1164 {
1165         memset(wpolicy, 0, sizeof(*wpolicy));
1166         wpolicy->l_extent.start = lpolicy->l_extent.start;
1167         wpolicy->l_extent.end = lpolicy->l_extent.end;
1168         wpolicy->l_extent.gid = lpolicy->l_extent.gid;
1169 }
1170