Whamcloud - gitweb
bbafcb10bff37992a04fe3d3da574bf001ff1987
[fs/lustre-release.git] / lustre / ldlm / ldlm_extent.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2010, 2013, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/ldlm/ldlm_extent.c
33  *
34  * Author: Peter Braam <braam@clusterfs.com>
35  * Author: Phil Schwan <phil@clusterfs.com>
36  */
37
38 /**
39  * This file contains implementation of EXTENT lock type
40  *
41  * EXTENT lock type is for locking a contiguous range of values, represented
42  * by 64-bit starting and ending offsets (inclusive). There are several extent
43  * lock modes, some of which may be mutually incompatible. Extent locks are
44  * considered incompatible if their modes are incompatible and their extents
45  * intersect.  See the lock mode compatibility matrix in lustre_dlm.h.
46  */
47
48 #define DEBUG_SUBSYSTEM S_LDLM
49
50 #include <libcfs/libcfs.h>
51 #include <lustre_dlm.h>
52 #include <obd_support.h>
53 #include <obd.h>
54 #include <obd_class.h>
55 #include <lustre_lib.h>
56
57 #include "ldlm_internal.h"
58
59 #ifdef HAVE_SERVER_SUPPORT
60 # define LDLM_MAX_GROWN_EXTENT (32 * 1024 * 1024 - 1)
61
62 /**
63  * Fix up the ldlm_extent after expanding it.
64  *
65  * After expansion has been done, we might still want to do certain adjusting
66  * based on overall contention of the resource and the like to avoid granting
67  * overly wide locks.
68  */
69 static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
70                                               struct ldlm_extent *new_ex,
71                                               int conflicting)
72 {
73         enum ldlm_mode req_mode = req->l_req_mode;
74         __u64 req_start = req->l_req_extent.start;
75         __u64 req_end = req->l_req_extent.end;
76         __u64 req_align, mask;
77
78         if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) {
79                 if (req_end < req_start + LDLM_MAX_GROWN_EXTENT)
80                         new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT,
81                                           new_ex->end);
82         }
83
84         if (new_ex->start == 0 && new_ex->end == OBD_OBJECT_EOF) {
85                 EXIT;
86                 return;
87         }
88
89         /* we need to ensure that the lock extent is properly aligned to what
90          * the client requested. Also we need to make sure it's also server
91          * page size aligned otherwise a server page can be covered by two
92          * write locks. */
93         mask = PAGE_SIZE;
94         req_align = (req_end + 1) | req_start;
95         if (req_align != 0 && (req_align & (mask - 1)) == 0) {
96                 while ((req_align & mask) == 0)
97                         mask <<= 1;
98         }
99         mask -= 1;
100         /* We can only shrink the lock, not grow it.
101          * This should never cause lock to be smaller than requested,
102          * since requested lock was already aligned on these boundaries. */
103         new_ex->start = ((new_ex->start - 1) | mask) + 1;
104         new_ex->end = ((new_ex->end + 1) & ~mask) - 1;
105         LASSERTF(new_ex->start <= req_start,
106                  "mask %#llx grant start %llu req start %llu\n",
107                  mask, new_ex->start, req_start);
108         LASSERTF(new_ex->end >= req_end,
109                  "mask %#llx grant end %llu req end %llu\n",
110                  mask, new_ex->end, req_end);
111 }
112
113 /**
114  * Return the maximum extent that:
115  * - contains the requested extent
116  * - does not overlap existing conflicting extents outside the requested one
117  *
118  * This allows clients to request a small required extent range, but if there
119  * is no contention on the lock the full lock can be granted to the client.
120  * This avoids the need for many smaller lock requests to be granted in the
121  * common (uncontended) case.
122  *
123  * Use interval tree to expand the lock extent for granted lock.
124  */
125 static void ldlm_extent_internal_policy_granted(struct ldlm_lock *req,
126                                                 struct ldlm_extent *new_ex)
127 {
128         struct ldlm_resource *res = req->l_resource;
129         enum ldlm_mode req_mode = req->l_req_mode;
130         __u64 req_start = req->l_req_extent.start;
131         __u64 req_end = req->l_req_extent.end;
132         struct ldlm_interval_tree *tree;
133         struct interval_node_extent limiter = { new_ex->start, new_ex->end };
134         int conflicting = 0;
135         int idx;
136         ENTRY;
137
138         lockmode_verify(req_mode);
139
140         /* Using interval tree to handle the LDLM extent granted locks. */
141         for (idx = 0; idx < LCK_MODE_NUM; idx++) {
142                 struct interval_node_extent ext = { req_start, req_end };
143
144                 tree = &res->lr_itree[idx];
145                 if (lockmode_compat(tree->lit_mode, req_mode))
146                         continue;
147
148                 conflicting += tree->lit_size;
149                 if (conflicting > 4)
150                         limiter.start = req_start;
151
152                 if (interval_is_overlapped(tree->lit_root, &ext))
153                         CDEBUG(D_INFO, 
154                                "req_mode = %d, tree->lit_mode = %d, "
155                                "tree->lit_size = %d\n",
156                                req_mode, tree->lit_mode, tree->lit_size);
157                 interval_expand(tree->lit_root, &ext, &limiter);
158                 limiter.start = max(limiter.start, ext.start);
159                 limiter.end = min(limiter.end, ext.end);
160                 if (limiter.start == req_start && limiter.end == req_end)
161                         break;
162         }
163
164         new_ex->start = limiter.start;
165         new_ex->end = limiter.end;
166         LASSERT(new_ex->start <= req_start);
167         LASSERT(new_ex->end >= req_end);
168
169         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
170         EXIT;
171 }
172
173 /* The purpose of this function is to return:
174  * - the maximum extent
175  * - containing the requested extent
176  * - and not overlapping existing conflicting extents outside the requested one
177  */
178 static void
179 ldlm_extent_internal_policy_waiting(struct ldlm_lock *req,
180                                     struct ldlm_extent *new_ex)
181 {
182         struct ldlm_resource *res = req->l_resource;
183         enum ldlm_mode req_mode = req->l_req_mode;
184         __u64 req_start = req->l_req_extent.start;
185         __u64 req_end = req->l_req_extent.end;
186         struct ldlm_lock *lock;
187         int conflicting = 0;
188         ENTRY;
189
190         lockmode_verify(req_mode);
191
192         /* for waiting locks */
193         list_for_each_entry(lock, &res->lr_waiting, l_res_link) {
194                 struct ldlm_extent *l_extent = &lock->l_policy_data.l_extent;
195
196                 /* We already hit the minimum requested size, search no more */
197                 if (new_ex->start == req_start && new_ex->end == req_end) {
198                         EXIT;
199                         return;
200                 }
201
202                 /* Don't conflict with ourselves */
203                 if (req == lock)
204                         continue;
205
206                 /* Locks are compatible, overlap doesn't matter */
207                 /* Until bug 20 is fixed, try to avoid granting overlapping
208                  * locks on one client (they take a long time to cancel) */
209                 if (lockmode_compat(lock->l_req_mode, req_mode) &&
210                     lock->l_export != req->l_export)
211                         continue;
212
213                 /* If this is a high-traffic lock, don't grow downwards at all
214                  * or grow upwards too much */
215                 ++conflicting;
216                 if (conflicting > 4)
217                         new_ex->start = req_start;
218
219                 /* If lock doesn't overlap new_ex, skip it. */
220                 if (!ldlm_extent_overlap(l_extent, new_ex))
221                         continue;
222
223                 /* Locks conflicting in requested extents and we can't satisfy
224                  * both locks, so ignore it.  Either we will ping-pong this
225                  * extent (we would regardless of what extent we granted) or
226                  * lock is unused and it shouldn't limit our extent growth. */
227                 if (ldlm_extent_overlap(&lock->l_req_extent,&req->l_req_extent))
228                         continue;
229
230                 /* We grow extents downwards only as far as they don't overlap
231                  * with already-granted locks, on the assumption that clients
232                  * will be writing beyond the initial requested end and would
233                  * then need to enqueue a new lock beyond previous request.
234                  * l_req_extent->end strictly < req_start, checked above. */
235                 if (l_extent->start < req_start && new_ex->start != req_start) {
236                         if (l_extent->end >= req_start)
237                                 new_ex->start = req_start;
238                         else
239                                 new_ex->start = min(l_extent->end+1, req_start);
240                 }
241
242                 /* If we need to cancel this lock anyways because our request
243                  * overlaps the granted lock, we grow up to its requested
244                  * extent start instead of limiting this extent, assuming that
245                  * clients are writing forwards and the lock had over grown
246                  * its extent downwards before we enqueued our request. */
247                 if (l_extent->end > req_end) {
248                         if (l_extent->start <= req_end)
249                                 new_ex->end = max(lock->l_req_extent.start - 1,
250                                                   req_end);
251                         else
252                                 new_ex->end = max(l_extent->start - 1, req_end);
253                 }
254         }
255
256         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
257         EXIT;
258 }
259
260
261 /* In order to determine the largest possible extent we can grant, we need
262  * to scan all of the queues. */
263 static void ldlm_extent_policy(struct ldlm_resource *res,
264                                struct ldlm_lock *lock, __u64 *flags)
265 {
266         struct ldlm_extent new_ex = { .start = 0, .end = OBD_OBJECT_EOF };
267
268         if (lock->l_export == NULL)
269                 /*
270                  * this is local lock taken by server (e.g., as a part of
271                  * OST-side locking, or unlink handling). Expansion doesn't
272                  * make a lot of sense for local locks, because they are
273                  * dropped immediately on operation completion and would only
274                  * conflict with other threads.
275                  */
276                 return;
277
278         if (lock->l_policy_data.l_extent.start == 0 &&
279             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
280                 /* fast-path whole file locks */
281                 return;
282
283         ldlm_extent_internal_policy_granted(lock, &new_ex);
284         ldlm_extent_internal_policy_waiting(lock, &new_ex);
285
286         if (new_ex.start != lock->l_policy_data.l_extent.start ||
287             new_ex.end != lock->l_policy_data.l_extent.end) {
288                 *flags |= LDLM_FL_LOCK_CHANGED;
289                 lock->l_policy_data.l_extent.start = new_ex.start;
290                 lock->l_policy_data.l_extent.end = new_ex.end;
291         }
292 }
293
294 static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
295 {
296         struct ldlm_resource *res = lock->l_resource;
297         cfs_time_t now = cfs_time_current();
298
299         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_SET_CONTENTION))
300                 return 1;
301
302         CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
303         if (contended_locks > ldlm_res_to_ns(res)->ns_contended_locks)
304                 res->lr_contention_time = now;
305         return cfs_time_before(now, cfs_time_add(res->lr_contention_time,
306                 cfs_time_seconds(ldlm_res_to_ns(res)->ns_contention_time)));
307 }
308
309 struct ldlm_extent_compat_args {
310         struct list_head *work_list;
311         struct ldlm_lock *lock;
312         enum ldlm_mode mode;
313         int *locks;
314         int *compat;
315 };
316
317 static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
318                                                 void *data)
319 {
320         struct ldlm_extent_compat_args *priv = data;
321         struct ldlm_interval *node = to_ldlm_interval(n);
322         struct ldlm_extent *extent;
323         struct list_head *work_list = priv->work_list;
324         struct ldlm_lock *lock, *enq = priv->lock;
325         enum ldlm_mode mode = priv->mode;
326         int count = 0;
327         ENTRY;
328
329         LASSERT(!list_empty(&node->li_group));
330
331         list_for_each_entry(lock, &node->li_group, l_sl_policy) {
332                 /* interval tree is for granted lock */
333                 LASSERTF(mode == lock->l_granted_mode,
334                          "mode = %s, lock->l_granted_mode = %s\n",
335                          ldlm_lockname[mode],
336                          ldlm_lockname[lock->l_granted_mode]);
337                 count++;
338                 if (lock->l_blocking_ast &&
339                     lock->l_granted_mode != LCK_GROUP)
340                         ldlm_add_ast_work_item(lock, enq, work_list);
341         }
342
343         /* don't count conflicting glimpse locks */
344         extent = ldlm_interval_extent(node);
345         if (!(mode == LCK_PR &&
346             extent->start == 0 && extent->end == OBD_OBJECT_EOF))
347                 *priv->locks += count;
348
349         if (priv->compat)
350                 *priv->compat = 0;
351
352         RETURN(INTERVAL_ITER_CONT);
353 }
354
355 /**
356  * Determine if the lock is compatible with all locks on the queue.
357  *
358  * If \a work_list is provided, conflicting locks are linked there.
359  * If \a work_list is not provided, we exit this function on first conflict.
360  *
361  * \retval 0 if the lock is not compatible
362  * \retval 1 if the lock is compatible
363  * \retval 2 if \a req is a group lock and it is compatible and requires
364  *           no further checking
365  * \retval negative error, such as EWOULDBLOCK for group locks
366  */
367 static int
368 ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
369                          __u64 *flags, enum ldlm_error *err,
370                          struct list_head *work_list, int *contended_locks)
371 {
372         struct ldlm_resource *res = req->l_resource;
373         enum ldlm_mode req_mode = req->l_req_mode;
374         __u64 req_start = req->l_req_extent.start;
375         __u64 req_end = req->l_req_extent.end;
376         struct ldlm_lock *lock;
377         int check_contention;
378         int compat = 1;
379         int scan = 0;
380         ENTRY;
381
382         lockmode_verify(req_mode);
383
384         /* Using interval tree for granted lock */
385         if (queue == &res->lr_granted) {
386                 struct ldlm_interval_tree *tree;
387                 struct ldlm_extent_compat_args data = {.work_list = work_list,
388                                                .lock = req,
389                                                .locks = contended_locks,
390                                                .compat = &compat };
391                 struct interval_node_extent ex = { .start = req_start,
392                                                    .end = req_end };
393                 int idx, rc;
394
395                 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
396                         tree = &res->lr_itree[idx];
397                         if (tree->lit_root == NULL) /* empty tree, skipped */
398                                 continue;
399
400                         data.mode = tree->lit_mode;
401                         if (lockmode_compat(req_mode, tree->lit_mode)) {
402                                 struct ldlm_interval *node;
403                                 struct ldlm_extent *extent;
404
405                                 if (req_mode != LCK_GROUP)
406                                         continue;
407
408                                 /* group lock, grant it immediately if
409                                  * compatible */
410                                 node = to_ldlm_interval(tree->lit_root);
411                                 extent = ldlm_interval_extent(node);
412                                 if (req->l_policy_data.l_extent.gid ==
413                                     extent->gid)
414                                         RETURN(2);
415                         }
416
417                         if (tree->lit_mode == LCK_GROUP) {
418                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
419                                         compat = -EWOULDBLOCK;
420                                         goto destroylock;
421                                 }
422
423                                 *flags |= LDLM_FL_NO_TIMEOUT;
424                                 if (!work_list)
425                                         RETURN(0);
426
427                                 /* if work list is not NULL,add all
428                                    locks in the tree to work list */
429                                 compat = 0;
430                                 interval_iterate(tree->lit_root,
431                                                  ldlm_extent_compat_cb, &data);
432                                 continue;
433                         }
434
435                         if (!work_list) {
436                                 rc = interval_is_overlapped(tree->lit_root,&ex);
437                                 if (rc)
438                                         RETURN(0);
439                         } else {
440                                 interval_search(tree->lit_root, &ex,
441                                                 ldlm_extent_compat_cb, &data);
442                                 if (!list_empty(work_list) && compat)
443                                         compat = 0;
444                         }
445                 }
446         } else { /* for waiting queue */
447                 list_for_each_entry(lock, queue, l_res_link) {
448                         check_contention = 1;
449
450                         /* We stop walking the queue if we hit ourselves so
451                          * we don't take conflicting locks enqueued after us
452                          * into account, or we'd wait forever. */
453                         if (req == lock)
454                                 break;
455
456                         if (unlikely(scan)) {
457                                 /* We only get here if we are queuing GROUP lock
458                                    and met some incompatible one. The main idea of this
459                                    code is to insert GROUP lock past compatible GROUP
460                                    lock in the waiting queue or if there is not any,
461                                    then in front of first non-GROUP lock */
462                                 if (lock->l_req_mode != LCK_GROUP) {
463                                         /* Ok, we hit non-GROUP lock, there should
464                                          * be no more GROUP locks later on, queue in
465                                          * front of first non-GROUP lock */
466
467                                         ldlm_resource_insert_lock_after(lock, req);
468                                         list_del_init(&lock->l_res_link);
469                                         ldlm_resource_insert_lock_after(req, lock);
470                                         compat = 0;
471                                         break;
472                                 }
473                                 if (req->l_policy_data.l_extent.gid ==
474                                     lock->l_policy_data.l_extent.gid) {
475                                         /* found it */
476                                         ldlm_resource_insert_lock_after(lock, req);
477                                         compat = 0;
478                                         break;
479                                 }
480                                 continue;
481                         }
482
483                         /* locks are compatible, overlap doesn't matter */
484                         if (lockmode_compat(lock->l_req_mode, req_mode)) {
485                                 if (req_mode == LCK_PR &&
486                                     ((lock->l_policy_data.l_extent.start <=
487                                       req->l_policy_data.l_extent.start) &&
488                                      (lock->l_policy_data.l_extent.end >=
489                                       req->l_policy_data.l_extent.end))) {
490                                         /* If we met a PR lock just like us or
491                                            wider, and nobody down the list
492                                            conflicted with it, that means we
493                                            can skip processing of the rest of
494                                            the list and safely place ourselves
495                                            at the end of the list, or grant
496                                            (dependent if we met an conflicting
497                                            locks before in the list).  In case
498                                            of 1st enqueue only we continue
499                                            traversing if there is something
500                                            conflicting down the list because
501                                            we need to make sure that something
502                                            is marked as AST_SENT as well, in
503                                            cse of empy worklist we would exit
504                                            on first conflict met. */
505                                         /* There IS a case where such flag is
506                                            not set for a lock, yet it blocks
507                                            something. Luckily for us this is
508                                            only during destroy, so lock is
509                                            exclusive. So here we are safe */
510                                         if (!ldlm_is_ast_sent(lock))
511                                                 RETURN(compat);
512                                 }
513
514                                 /* non-group locks are compatible, overlap doesn't
515                                    matter */
516                                 if (likely(req_mode != LCK_GROUP))
517                                         continue;
518
519                                 /* If we are trying to get a GROUP lock and there is
520                                    another one of this kind, we need to compare gid */
521                                 if (req->l_policy_data.l_extent.gid ==
522                                     lock->l_policy_data.l_extent.gid) {
523                                         /* If existing lock with matched gid is granted,
524                                            we grant new one too. */
525                                         if (lock->l_req_mode == lock->l_granted_mode)
526                                                 RETURN(2);
527
528                                         /* Otherwise we are scanning queue of waiting
529                                          * locks and it means current request would
530                                          * block along with existing lock (that is
531                                          * already blocked.
532                                          * If we are in nonblocking mode - return
533                                          * immediately */
534                                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
535                                                 compat = -EWOULDBLOCK;
536                                                 goto destroylock;
537                                         }
538                                         /* If this group lock is compatible with another
539                                          * group lock on the waiting list, they must be
540                                          * together in the list, so they can be granted
541                                          * at the same time.  Otherwise the later lock
542                                          * can get stuck behind another, incompatible,
543                                          * lock. */
544                                         ldlm_resource_insert_lock_after(lock, req);
545                                         /* Because 'lock' is not granted, we can stop
546                                          * processing this queue and return immediately.
547                                          * There is no need to check the rest of the
548                                          * list. */
549                                         RETURN(0);
550                                 }
551                         }
552
553                         if (unlikely(req_mode == LCK_GROUP &&
554                                      (lock->l_req_mode != lock->l_granted_mode))) {
555                                 scan = 1;
556                                 compat = 0;
557                                 if (lock->l_req_mode != LCK_GROUP) {
558                                         /* Ok, we hit non-GROUP lock, there should be no
559                                            more GROUP locks later on, queue in front of
560                                            first non-GROUP lock */
561
562                                         ldlm_resource_insert_lock_after(lock, req);
563                                         list_del_init(&lock->l_res_link);
564                                         ldlm_resource_insert_lock_after(req, lock);
565                                         break;
566                                 }
567                                 if (req->l_policy_data.l_extent.gid ==
568                                     lock->l_policy_data.l_extent.gid) {
569                                         /* found it */
570                                         ldlm_resource_insert_lock_after(lock, req);
571                                         break;
572                                 }
573                                 continue;
574                         }
575
576                         if (unlikely(lock->l_req_mode == LCK_GROUP)) {
577                                 /* If compared lock is GROUP, then requested is PR/PW/
578                                  * so this is not compatible; extent range does not
579                                  * matter */
580                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
581                                         compat = -EWOULDBLOCK;
582                                         goto destroylock;
583                                 } else {
584                                         *flags |= LDLM_FL_NO_TIMEOUT;
585                                 }
586                         } else if (lock->l_policy_data.l_extent.end < req_start ||
587                                    lock->l_policy_data.l_extent.start > req_end) {
588                                 /* if a non group lock doesn't overlap skip it */
589                                 continue;
590                         } else if (lock->l_req_extent.end < req_start ||
591                                    lock->l_req_extent.start > req_end) {
592                                 /* false contention, the requests doesn't really overlap */
593                                 check_contention = 0;
594                         }
595
596                         if (!work_list)
597                                 RETURN(0);
598
599                         /* don't count conflicting glimpse locks */
600                         if (lock->l_req_mode == LCK_PR &&
601                             lock->l_policy_data.l_extent.start == 0 &&
602                             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
603                                 check_contention = 0;
604
605                         *contended_locks += check_contention;
606
607                         compat = 0;
608                         if (lock->l_blocking_ast &&
609                             lock->l_req_mode != LCK_GROUP)
610                                 ldlm_add_ast_work_item(lock, req, work_list);
611                 }
612         }
613
614         if (ldlm_check_contention(req, *contended_locks) &&
615             compat == 0 &&
616             (*flags & LDLM_FL_DENY_ON_CONTENTION) &&
617             req->l_req_mode != LCK_GROUP &&
618             req_end - req_start <=
619             ldlm_res_to_ns(req->l_resource)->ns_max_nolock_size)
620                 GOTO(destroylock, compat = -EUSERS);
621
622         RETURN(compat);
623 destroylock:
624         list_del_init(&req->l_res_link);
625         ldlm_lock_destroy_nolock(req);
626         *err = compat;
627         RETURN(compat);
628 }
629
630 /**
631  * This function refresh eviction timer for cancelled lock.
632  * \param[in] lock              ldlm lock for refresh
633  * \param[in] arg               ldlm prolong arguments, timeout, export, extent
634  *                              and counter are used
635  */
636 void ldlm_lock_prolong_one(struct ldlm_lock *lock,
637                            struct ldlm_prolong_args *arg)
638 {
639         int timeout;
640
641         if (arg->lpa_export != lock->l_export ||
642             lock->l_flags & LDLM_FL_DESTROYED)
643                 /* ignore unrelated locks */
644                 return;
645
646         arg->lpa_locks_cnt++;
647
648         if (!(lock->l_flags & LDLM_FL_AST_SENT))
649                 /* ignore locks not being cancelled */
650                 return;
651
652         /* We are in the middle of the process - BL AST is sent, CANCEL
653          * is ahead. Take half of BL AT + IO AT process time.
654          */
655         timeout = arg->lpa_timeout + (ldlm_bl_timeout(lock) >> 1);
656
657         LDLM_DEBUG(lock, "refreshed to %ds.\n", timeout);
658
659         arg->lpa_blocks_cnt++;
660
661         /* OK. this is a possible lock the user holds doing I/O
662          * let's refresh eviction timer for it.
663          */
664         ldlm_refresh_waiting_lock(lock, timeout);
665 }
666 EXPORT_SYMBOL(ldlm_lock_prolong_one);
667
668 static enum interval_iter ldlm_resource_prolong_cb(struct interval_node *n,
669                                                    void *data)
670 {
671         struct ldlm_prolong_args *arg = data;
672         struct ldlm_interval *node = to_ldlm_interval(n);
673         struct ldlm_lock *lock;
674
675         ENTRY;
676
677         LASSERT(!list_empty(&node->li_group));
678
679         list_for_each_entry(lock, &node->li_group, l_sl_policy) {
680                 ldlm_lock_prolong_one(lock, arg);
681         }
682
683         RETURN(INTERVAL_ITER_CONT);
684 }
685
686 /**
687  * Walk through granted tree and prolong locks if they overlaps extent.
688  *
689  * \param[in] arg               prolong args
690  */
691 void ldlm_resource_prolong(struct ldlm_prolong_args *arg)
692 {
693         struct ldlm_interval_tree *tree;
694         struct ldlm_resource *res;
695         struct interval_node_extent ex = { .start = arg->lpa_extent.start,
696                                            .end = arg->lpa_extent.end };
697         int idx;
698
699         ENTRY;
700
701         res = ldlm_resource_get(arg->lpa_export->exp_obd->obd_namespace, NULL,
702                                 &arg->lpa_resid, LDLM_EXTENT, 0);
703         if (IS_ERR(res)) {
704                 CDEBUG(D_DLMTRACE, "Failed to get resource for resid %llu/%llu\n",
705                        arg->lpa_resid.name[0], arg->lpa_resid.name[1]);
706                 RETURN_EXIT;
707         }
708
709         lock_res(res);
710         for (idx = 0; idx < LCK_MODE_NUM; idx++) {
711                 tree = &res->lr_itree[idx];
712                 if (tree->lit_root == NULL) /* empty tree, skipped */
713                         continue;
714
715                 /* There is no possibility to check for the groupID
716                  * so all the group locks are considered as valid
717                  * here, especially because the client is supposed
718                  * to check it has such a lock before sending an RPC.
719                  */
720                 if (!(tree->lit_mode & arg->lpa_mode))
721                         continue;
722
723                 interval_search(tree->lit_root, &ex,
724                                 ldlm_resource_prolong_cb, arg);
725         }
726
727         unlock_res(res);
728         ldlm_resource_putref(res);
729
730         EXIT;
731 }
732 EXPORT_SYMBOL(ldlm_resource_prolong);
733
734
735 /**
736  * Discard all AST work items from list.
737  *
738  * If for whatever reason we do not want to send ASTs to conflicting locks
739  * anymore, disassemble the list with this function.
740  */
741 static void discard_bl_list(struct list_head *bl_list)
742 {
743         struct list_head *tmp, *pos;
744         ENTRY;
745
746         list_for_each_safe(pos, tmp, bl_list) {
747                 struct ldlm_lock *lock =
748                         list_entry(pos, struct ldlm_lock, l_bl_ast);
749
750                 list_del_init(&lock->l_bl_ast);
751                 LASSERT(ldlm_is_ast_sent(lock));
752                 ldlm_clear_ast_sent(lock);
753                 LASSERT(lock->l_bl_ast_run == 0);
754                 LASSERT(lock->l_blocking_lock);
755                 LDLM_LOCK_RELEASE(lock->l_blocking_lock);
756                 lock->l_blocking_lock = NULL;
757                 LDLM_LOCK_RELEASE(lock);
758         }
759         EXIT;
760 }
761
762 /**
763  * Process a granting attempt for extent lock.
764  * Must be called with ns lock held.
765  *
766  * This function looks for any conflicts for \a lock in the granted or
767  * waiting queues. The lock is granted if no conflicts are found in
768  * either queue.
769  *
770  * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
771  *   - blocking ASTs have already been sent
772  *
773  * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
774  *   - blocking ASTs have not been sent yet, so list of conflicting locks
775  *     would be collected and ASTs sent.
776  */
777 int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags,
778                              int first_enq, enum ldlm_error *err,
779                              struct list_head *work_list)
780 {
781         struct ldlm_resource *res = lock->l_resource;
782         struct list_head rpc_list;
783         int rc, rc2;
784         int contended_locks = 0;
785         ENTRY;
786
787         LASSERT(lock->l_granted_mode != lock->l_req_mode);
788         LASSERT(list_empty(&res->lr_converting));
789         LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
790                 !ldlm_is_ast_discard_data(lock));
791         INIT_LIST_HEAD(&rpc_list);
792         check_res_locked(res);
793         *err = ELDLM_OK;
794
795         if (!first_enq) {
796                 /* Careful observers will note that we don't handle -EWOULDBLOCK
797                  * here, but it's ok for a non-obvious reason -- compat_queue
798                  * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
799                  * flags should always be zero here, and if that ever stops
800                  * being true, we want to find out. */
801                 LASSERT(*flags == 0);
802                 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
803                                               err, NULL, &contended_locks);
804                 if (rc == 1) {
805                         rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
806                                                       flags, err, NULL,
807                                                       &contended_locks);
808                 }
809                 if (rc == 0)
810                         RETURN(LDLM_ITER_STOP);
811
812                 ldlm_resource_unlink_lock(lock);
813
814                 if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE))
815                         ldlm_extent_policy(res, lock, flags);
816                 ldlm_grant_lock(lock, work_list);
817                 RETURN(LDLM_ITER_CONTINUE);
818         }
819
820  restart:
821         contended_locks = 0;
822         rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
823                                       &rpc_list, &contended_locks);
824         if (rc < 0)
825                 GOTO(out, rc); /* lock was destroyed */
826         if (rc == 2)
827                 goto grant;
828
829         rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err,
830                                        &rpc_list, &contended_locks);
831         if (rc2 < 0)
832                 GOTO(out, rc = rc2); /* lock was destroyed */
833
834         if (rc + rc2 == 2) {
835         grant:
836                 ldlm_extent_policy(res, lock, flags);
837                 ldlm_resource_unlink_lock(lock);
838                 ldlm_grant_lock(lock, NULL);
839         } else {
840                 /* If either of the compat_queue()s returned failure, then we
841                  * have ASTs to send and must go onto the waiting list.
842                  *
843                  * bug 2322: we used to unlink and re-add here, which was a
844                  * terrible folly -- if we goto restart, we could get
845                  * re-ordered!  Causes deadlock, because ASTs aren't sent! */
846                 if (list_empty(&lock->l_res_link))
847                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
848                 unlock_res(res);
849                 rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
850                                        LDLM_WORK_BL_AST);
851
852                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_FAIL_RACE) &&
853                     !ns_is_client(ldlm_res_to_ns(res)))
854                         class_fail_export(lock->l_export);
855
856                 lock_res(res);
857                 if (rc == -ERESTART) {
858                         /* 15715: The lock was granted and destroyed after
859                          * resource lock was dropped. Interval node was freed
860                          * in ldlm_lock_destroy. Anyway, this always happens
861                          * when a client is being evicted. So it would be
862                          * ok to return an error. -jay */
863                         if (ldlm_is_destroyed(lock)) {
864                                 *err = -EAGAIN;
865                                 GOTO(out, rc = -EAGAIN);
866                         }
867
868                         /* lock was granted while resource was unlocked. */
869                         if (lock->l_granted_mode == lock->l_req_mode) {
870                                 /* bug 11300: if the lock has been granted,
871                                  * break earlier because otherwise, we will go
872                                  * to restart and ldlm_resource_unlink will be
873                                  * called and it causes the interval node to be
874                                  * freed. Then we will fail at
875                                  * ldlm_extent_add_lock() */
876                                 *flags &= ~LDLM_FL_BLOCKED_MASK;
877                                 GOTO(out, rc = 0);
878                         }
879
880                         GOTO(restart, rc);
881                 }
882
883                 /* this way we force client to wait for the lock
884                  * endlessly once the lock is enqueued -bzzz */
885                 *flags |= LDLM_FL_BLOCK_GRANTED | LDLM_FL_NO_TIMEOUT;
886
887         }
888         RETURN(0);
889 out:
890         if (!list_empty(&rpc_list)) {
891                 LASSERT(!ldlm_is_ast_discard_data(lock));
892                 discard_bl_list(&rpc_list);
893         }
894         RETURN(rc);
895 }
896 #endif /* HAVE_SERVER_SUPPORT */
897
898 /* When a lock is cancelled by a client, the KMS may undergo change if this
899  * is the "highest lock".  This function returns the new KMS value.
900  * Caller must hold lr_lock already.
901  *
902  * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes! */
903 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
904 {
905         struct ldlm_resource *res = lock->l_resource;
906         struct list_head *tmp;
907         struct ldlm_lock *lck;
908         __u64 kms = 0;
909         ENTRY;
910
911         /* don't let another thread in ldlm_extent_shift_kms race in
912          * just after we finish and take our lock into account in its
913          * calculation of the kms */
914         ldlm_set_kms_ignore(lock);
915
916         list_for_each(tmp, &res->lr_granted) {
917                 lck = list_entry(tmp, struct ldlm_lock, l_res_link);
918
919                 if (ldlm_is_kms_ignore(lck))
920                         continue;
921
922                 if (lck->l_policy_data.l_extent.end >= old_kms)
923                         RETURN(old_kms);
924
925                 /* This extent _has_ to be smaller than old_kms (checked above)
926                  * so kms can only ever be smaller or the same as old_kms. */
927                 if (lck->l_policy_data.l_extent.end + 1 > kms)
928                         kms = lck->l_policy_data.l_extent.end + 1;
929         }
930         LASSERTF(kms <= old_kms, "kms %llu old_kms %llu\n", kms, old_kms);
931
932         RETURN(kms);
933 }
934 EXPORT_SYMBOL(ldlm_extent_shift_kms);
935
936 struct kmem_cache *ldlm_interval_slab;
937 struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock)
938 {
939         struct ldlm_interval *node;
940         ENTRY;
941
942         LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
943         OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS);
944         if (node == NULL)
945                 RETURN(NULL);
946
947         INIT_LIST_HEAD(&node->li_group);
948         ldlm_interval_attach(node, lock);
949         RETURN(node);
950 }
951
952 void ldlm_interval_free(struct ldlm_interval *node)
953 {
954         if (node) {
955                 LASSERT(list_empty(&node->li_group));
956                 LASSERT(!interval_is_intree(&node->li_node));
957                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
958         }
959 }
960
961 /* interval tree, for LDLM_EXTENT. */
962 void ldlm_interval_attach(struct ldlm_interval *n,
963                           struct ldlm_lock *l)
964 {
965         LASSERT(l->l_tree_node == NULL);
966         LASSERT(l->l_resource->lr_type == LDLM_EXTENT);
967
968         list_add_tail(&l->l_sl_policy, &n->li_group);
969         l->l_tree_node = n;
970 }
971
972 struct ldlm_interval *ldlm_interval_detach(struct ldlm_lock *l)
973 {
974         struct ldlm_interval *n = l->l_tree_node;
975
976         if (n == NULL)
977                 return NULL;
978
979         LASSERT(!list_empty(&n->li_group));
980         l->l_tree_node = NULL;
981         list_del_init(&l->l_sl_policy);
982
983         return list_empty(&n->li_group) ? n : NULL;
984 }
985
986 static inline int ldlm_mode_to_index(enum ldlm_mode mode)
987 {
988         int index;
989
990         LASSERT(mode != 0);
991         LASSERT(IS_PO2(mode));
992         for (index = -1; mode != 0; index++, mode >>= 1)
993                 /* do nothing */;
994         LASSERT(index < LCK_MODE_NUM);
995         return index;
996 }
997
998 /** Add newly granted lock into interval tree for the resource. */
999 void ldlm_extent_add_lock(struct ldlm_resource *res,
1000                           struct ldlm_lock *lock)
1001 {
1002         struct interval_node *found, **root;
1003         struct ldlm_interval *node;
1004         struct ldlm_extent *extent;
1005         int idx, rc;
1006
1007         LASSERT(lock->l_granted_mode == lock->l_req_mode);
1008
1009         node = lock->l_tree_node;
1010         LASSERT(node != NULL);
1011         LASSERT(!interval_is_intree(&node->li_node));
1012
1013         idx = ldlm_mode_to_index(lock->l_granted_mode);
1014         LASSERT(lock->l_granted_mode == 1 << idx);
1015         LASSERT(lock->l_granted_mode == res->lr_itree[idx].lit_mode);
1016
1017         /* node extent initialize */
1018         extent = &lock->l_policy_data.l_extent;
1019
1020         rc = interval_set(&node->li_node, extent->start, extent->end);
1021         LASSERT(!rc);
1022
1023         root = &res->lr_itree[idx].lit_root;
1024         found = interval_insert(&node->li_node, root);
1025         if (found) { /* The policy group found. */
1026                 struct ldlm_interval *tmp = ldlm_interval_detach(lock);
1027                 LASSERT(tmp != NULL);
1028                 ldlm_interval_free(tmp);
1029                 ldlm_interval_attach(to_ldlm_interval(found), lock);
1030         }
1031         res->lr_itree[idx].lit_size++;
1032
1033         /* even though we use interval tree to manage the extent lock, we also
1034          * add the locks into grant list, for debug purpose, .. */
1035         ldlm_resource_add_lock(res, &res->lr_granted, lock);
1036
1037         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_GRANT_CHECK)) {
1038                 struct ldlm_lock *lck;
1039
1040                 list_for_each_entry_reverse(lck, &res->lr_granted,
1041                                             l_res_link) {
1042                         if (lck == lock)
1043                                 continue;
1044                         if (lockmode_compat(lck->l_granted_mode,
1045                                             lock->l_granted_mode))
1046                                 continue;
1047                         if (ldlm_extent_overlap(&lck->l_req_extent,
1048                                                 &lock->l_req_extent)) {
1049                                 CDEBUG(D_ERROR, "granting conflicting lock %p "
1050                                                 "%p\n", lck, lock);
1051                                 ldlm_resource_dump(D_ERROR, res);
1052                                 LBUG();
1053                         }
1054                 }
1055         }
1056 }
1057
1058 /** Remove cancelled lock from resource interval tree. */
1059 void ldlm_extent_unlink_lock(struct ldlm_lock *lock)
1060 {
1061         struct ldlm_resource *res = lock->l_resource;
1062         struct ldlm_interval *node = lock->l_tree_node;
1063         struct ldlm_interval_tree *tree;
1064         int idx;
1065
1066         if (!node || !interval_is_intree(&node->li_node)) /* duplicate unlink */
1067                 return;
1068
1069         idx = ldlm_mode_to_index(lock->l_granted_mode);
1070         LASSERT(lock->l_granted_mode == 1 << idx);
1071         tree = &res->lr_itree[idx];
1072
1073         LASSERT(tree->lit_root != NULL); /* assure the tree is not null */
1074
1075         tree->lit_size--;
1076         node = ldlm_interval_detach(lock);
1077         if (node) {
1078                 interval_erase(&node->li_node, &tree->lit_root);
1079                 ldlm_interval_free(node);
1080         }
1081 }
1082
1083 void ldlm_extent_policy_wire_to_local(const union ldlm_wire_policy_data *wpolicy,
1084                                       union ldlm_policy_data *lpolicy)
1085 {
1086         lpolicy->l_extent.start = wpolicy->l_extent.start;
1087         lpolicy->l_extent.end = wpolicy->l_extent.end;
1088         lpolicy->l_extent.gid = wpolicy->l_extent.gid;
1089 }
1090
1091 void ldlm_extent_policy_local_to_wire(const union ldlm_policy_data *lpolicy,
1092                                       union ldlm_wire_policy_data *wpolicy)
1093 {
1094         memset(wpolicy, 0, sizeof(*wpolicy));
1095         wpolicy->l_extent.start = lpolicy->l_extent.start;
1096         wpolicy->l_extent.end = lpolicy->l_extent.end;
1097         wpolicy->l_extent.gid = lpolicy->l_extent.gid;
1098 }
1099