Whamcloud - gitweb
f1f88cefbd298e2456437d251c849191d9666534
[fs/lustre-release.git] / lustre / ldlm / ldlm_extent.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of the Lustre file system, http://www.lustre.org
9  *   Lustre is a trademark of Cluster File Systems, Inc.
10  *
11  *   You may have signed or agreed to another license before downloading
12  *   this software.  If so, you are bound by the terms and conditions
13  *   of that agreement, and the following does not apply to you.  See the
14  *   LICENSE file included with this distribution for more information.
15  *
16  *   If you did not agree to a different license, then this copy of Lustre
17  *   is open source software; you can redistribute it and/or modify it
18  *   under the terms of version 2 of the GNU General Public License as
19  *   published by the Free Software Foundation.
20  *
21  *   In either case, Lustre is distributed in the hope that it will be
22  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
23  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24  *   license text for more details.
25  */
26
27 #define DEBUG_SUBSYSTEM S_LDLM
28 #ifndef __KERNEL__
29 # include <liblustre.h>
30 #endif
31
32 #include <lustre_dlm.h>
33 #include <obd_support.h>
34 #include <lustre_lib.h>
35
36 #include "ldlm_internal.h"
37
38 #define LDLM_MAX_GROWN_EXTENT (32 * 1024 * 1024 - 1)
39
40 /* fixup the ldlm_extent after expanding */
41 static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
42                                               struct ldlm_extent *new_ex,
43                                               int conflicting)
44 {
45         ldlm_mode_t req_mode = req->l_req_mode;
46         __u64 req_start = req->l_req_extent.start;
47         __u64 req_end = req->l_req_extent.end;
48         __u64 req_align, mask;
49  
50         if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) {
51                 if (req_end < req_start + LDLM_MAX_GROWN_EXTENT)
52                         new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT,
53                                           new_ex->end);
54         }
55
56         if (new_ex->start == 0 && new_ex->end == OBD_OBJECT_EOF) {
57                 EXIT;
58                 return;
59         }
60
61         /* we need to ensure that the lock extent is properly aligned to what
62          * the client requested.  We align it to the lowest-common denominator
63          * of the clients requested lock start and end alignment. */
64         mask = 0x1000ULL;
65         req_align = (req_end + 1) | req_start;
66         if (req_align != 0) {
67                 while ((req_align & mask) == 0)
68                         mask <<= 1;
69         }
70         mask -= 1;
71         /* We can only shrink the lock, not grow it.
72          * This should never cause lock to be smaller than requested,
73          * since requested lock was already aligned on these boundaries. */
74         new_ex->start = ((new_ex->start - 1) | mask) + 1;
75         new_ex->end = ((new_ex->end + 1) & ~mask) - 1;
76         LASSERTF(new_ex->start <= req_start,
77                  "mask "LPX64" grant start "LPU64" req start "LPU64"\n",
78                  mask, new_ex->start, req_start);
79         LASSERTF(new_ex->end >= req_end,
80                  "mask "LPX64" grant end "LPU64" req end "LPU64"\n",
81                  mask, new_ex->end, req_end);
82 }
83
84 /* The purpose of this function is to return:
85  * - the maximum extent
86  * - containing the requested extent
87  * - and not overlapping existing conflicting extents outside the requested one
88  *
89  * Use interval tree to expand the lock extent for granted lock.
90  */
91 static void ldlm_extent_internal_policy_granted(struct ldlm_lock *req,
92                                                 struct ldlm_extent *new_ex)
93 {
94         struct ldlm_resource *res = req->l_resource;
95         ldlm_mode_t req_mode = req->l_req_mode;
96         __u64 req_start = req->l_req_extent.start;
97         __u64 req_end = req->l_req_extent.end;
98         struct ldlm_interval_tree *tree;
99         struct interval_node_extent limiter = { new_ex->start, new_ex->end };
100         int conflicting = 0;
101         int idx;
102         ENTRY;
103
104         lockmode_verify(req_mode);
105
106         /* using interval tree to handle the ldlm extent granted locks */
107         for (idx = 0; idx < LCK_MODE_NUM; idx++) {
108                 struct interval_node_extent ext = { req_start, req_end };
109
110                 tree = &res->lr_itree[idx];
111                 if (lockmode_compat(tree->lit_mode, req_mode))
112                         continue;
113
114                 conflicting += tree->lit_size;
115                 if (conflicting > 4)
116                         limiter.start = req_start;
117
118                 if (interval_is_overlapped(tree->lit_root, &ext))
119                         printk("req_mode = %d, tree->lit_mode = %d, tree->lit_size = %d\n",
120                                req_mode, tree->lit_mode, tree->lit_size);
121                 interval_expand(tree->lit_root, &ext, &limiter);
122                 limiter.start = max(limiter.start, ext.start);
123                 limiter.end = min(limiter.end, ext.end);
124                 if (limiter.start == req_start && limiter.end == req_end)
125                         break;
126         }
127
128         new_ex->start = limiter.start;
129         new_ex->end = limiter.end;
130         LASSERT(new_ex->start <= req_start);
131         LASSERT(new_ex->end >= req_end);
132
133         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
134         EXIT;
135 }
136
137 /* The purpose of this function is to return:
138  * - the maximum extent
139  * - containing the requested extent
140  * - and not overlapping existing conflicting extents outside the requested one
141  */
142 static void
143 ldlm_extent_internal_policy_waiting(struct ldlm_lock *req,
144                                     struct ldlm_extent *new_ex)
145 {
146         struct list_head *tmp;
147         struct ldlm_resource *res = req->l_resource;
148         ldlm_mode_t req_mode = req->l_req_mode;
149         __u64 req_start = req->l_req_extent.start;
150         __u64 req_end = req->l_req_extent.end;
151         int conflicting = 0;
152         ENTRY;
153
154         lockmode_verify(req_mode);
155
156         /* for waiting locks */
157         list_for_each(tmp, &res->lr_waiting) {
158                 struct ldlm_lock *lock;
159                 struct ldlm_extent *l_extent;
160
161                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
162                 l_extent = &lock->l_policy_data.l_extent;
163
164                 /* We already hit the minimum requested size, search no more */
165                 if (new_ex->start == req_start && new_ex->end == req_end) {
166                         EXIT;
167                         return;
168                 }
169
170                 /* Don't conflict with ourselves */
171                 if (req == lock)
172                         continue;
173
174                 /* Locks are compatible, overlap doesn't matter */
175                 /* Until bug 20 is fixed, try to avoid granting overlapping
176                  * locks on one client (they take a long time to cancel) */
177                 if (lockmode_compat(lock->l_req_mode, req_mode) &&
178                     lock->l_export != req->l_export)
179                         continue;
180
181                 /* If this is a high-traffic lock, don't grow downwards at all
182                  * or grow upwards too much */
183                 ++conflicting;
184                 if (conflicting > 4)
185                         new_ex->start = req_start;
186
187                 /* If lock doesn't overlap new_ex, skip it. */
188                 if (!ldlm_extent_overlap(l_extent, new_ex))
189                         continue;
190
191                 /* Locks conflicting in requested extents and we can't satisfy
192                  * both locks, so ignore it.  Either we will ping-pong this
193                  * extent (we would regardless of what extent we granted) or
194                  * lock is unused and it shouldn't limit our extent growth. */
195                 if (ldlm_extent_overlap(&lock->l_req_extent,&req->l_req_extent))
196                         continue;
197
198                 /* We grow extents downwards only as far as they don't overlap
199                  * with already-granted locks, on the assumtion that clients
200                  * will be writing beyond the initial requested end and would
201                  * then need to enqueue a new lock beyond previous request.
202                  * l_req_extent->end strictly < req_start, checked above. */
203                 if (l_extent->start < req_start && new_ex->start != req_start) {
204                         if (l_extent->end >= req_start)
205                                 new_ex->start = req_start;
206                         else
207                                 new_ex->start = min(l_extent->end+1, req_start);
208                 }
209
210                 /* If we need to cancel this lock anyways because our request
211                  * overlaps the granted lock, we grow up to its requested
212                  * extent start instead of limiting this extent, assuming that
213                  * clients are writing forwards and the lock had over grown
214                  * its extent downwards before we enqueued our request. */
215                 if (l_extent->end > req_end) {
216                         if (l_extent->start <= req_end)
217                                 new_ex->end = max(lock->l_req_extent.start - 1,
218                                                   req_end);
219                         else
220                                 new_ex->end = max(l_extent->start - 1, req_end);
221                 }
222         }
223
224         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
225         EXIT;
226 }
227
228
229 /* In order to determine the largest possible extent we can grant, we need
230  * to scan all of the queues. */
231 static void ldlm_extent_policy(struct ldlm_resource *res,
232                                struct ldlm_lock *lock, int *flags)
233 {
234         struct ldlm_extent new_ex = { .start = 0, .end = OBD_OBJECT_EOF };
235
236         if (lock->l_export == NULL)
237                 /*
238                  * this is local lock taken by server (e.g., as a part of
239                  * OST-side locking, or unlink handling). Expansion doesn't
240                  * make a lot of sense for local locks, because they are
241                  * dropped immediately on operation completion and would only
242                  * conflict with other threads.
243                  */
244                 return;
245
246         if (lock->l_policy_data.l_extent.start == 0 &&
247             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
248                 /* fast-path whole file locks */
249                 return;
250
251         ldlm_extent_internal_policy_granted(lock, &new_ex);
252         ldlm_extent_internal_policy_waiting(lock, &new_ex);
253
254         if (new_ex.start != lock->l_policy_data.l_extent.start ||
255             new_ex.end != lock->l_policy_data.l_extent.end) {
256                 *flags |= LDLM_FL_LOCK_CHANGED;
257                 lock->l_policy_data.l_extent.start = new_ex.start;
258                 lock->l_policy_data.l_extent.end = new_ex.end;
259         }
260 }
261
262 struct ldlm_extent_compat_args {
263         struct list_head *work_list;
264         struct ldlm_lock *lock;
265         ldlm_mode_t mode;
266         int *compat;
267 };
268
269 static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
270                                                 void *data)
271 {
272         struct ldlm_extent_compat_args *priv = data;
273         struct ldlm_interval *node = to_ldlm_interval(n);
274         struct list_head *work_list = priv->work_list;
275         struct ldlm_lock *lock, *enq = priv->lock;
276         ldlm_mode_t mode = priv->mode;
277         ENTRY;
278
279         LASSERT(!list_empty(&node->li_group));
280
281         list_for_each_entry(lock, &node->li_group, l_sl_policy) {
282                 /* interval tree is for granted lock */
283                 LASSERTF(mode == lock->l_granted_mode,
284                          "mode = %s, lock->l_granted_mode = %s\n",
285                          ldlm_lockname[mode],
286                          ldlm_lockname[lock->l_granted_mode]);
287
288                 if (lock->l_blocking_ast)
289                         ldlm_add_ast_work_item(lock, enq, work_list);
290         }
291
292         if (priv->compat)
293                 *priv->compat = 0;
294
295         RETURN(INTERVAL_ITER_CONT);
296 }
297
298 /* Determine if the lock is compatible with all locks on the queue.
299  * We stop walking the queue if we hit ourselves so we don't take
300  * conflicting locks enqueued after us into accound, or we'd wait forever.
301  *
302  * 0 if the lock is not compatible
303  * 1 if the lock is compatible
304  * 2 if this group lock is compatible and requires no further checking
305  * negative error, such as EWOULDBLOCK for group locks
306  */
307 static int
308 ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
309                          int *flags, ldlm_error_t *err,
310                          struct list_head *work_list)
311 {
312         struct list_head *tmp;
313         struct ldlm_lock *lock;
314         struct ldlm_resource *res = req->l_resource;
315         ldlm_mode_t req_mode = req->l_req_mode;
316         __u64 req_start = req->l_req_extent.start;
317         __u64 req_end = req->l_req_extent.end;
318         int compat = 1;
319         int scan = 0;
320         ENTRY;
321
322         lockmode_verify(req_mode);
323
324         /* Using interval tree for granted lock */
325         if (queue == &res->lr_granted) {
326                 struct ldlm_interval_tree *tree;
327                 struct ldlm_extent_compat_args data = {.work_list = work_list,
328                                                .lock = req,
329                                                .compat = &compat };
330                 struct interval_node_extent ex = { .start = req_start,
331                                                    .end = req_end };
332                 int idx, rc;
333
334                 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
335                         tree = &res->lr_itree[idx];
336                         if (tree->lit_root == NULL) /* empty tree, skipped */
337                                 continue;
338
339                         data.mode = tree->lit_mode;
340                         if (lockmode_compat(req_mode, tree->lit_mode)) {
341                                 struct ldlm_interval *node;
342                                 struct ldlm_extent *extent;
343
344                                 if (req_mode != LCK_GROUP)
345                                         continue;
346
347                                 /* group lock, grant it immediately if
348                                  * compatible */
349                                 node = to_ldlm_interval(tree->lit_root);
350                                 extent = ldlm_interval_extent(node);
351                                 if (req->l_policy_data.l_extent.gid ==
352                                     extent->gid)
353                                         RETURN(2);
354                         }
355
356                         if (tree->lit_mode == LCK_GROUP) {
357                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
358                                         compat = -EWOULDBLOCK;
359                                         goto destroylock;
360                                 }
361
362                                 *flags |= LDLM_FL_NO_TIMEOUT;
363                                 if (!work_list)
364                                         RETURN(0);
365
366                                 /* if work list is not NULL,add all
367                                    locks in the tree to work list */
368                                 compat = 0;
369                                 interval_iterate(tree->lit_root,
370                                                  ldlm_extent_compat_cb, &data);
371                                 continue;
372                         }
373
374                         if (!work_list) {
375                                 rc = interval_is_overlapped(tree->lit_root,&ex);
376                                 if (rc)
377                                         RETURN(0);
378                         } else {
379                                 interval_search(tree->lit_root, &ex,
380                                                 ldlm_extent_compat_cb, &data);
381                                 if (!list_empty(work_list) && compat)
382                                         compat = 0;
383                         }
384                 }
385                 RETURN(compat);
386         }
387
388         /* for waiting queue */
389         list_for_each(tmp, queue) {
390                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
391
392                 if (req == lock)
393                         RETURN(compat);
394
395                 if (unlikely(scan)) {
396                         /* We only get here if we are queuing GROUP lock
397                            and met some incompatible one. The main idea of this
398                            code is to insert GROUP lock past compatible GROUP
399                            lock in the waiting queue or if there is not any,
400                            then in front of first non-GROUP lock */
401                         if (lock->l_req_mode != LCK_GROUP) {
402                                 /* Ok, we hit non-GROUP lock, there should
403                                  * be no more GROUP locks later on, queue in
404                                  * front of first non-GROUP lock */
405
406                                 ldlm_resource_insert_lock_after(lock, req);
407                                 list_del_init(&lock->l_res_link);
408                                 ldlm_resource_insert_lock_after(req, lock);
409                                 RETURN(0);
410                         }
411                         if (req->l_policy_data.l_extent.gid ==
412                              lock->l_policy_data.l_extent.gid) {
413                                 /* found it */
414                                 ldlm_resource_insert_lock_after(lock, req);
415                                 RETURN(0);
416                         }
417                         continue;
418                 }
419
420                 /* locks are compatible, overlap doesn't matter */
421                 if (lockmode_compat(lock->l_req_mode, req_mode)) {
422                         if (req_mode == LCK_PR &&
423                             ((lock->l_policy_data.l_extent.start <=
424                              req->l_policy_data.l_extent.start) &&
425                              (lock->l_policy_data.l_extent.end >=
426                               req->l_policy_data.l_extent.end))) {
427                                 /* If we met a PR lock just like us or wider,
428                                    and nobody down the list conflicted with
429                                    it, that means we can skip processing of
430                                    the rest of the list and safely place
431                                    ourselves at the end of the list, or grant
432                                    (dependent if we met an conflicting locks
433                                    before in the list).
434                                    In case of 1st enqueue only we continue
435                                    traversing if there is something conflicting
436                                    down the list because we need to make sure
437                                    that something is marked as AST_SENT as well,
438                                    in cse of empy worklist we would exit on
439                                    first conflict met. */
440                                 /* There IS a case where such flag is
441                                    not set for a lock, yet it blocks
442                                    something. Luckily for us this is
443                                    only during destroy, so lock is
444                                    exclusive. So here we are safe */
445                                 if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
446                                         RETURN(compat);
447                                 }
448                         }
449
450                         /* non-group locks are compatible, overlap doesn't
451                            matter */
452                         if (likely(req_mode != LCK_GROUP))
453                                 continue;
454
455                         /* If we are trying to get a GROUP lock and there is
456                            another one of this kind, we need to compare gid */
457                         if (req->l_policy_data.l_extent.gid ==
458                             lock->l_policy_data.l_extent.gid) {
459                                 /* If existing lock with matched gid is granted,
460                                    we grant new one too. */
461                                 if (lock->l_req_mode == lock->l_granted_mode)
462                                         RETURN(2);
463
464                                 /* Otherwise we are scanning queue of waiting
465                                  * locks and it means current request would
466                                  * block along with existing lock (that is
467                                  * already blocked.
468                                  * If we are in nonblocking mode - return
469                                  * immediately */
470                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
471                                         compat = -EWOULDBLOCK;
472                                         goto destroylock;
473                                 }
474                                 /* If this group lock is compatible with another
475                                  * group lock on the waiting list, they must be
476                                  * together in the list, so they can be granted
477                                  * at the same time.  Otherwise the later lock
478                                  * can get stuck behind another, incompatible,
479                                  * lock. */
480                                 ldlm_resource_insert_lock_after(lock, req);
481                                 /* Because 'lock' is not granted, we can stop
482                                  * processing this queue and return immediately.
483                                  * There is no need to check the rest of the
484                                  * list. */
485                                 RETURN(0);
486                         }
487                 }
488
489                 if (unlikely(req_mode == LCK_GROUP &&
490                     (lock->l_req_mode != lock->l_granted_mode))) {
491                         scan = 1;
492                         compat = 0;
493                         if (lock->l_req_mode != LCK_GROUP) {
494                         /* Ok, we hit non-GROUP lock, there should be no
495                            more GROUP locks later on, queue in front of
496                            first non-GROUP lock */
497
498                                 ldlm_resource_insert_lock_after(lock, req);
499                                 list_del_init(&lock->l_res_link);
500                                 ldlm_resource_insert_lock_after(req, lock);
501                                 RETURN(0);
502                         }
503                         if (req->l_policy_data.l_extent.gid ==
504                              lock->l_policy_data.l_extent.gid) {
505                                 /* found it */
506                                 ldlm_resource_insert_lock_after(lock, req);
507                                 RETURN(0);
508                         }
509                         continue;
510                 }
511
512                 if (unlikely(lock->l_req_mode == LCK_GROUP)) {
513                         /* If compared lock is GROUP, then requested is PR/PW/
514                          * so this is not compatible; extent range does not
515                          * matter */
516                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
517                                 compat = -EWOULDBLOCK;
518                                 goto destroylock;
519                         } else {
520                                 *flags |= LDLM_FL_NO_TIMEOUT;
521                         }
522                 } else if (lock->l_policy_data.l_extent.end < req_start ||
523                            lock->l_policy_data.l_extent.start > req_end) {
524                         /* if a non group lock doesn't overlap skip it */
525                         continue;
526                 }
527
528                 if (!work_list)
529                         RETURN(0);
530
531                 compat = 0;
532                 if (lock->l_blocking_ast)
533                         ldlm_add_ast_work_item(lock, req, work_list);
534         }
535
536         RETURN(compat);
537 destroylock:
538         list_del_init(&req->l_res_link);
539         ldlm_lock_destroy_nolock(req);
540         *err = compat;
541         RETURN(compat);
542 }
543
544 /* If first_enq is 0 (ie, called from ldlm_reprocess_queue):
545   *   - blocking ASTs have already been sent
546   *   - must call this function with the ns lock held
547   *
548   * If first_enq is 1 (ie, called from ldlm_lock_enqueue):
549   *   - blocking ASTs have not been sent
550   *   - must call this function with the ns lock held once */
551 int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
552                              ldlm_error_t *err, struct list_head *work_list)
553 {
554         struct ldlm_resource *res = lock->l_resource;
555         struct list_head rpc_list = CFS_LIST_HEAD_INIT(rpc_list);
556         int rc, rc2;
557         ENTRY;
558
559         LASSERT(list_empty(&res->lr_converting));
560         check_res_locked(res);
561         *err = ELDLM_OK;
562
563         if (!first_enq) {
564                 /* Careful observers will note that we don't handle -EWOULDBLOCK
565                  * here, but it's ok for a non-obvious reason -- compat_queue
566                  * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
567                  * flags should always be zero here, and if that ever stops
568                  * being true, we want to find out. */
569                 LASSERT(*flags == 0);
570                 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
571                                               err, NULL);
572                 if (rc == 1) {
573                         rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
574                                                       flags, err, NULL);
575                 }
576                 if (rc == 0)
577                         RETURN(LDLM_ITER_STOP);
578
579                 ldlm_resource_unlink_lock(lock);
580
581                 if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE))
582                         ldlm_extent_policy(res, lock, flags);
583                 ldlm_grant_lock(lock, work_list);
584                 RETURN(LDLM_ITER_CONTINUE);
585         }
586
587  restart:
588         rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err, &rpc_list);
589         if (rc < 0)
590                 GOTO(out, rc); /* lock was destroyed */
591         if (rc == 2)
592                 goto grant;
593
594         rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err, &rpc_list);
595         if (rc2 < 0)
596                 GOTO(out, rc = rc2); /* lock was destroyed */
597
598         if (rc + rc2 == 2) {
599         grant:
600                 ldlm_extent_policy(res, lock, flags);
601                 ldlm_resource_unlink_lock(lock);
602                 ldlm_grant_lock(lock, NULL);
603         } else {
604                 /* If either of the compat_queue()s returned failure, then we
605                  * have ASTs to send and must go onto the waiting list.
606                  *
607                  * bug 2322: we used to unlink and re-add here, which was a
608                  * terrible folly -- if we goto restart, we could get
609                  * re-ordered!  Causes deadlock, because ASTs aren't sent! */
610                 if (list_empty(&lock->l_res_link))
611                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
612                 unlock_res(res);
613                 rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_BL_AST);
614                 lock_res(res);
615
616                 if (rc == -ERESTART) {
617                         /* lock was granted while resource was unlocked. */
618                         if (lock->l_granted_mode == lock->l_req_mode) {
619                                 /* bug 11300: if the lock has been granted,
620                                  * break earlier because otherwise, we will go
621                                  * to restart and ldlm_resource_unlink will be
622                                  * called and it causes the interval node to be
623                                  * freed. Then we will fail at 
624                                  * ldlm_extent_add_lock() */
625                                 *flags &= ~(LDLM_FL_BLOCK_GRANTED | LDLM_FL_BLOCK_CONV |
626                                             LDLM_FL_BLOCK_WAIT);
627                                 GOTO(out, rc = 0);
628                         }
629
630                         GOTO(restart, -ERESTART);
631                 }
632
633                 *flags |= LDLM_FL_BLOCK_GRANTED;
634                 /* this way we force client to wait for the lock
635                  * endlessly once the lock is enqueued -bzzz */
636                 *flags |= LDLM_FL_NO_TIMEOUT;
637
638         }
639         rc = 0;
640 out:
641         RETURN(rc);
642 }
643
644 /* When a lock is cancelled by a client, the KMS may undergo change if this
645  * is the "highest lock".  This function returns the new KMS value.
646  * Caller must hold ns_lock already.
647  *
648  * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes! */
649 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
650 {
651         struct ldlm_resource *res = lock->l_resource;
652         struct list_head *tmp;
653         struct ldlm_lock *lck;
654         __u64 kms = 0;
655         ENTRY;
656
657         /* don't let another thread in ldlm_extent_shift_kms race in
658          * just after we finish and take our lock into account in its
659          * calculation of the kms */
660         lock->l_flags |= LDLM_FL_KMS_IGNORE;
661
662         list_for_each(tmp, &res->lr_granted) {
663                 lck = list_entry(tmp, struct ldlm_lock, l_res_link);
664
665                 if (lck->l_flags & LDLM_FL_KMS_IGNORE)
666                         continue;
667
668                 if (lck->l_policy_data.l_extent.end >= old_kms)
669                         RETURN(old_kms);
670
671                 /* This extent _has_ to be smaller than old_kms (checked above)
672                  * so kms can only ever be smaller or the same as old_kms. */
673                 if (lck->l_policy_data.l_extent.end + 1 > kms)
674                         kms = lck->l_policy_data.l_extent.end + 1;
675         }
676         LASSERTF(kms <= old_kms, "kms "LPU64" old_kms "LPU64"\n", kms, old_kms);
677
678         RETURN(kms);
679 }
680
681 cfs_mem_cache_t *ldlm_interval_slab;
682 struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock)
683 {
684         struct ldlm_interval *node;
685         ENTRY;
686
687         LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
688         OBD_SLAB_ALLOC(node, ldlm_interval_slab, CFS_ALLOC_IO, sizeof(*node));
689         if (node == NULL)
690                 RETURN(NULL);
691
692         CFS_INIT_LIST_HEAD(&node->li_group);
693         ldlm_interval_attach(node, lock);
694         RETURN(node);
695 }
696
697 void ldlm_interval_free(struct ldlm_interval *node)
698 {
699         if (node) {
700                 LASSERT(list_empty(&node->li_group));
701                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
702         }
703 }
704
705 /* interval tree, for LDLM_EXTENT. */
706 void ldlm_interval_attach(struct ldlm_interval *n,
707                           struct ldlm_lock *l)
708 {
709         LASSERT(l->l_tree_node == NULL);
710         LASSERT(l->l_resource->lr_type == LDLM_EXTENT);
711
712         list_add_tail(&l->l_sl_policy, &n->li_group);
713         l->l_tree_node = n;
714 }
715
716 struct ldlm_interval *ldlm_interval_detach(struct ldlm_lock *l)
717 {
718         struct ldlm_interval *n = l->l_tree_node;
719
720         if (n == NULL)
721                 return NULL;
722
723         LASSERT(!list_empty(&n->li_group));
724         l->l_tree_node = NULL;
725         list_del_init(&l->l_sl_policy);
726
727         return (list_empty(&n->li_group) ? n : NULL);
728 }
729
730 static inline int lock_mode_to_index(ldlm_mode_t mode)
731 {
732         int index;
733
734         LASSERT(mode != 0);
735         LASSERT(IS_PO2(mode));
736         for (index = -1; mode; index++, mode >>= 1) ;
737         LASSERT(index < LCK_MODE_NUM);
738         return index;
739 }
740
741 void ldlm_extent_add_lock(struct ldlm_resource *res,
742                           struct ldlm_lock *lock)
743 {
744         struct interval_node *found, **root;
745         struct ldlm_interval *node;
746         struct ldlm_extent *extent;
747         int idx;
748
749         LASSERT(lock->l_granted_mode == lock->l_req_mode);
750
751         node = lock->l_tree_node;
752         LASSERT(node != NULL);
753
754         idx = lock_mode_to_index(lock->l_granted_mode);
755         LASSERT(lock->l_granted_mode == 1 << idx);
756         LASSERT(lock->l_granted_mode == res->lr_itree[idx].lit_mode);
757
758         /* node extent initialize */
759         extent = &lock->l_policy_data.l_extent;
760         interval_set(&node->li_node, extent->start, extent->end);
761
762         root = &res->lr_itree[idx].lit_root;
763         found = interval_insert(&node->li_node, root);
764         if (found) { /* The policy group found. */
765                 struct ldlm_interval *tmp = ldlm_interval_detach(lock);
766                 LASSERT(tmp != NULL);
767                 ldlm_interval_free(tmp);
768                 ldlm_interval_attach(to_ldlm_interval(found), lock);
769         }
770         res->lr_itree[idx].lit_size++;
771
772         /* even though we use interval tree to manage the extent lock, we also
773          * add the locks into grant list, for debug purpose, .. */
774         ldlm_resource_add_lock(res, &res->lr_granted, lock);
775 }
776
777 void ldlm_extent_unlink_lock(struct ldlm_lock *lock)
778 {
779         struct ldlm_resource *res = lock->l_resource;
780         struct ldlm_interval *node;
781         struct ldlm_interval_tree *tree;
782         int idx;
783
784         if (lock->l_granted_mode != lock->l_req_mode)
785                 return;
786
787         LASSERT(lock->l_tree_node != NULL);
788         idx = lock_mode_to_index(lock->l_granted_mode);
789         LASSERT(lock->l_granted_mode == 1 << idx);
790         tree = &res->lr_itree[idx];
791
792         LASSERT(tree->lit_root != NULL); /* assure the tree is not null */
793
794         tree->lit_size--;
795         node = ldlm_interval_detach(lock);
796         if (node) {
797                 interval_erase(&node->li_node, &tree->lit_root);
798                 ldlm_interval_free(node);
799         }
800 }