Whamcloud - gitweb
Using interval tree for scalable handling of many extent locks.
[fs/lustre-release.git] / lustre / ldlm / ldlm_extent.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of the Lustre file system, http://www.lustre.org
9  *   Lustre is a trademark of Cluster File Systems, Inc.
10  *
11  *   You may have signed or agreed to another license before downloading
12  *   this software.  If so, you are bound by the terms and conditions
13  *   of that agreement, and the following does not apply to you.  See the
14  *   LICENSE file included with this distribution for more information.
15  *
16  *   If you did not agree to a different license, then this copy of Lustre
17  *   is open source software; you can redistribute it and/or modify it
18  *   under the terms of version 2 of the GNU General Public License as
19  *   published by the Free Software Foundation.
20  *
21  *   In either case, Lustre is distributed in the hope that it will be
22  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
23  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24  *   license text for more details.
25  */
26
27 #define DEBUG_SUBSYSTEM S_LDLM
28 #ifndef __KERNEL__
29 # include <liblustre.h>
30 #endif
31
32 #include <lustre_dlm.h>
33 #include <obd_support.h>
34 #include <obd.h>
35 #include <lustre_lib.h>
36
37 #include "ldlm_internal.h"
38
39 #define LDLM_MAX_GROWN_EXTENT (32 * 1024 * 1024 - 1)
40
41 /* fixup the ldlm_extent after expanding */
42 static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
43                                               struct ldlm_extent *new_ex,
44                                               int conflicting)
45 {
46         ldlm_mode_t req_mode = req->l_req_mode;
47         __u64 req_start = req->l_req_extent.start;
48         __u64 req_end = req->l_req_extent.end;
49         __u64 req_align, mask;
50  
51         if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) {
52                 if (req_end < req_start + LDLM_MAX_GROWN_EXTENT)
53                         new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT,
54                                           new_ex->end);
55         }
56
57         if (new_ex->start == 0 && new_ex->end == OBD_OBJECT_EOF) {
58                 EXIT;
59                 return;
60         }
61
62         /* we need to ensure that the lock extent is properly aligned to what
63          * the client requested.  We align it to the lowest-common denominator
64          * of the clients requested lock start and end alignment. */
65         mask = 0x1000ULL;
66         req_align = (req_end + 1) | req_start;
67         if (req_align != 0) {
68                 while ((req_align & mask) == 0)
69                         mask <<= 1;
70         }
71         mask -= 1;
72         /* We can only shrink the lock, not grow it.
73          * This should never cause lock to be smaller than requested,
74          * since requested lock was already aligned on these boundaries. */
75         new_ex->start = ((new_ex->start - 1) | mask) + 1;
76         new_ex->end = ((new_ex->end + 1) & ~mask) - 1;
77         LASSERTF(new_ex->start <= req_start,
78                  "mask "LPX64" grant start "LPU64" req start "LPU64"\n",
79                  mask, new_ex->start, req_start);
80         LASSERTF(new_ex->end >= req_end,
81                  "mask "LPX64" grant end "LPU64" req end "LPU64"\n",
82                  mask, new_ex->end, req_end);
83 }
84
85 /* The purpose of this function is to return:
86  * - the maximum extent
87  * - containing the requested extent
88  * - and not overlapping existing conflicting extents outside the requested one
89  *
90  * Use interval tree to expand the lock extent for granted lock.
91  */
92 static void ldlm_extent_internal_policy_granted(struct ldlm_lock *req,
93                                                 struct ldlm_extent *new_ex)
94 {
95         struct ldlm_resource *res = req->l_resource;
96         ldlm_mode_t req_mode = req->l_req_mode;
97         __u64 req_start = req->l_req_extent.start;
98         __u64 req_end = req->l_req_extent.end;
99         struct ldlm_interval_tree *tree;
100         struct interval_node_extent limiter = { new_ex->start, new_ex->end };
101         int conflicting = 0;
102         int idx;
103         ENTRY;
104
105         lockmode_verify(req_mode);
106
107         /* using interval tree to handle the ldlm extent granted locks */
108         for (idx = 0; idx < LCK_MODE_NUM; idx++) {
109                 struct interval_node_extent ext = { req_start, req_end };
110
111                 tree = &res->lr_itree[idx];
112                 if (lockmode_compat(tree->lit_mode, req_mode))
113                         continue;
114
115                 conflicting += tree->lit_size;
116                 if (conflicting > 4)
117                         limiter.start = req_start;
118
119                 if (interval_is_overlapped(tree->lit_root, &ext))
120                         printk("req_mode = %d, tree->lit_mode = %d, tree->lit_size = %d\n",
121                                req_mode, tree->lit_mode, tree->lit_size);
122                 interval_expand(tree->lit_root, &ext, &limiter);
123                 limiter.start = max(limiter.start, ext.start);
124                 limiter.end = min(limiter.end, ext.end);
125                 if (limiter.start == req_start && limiter.end == req_end)
126                         break;
127         }
128
129         new_ex->start = limiter.start;
130         new_ex->end = limiter.end;
131         LASSERT(new_ex->start <= req_start);
132         LASSERT(new_ex->end >= req_end);
133
134         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
135         EXIT;
136 }
137
138 /* The purpose of this function is to return:
139  * - the maximum extent
140  * - containing the requested extent
141  * - and not overlapping existing conflicting extents outside the requested one
142  */
143 static void
144 ldlm_extent_internal_policy_waiting(struct ldlm_lock *req,
145                                     struct ldlm_extent *new_ex)
146 {
147         struct list_head *tmp;
148         struct ldlm_resource *res = req->l_resource;
149         ldlm_mode_t req_mode = req->l_req_mode;
150         __u64 req_start = req->l_req_extent.start;
151         __u64 req_end = req->l_req_extent.end;
152         int conflicting = 0;
153         ENTRY;
154
155         lockmode_verify(req_mode);
156
157         /* for waiting locks */
158         list_for_each(tmp, &res->lr_waiting) {
159                 struct ldlm_lock *lock;
160                 struct ldlm_extent *l_extent;
161
162                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
163                 l_extent = &lock->l_policy_data.l_extent;
164
165                 /* We already hit the minimum requested size, search no more */
166                 if (new_ex->start == req_start && new_ex->end == req_end) {
167                         EXIT;
168                         return;
169                 }
170
171                 /* Don't conflict with ourselves */
172                 if (req == lock)
173                         continue;
174
175                 /* Locks are compatible, overlap doesn't matter */
176                 /* Until bug 20 is fixed, try to avoid granting overlapping
177                  * locks on one client (they take a long time to cancel) */
178                 if (lockmode_compat(lock->l_req_mode, req_mode) &&
179                     lock->l_export != req->l_export)
180                         continue;
181
182                 /* If this is a high-traffic lock, don't grow downwards at all
183                  * or grow upwards too much */
184                 ++conflicting;
185                 if (conflicting > 4)
186                         new_ex->start = req_start;
187
188                 /* If lock doesn't overlap new_ex, skip it. */
189                 if (!ldlm_extent_overlap(l_extent, new_ex))
190                         continue;
191
192                 /* Locks conflicting in requested extents and we can't satisfy
193                  * both locks, so ignore it.  Either we will ping-pong this
194                  * extent (we would regardless of what extent we granted) or
195                  * lock is unused and it shouldn't limit our extent growth. */
196                 if (ldlm_extent_overlap(&lock->l_req_extent,&req->l_req_extent))
197                         continue;
198
199                 /* We grow extents downwards only as far as they don't overlap
200                  * with already-granted locks, on the assumtion that clients
201                  * will be writing beyond the initial requested end and would
202                  * then need to enqueue a new lock beyond previous request.
203                  * l_req_extent->end strictly < req_start, checked above. */
204                 if (l_extent->start < req_start && new_ex->start != req_start) {
205                         if (l_extent->end >= req_start)
206                                 new_ex->start = req_start;
207                         else
208                                 new_ex->start = min(l_extent->end+1, req_start);
209                 }
210
211                 /* If we need to cancel this lock anyways because our request
212                  * overlaps the granted lock, we grow up to its requested
213                  * extent start instead of limiting this extent, assuming that
214                  * clients are writing forwards and the lock had over grown
215                  * its extent downwards before we enqueued our request. */
216                 if (l_extent->end > req_end) {
217                         if (l_extent->start <= req_end)
218                                 new_ex->end = max(lock->l_req_extent.start - 1,
219                                                   req_end);
220                         else
221                                 new_ex->end = max(l_extent->start - 1, req_end);
222                 }
223         }
224
225         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
226         EXIT;
227 }
228
229
230 /* In order to determine the largest possible extent we can grant, we need
231  * to scan all of the queues. */
232 static void ldlm_extent_policy(struct ldlm_resource *res,
233                                struct ldlm_lock *lock, int *flags)
234 {
235         struct ldlm_extent new_ex = { .start = 0, .end = OBD_OBJECT_EOF };
236
237         if (lock->l_export == NULL)
238                 /*
239                  * this is local lock taken by server (e.g., as a part of
240                  * OST-side locking, or unlink handling). Expansion doesn't
241                  * make a lot of sense for local locks, because they are
242                  * dropped immediately on operation completion and would only
243                  * conflict with other threads.
244                  */
245                 return;
246
247         if (lock->l_policy_data.l_extent.start == 0 &&
248             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
249                 /* fast-path whole file locks */
250                 return;
251
252         ldlm_extent_internal_policy_granted(lock, &new_ex);
253         ldlm_extent_internal_policy_waiting(lock, &new_ex);
254
255         if (new_ex.start != lock->l_policy_data.l_extent.start ||
256             new_ex.end != lock->l_policy_data.l_extent.end) {
257                 *flags |= LDLM_FL_LOCK_CHANGED;
258                 lock->l_policy_data.l_extent.start = new_ex.start;
259                 lock->l_policy_data.l_extent.end = new_ex.end;
260         }
261 }
262
263 static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
264 {
265         struct ldlm_resource *res = lock->l_resource;
266         cfs_time_t now = cfs_time_current();
267
268         CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
269         if (contended_locks > res->lr_namespace->ns_contended_locks)
270                 res->lr_contention_time = now;
271         return cfs_time_before(now, cfs_time_add(res->lr_contention_time,
272                 cfs_time_seconds(res->lr_namespace->ns_contention_time)));
273 }
274
275 struct ldlm_extent_compat_args {
276         struct list_head *work_list;
277         struct ldlm_lock *lock;
278         ldlm_mode_t mode;
279         int *locks;
280         int *compat;
281 };
282
283 static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
284                                                 void *data)
285 {
286         struct ldlm_extent_compat_args *priv = data;
287         struct ldlm_interval *node = to_ldlm_interval(n);
288         struct ldlm_extent *extent;
289         struct list_head *work_list = priv->work_list;
290         struct ldlm_lock *lock, *enq = priv->lock;
291         ldlm_mode_t mode = priv->mode;
292         int count = 0;
293         ENTRY;
294
295         LASSERT(!list_empty(&node->li_group));
296
297         list_for_each_entry(lock, &node->li_group, l_sl_policy) {
298                 /* interval tree is for granted lock */
299                 LASSERTF(mode == lock->l_granted_mode,
300                          "mode = %s, lock->l_granted_mode = %s\n",
301                          ldlm_lockname[mode],
302                          ldlm_lockname[lock->l_granted_mode]);
303
304                 count++;
305                 if (lock->l_blocking_ast)
306                         ldlm_add_ast_work_item(lock, enq, work_list);
307         }
308         LASSERT(count > 0);
309
310         /* don't count conflicting glimpse locks */
311         extent = ldlm_interval_extent(node);
312         if (!(mode == LCK_PR &&
313             extent->start == 0 && extent->end == OBD_OBJECT_EOF))
314                 *priv->locks += count;
315
316         if (priv->compat)
317                 *priv->compat = 0;
318
319         RETURN(INTERVAL_ITER_CONT);
320 }
321
322 /* Determine if the lock is compatible with all locks on the queue.
323  * We stop walking the queue if we hit ourselves so we don't take
324  * conflicting locks enqueued after us into accound, or we'd wait forever.
325  *
326  * 0 if the lock is not compatible
327  * 1 if the lock is compatible
328  * 2 if this group lock is compatible and requires no further checking
329  * negative error, such as EWOULDBLOCK for group locks
330  */
331 static int
332 ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
333                          int *flags, ldlm_error_t *err,
334                          struct list_head *work_list, int *contended_locks)
335 {
336         struct list_head *tmp;
337         struct ldlm_lock *lock;
338         struct ldlm_resource *res = req->l_resource;
339         ldlm_mode_t req_mode = req->l_req_mode;
340         __u64 req_start = req->l_req_extent.start;
341         __u64 req_end = req->l_req_extent.end;
342         int compat = 1;
343         int scan = 0;
344         int check_contention;
345         ENTRY;
346
347         lockmode_verify(req_mode);
348
349         /* Using interval tree for granted lock */
350         if (queue == &res->lr_granted) {
351                 struct ldlm_interval_tree *tree;
352                 struct ldlm_extent_compat_args data = {.work_list = work_list,
353                                                .lock = req,
354                                                .locks = contended_locks,
355                                                .compat = &compat };
356                 struct interval_node_extent ex = { .start = req_start,
357                                                    .end = req_end };
358                 int idx, rc;
359
360                 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
361                         tree = &res->lr_itree[idx];
362                         if (tree->lit_root == NULL) /* empty tree, skipped */
363                                 continue;
364
365                         data.mode = tree->lit_mode;
366                         if (lockmode_compat(req_mode, tree->lit_mode)) {
367                                 struct ldlm_interval *node;
368                                 struct ldlm_extent *extent;
369
370                                 if (req_mode != LCK_GROUP)
371                                         continue;
372
373                                 /* group lock, grant it immediately if
374                                  * compatible */
375                                 node = to_ldlm_interval(tree->lit_root);
376                                 extent = ldlm_interval_extent(node);
377                                 if (req->l_policy_data.l_extent.gid ==
378                                     extent->gid)
379                                         RETURN(2);
380                         }
381
382                         if (tree->lit_mode == LCK_GROUP) {
383                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
384                                         compat = -EWOULDBLOCK;
385                                         goto destroylock;
386                                 }
387
388                                 *flags |= LDLM_FL_NO_TIMEOUT;
389                                 if (!work_list)
390                                         RETURN(0);
391
392                                 /* if work list is not NULL,add all
393                                    locks in the tree to work list */
394                                 compat = 0;
395                                 interval_iterate(tree->lit_root,
396                                                  ldlm_extent_compat_cb, &data);
397                                 continue;
398                         }
399
400                         if (!work_list) {
401                                 rc = interval_is_overlapped(tree->lit_root,&ex);
402                                 if (rc)
403                                         RETURN(0);
404                         } else {
405                                 interval_search(tree->lit_root, &ex,
406                                                 ldlm_extent_compat_cb, &data);
407                                 if (!list_empty(work_list) && compat)
408                                         compat = 0;
409                         }
410                 }
411                 RETURN(compat);
412         }
413
414         /* for waiting queue */
415         list_for_each(tmp, queue) {
416                 check_contention = 1;
417
418                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
419
420                 if (req == lock)
421                         break;
422
423                 if (unlikely(scan)) {
424                         /* We only get here if we are queuing GROUP lock
425                            and met some incompatible one. The main idea of this
426                            code is to insert GROUP lock past compatible GROUP
427                            lock in the waiting queue or if there is not any,
428                            then in front of first non-GROUP lock */
429                         if (lock->l_req_mode != LCK_GROUP) {
430                                 /* Ok, we hit non-GROUP lock, there should be no
431                                 more GROUP locks later on, queue in front of
432                                 first non-GROUP lock */
433
434                                 ldlm_resource_insert_lock_after(lock, req);
435                                 list_del_init(&lock->l_res_link);
436                                 ldlm_resource_insert_lock_after(req, lock);
437                                 compat = 0;
438                                 break;
439                         }
440                         if (req->l_policy_data.l_extent.gid ==
441                              lock->l_policy_data.l_extent.gid) {
442                                 /* found it */
443                                 ldlm_resource_insert_lock_after(lock, req);
444                                 compat = 0;
445                                 break;
446                         }
447                         continue;
448                 }
449
450                 /* locks are compatible, overlap doesn't matter */
451                 if (lockmode_compat(lock->l_req_mode, req_mode)) {
452                         /* non-group locks are compatible, overlap doesn't
453                            matter */
454                         if (likely(req_mode != LCK_GROUP))
455                                 continue;
456
457                         /* If we are trying to get a GROUP lock and there is
458                            another one of this kind, we need to compare gid */
459                         if (req->l_policy_data.l_extent.gid ==
460                             lock->l_policy_data.l_extent.gid) {
461                                 /* If existing lock with matched gid is granted,
462                                    we grant new one too. */
463                                 if (lock->l_req_mode == lock->l_granted_mode)
464                                         RETURN(2);
465
466                                 /* Otherwise we are scanning queue of waiting
467                                  * locks and it means current request would
468                                  * block along with existing lock (that is
469                                  * already blocked.
470                                  * If we are in nonblocking mode - return
471                                  * immediately */
472                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
473                                         compat = -EWOULDBLOCK;
474                                         goto destroylock;
475                                 }
476                                 /* If this group lock is compatible with another
477                                  * group lock on the waiting list, they must be
478                                  * together in the list, so they can be granted
479                                  * at the same time.  Otherwise the later lock
480                                  * can get stuck behind another, incompatible,
481                                  * lock. */
482                                 ldlm_resource_insert_lock_after(lock, req);
483                                 /* Because 'lock' is not granted, we can stop
484                                  * processing this queue and return immediately.
485                                  * There is no need to check the rest of the
486                                  * list. */
487                                 RETURN(0);
488                         }
489                 }
490
491                 if (unlikely(req_mode == LCK_GROUP &&
492                              (lock->l_req_mode != lock->l_granted_mode))) {
493                         scan = 1;
494                         compat = 0;
495                         if (lock->l_req_mode != LCK_GROUP) {
496                                 /* Ok, we hit non-GROUP lock, there should
497                                  * be no more GROUP locks later on, queue in
498                                  * front of first non-GROUP lock */
499
500                                 ldlm_resource_insert_lock_after(lock, req);
501                                 list_del_init(&lock->l_res_link);
502                                 ldlm_resource_insert_lock_after(req, lock);
503                                 break;
504                         }
505                         if (req->l_policy_data.l_extent.gid ==
506                              lock->l_policy_data.l_extent.gid) {
507                                 /* found it */
508                                 ldlm_resource_insert_lock_after(lock, req);
509                                 break;
510                         }
511                         continue;
512                 }
513
514                 if (unlikely(lock->l_req_mode == LCK_GROUP)) {
515                         /* If compared lock is GROUP, then requested is PR/PW/
516                          * so this is not compatible; extent range does not
517                          * matter */
518                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
519                                 compat = -EWOULDBLOCK;
520                                 goto destroylock;
521                         } else {
522                                 *flags |= LDLM_FL_NO_TIMEOUT;
523                         }
524                 } else if (lock->l_policy_data.l_extent.end < req_start ||
525                            lock->l_policy_data.l_extent.start > req_end) {
526                         /* if a non group lock doesn't overlap skip it */
527                         continue;
528                 } else if (lock->l_req_extent.end < req_start ||
529                            lock->l_req_extent.start > req_end)
530                         /* false contention, the requests doesn't really overlap */
531                                 check_contention = 0;
532
533                 if (!work_list)
534                         RETURN(0);
535
536                 /* don't count conflicting glimpse locks */
537                 if (lock->l_req_mode == LCK_PR &&
538                     lock->l_policy_data.l_extent.start == 0 &&
539                     lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
540                         check_contention = 0;
541
542                 *contended_locks += check_contention;
543
544                 compat = 0;
545                 if (lock->l_blocking_ast)
546                         ldlm_add_ast_work_item(lock, req, work_list);
547         }
548
549         if (ldlm_check_contention(req, *contended_locks) &&
550             compat == 0 &&
551             (*flags & LDLM_FL_DENY_ON_CONTENTION) &&
552             req->l_req_mode != LCK_GROUP &&
553             req_end - req_start <=
554             req->l_resource->lr_namespace->ns_max_nolock_size)
555                 GOTO(destroylock, compat = -EUSERS);
556
557         RETURN(compat);
558 destroylock:
559         list_del_init(&req->l_res_link);
560         ldlm_lock_destroy_nolock(req);
561         *err = compat;
562         RETURN(compat);
563 }
564
565 static void discard_bl_list(struct list_head *bl_list)
566 {
567         struct list_head *tmp, *pos;
568         ENTRY;
569
570         list_for_each_safe(pos, tmp, bl_list) {
571                 struct ldlm_lock *lock =
572                         list_entry(pos, struct ldlm_lock, l_bl_ast);
573
574                 list_del_init(&lock->l_bl_ast);
575                 LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
576                 lock->l_flags &= ~LDLM_FL_AST_SENT;
577                 LASSERT(lock->l_bl_ast_run == 0);
578                 LASSERT(lock->l_blocking_lock);
579                 LDLM_LOCK_PUT(lock->l_blocking_lock);
580                 lock->l_blocking_lock = NULL;
581                 LDLM_LOCK_PUT(lock);
582         }
583         EXIT;
584 }
585
586 /* If first_enq is 0 (ie, called from ldlm_reprocess_queue):
587   *   - blocking ASTs have already been sent
588   *   - must call this function with the ns lock held
589   *
590   * If first_enq is 1 (ie, called from ldlm_lock_enqueue):
591   *   - blocking ASTs have not been sent
592   *   - must call this function with the ns lock held once */
593 int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
594                              ldlm_error_t *err, struct list_head *work_list)
595 {
596         struct ldlm_resource *res = lock->l_resource;
597         struct list_head rpc_list = CFS_LIST_HEAD_INIT(rpc_list);
598         int rc, rc2;
599         int contended_locks = 0;
600         ENTRY;
601
602         LASSERT(list_empty(&res->lr_converting));
603         LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
604                 !(lock->l_flags & LDLM_AST_DISCARD_DATA));
605         check_res_locked(res);
606         *err = ELDLM_OK;
607
608         if (!first_enq) {
609                 /* Careful observers will note that we don't handle -EWOULDBLOCK
610                  * here, but it's ok for a non-obvious reason -- compat_queue
611                  * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
612                  * flags should always be zero here, and if that ever stops
613                  * being true, we want to find out. */
614                 LASSERT(*flags == 0);
615                 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
616                                               err, NULL, &contended_locks);
617                 if (rc == 1) {
618                         rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
619                                                       flags, err, NULL,
620                                                       &contended_locks);
621                 }
622                 if (rc == 0)
623                         RETURN(LDLM_ITER_STOP);
624
625                 ldlm_resource_unlink_lock(lock);
626
627                 if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE))
628                         ldlm_extent_policy(res, lock, flags);
629                 ldlm_grant_lock(lock, work_list);
630                 RETURN(LDLM_ITER_CONTINUE);
631         }
632
633  restart:
634         contended_locks = 0;
635         rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
636                                       &rpc_list, &contended_locks);
637         if (rc < 0)
638                 GOTO(out, rc); /* lock was destroyed */
639         if (rc == 2)
640                 goto grant;
641
642         rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err,
643                                        &rpc_list, &contended_locks);
644         if (rc2 < 0)
645                 GOTO(out, rc = rc2); /* lock was destroyed */
646
647         if (rc + rc2 == 2) {
648         grant:
649                 ldlm_extent_policy(res, lock, flags);
650                 ldlm_resource_unlink_lock(lock);
651                 ldlm_grant_lock(lock, NULL);
652         } else {
653                 /* If either of the compat_queue()s returned failure, then we
654                  * have ASTs to send and must go onto the waiting list.
655                  *
656                  * bug 2322: we used to unlink and re-add here, which was a
657                  * terrible folly -- if we goto restart, we could get
658                  * re-ordered!  Causes deadlock, because ASTs aren't sent! */
659                 if (list_empty(&lock->l_res_link))
660                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
661                 unlock_res(res);
662                 rc = ldlm_run_bl_ast_work(&rpc_list);
663                 lock_res(res);
664                 if (rc == -ERESTART)
665                         GOTO(restart, -ERESTART);
666                 *flags |= LDLM_FL_BLOCK_GRANTED;
667                 /* this way we force client to wait for the lock
668                  * endlessly once the lock is enqueued -bzzz */
669                 *flags |= LDLM_FL_NO_TIMEOUT;
670
671         }
672         RETURN(0);
673 out:
674         if (!list_empty(&rpc_list)) {
675                 LASSERT(!(lock->l_flags & LDLM_AST_DISCARD_DATA));
676                 discard_bl_list(&rpc_list);
677         }
678         RETURN(rc);
679 }
680
681 /* When a lock is cancelled by a client, the KMS may undergo change if this
682  * is the "highest lock".  This function returns the new KMS value.
683  * Caller must hold ns_lock already.
684  *
685  * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes! */
686 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
687 {
688         struct ldlm_resource *res = lock->l_resource;
689         struct list_head *tmp;
690         struct ldlm_lock *lck;
691         __u64 kms = 0;
692         ENTRY;
693
694         /* don't let another thread in ldlm_extent_shift_kms race in
695          * just after we finish and take our lock into account in its
696          * calculation of the kms */
697         lock->l_flags |= LDLM_FL_KMS_IGNORE;
698
699         list_for_each(tmp, &res->lr_granted) {
700                 lck = list_entry(tmp, struct ldlm_lock, l_res_link);
701
702                 if (lck->l_flags & LDLM_FL_KMS_IGNORE)
703                         continue;
704
705                 if (lck->l_policy_data.l_extent.end >= old_kms)
706                         RETURN(old_kms);
707
708                 /* This extent _has_ to be smaller than old_kms (checked above)
709                  * so kms can only ever be smaller or the same as old_kms. */
710                 if (lck->l_policy_data.l_extent.end + 1 > kms)
711                         kms = lck->l_policy_data.l_extent.end + 1;
712         }
713         LASSERTF(kms <= old_kms, "kms "LPU64" old_kms "LPU64"\n", kms, old_kms);
714
715         RETURN(kms);
716 }
717
718 cfs_mem_cache_t *ldlm_interval_slab;
719 struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock)
720 {
721         struct ldlm_interval *node;
722         ENTRY;
723
724         LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
725         OBD_SLAB_ALLOC(node, ldlm_interval_slab, CFS_ALLOC_IO, sizeof(*node));
726         if (node == NULL)
727                 RETURN(NULL);
728
729         CFS_INIT_LIST_HEAD(&node->li_group);
730         ldlm_interval_attach(node, lock);
731         RETURN(node);
732 }
733
734 void ldlm_interval_free(struct ldlm_interval *node)
735 {
736         if (node) {
737                 LASSERT(list_empty(&node->li_group));
738                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
739         }
740 }
741
742 /* interval tree, for LDLM_EXTENT. */
743 void ldlm_interval_attach(struct ldlm_interval *n,
744                           struct ldlm_lock *l)
745 {
746         LASSERT(l->l_tree_node == NULL);
747         LASSERT(l->l_resource->lr_type == LDLM_EXTENT);
748
749         list_add_tail(&l->l_sl_policy, &n->li_group);
750         l->l_tree_node = n;
751 }
752
753 struct ldlm_interval *ldlm_interval_detach(struct ldlm_lock *l)
754 {
755         struct ldlm_interval *n = l->l_tree_node;
756
757         if (n == NULL)
758                 return NULL;
759
760         LASSERT(!list_empty(&n->li_group));
761         l->l_tree_node = NULL;
762         list_del_init(&l->l_sl_policy);
763
764         return (list_empty(&n->li_group) ? n : NULL);
765 }
766
767 static inline int lock_mode_to_index(ldlm_mode_t mode)
768 {
769         int index;
770
771         LASSERT(mode != 0);
772         LASSERT(IS_PO2(mode));
773         for (index = -1; mode; index++, mode >>= 1) ;
774         LASSERT(index < LCK_MODE_NUM);
775         return index;
776 }
777
778 void ldlm_extent_add_lock(struct ldlm_resource *res,
779                           struct ldlm_lock *lock)
780 {
781         struct interval_node *found, **root;
782         struct ldlm_interval *node;
783         struct ldlm_extent *extent;
784         int idx;
785
786         LASSERT(lock->l_granted_mode == lock->l_req_mode);
787
788         node = lock->l_tree_node;
789         LASSERT(node != NULL);
790
791         idx = lock_mode_to_index(lock->l_granted_mode);
792         LASSERT(lock->l_granted_mode == 1 << idx);
793         LASSERT(lock->l_granted_mode == res->lr_itree[idx].lit_mode);
794
795         /* node extent initialize */
796         extent = &lock->l_policy_data.l_extent;
797         interval_set(&node->li_node, extent->start, extent->end);
798
799         root = &res->lr_itree[idx].lit_root;
800         found = interval_insert(&node->li_node, root);
801         if (found) { /* The policy group found. */
802                 struct ldlm_interval *tmp = ldlm_interval_detach(lock);
803                 LASSERT(tmp != NULL);
804                 ldlm_interval_free(tmp);
805                 ldlm_interval_attach(to_ldlm_interval(found), lock);
806         }
807         res->lr_itree[idx].lit_size++;
808
809         /* even though we use interval tree to manage the extent lock, we also
810          * add the locks into grant list, for debug purpose, .. */
811         ldlm_resource_add_lock(res, &res->lr_granted, lock);
812 }
813
814 void ldlm_extent_unlink_lock(struct ldlm_lock *lock)
815 {
816         struct ldlm_resource *res = lock->l_resource;
817         struct ldlm_interval *node;
818         struct ldlm_interval_tree *tree;
819         int idx;
820
821         if (lock->l_granted_mode != lock->l_req_mode)
822                 return;
823
824         LASSERT(lock->l_tree_node != NULL);
825         idx = lock_mode_to_index(lock->l_granted_mode);
826         LASSERT(lock->l_granted_mode == 1 << idx);
827         tree = &res->lr_itree[idx];
828
829         LASSERT(tree->lit_root != NULL); /* assure the tree is not null */
830
831         tree->lit_size--;
832         node = ldlm_interval_detach(lock);
833         if (node) {
834                 interval_erase(&node->li_node, &tree->lit_root);
835                 ldlm_interval_free(node);
836         }
837 }