Whamcloud - gitweb
Branch b1_6
[fs/lustre-release.git] / lustre / ldlm / ldlm_extent.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of the Lustre file system, http://www.lustre.org
9  *   Lustre is a trademark of Cluster File Systems, Inc.
10  *
11  *   You may have signed or agreed to another license before downloading
12  *   this software.  If so, you are bound by the terms and conditions
13  *   of that agreement, and the following does not apply to you.  See the
14  *   LICENSE file included with this distribution for more information.
15  *
16  *   If you did not agree to a different license, then this copy of Lustre
17  *   is open source software; you can redistribute it and/or modify it
18  *   under the terms of version 2 of the GNU General Public License as
19  *   published by the Free Software Foundation.
20  *
21  *   In either case, Lustre is distributed in the hope that it will be
22  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
23  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24  *   license text for more details.
25  */
26
27 #define DEBUG_SUBSYSTEM S_LDLM
28 #ifndef __KERNEL__
29 # include <liblustre.h>
30 #endif
31
32 #include <lustre_dlm.h>
33 #include <obd_support.h>
34 #include <obd.h>
35 #include <lustre_lib.h>
36
37 #include "ldlm_internal.h"
38
39 #define LDLM_MAX_GROWN_EXTENT (32 * 1024 * 1024 - 1)
40
41 /* fixup the ldlm_extent after expanding */
42 static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
43                                               struct ldlm_extent *new_ex,
44                                               int conflicting)
45 {
46         ldlm_mode_t req_mode = req->l_req_mode;
47         __u64 req_start = req->l_req_extent.start;
48         __u64 req_end = req->l_req_extent.end;
49         __u64 req_align, mask;
50  
51         if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) {
52                 if (req_end < req_start + LDLM_MAX_GROWN_EXTENT)
53                         new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT,
54                                           new_ex->end);
55         }
56
57         if (new_ex->start == 0 && new_ex->end == OBD_OBJECT_EOF) {
58                 EXIT;
59                 return;
60         }
61
62         /* we need to ensure that the lock extent is properly aligned to what
63          * the client requested.  We align it to the lowest-common denominator
64          * of the clients requested lock start and end alignment. */
65         mask = 0x1000ULL;
66         req_align = (req_end + 1) | req_start;
67         if (req_align != 0) {
68                 while ((req_align & mask) == 0)
69                         mask <<= 1;
70         }
71         mask -= 1;
72         /* We can only shrink the lock, not grow it.
73          * This should never cause lock to be smaller than requested,
74          * since requested lock was already aligned on these boundaries. */
75         new_ex->start = ((new_ex->start - 1) | mask) + 1;
76         new_ex->end = ((new_ex->end + 1) & ~mask) - 1;
77         LASSERTF(new_ex->start <= req_start,
78                  "mask "LPX64" grant start "LPU64" req start "LPU64"\n",
79                  mask, new_ex->start, req_start);
80         LASSERTF(new_ex->end >= req_end,
81                  "mask "LPX64" grant end "LPU64" req end "LPU64"\n",
82                  mask, new_ex->end, req_end);
83 }
84
85 /* The purpose of this function is to return:
86  * - the maximum extent
87  * - containing the requested extent
88  * - and not overlapping existing conflicting extents outside the requested one
89  *
90  * Use interval tree to expand the lock extent for granted lock.
91  */
92 static void ldlm_extent_internal_policy_granted(struct ldlm_lock *req,
93                                                 struct ldlm_extent *new_ex)
94 {
95         struct ldlm_resource *res = req->l_resource;
96         ldlm_mode_t req_mode = req->l_req_mode;
97         __u64 req_start = req->l_req_extent.start;
98         __u64 req_end = req->l_req_extent.end;
99         struct ldlm_interval_tree *tree;
100         struct interval_node_extent limiter = { new_ex->start, new_ex->end };
101         int conflicting = 0;
102         int idx;
103         ENTRY;
104
105         lockmode_verify(req_mode);
106
107         /* using interval tree to handle the ldlm extent granted locks */
108         for (idx = 0; idx < LCK_MODE_NUM; idx++) {
109                 struct interval_node_extent ext = { req_start, req_end };
110
111                 tree = &res->lr_itree[idx];
112                 if (lockmode_compat(tree->lit_mode, req_mode))
113                         continue;
114
115                 conflicting += tree->lit_size;
116                 if (conflicting > 4)
117                         limiter.start = req_start;
118
119                 if (interval_is_overlapped(tree->lit_root, &ext))
120                         printk("req_mode = %d, tree->lit_mode = %d, tree->lit_size = %d\n",
121                                req_mode, tree->lit_mode, tree->lit_size);
122                 interval_expand(tree->lit_root, &ext, &limiter);
123                 limiter.start = max(limiter.start, ext.start);
124                 limiter.end = min(limiter.end, ext.end);
125                 if (limiter.start == req_start && limiter.end == req_end)
126                         break;
127         }
128
129         new_ex->start = limiter.start;
130         new_ex->end = limiter.end;
131         LASSERT(new_ex->start <= req_start);
132         LASSERT(new_ex->end >= req_end);
133
134         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
135         EXIT;
136 }
137
138 /* The purpose of this function is to return:
139  * - the maximum extent
140  * - containing the requested extent
141  * - and not overlapping existing conflicting extents outside the requested one
142  */
143 static void
144 ldlm_extent_internal_policy_waiting(struct ldlm_lock *req,
145                                     struct ldlm_extent *new_ex)
146 {
147         struct list_head *tmp;
148         struct ldlm_resource *res = req->l_resource;
149         ldlm_mode_t req_mode = req->l_req_mode;
150         __u64 req_start = req->l_req_extent.start;
151         __u64 req_end = req->l_req_extent.end;
152         int conflicting = 0;
153         ENTRY;
154
155         lockmode_verify(req_mode);
156
157         /* for waiting locks */
158         list_for_each(tmp, &res->lr_waiting) {
159                 struct ldlm_lock *lock;
160                 struct ldlm_extent *l_extent;
161
162                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
163                 l_extent = &lock->l_policy_data.l_extent;
164
165                 /* We already hit the minimum requested size, search no more */
166                 if (new_ex->start == req_start && new_ex->end == req_end) {
167                         EXIT;
168                         return;
169                 }
170
171                 /* Don't conflict with ourselves */
172                 if (req == lock)
173                         continue;
174
175                 /* Locks are compatible, overlap doesn't matter */
176                 /* Until bug 20 is fixed, try to avoid granting overlapping
177                  * locks on one client (they take a long time to cancel) */
178                 if (lockmode_compat(lock->l_req_mode, req_mode) &&
179                     lock->l_export != req->l_export)
180                         continue;
181
182                 /* If this is a high-traffic lock, don't grow downwards at all
183                  * or grow upwards too much */
184                 ++conflicting;
185                 if (conflicting > 4)
186                         new_ex->start = req_start;
187
188                 /* If lock doesn't overlap new_ex, skip it. */
189                 if (!ldlm_extent_overlap(l_extent, new_ex))
190                         continue;
191
192                 /* Locks conflicting in requested extents and we can't satisfy
193                  * both locks, so ignore it.  Either we will ping-pong this
194                  * extent (we would regardless of what extent we granted) or
195                  * lock is unused and it shouldn't limit our extent growth. */
196                 if (ldlm_extent_overlap(&lock->l_req_extent,&req->l_req_extent))
197                         continue;
198
199                 /* We grow extents downwards only as far as they don't overlap
200                  * with already-granted locks, on the assumtion that clients
201                  * will be writing beyond the initial requested end and would
202                  * then need to enqueue a new lock beyond previous request.
203                  * l_req_extent->end strictly < req_start, checked above. */
204                 if (l_extent->start < req_start && new_ex->start != req_start) {
205                         if (l_extent->end >= req_start)
206                                 new_ex->start = req_start;
207                         else
208                                 new_ex->start = min(l_extent->end+1, req_start);
209                 }
210
211                 /* If we need to cancel this lock anyways because our request
212                  * overlaps the granted lock, we grow up to its requested
213                  * extent start instead of limiting this extent, assuming that
214                  * clients are writing forwards and the lock had over grown
215                  * its extent downwards before we enqueued our request. */
216                 if (l_extent->end > req_end) {
217                         if (l_extent->start <= req_end)
218                                 new_ex->end = max(lock->l_req_extent.start - 1,
219                                                   req_end);
220                         else
221                                 new_ex->end = max(l_extent->start - 1, req_end);
222                 }
223         }
224
225         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
226         EXIT;
227 }
228
229
230 /* In order to determine the largest possible extent we can grant, we need
231  * to scan all of the queues. */
232 static void ldlm_extent_policy(struct ldlm_resource *res,
233                                struct ldlm_lock *lock, int *flags)
234 {
235         struct ldlm_extent new_ex = { .start = 0, .end = OBD_OBJECT_EOF };
236
237         if (lock->l_export == NULL)
238                 /*
239                  * this is local lock taken by server (e.g., as a part of
240                  * OST-side locking, or unlink handling). Expansion doesn't
241                  * make a lot of sense for local locks, because they are
242                  * dropped immediately on operation completion and would only
243                  * conflict with other threads.
244                  */
245                 return;
246
247         if (lock->l_policy_data.l_extent.start == 0 &&
248             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
249                 /* fast-path whole file locks */
250                 return;
251
252         ldlm_extent_internal_policy_granted(lock, &new_ex);
253         ldlm_extent_internal_policy_waiting(lock, &new_ex);
254
255         if (new_ex.start != lock->l_policy_data.l_extent.start ||
256             new_ex.end != lock->l_policy_data.l_extent.end) {
257                 *flags |= LDLM_FL_LOCK_CHANGED;
258                 lock->l_policy_data.l_extent.start = new_ex.start;
259                 lock->l_policy_data.l_extent.end = new_ex.end;
260         }
261 }
262
263 static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
264 {
265         struct ldlm_resource *res = lock->l_resource;
266         cfs_time_t now = cfs_time_current();
267
268         CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
269         if (contended_locks > res->lr_namespace->ns_contended_locks)
270                 res->lr_contention_time = now;
271         return cfs_time_before(now, cfs_time_add(res->lr_contention_time,
272                 cfs_time_seconds(res->lr_namespace->ns_contention_time)));
273 }
274
275 struct ldlm_extent_compat_args {
276         struct list_head *work_list;
277         struct ldlm_lock *lock;
278         ldlm_mode_t mode;
279         int *locks;
280         int *compat;
281 };
282
283 static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
284                                                 void *data)
285 {
286         struct ldlm_extent_compat_args *priv = data;
287         struct ldlm_interval *node = to_ldlm_interval(n);
288         struct ldlm_extent *extent;
289         struct list_head *work_list = priv->work_list;
290         struct ldlm_lock *lock, *enq = priv->lock;
291         ldlm_mode_t mode = priv->mode;
292         int count = 0;
293         ENTRY;
294
295         LASSERT(!list_empty(&node->li_group));
296
297         list_for_each_entry(lock, &node->li_group, l_sl_policy) {
298                 /* interval tree is for granted lock */
299                 LASSERTF(mode == lock->l_granted_mode,
300                          "mode = %s, lock->l_granted_mode = %s\n",
301                          ldlm_lockname[mode],
302                          ldlm_lockname[lock->l_granted_mode]);
303
304                 count++;
305                 if (lock->l_blocking_ast)
306                         ldlm_add_ast_work_item(lock, enq, work_list);
307         }
308         LASSERT(count > 0);
309
310         /* don't count conflicting glimpse locks */
311         extent = ldlm_interval_extent(node);
312         if (!(mode == LCK_PR &&
313             extent->start == 0 && extent->end == OBD_OBJECT_EOF))
314                 *priv->locks += count;
315
316         if (priv->compat)
317                 *priv->compat = 0;
318
319         RETURN(INTERVAL_ITER_CONT);
320 }
321
322 /* Determine if the lock is compatible with all locks on the queue.
323  * We stop walking the queue if we hit ourselves so we don't take
324  * conflicting locks enqueued after us into accound, or we'd wait forever.
325  *
326  * 0 if the lock is not compatible
327  * 1 if the lock is compatible
328  * 2 if this group lock is compatible and requires no further checking
329  * negative error, such as EWOULDBLOCK for group locks
330  */
331 static int
332 ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
333                          int *flags, ldlm_error_t *err,
334                          struct list_head *work_list, int *contended_locks)
335 {
336         struct list_head *tmp;
337         struct ldlm_lock *lock;
338         struct ldlm_resource *res = req->l_resource;
339         ldlm_mode_t req_mode = req->l_req_mode;
340         __u64 req_start = req->l_req_extent.start;
341         __u64 req_end = req->l_req_extent.end;
342         int compat = 1;
343         int scan = 0;
344         int check_contention;
345         ENTRY;
346
347         lockmode_verify(req_mode);
348
349         /* Using interval tree for granted lock */
350         if (queue == &res->lr_granted) {
351                 struct ldlm_interval_tree *tree;
352                 struct ldlm_extent_compat_args data = {.work_list = work_list,
353                                                .lock = req,
354                                                .locks = contended_locks,
355                                                .compat = &compat };
356                 struct interval_node_extent ex = { .start = req_start,
357                                                    .end = req_end };
358                 int idx, rc;
359
360                 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
361                         tree = &res->lr_itree[idx];
362                         if (tree->lit_root == NULL) /* empty tree, skipped */
363                                 continue;
364
365                         data.mode = tree->lit_mode;
366                         if (lockmode_compat(req_mode, tree->lit_mode)) {
367                                 struct ldlm_interval *node;
368                                 struct ldlm_extent *extent;
369
370                                 if (req_mode != LCK_GROUP)
371                                         continue;
372
373                                 /* group lock, grant it immediately if
374                                  * compatible */
375                                 node = to_ldlm_interval(tree->lit_root);
376                                 extent = ldlm_interval_extent(node);
377                                 if (req->l_policy_data.l_extent.gid ==
378                                     extent->gid)
379                                         RETURN(2);
380                         }
381
382                         if (tree->lit_mode == LCK_GROUP) {
383                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
384                                         compat = -EWOULDBLOCK;
385                                         goto destroylock;
386                                 }
387
388                                 *flags |= LDLM_FL_NO_TIMEOUT;
389                                 if (!work_list)
390                                         RETURN(0);
391
392                                 /* if work list is not NULL,add all
393                                    locks in the tree to work list */
394                                 compat = 0;
395                                 interval_iterate(tree->lit_root,
396                                                  ldlm_extent_compat_cb, &data);
397                                 continue;
398                         }
399
400                         if (!work_list) {
401                                 rc = interval_is_overlapped(tree->lit_root,&ex);
402                                 if (rc)
403                                         RETURN(0);
404                         } else {
405                                 interval_search(tree->lit_root, &ex,
406                                                 ldlm_extent_compat_cb, &data);
407                                 if (!list_empty(work_list) && compat)
408                                         compat = 0;
409                         }
410                 }
411                 RETURN(compat);
412         }
413
414         /* for waiting queue */
415         list_for_each(tmp, queue) {
416                 check_contention = 1;
417
418                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
419
420                 if (req == lock)
421                         break;
422
423                 if (unlikely(scan)) {
424                         /* We only get here if we are queuing GROUP lock
425                            and met some incompatible one. The main idea of this
426                            code is to insert GROUP lock past compatible GROUP
427                            lock in the waiting queue or if there is not any,
428                            then in front of first non-GROUP lock */
429                         if (lock->l_req_mode != LCK_GROUP) {
430                                 /* Ok, we hit non-GROUP lock, there should be no
431                                 more GROUP locks later on, queue in front of
432                                 first non-GROUP lock */
433
434                                 ldlm_resource_insert_lock_after(lock, req);
435                                 list_del_init(&lock->l_res_link);
436                                 ldlm_resource_insert_lock_after(req, lock);
437                                 compat = 0;
438                                 break;
439                         }
440                         if (req->l_policy_data.l_extent.gid ==
441                              lock->l_policy_data.l_extent.gid) {
442                                 /* found it */
443                                 ldlm_resource_insert_lock_after(lock, req);
444                                 compat = 0;
445                                 break;
446                         }
447                         continue;
448                 }
449
450                 /* locks are compatible, overlap doesn't matter */
451                 if (lockmode_compat(lock->l_req_mode, req_mode)) {
452                         if (req_mode == LCK_PR &&
453                             ((lock->l_policy_data.l_extent.start <=
454                              req->l_policy_data.l_extent.start) &&
455                              (lock->l_policy_data.l_extent.end >=
456                               req->l_policy_data.l_extent.end))) {
457                                 /* If we met a PR lock just like us or wider,
458                                    and nobody down the list conflicted with
459                                    it, that means we can skip processing of
460                                    the rest of the list and safely place
461                                    ourselves at the end of the list, or grant
462                                    (dependent if we met an conflicting locks
463                                    before in the list).
464                                    In case of 1st enqueue only we continue
465                                    traversing if there is something conflicting
466                                    down the list because we need to make sure
467                                    that something is marked as AST_SENT as well,
468                                    in cse of empy worklist we would exit on
469                                    first conflict met. */
470                                 /* There IS a case where such flag is
471                                    not set for a lock, yet it blocks
472                                    something. Luckily for us this is
473                                    only during destroy, so lock is
474                                    exclusive. So here we are safe */
475                                 if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
476                                         RETURN(compat);
477                                 }
478                         }
479
480                         /* non-group locks are compatible, overlap doesn't
481                            matter */
482                         if (likely(req_mode != LCK_GROUP))
483                                 continue;
484
485                         /* If we are trying to get a GROUP lock and there is
486                            another one of this kind, we need to compare gid */
487                         if (req->l_policy_data.l_extent.gid ==
488                             lock->l_policy_data.l_extent.gid) {
489                                 /* If existing lock with matched gid is granted,
490                                    we grant new one too. */
491                                 if (lock->l_req_mode == lock->l_granted_mode)
492                                         RETURN(2);
493
494                                 /* Otherwise we are scanning queue of waiting
495                                  * locks and it means current request would
496                                  * block along with existing lock (that is
497                                  * already blocked.
498                                  * If we are in nonblocking mode - return
499                                  * immediately */
500                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
501                                         compat = -EWOULDBLOCK;
502                                         goto destroylock;
503                                 }
504                                 /* If this group lock is compatible with another
505                                  * group lock on the waiting list, they must be
506                                  * together in the list, so they can be granted
507                                  * at the same time.  Otherwise the later lock
508                                  * can get stuck behind another, incompatible,
509                                  * lock. */
510                                 ldlm_resource_insert_lock_after(lock, req);
511                                 /* Because 'lock' is not granted, we can stop
512                                  * processing this queue and return immediately.
513                                  * There is no need to check the rest of the
514                                  * list. */
515                                 RETURN(0);
516                         }
517                 }
518
519                 if (unlikely(req_mode == LCK_GROUP &&
520                              (lock->l_req_mode != lock->l_granted_mode))) {
521                         scan = 1;
522                         compat = 0;
523                         if (lock->l_req_mode != LCK_GROUP) {
524                                 /* Ok, we hit non-GROUP lock, there should
525                                  * be no more GROUP locks later on, queue in
526                                  * front of first non-GROUP lock */
527
528                                 ldlm_resource_insert_lock_after(lock, req);
529                                 list_del_init(&lock->l_res_link);
530                                 ldlm_resource_insert_lock_after(req, lock);
531                                 break;
532                         }
533                         if (req->l_policy_data.l_extent.gid ==
534                              lock->l_policy_data.l_extent.gid) {
535                                 /* found it */
536                                 ldlm_resource_insert_lock_after(lock, req);
537                                 break;
538                         }
539                         continue;
540                 }
541
542                 if (unlikely(lock->l_req_mode == LCK_GROUP)) {
543                         /* If compared lock is GROUP, then requested is PR/PW/
544                          * so this is not compatible; extent range does not
545                          * matter */
546                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
547                                 compat = -EWOULDBLOCK;
548                                 goto destroylock;
549                         } else {
550                                 *flags |= LDLM_FL_NO_TIMEOUT;
551                         }
552                 } else if (lock->l_policy_data.l_extent.end < req_start ||
553                            lock->l_policy_data.l_extent.start > req_end) {
554                         /* if a non group lock doesn't overlap skip it */
555                         continue;
556                 } else if (lock->l_req_extent.end < req_start ||
557                            lock->l_req_extent.start > req_end)
558                         /* false contention, the requests doesn't really overlap */
559                                 check_contention = 0;
560
561                 if (!work_list)
562                         RETURN(0);
563
564                 /* don't count conflicting glimpse locks */
565                 if (lock->l_req_mode == LCK_PR &&
566                     lock->l_policy_data.l_extent.start == 0 &&
567                     lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
568                         check_contention = 0;
569
570                 *contended_locks += check_contention;
571
572                 compat = 0;
573                 if (lock->l_blocking_ast)
574                         ldlm_add_ast_work_item(lock, req, work_list);
575         }
576
577         if (ldlm_check_contention(req, *contended_locks) &&
578             compat == 0 &&
579             (*flags & LDLM_FL_DENY_ON_CONTENTION) &&
580             req->l_req_mode != LCK_GROUP &&
581             req_end - req_start <=
582             req->l_resource->lr_namespace->ns_max_nolock_size)
583                 GOTO(destroylock, compat = -EUSERS);
584
585         RETURN(compat);
586 destroylock:
587         list_del_init(&req->l_res_link);
588         ldlm_lock_destroy_nolock(req);
589         *err = compat;
590         RETURN(compat);
591 }
592
593 static void discard_bl_list(struct list_head *bl_list)
594 {
595         struct list_head *tmp, *pos;
596         ENTRY;
597
598         list_for_each_safe(pos, tmp, bl_list) {
599                 struct ldlm_lock *lock =
600                         list_entry(pos, struct ldlm_lock, l_bl_ast);
601
602                 list_del_init(&lock->l_bl_ast);
603                 LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
604                 lock->l_flags &= ~LDLM_FL_AST_SENT;
605                 LASSERT(lock->l_bl_ast_run == 0);
606                 LASSERT(lock->l_blocking_lock);
607                 LDLM_LOCK_PUT(lock->l_blocking_lock);
608                 lock->l_blocking_lock = NULL;
609                 LDLM_LOCK_PUT(lock);
610         }
611         EXIT;
612 }
613
614 /* If first_enq is 0 (ie, called from ldlm_reprocess_queue):
615   *   - blocking ASTs have already been sent
616   *   - must call this function with the ns lock held
617   *
618   * If first_enq is 1 (ie, called from ldlm_lock_enqueue):
619   *   - blocking ASTs have not been sent
620   *   - must call this function with the ns lock held once */
621 int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
622                              ldlm_error_t *err, struct list_head *work_list)
623 {
624         struct ldlm_resource *res = lock->l_resource;
625         struct list_head rpc_list = CFS_LIST_HEAD_INIT(rpc_list);
626         int rc, rc2;
627         int contended_locks = 0;
628         ENTRY;
629
630         LASSERT(list_empty(&res->lr_converting));
631         LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
632                 !(lock->l_flags & LDLM_AST_DISCARD_DATA));
633         check_res_locked(res);
634         *err = ELDLM_OK;
635
636         if (!first_enq) {
637                 /* Careful observers will note that we don't handle -EWOULDBLOCK
638                  * here, but it's ok for a non-obvious reason -- compat_queue
639                  * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
640                  * flags should always be zero here, and if that ever stops
641                  * being true, we want to find out. */
642                 LASSERT(*flags == 0);
643                 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
644                                               err, NULL, &contended_locks);
645                 if (rc == 1) {
646                         rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
647                                                       flags, err, NULL,
648                                                       &contended_locks);
649                 }
650                 if (rc == 0)
651                         RETURN(LDLM_ITER_STOP);
652
653                 ldlm_resource_unlink_lock(lock);
654
655                 if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE))
656                         ldlm_extent_policy(res, lock, flags);
657                 ldlm_grant_lock(lock, work_list);
658                 RETURN(LDLM_ITER_CONTINUE);
659         }
660
661  restart:
662         contended_locks = 0;
663         rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
664                                       &rpc_list, &contended_locks);
665         if (rc < 0)
666                 GOTO(out, rc); /* lock was destroyed */
667         if (rc == 2)
668                 goto grant;
669
670         rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err,
671                                        &rpc_list, &contended_locks);
672         if (rc2 < 0)
673                 GOTO(out, rc = rc2); /* lock was destroyed */
674
675         if (rc + rc2 == 2) {
676         grant:
677                 ldlm_extent_policy(res, lock, flags);
678                 ldlm_resource_unlink_lock(lock);
679                 ldlm_grant_lock(lock, NULL);
680         } else {
681                 /* If either of the compat_queue()s returned failure, then we
682                  * have ASTs to send and must go onto the waiting list.
683                  *
684                  * bug 2322: we used to unlink and re-add here, which was a
685                  * terrible folly -- if we goto restart, we could get
686                  * re-ordered!  Causes deadlock, because ASTs aren't sent! */
687                 if (list_empty(&lock->l_res_link))
688                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
689                 unlock_res(res);
690                 rc = ldlm_run_bl_ast_work(&rpc_list);
691                 lock_res(res);
692
693                 if (rc == -ERESTART) {
694                         /* lock was granted while resource was unlocked. */
695                         if (lock->l_granted_mode == lock->l_req_mode) {
696                                 /* bug 11300: if the lock has been granted,
697                                  * break earlier because otherwise, we will go
698                                  * to restart and ldlm_resource_unlink will be
699                                  * called and it causes the interval node to be
700                                  * freed. Then we will fail at 
701                                  * ldlm_extent_add_lock() */
702                                 *flags &= ~(LDLM_FL_BLOCK_GRANTED | LDLM_FL_BLOCK_CONV |
703                                             LDLM_FL_BLOCK_WAIT);
704                                 GOTO(out, rc = 0);
705                         }
706
707                         GOTO(restart, -ERESTART);
708                 }
709
710                 *flags |= LDLM_FL_BLOCK_GRANTED;
711                 /* this way we force client to wait for the lock
712                  * endlessly once the lock is enqueued -bzzz */
713                 *flags |= LDLM_FL_NO_TIMEOUT;
714
715         }
716         RETURN(0);
717 out:
718         if (!list_empty(&rpc_list)) {
719                 LASSERT(!(lock->l_flags & LDLM_AST_DISCARD_DATA));
720                 discard_bl_list(&rpc_list);
721         }
722         RETURN(rc);
723 }
724
725 /* When a lock is cancelled by a client, the KMS may undergo change if this
726  * is the "highest lock".  This function returns the new KMS value.
727  * Caller must hold ns_lock already.
728  *
729  * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes! */
730 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
731 {
732         struct ldlm_resource *res = lock->l_resource;
733         struct list_head *tmp;
734         struct ldlm_lock *lck;
735         __u64 kms = 0;
736         ENTRY;
737
738         /* don't let another thread in ldlm_extent_shift_kms race in
739          * just after we finish and take our lock into account in its
740          * calculation of the kms */
741         lock->l_flags |= LDLM_FL_KMS_IGNORE;
742
743         list_for_each(tmp, &res->lr_granted) {
744                 lck = list_entry(tmp, struct ldlm_lock, l_res_link);
745
746                 if (lck->l_flags & LDLM_FL_KMS_IGNORE)
747                         continue;
748
749                 if (lck->l_policy_data.l_extent.end >= old_kms)
750                         RETURN(old_kms);
751
752                 /* This extent _has_ to be smaller than old_kms (checked above)
753                  * so kms can only ever be smaller or the same as old_kms. */
754                 if (lck->l_policy_data.l_extent.end + 1 > kms)
755                         kms = lck->l_policy_data.l_extent.end + 1;
756         }
757         LASSERTF(kms <= old_kms, "kms "LPU64" old_kms "LPU64"\n", kms, old_kms);
758
759         RETURN(kms);
760 }
761
762 cfs_mem_cache_t *ldlm_interval_slab;
763 struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock)
764 {
765         struct ldlm_interval *node;
766         ENTRY;
767
768         LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
769         OBD_SLAB_ALLOC(node, ldlm_interval_slab, CFS_ALLOC_IO, sizeof(*node));
770         if (node == NULL)
771                 RETURN(NULL);
772
773         CFS_INIT_LIST_HEAD(&node->li_group);
774         ldlm_interval_attach(node, lock);
775         RETURN(node);
776 }
777
778 void ldlm_interval_free(struct ldlm_interval *node)
779 {
780         if (node) {
781                 LASSERT(list_empty(&node->li_group));
782                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
783         }
784 }
785
786 /* interval tree, for LDLM_EXTENT. */
787 void ldlm_interval_attach(struct ldlm_interval *n,
788                           struct ldlm_lock *l)
789 {
790         LASSERT(l->l_tree_node == NULL);
791         LASSERT(l->l_resource->lr_type == LDLM_EXTENT);
792
793         list_add_tail(&l->l_sl_policy, &n->li_group);
794         l->l_tree_node = n;
795 }
796
797 struct ldlm_interval *ldlm_interval_detach(struct ldlm_lock *l)
798 {
799         struct ldlm_interval *n = l->l_tree_node;
800
801         if (n == NULL)
802                 return NULL;
803
804         LASSERT(!list_empty(&n->li_group));
805         l->l_tree_node = NULL;
806         list_del_init(&l->l_sl_policy);
807
808         return (list_empty(&n->li_group) ? n : NULL);
809 }
810
811 static inline int lock_mode_to_index(ldlm_mode_t mode)
812 {
813         int index;
814
815         LASSERT(mode != 0);
816         LASSERT(IS_PO2(mode));
817         for (index = -1; mode; index++, mode >>= 1) ;
818         LASSERT(index < LCK_MODE_NUM);
819         return index;
820 }
821
822 void ldlm_extent_add_lock(struct ldlm_resource *res,
823                           struct ldlm_lock *lock)
824 {
825         struct interval_node *found, **root;
826         struct ldlm_interval *node;
827         struct ldlm_extent *extent;
828         int idx;
829
830         LASSERT(lock->l_granted_mode == lock->l_req_mode);
831
832         node = lock->l_tree_node;
833         LASSERT(node != NULL);
834
835         idx = lock_mode_to_index(lock->l_granted_mode);
836         LASSERT(lock->l_granted_mode == 1 << idx);
837         LASSERT(lock->l_granted_mode == res->lr_itree[idx].lit_mode);
838
839         /* node extent initialize */
840         extent = &lock->l_policy_data.l_extent;
841         interval_set(&node->li_node, extent->start, extent->end);
842
843         root = &res->lr_itree[idx].lit_root;
844         found = interval_insert(&node->li_node, root);
845         if (found) { /* The policy group found. */
846                 struct ldlm_interval *tmp = ldlm_interval_detach(lock);
847                 LASSERT(tmp != NULL);
848                 ldlm_interval_free(tmp);
849                 ldlm_interval_attach(to_ldlm_interval(found), lock);
850         }
851         res->lr_itree[idx].lit_size++;
852
853         /* even though we use interval tree to manage the extent lock, we also
854          * add the locks into grant list, for debug purpose, .. */
855         ldlm_resource_add_lock(res, &res->lr_granted, lock);
856 }
857
858 void ldlm_extent_unlink_lock(struct ldlm_lock *lock)
859 {
860         struct ldlm_resource *res = lock->l_resource;
861         struct ldlm_interval *node;
862         struct ldlm_interval_tree *tree;
863         int idx;
864
865         if (lock->l_granted_mode != lock->l_req_mode)
866                 return;
867
868         LASSERT(lock->l_tree_node != NULL);
869         idx = lock_mode_to_index(lock->l_granted_mode);
870         LASSERT(lock->l_granted_mode == 1 << idx);
871         tree = &res->lr_itree[idx];
872
873         LASSERT(tree->lit_root != NULL); /* assure the tree is not null */
874
875         tree->lit_size--;
876         node = ldlm_interval_detach(lock);
877         if (node) {
878                 interval_erase(&node->li_node, &tree->lit_root);
879                 ldlm_interval_free(node);
880         }
881 }