Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / ldlm / ldlm_extent.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of the Lustre file system, http://www.lustre.org
9  *   Lustre is a trademark of Cluster File Systems, Inc.
10  *
11  *   You may have signed or agreed to another license before downloading
12  *   this software.  If so, you are bound by the terms and conditions
13  *   of that agreement, and the following does not apply to you.  See the
14  *   LICENSE file included with this distribution for more information.
15  *
16  *   If you did not agree to a different license, then this copy of Lustre
17  *   is open source software; you can redistribute it and/or modify it
18  *   under the terms of version 2 of the GNU General Public License as
19  *   published by the Free Software Foundation.
20  *
21  *   In either case, Lustre is distributed in the hope that it will be
22  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
23  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24  *   license text for more details.
25  */
26
27 #define DEBUG_SUBSYSTEM S_LDLM
28 #ifndef __KERNEL__
29 # include <liblustre.h>
30 #else
31 # include <libcfs/libcfs.h>
32 #endif
33
34 #include <lustre_dlm.h>
35 #include <obd_support.h>
36 #include <obd.h>
37 #include <lustre_lib.h>
38
39 #include "ldlm_internal.h"
40
41 #define LDLM_MAX_GROWN_EXTENT (32 * 1024 * 1024 - 1)
42
43 /* fixup the ldlm_extent after expanding */
44 static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
45                                               struct ldlm_extent *new_ex,
46                                               int conflicting)
47 {
48         ldlm_mode_t req_mode = req->l_req_mode;
49         __u64 req_start = req->l_req_extent.start;
50         __u64 req_end = req->l_req_extent.end;
51         __u64 req_align, mask;
52  
53         if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) {
54                 if (req_end < req_start + LDLM_MAX_GROWN_EXTENT)
55                         new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT,
56                                           new_ex->end);
57         }
58
59         if (new_ex->start == 0 && new_ex->end == OBD_OBJECT_EOF) {
60                 EXIT;
61                 return;
62         }
63
64         /* we need to ensure that the lock extent is properly aligned to what
65          * the client requested.  We align it to the lowest-common denominator
66          * of the clients requested lock start and end alignment. */
67         mask = 0x1000ULL;
68         req_align = (req_end + 1) | req_start;
69         if (req_align != 0) {
70                 while ((req_align & mask) == 0)
71                         mask <<= 1;
72         }
73         mask -= 1;
74         /* We can only shrink the lock, not grow it.
75          * This should never cause lock to be smaller than requested,
76          * since requested lock was already aligned on these boundaries. */
77         new_ex->start = ((new_ex->start - 1) | mask) + 1;
78         new_ex->end = ((new_ex->end + 1) & ~mask) - 1;
79         LASSERTF(new_ex->start <= req_start,
80                  "mask "LPX64" grant start "LPU64" req start "LPU64"\n",
81                  mask, new_ex->start, req_start);
82         LASSERTF(new_ex->end >= req_end,
83                  "mask "LPX64" grant end "LPU64" req end "LPU64"\n",
84                  mask, new_ex->end, req_end);
85 }
86
87 /* The purpose of this function is to return:
88  * - the maximum extent
89  * - containing the requested extent
90  * - and not overlapping existing conflicting extents outside the requested one
91  *
92  * Use interval tree to expand the lock extent for granted lock.
93  */
94 static void ldlm_extent_internal_policy_granted(struct ldlm_lock *req,
95                                                 struct ldlm_extent *new_ex)
96 {
97         struct ldlm_resource *res = req->l_resource;
98         ldlm_mode_t req_mode = req->l_req_mode;
99         __u64 req_start = req->l_req_extent.start;
100         __u64 req_end = req->l_req_extent.end;
101         struct ldlm_interval_tree *tree;
102         struct interval_node_extent limiter = { new_ex->start, new_ex->end };
103         int conflicting = 0;
104         int idx;
105         ENTRY;
106
107         lockmode_verify(req_mode);
108
109         /* using interval tree to handle the ldlm extent granted locks */
110         for (idx = 0; idx < LCK_MODE_NUM; idx++) {
111                 struct interval_node_extent ext = { req_start, req_end };
112
113                 tree = &res->lr_itree[idx];
114                 if (lockmode_compat(tree->lit_mode, req_mode))
115                         continue;
116
117                 conflicting += tree->lit_size;
118                 if (conflicting > 4)
119                         limiter.start = req_start;
120
121                 if (interval_is_overlapped(tree->lit_root, &ext))
122                         printk("req_mode = %d, tree->lit_mode = %d, tree->lit_size = %d\n",
123                                req_mode, tree->lit_mode, tree->lit_size);
124                 interval_expand(tree->lit_root, &ext, &limiter);
125                 limiter.start = max(limiter.start, ext.start);
126                 limiter.end = min(limiter.end, ext.end);
127                 if (limiter.start == req_start && limiter.end == req_end)
128                         break;
129         }
130
131         new_ex->start = limiter.start;
132         new_ex->end = limiter.end;
133         LASSERT(new_ex->start <= req_start);
134         LASSERT(new_ex->end >= req_end);
135
136         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
137         EXIT;
138 }
139
140 /* The purpose of this function is to return:
141  * - the maximum extent
142  * - containing the requested extent
143  * - and not overlapping existing conflicting extents outside the requested one
144  */
145 static void
146 ldlm_extent_internal_policy_waiting(struct ldlm_lock *req,
147                                     struct ldlm_extent *new_ex)
148 {
149         struct list_head *tmp;
150         struct ldlm_resource *res = req->l_resource;
151         ldlm_mode_t req_mode = req->l_req_mode;
152         __u64 req_start = req->l_req_extent.start;
153         __u64 req_end = req->l_req_extent.end;
154         int conflicting = 0;
155         ENTRY;
156
157         lockmode_verify(req_mode);
158
159         /* for waiting locks */
160         list_for_each(tmp, &res->lr_waiting) {
161                 struct ldlm_lock *lock;
162                 struct ldlm_extent *l_extent;
163
164                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
165                 l_extent = &lock->l_policy_data.l_extent;
166
167                 /* We already hit the minimum requested size, search no more */
168                 if (new_ex->start == req_start && new_ex->end == req_end) {
169                         EXIT;
170                         return;
171                 }
172
173                 /* Don't conflict with ourselves */
174                 if (req == lock)
175                         continue;
176
177                 /* Locks are compatible, overlap doesn't matter */
178                 /* Until bug 20 is fixed, try to avoid granting overlapping
179                  * locks on one client (they take a long time to cancel) */
180                 if (lockmode_compat(lock->l_req_mode, req_mode) &&
181                     lock->l_export != req->l_export)
182                         continue;
183
184                 /* If this is a high-traffic lock, don't grow downwards at all
185                  * or grow upwards too much */
186                 ++conflicting;
187                 if (conflicting > 4)
188                         new_ex->start = req_start;
189
190                 /* If lock doesn't overlap new_ex, skip it. */
191                 if (!ldlm_extent_overlap(l_extent, new_ex))
192                         continue;
193
194                 /* Locks conflicting in requested extents and we can't satisfy
195                  * both locks, so ignore it.  Either we will ping-pong this
196                  * extent (we would regardless of what extent we granted) or
197                  * lock is unused and it shouldn't limit our extent growth. */
198                 if (ldlm_extent_overlap(&lock->l_req_extent,&req->l_req_extent))
199                         continue;
200
201                 /* We grow extents downwards only as far as they don't overlap
202                  * with already-granted locks, on the assumtion that clients
203                  * will be writing beyond the initial requested end and would
204                  * then need to enqueue a new lock beyond previous request.
205                  * l_req_extent->end strictly < req_start, checked above. */
206                 if (l_extent->start < req_start && new_ex->start != req_start) {
207                         if (l_extent->end >= req_start)
208                                 new_ex->start = req_start;
209                         else
210                                 new_ex->start = min(l_extent->end+1, req_start);
211                 }
212
213                 /* If we need to cancel this lock anyways because our request
214                  * overlaps the granted lock, we grow up to its requested
215                  * extent start instead of limiting this extent, assuming that
216                  * clients are writing forwards and the lock had over grown
217                  * its extent downwards before we enqueued our request. */
218                 if (l_extent->end > req_end) {
219                         if (l_extent->start <= req_end)
220                                 new_ex->end = max(lock->l_req_extent.start - 1,
221                                                   req_end);
222                         else
223                                 new_ex->end = max(l_extent->start - 1, req_end);
224                 }
225         }
226
227         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
228         EXIT;
229 }
230
231
232 /* In order to determine the largest possible extent we can grant, we need
233  * to scan all of the queues. */
234 static void ldlm_extent_policy(struct ldlm_resource *res,
235                                struct ldlm_lock *lock, int *flags)
236 {
237         struct ldlm_extent new_ex = { .start = 0, .end = OBD_OBJECT_EOF };
238
239         if (lock->l_export == NULL)
240                 /*
241                  * this is local lock taken by server (e.g., as a part of
242                  * OST-side locking, or unlink handling). Expansion doesn't
243                  * make a lot of sense for local locks, because they are
244                  * dropped immediately on operation completion and would only
245                  * conflict with other threads.
246                  */
247                 return;
248
249         if (lock->l_policy_data.l_extent.start == 0 &&
250             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
251                 /* fast-path whole file locks */
252                 return;
253
254         ldlm_extent_internal_policy_granted(lock, &new_ex);
255         ldlm_extent_internal_policy_waiting(lock, &new_ex);
256
257         if (new_ex.start != lock->l_policy_data.l_extent.start ||
258             new_ex.end != lock->l_policy_data.l_extent.end) {
259                 *flags |= LDLM_FL_LOCK_CHANGED;
260                 lock->l_policy_data.l_extent.start = new_ex.start;
261                 lock->l_policy_data.l_extent.end = new_ex.end;
262         }
263 }
264
265 static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
266 {
267         struct ldlm_resource *res = lock->l_resource;
268         cfs_time_t now = cfs_time_current();
269
270         CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
271         if (contended_locks > res->lr_namespace->ns_contended_locks)
272                 res->lr_contention_time = now;
273         return cfs_time_before(now, cfs_time_add(res->lr_contention_time,
274                 cfs_time_seconds(res->lr_namespace->ns_contention_time)));
275 }
276
277 struct ldlm_extent_compat_args {
278         struct list_head *work_list;
279         struct ldlm_lock *lock;
280         ldlm_mode_t mode;
281         int *locks;
282         int *compat;
283 };
284
285 static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
286                                                 void *data)
287 {
288         struct ldlm_extent_compat_args *priv = data;
289         struct ldlm_interval *node = to_ldlm_interval(n);
290         struct ldlm_extent *extent;
291         struct list_head *work_list = priv->work_list;
292         struct ldlm_lock *lock, *enq = priv->lock;
293         ldlm_mode_t mode = priv->mode;
294         int count = 0;
295         ENTRY;
296
297         LASSERT(!list_empty(&node->li_group));
298
299         list_for_each_entry(lock, &node->li_group, l_sl_policy) {
300                 /* interval tree is for granted lock */
301                 LASSERTF(mode == lock->l_granted_mode,
302                          "mode = %s, lock->l_granted_mode = %s\n",
303                          ldlm_lockname[mode],
304                          ldlm_lockname[lock->l_granted_mode]);
305                 count++;
306                 if (lock->l_blocking_ast)
307                         ldlm_add_ast_work_item(lock, enq, work_list);
308         }
309
310         /* don't count conflicting glimpse locks */
311         extent = ldlm_interval_extent(node);
312         if (!(mode == LCK_PR &&
313             extent->start == 0 && extent->end == OBD_OBJECT_EOF))
314                 *priv->locks += count;
315
316         if (priv->compat)
317                 *priv->compat = 0;
318
319         RETURN(INTERVAL_ITER_CONT);
320 }
321
322 /* Determine if the lock is compatible with all locks on the queue.
323  * We stop walking the queue if we hit ourselves so we don't take
324  * conflicting locks enqueued after us into accound, or we'd wait forever.
325  *
326  * 0 if the lock is not compatible
327  * 1 if the lock is compatible
328  * 2 if this group lock is compatible and requires no further checking
329  * negative error, such as EWOULDBLOCK for group locks
330  */
331 static int
332 ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
333                          int *flags, ldlm_error_t *err,
334                          struct list_head *work_list, int *contended_locks)
335 {
336         struct list_head *tmp;
337         struct ldlm_lock *lock;
338         struct ldlm_resource *res = req->l_resource;
339         ldlm_mode_t req_mode = req->l_req_mode;
340         __u64 req_start = req->l_req_extent.start;
341         __u64 req_end = req->l_req_extent.end;
342         int compat = 1;
343         int scan = 0;
344         int check_contention;
345         ENTRY;
346
347         lockmode_verify(req_mode);
348
349         /* Using interval tree for granted lock */
350         if (queue == &res->lr_granted) {
351                 struct ldlm_interval_tree *tree;
352                 struct ldlm_extent_compat_args data = {.work_list = work_list,
353                                                .lock = req,
354                                                .locks = contended_locks,
355                                                .compat = &compat };
356                 struct interval_node_extent ex = { .start = req_start,
357                                                    .end = req_end };
358                 int idx, rc;
359
360                 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
361                         tree = &res->lr_itree[idx];
362                         if (tree->lit_root == NULL) /* empty tree, skipped */
363                                 continue;
364
365                         data.mode = tree->lit_mode;
366                         if (lockmode_compat(req_mode, tree->lit_mode)) {
367                                 struct ldlm_interval *node;
368                                 struct ldlm_extent *extent;
369
370                                 if (req_mode != LCK_GROUP)
371                                         continue;
372
373                                 /* group lock, grant it immediately if
374                                  * compatible */
375                                 node = to_ldlm_interval(tree->lit_root);
376                                 extent = ldlm_interval_extent(node);
377                                 if (req->l_policy_data.l_extent.gid ==
378                                     extent->gid)
379                                         RETURN(2);
380                         }
381
382                         if (tree->lit_mode == LCK_GROUP) {
383                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
384                                         compat = -EWOULDBLOCK;
385                                         goto destroylock;
386                                 }
387
388                                 *flags |= LDLM_FL_NO_TIMEOUT;
389                                 if (!work_list)
390                                         RETURN(0);
391
392                                 /* if work list is not NULL,add all
393                                    locks in the tree to work list */
394                                 compat = 0;
395                                 interval_iterate(tree->lit_root,
396                                                  ldlm_extent_compat_cb, &data);
397                                 continue;
398                         }
399
400                         if (!work_list) {
401                                 rc = interval_is_overlapped(tree->lit_root,&ex);
402                                 if (rc)
403                                         RETURN(0);
404                         } else {
405                                 interval_search(tree->lit_root, &ex,
406                                                 ldlm_extent_compat_cb, &data);
407                                 if (!list_empty(work_list) && compat)
408                                         compat = 0;
409                         }
410                 }
411         } else { /* for waiting queue */
412                 list_for_each(tmp, queue) {
413                         check_contention = 1;
414
415                         lock = list_entry(tmp, struct ldlm_lock, l_res_link);
416
417                         if (req == lock)
418                                 break;
419
420                         if (unlikely(scan)) {
421                                 /* We only get here if we are queuing GROUP lock
422                                    and met some incompatible one. The main idea of this
423                                    code is to insert GROUP lock past compatible GROUP
424                                    lock in the waiting queue or if there is not any,
425                                    then in front of first non-GROUP lock */
426                                 if (lock->l_req_mode != LCK_GROUP) {
427                                         /* Ok, we hit non-GROUP lock, there should
428                                          * be no more GROUP locks later on, queue in
429                                          * front of first non-GROUP lock */
430
431                                         ldlm_resource_insert_lock_after(lock, req);
432                                         list_del_init(&lock->l_res_link);
433                                         ldlm_resource_insert_lock_after(req, lock);
434                                         compat = 0;
435                                         break;
436                                 }
437                                 if (req->l_policy_data.l_extent.gid ==
438                                     lock->l_policy_data.l_extent.gid) {
439                                         /* found it */
440                                         ldlm_resource_insert_lock_after(lock, req);
441                                         compat = 0;
442                                         break;
443                                 }
444                                 continue;
445                         }
446
447                         /* locks are compatible, overlap doesn't matter */
448                         if (lockmode_compat(lock->l_req_mode, req_mode)) {
449                                 if (req_mode == LCK_PR &&
450                                     ((lock->l_policy_data.l_extent.start <=
451                                       req->l_policy_data.l_extent.start) &&
452                                      (lock->l_policy_data.l_extent.end >=
453                                       req->l_policy_data.l_extent.end))) {
454                                         /* If we met a PR lock just like us or wider,
455                                            and nobody down the list conflicted with
456                                            it, that means we can skip processing of
457                                            the rest of the list and safely place
458                                            ourselves at the end of the list, or grant
459                                            (dependent if we met an conflicting locks
460                                            before in the list).
461                                            In case of 1st enqueue only we continue
462                                            traversing if there is something conflicting
463                                            down the list because we need to make sure
464                                            that something is marked as AST_SENT as well,
465                                            in cse of empy worklist we would exit on
466                                            first conflict met. */
467                                         /* There IS a case where such flag is
468                                            not set for a lock, yet it blocks
469                                            something. Luckily for us this is
470                                            only during destroy, so lock is
471                                            exclusive. So here we are safe */
472                                         if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
473                                                 RETURN(compat);
474                                         }
475                                 }
476
477                                 /* non-group locks are compatible, overlap doesn't
478                                    matter */
479                                 if (likely(req_mode != LCK_GROUP))
480                                         continue;
481
482                                 /* If we are trying to get a GROUP lock and there is
483                                    another one of this kind, we need to compare gid */
484                                 if (req->l_policy_data.l_extent.gid ==
485                                     lock->l_policy_data.l_extent.gid) {
486                                         /* If existing lock with matched gid is granted,
487                                            we grant new one too. */
488                                         if (lock->l_req_mode == lock->l_granted_mode)
489                                                 RETURN(2);
490
491                                         /* Otherwise we are scanning queue of waiting
492                                          * locks and it means current request would
493                                          * block along with existing lock (that is
494                                          * already blocked.
495                                          * If we are in nonblocking mode - return
496                                          * immediately */
497                                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
498                                                 compat = -EWOULDBLOCK;
499                                                 goto destroylock;
500                                         }
501                                         /* If this group lock is compatible with another
502                                          * group lock on the waiting list, they must be
503                                          * together in the list, so they can be granted
504                                          * at the same time.  Otherwise the later lock
505                                          * can get stuck behind another, incompatible,
506                                          * lock. */
507                                         ldlm_resource_insert_lock_after(lock, req);
508                                         /* Because 'lock' is not granted, we can stop
509                                          * processing this queue and return immediately.
510                                          * There is no need to check the rest of the
511                                          * list. */
512                                         RETURN(0);
513                                 }
514                         }
515
516                         if (unlikely(req_mode == LCK_GROUP &&
517                                      (lock->l_req_mode != lock->l_granted_mode))) {
518                                 scan = 1;
519                                 compat = 0;
520                                 if (lock->l_req_mode != LCK_GROUP) {
521                                         /* Ok, we hit non-GROUP lock, there should be no
522                                            more GROUP locks later on, queue in front of
523                                            first non-GROUP lock */
524
525                                         ldlm_resource_insert_lock_after(lock, req);
526                                         list_del_init(&lock->l_res_link);
527                                         ldlm_resource_insert_lock_after(req, lock);
528                                         break;
529                                 }
530                                 if (req->l_policy_data.l_extent.gid ==
531                                     lock->l_policy_data.l_extent.gid) {
532                                         /* found it */
533                                         ldlm_resource_insert_lock_after(lock, req);
534                                         break;
535                                 }
536                                 continue;
537                         }
538
539                         if (unlikely(lock->l_req_mode == LCK_GROUP)) {
540                                 /* If compared lock is GROUP, then requested is PR/PW/
541                                  * so this is not compatible; extent range does not
542                                  * matter */
543                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
544                                         compat = -EWOULDBLOCK;
545                                         goto destroylock;
546                                 } else {
547                                         *flags |= LDLM_FL_NO_TIMEOUT;
548                                 }
549                         } else if (lock->l_policy_data.l_extent.end < req_start ||
550                                    lock->l_policy_data.l_extent.start > req_end) {
551                                 /* if a non group lock doesn't overlap skip it */
552                                 continue;
553                         } else if (lock->l_req_extent.end < req_start ||
554                                    lock->l_req_extent.start > req_end) {
555                                 /* false contention, the requests doesn't really overlap */
556                                 check_contention = 0;
557                         }
558
559                         if (!work_list)
560                                 RETURN(0);
561
562                         /* don't count conflicting glimpse locks */
563                         if (lock->l_req_mode == LCK_PR &&
564                             lock->l_policy_data.l_extent.start == 0 &&
565                             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
566                                 check_contention = 0;
567
568                         *contended_locks += check_contention;
569
570                         compat = 0;
571                         if (lock->l_blocking_ast)
572                                 ldlm_add_ast_work_item(lock, req, work_list);
573                 }
574         }
575
576         if (ldlm_check_contention(req, *contended_locks) &&
577             compat == 0 &&
578             (*flags & LDLM_FL_DENY_ON_CONTENTION) &&
579             req->l_req_mode != LCK_GROUP &&
580             req_end - req_start <=
581             req->l_resource->lr_namespace->ns_max_nolock_size)
582                 GOTO(destroylock, compat = -EUSERS);
583
584         RETURN(compat);
585 destroylock:
586         list_del_init(&req->l_res_link);
587         ldlm_lock_destroy_nolock(req);
588         *err = compat;
589         RETURN(compat);
590 }
591
592 static void discard_bl_list(struct list_head *bl_list)
593 {
594         struct list_head *tmp, *pos;
595         ENTRY;
596
597         list_for_each_safe(pos, tmp, bl_list) {
598                 struct ldlm_lock *lock =
599                         list_entry(pos, struct ldlm_lock, l_bl_ast);
600
601                 list_del_init(&lock->l_bl_ast);
602                 LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
603                 lock->l_flags &= ~LDLM_FL_AST_SENT;
604                 LASSERT(lock->l_bl_ast_run == 0);
605                 LASSERT(lock->l_blocking_lock);
606                 LDLM_LOCK_PUT(lock->l_blocking_lock);
607                 lock->l_blocking_lock = NULL;
608                 LDLM_LOCK_PUT(lock);
609         }
610         EXIT;
611 }
612
613 /* If first_enq is 0 (ie, called from ldlm_reprocess_queue):
614   *   - blocking ASTs have already been sent
615   *   - must call this function with the ns lock held
616   *
617   * If first_enq is 1 (ie, called from ldlm_lock_enqueue):
618   *   - blocking ASTs have not been sent
619   *   - must call this function with the ns lock held once */
620 int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
621                              ldlm_error_t *err, struct list_head *work_list)
622 {
623         struct ldlm_resource *res = lock->l_resource;
624         CFS_LIST_HEAD(rpc_list);
625         int rc, rc2;
626         int contended_locks = 0;
627         ENTRY;
628
629         LASSERT(list_empty(&res->lr_converting));
630         LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
631                 !(lock->l_flags & LDLM_AST_DISCARD_DATA));
632         check_res_locked(res);
633         *err = ELDLM_OK;
634
635         if (!first_enq) {
636                 /* Careful observers will note that we don't handle -EWOULDBLOCK
637                  * here, but it's ok for a non-obvious reason -- compat_queue
638                  * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
639                  * flags should always be zero here, and if that ever stops
640                  * being true, we want to find out. */
641                 LASSERT(*flags == 0);
642                 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
643                                               err, NULL, &contended_locks);
644                 if (rc == 1) {
645                         rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
646                                                       flags, err, NULL,
647                                                       &contended_locks);
648                 }
649                 if (rc == 0)
650                         RETURN(LDLM_ITER_STOP);
651
652                 ldlm_resource_unlink_lock(lock);
653
654                 if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE))
655                         ldlm_extent_policy(res, lock, flags);
656                 ldlm_grant_lock(lock, work_list);
657                 RETURN(LDLM_ITER_CONTINUE);
658         }
659
660  restart:
661         contended_locks = 0;
662         rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
663                                       &rpc_list, &contended_locks);
664         if (rc < 0)
665                 GOTO(out, rc); /* lock was destroyed */
666         if (rc == 2)
667                 goto grant;
668
669         rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err,
670                                        &rpc_list, &contended_locks);
671         if (rc2 < 0)
672                 GOTO(out, rc = rc2); /* lock was destroyed */
673
674         if (rc + rc2 == 2) {
675         grant:
676                 ldlm_extent_policy(res, lock, flags);
677                 ldlm_resource_unlink_lock(lock);
678                 ldlm_grant_lock(lock, NULL);
679         } else {
680                 /* If either of the compat_queue()s returned failure, then we
681                  * have ASTs to send and must go onto the waiting list.
682                  *
683                  * bug 2322: we used to unlink and re-add here, which was a
684                  * terrible folly -- if we goto restart, we could get
685                  * re-ordered!  Causes deadlock, because ASTs aren't sent! */
686                 if (list_empty(&lock->l_res_link))
687                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
688                 unlock_res(res);
689                 rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_BL_AST);
690                 lock_res(res);
691
692                 if (rc == -ERESTART) {
693                         /* lock was granted while resource was unlocked. */
694                         if (lock->l_granted_mode == lock->l_req_mode) {
695                                 /* bug 11300: if the lock has been granted,
696                                  * break earlier because otherwise, we will go
697                                  * to restart and ldlm_resource_unlink will be
698                                  * called and it causes the interval node to be
699                                  * freed. Then we will fail at 
700                                  * ldlm_extent_add_lock() */
701                                 *flags &= ~(LDLM_FL_BLOCK_GRANTED | LDLM_FL_BLOCK_CONV |
702                                             LDLM_FL_BLOCK_WAIT);
703                                 GOTO(out, rc = 0);
704                         }
705
706                         GOTO(restart, -ERESTART);
707                 }
708
709                 *flags |= LDLM_FL_BLOCK_GRANTED;
710                 /* this way we force client to wait for the lock
711                  * endlessly once the lock is enqueued -bzzz */
712                 *flags |= LDLM_FL_NO_TIMEOUT;
713
714         }
715         RETURN(0);
716 out:
717         if (!list_empty(&rpc_list)) {
718                 LASSERT(!(lock->l_flags & LDLM_AST_DISCARD_DATA));
719                 discard_bl_list(&rpc_list);
720         }
721         RETURN(rc);
722 }
723
724 /* When a lock is cancelled by a client, the KMS may undergo change if this
725  * is the "highest lock".  This function returns the new KMS value.
726  * Caller must hold ns_lock already.
727  *
728  * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes! */
729 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
730 {
731         struct ldlm_resource *res = lock->l_resource;
732         struct list_head *tmp;
733         struct ldlm_lock *lck;
734         __u64 kms = 0;
735         ENTRY;
736
737         /* don't let another thread in ldlm_extent_shift_kms race in
738          * just after we finish and take our lock into account in its
739          * calculation of the kms */
740         lock->l_flags |= LDLM_FL_KMS_IGNORE;
741
742         list_for_each(tmp, &res->lr_granted) {
743                 lck = list_entry(tmp, struct ldlm_lock, l_res_link);
744
745                 if (lck->l_flags & LDLM_FL_KMS_IGNORE)
746                         continue;
747
748                 if (lck->l_policy_data.l_extent.end >= old_kms)
749                         RETURN(old_kms);
750
751                 /* This extent _has_ to be smaller than old_kms (checked above)
752                  * so kms can only ever be smaller or the same as old_kms. */
753                 if (lck->l_policy_data.l_extent.end + 1 > kms)
754                         kms = lck->l_policy_data.l_extent.end + 1;
755         }
756         LASSERTF(kms <= old_kms, "kms "LPU64" old_kms "LPU64"\n", kms, old_kms);
757
758         RETURN(kms);
759 }
760
761 cfs_mem_cache_t *ldlm_interval_slab;
762 struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock)
763 {
764         struct ldlm_interval *node;
765         ENTRY;
766
767         LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
768         OBD_SLAB_ALLOC(node, ldlm_interval_slab, CFS_ALLOC_IO, sizeof(*node));
769         if (node == NULL)
770                 RETURN(NULL);
771
772         CFS_INIT_LIST_HEAD(&node->li_group);
773         ldlm_interval_attach(node, lock);
774         RETURN(node);
775 }
776
777 void ldlm_interval_free(struct ldlm_interval *node)
778 {
779         if (node) {
780                 LASSERT(list_empty(&node->li_group));
781                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
782         }
783 }
784
785 /* interval tree, for LDLM_EXTENT. */
786 void ldlm_interval_attach(struct ldlm_interval *n,
787                           struct ldlm_lock *l)
788 {
789         LASSERT(l->l_tree_node == NULL);
790         LASSERT(l->l_resource->lr_type == LDLM_EXTENT);
791
792         list_add_tail(&l->l_sl_policy, &n->li_group);
793         l->l_tree_node = n;
794 }
795
796 struct ldlm_interval *ldlm_interval_detach(struct ldlm_lock *l)
797 {
798         struct ldlm_interval *n = l->l_tree_node;
799
800         if (n == NULL)
801                 return NULL;
802
803         LASSERT(!list_empty(&n->li_group));
804         l->l_tree_node = NULL;
805         list_del_init(&l->l_sl_policy);
806
807         return (list_empty(&n->li_group) ? n : NULL);
808 }
809
810 static inline int lock_mode_to_index(ldlm_mode_t mode)
811 {
812         int index;
813
814         LASSERT(mode != 0);
815         LASSERT(IS_PO2(mode));
816         for (index = -1; mode; index++, mode >>= 1) ;
817         LASSERT(index < LCK_MODE_NUM);
818         return index;
819 }
820
821 void ldlm_extent_add_lock(struct ldlm_resource *res,
822                           struct ldlm_lock *lock)
823 {
824         struct interval_node *found, **root;
825         struct ldlm_interval *node;
826         struct ldlm_extent *extent;
827         int idx;
828
829         LASSERT(lock->l_granted_mode == lock->l_req_mode);
830
831         node = lock->l_tree_node;
832         LASSERT(node != NULL);
833
834         idx = lock_mode_to_index(lock->l_granted_mode);
835         LASSERT(lock->l_granted_mode == 1 << idx);
836         LASSERT(lock->l_granted_mode == res->lr_itree[idx].lit_mode);
837
838         /* node extent initialize */
839         extent = &lock->l_policy_data.l_extent;
840         interval_set(&node->li_node, extent->start, extent->end);
841
842         root = &res->lr_itree[idx].lit_root;
843         found = interval_insert(&node->li_node, root);
844         if (found) { /* The policy group found. */
845                 struct ldlm_interval *tmp = ldlm_interval_detach(lock);
846                 LASSERT(tmp != NULL);
847                 ldlm_interval_free(tmp);
848                 ldlm_interval_attach(to_ldlm_interval(found), lock);
849         }
850         res->lr_itree[idx].lit_size++;
851
852         /* even though we use interval tree to manage the extent lock, we also
853          * add the locks into grant list, for debug purpose, .. */
854         ldlm_resource_add_lock(res, &res->lr_granted, lock);
855 }
856
857 void ldlm_extent_unlink_lock(struct ldlm_lock *lock)
858 {
859         struct ldlm_resource *res = lock->l_resource;
860         struct ldlm_interval *node;
861         struct ldlm_interval_tree *tree;
862         int idx;
863
864         if (lock->l_granted_mode != lock->l_req_mode)
865                 return;
866
867         LASSERT(lock->l_tree_node != NULL);
868         idx = lock_mode_to_index(lock->l_granted_mode);
869         LASSERT(lock->l_granted_mode == 1 << idx);
870         tree = &res->lr_itree[idx];
871
872         LASSERT(tree->lit_root != NULL); /* assure the tree is not null */
873
874         tree->lit_size--;
875         node = ldlm_interval_detach(lock);
876         if (node) {
877                 interval_erase(&node->li_node, &tree->lit_root);
878                 ldlm_interval_free(node);
879         }
880 }