Whamcloud - gitweb
b=16098
[fs/lustre-release.git] / lustre / ldlm / ldlm_extent.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see [sun.com URL with a
20  * copy of GPLv2].
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_extent.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LDLM
43 #ifndef __KERNEL__
44 # include <liblustre.h>
45 #else
46 # include <libcfs/libcfs.h>
47 #endif
48
49 #include <lustre_dlm.h>
50 #include <obd_support.h>
51 #include <obd.h>
52 #include <lustre_lib.h>
53
54 #include "ldlm_internal.h"
55
56 #define LDLM_MAX_GROWN_EXTENT (32 * 1024 * 1024 - 1)
57
58 /* fixup the ldlm_extent after expanding */
59 static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
60                                               struct ldlm_extent *new_ex,
61                                               int conflicting)
62 {
63         ldlm_mode_t req_mode = req->l_req_mode;
64         __u64 req_start = req->l_req_extent.start;
65         __u64 req_end = req->l_req_extent.end;
66         __u64 req_align, mask;
67  
68         if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) {
69                 if (req_end < req_start + LDLM_MAX_GROWN_EXTENT)
70                         new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT,
71                                           new_ex->end);
72         }
73
74         if (new_ex->start == 0 && new_ex->end == OBD_OBJECT_EOF) {
75                 EXIT;
76                 return;
77         }
78
79         /* we need to ensure that the lock extent is properly aligned to what
80          * the client requested.  We align it to the lowest-common denominator
81          * of the clients requested lock start and end alignment. */
82         mask = 0x1000ULL;
83         req_align = (req_end + 1) | req_start;
84         if (req_align != 0) {
85                 while ((req_align & mask) == 0)
86                         mask <<= 1;
87         }
88         mask -= 1;
89         /* We can only shrink the lock, not grow it.
90          * This should never cause lock to be smaller than requested,
91          * since requested lock was already aligned on these boundaries. */
92         new_ex->start = ((new_ex->start - 1) | mask) + 1;
93         new_ex->end = ((new_ex->end + 1) & ~mask) - 1;
94         LASSERTF(new_ex->start <= req_start,
95                  "mask "LPX64" grant start "LPU64" req start "LPU64"\n",
96                  mask, new_ex->start, req_start);
97         LASSERTF(new_ex->end >= req_end,
98                  "mask "LPX64" grant end "LPU64" req end "LPU64"\n",
99                  mask, new_ex->end, req_end);
100 }
101
102 /* The purpose of this function is to return:
103  * - the maximum extent
104  * - containing the requested extent
105  * - and not overlapping existing conflicting extents outside the requested one
106  *
107  * Use interval tree to expand the lock extent for granted lock.
108  */
109 static void ldlm_extent_internal_policy_granted(struct ldlm_lock *req,
110                                                 struct ldlm_extent *new_ex)
111 {
112         struct ldlm_resource *res = req->l_resource;
113         ldlm_mode_t req_mode = req->l_req_mode;
114         __u64 req_start = req->l_req_extent.start;
115         __u64 req_end = req->l_req_extent.end;
116         struct ldlm_interval_tree *tree;
117         struct interval_node_extent limiter = { new_ex->start, new_ex->end };
118         int conflicting = 0;
119         int idx;
120         ENTRY;
121
122         lockmode_verify(req_mode);
123
124         /* using interval tree to handle the ldlm extent granted locks */
125         for (idx = 0; idx < LCK_MODE_NUM; idx++) {
126                 struct interval_node_extent ext = { req_start, req_end };
127
128                 tree = &res->lr_itree[idx];
129                 if (lockmode_compat(tree->lit_mode, req_mode))
130                         continue;
131
132                 conflicting += tree->lit_size;
133                 if (conflicting > 4)
134                         limiter.start = req_start;
135
136                 if (interval_is_overlapped(tree->lit_root, &ext))
137                         printk("req_mode = %d, tree->lit_mode = %d, tree->lit_size = %d\n",
138                                req_mode, tree->lit_mode, tree->lit_size);
139                 interval_expand(tree->lit_root, &ext, &limiter);
140                 limiter.start = max(limiter.start, ext.start);
141                 limiter.end = min(limiter.end, ext.end);
142                 if (limiter.start == req_start && limiter.end == req_end)
143                         break;
144         }
145
146         new_ex->start = limiter.start;
147         new_ex->end = limiter.end;
148         LASSERT(new_ex->start <= req_start);
149         LASSERT(new_ex->end >= req_end);
150
151         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
152         EXIT;
153 }
154
155 /* The purpose of this function is to return:
156  * - the maximum extent
157  * - containing the requested extent
158  * - and not overlapping existing conflicting extents outside the requested one
159  */
160 static void
161 ldlm_extent_internal_policy_waiting(struct ldlm_lock *req,
162                                     struct ldlm_extent *new_ex)
163 {
164         struct list_head *tmp;
165         struct ldlm_resource *res = req->l_resource;
166         ldlm_mode_t req_mode = req->l_req_mode;
167         __u64 req_start = req->l_req_extent.start;
168         __u64 req_end = req->l_req_extent.end;
169         int conflicting = 0;
170         ENTRY;
171
172         lockmode_verify(req_mode);
173
174         /* for waiting locks */
175         list_for_each(tmp, &res->lr_waiting) {
176                 struct ldlm_lock *lock;
177                 struct ldlm_extent *l_extent;
178
179                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
180                 l_extent = &lock->l_policy_data.l_extent;
181
182                 /* We already hit the minimum requested size, search no more */
183                 if (new_ex->start == req_start && new_ex->end == req_end) {
184                         EXIT;
185                         return;
186                 }
187
188                 /* Don't conflict with ourselves */
189                 if (req == lock)
190                         continue;
191
192                 /* Locks are compatible, overlap doesn't matter */
193                 /* Until bug 20 is fixed, try to avoid granting overlapping
194                  * locks on one client (they take a long time to cancel) */
195                 if (lockmode_compat(lock->l_req_mode, req_mode) &&
196                     lock->l_export != req->l_export)
197                         continue;
198
199                 /* If this is a high-traffic lock, don't grow downwards at all
200                  * or grow upwards too much */
201                 ++conflicting;
202                 if (conflicting > 4)
203                         new_ex->start = req_start;
204
205                 /* If lock doesn't overlap new_ex, skip it. */
206                 if (!ldlm_extent_overlap(l_extent, new_ex))
207                         continue;
208
209                 /* Locks conflicting in requested extents and we can't satisfy
210                  * both locks, so ignore it.  Either we will ping-pong this
211                  * extent (we would regardless of what extent we granted) or
212                  * lock is unused and it shouldn't limit our extent growth. */
213                 if (ldlm_extent_overlap(&lock->l_req_extent,&req->l_req_extent))
214                         continue;
215
216                 /* We grow extents downwards only as far as they don't overlap
217                  * with already-granted locks, on the assumtion that clients
218                  * will be writing beyond the initial requested end and would
219                  * then need to enqueue a new lock beyond previous request.
220                  * l_req_extent->end strictly < req_start, checked above. */
221                 if (l_extent->start < req_start && new_ex->start != req_start) {
222                         if (l_extent->end >= req_start)
223                                 new_ex->start = req_start;
224                         else
225                                 new_ex->start = min(l_extent->end+1, req_start);
226                 }
227
228                 /* If we need to cancel this lock anyways because our request
229                  * overlaps the granted lock, we grow up to its requested
230                  * extent start instead of limiting this extent, assuming that
231                  * clients are writing forwards and the lock had over grown
232                  * its extent downwards before we enqueued our request. */
233                 if (l_extent->end > req_end) {
234                         if (l_extent->start <= req_end)
235                                 new_ex->end = max(lock->l_req_extent.start - 1,
236                                                   req_end);
237                         else
238                                 new_ex->end = max(l_extent->start - 1, req_end);
239                 }
240         }
241
242         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
243         EXIT;
244 }
245
246
247 /* In order to determine the largest possible extent we can grant, we need
248  * to scan all of the queues. */
249 static void ldlm_extent_policy(struct ldlm_resource *res,
250                                struct ldlm_lock *lock, int *flags)
251 {
252         struct ldlm_extent new_ex = { .start = 0, .end = OBD_OBJECT_EOF };
253
254         if (lock->l_export == NULL)
255                 /*
256                  * this is local lock taken by server (e.g., as a part of
257                  * OST-side locking, or unlink handling). Expansion doesn't
258                  * make a lot of sense for local locks, because they are
259                  * dropped immediately on operation completion and would only
260                  * conflict with other threads.
261                  */
262                 return;
263
264         if (lock->l_policy_data.l_extent.start == 0 &&
265             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
266                 /* fast-path whole file locks */
267                 return;
268
269         ldlm_extent_internal_policy_granted(lock, &new_ex);
270         ldlm_extent_internal_policy_waiting(lock, &new_ex);
271
272         if (new_ex.start != lock->l_policy_data.l_extent.start ||
273             new_ex.end != lock->l_policy_data.l_extent.end) {
274                 *flags |= LDLM_FL_LOCK_CHANGED;
275                 lock->l_policy_data.l_extent.start = new_ex.start;
276                 lock->l_policy_data.l_extent.end = new_ex.end;
277         }
278 }
279
280 static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
281 {
282         struct ldlm_resource *res = lock->l_resource;
283         cfs_time_t now = cfs_time_current();
284
285         CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
286         if (contended_locks > res->lr_namespace->ns_contended_locks)
287                 res->lr_contention_time = now;
288         return cfs_time_before(now, cfs_time_add(res->lr_contention_time,
289                 cfs_time_seconds(res->lr_namespace->ns_contention_time)));
290 }
291
292 struct ldlm_extent_compat_args {
293         struct list_head *work_list;
294         struct ldlm_lock *lock;
295         ldlm_mode_t mode;
296         int *locks;
297         int *compat;
298 };
299
300 static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
301                                                 void *data)
302 {
303         struct ldlm_extent_compat_args *priv = data;
304         struct ldlm_interval *node = to_ldlm_interval(n);
305         struct ldlm_extent *extent;
306         struct list_head *work_list = priv->work_list;
307         struct ldlm_lock *lock, *enq = priv->lock;
308         ldlm_mode_t mode = priv->mode;
309         int count = 0;
310         ENTRY;
311
312         LASSERT(!list_empty(&node->li_group));
313
314         list_for_each_entry(lock, &node->li_group, l_sl_policy) {
315                 /* interval tree is for granted lock */
316                 LASSERTF(mode == lock->l_granted_mode,
317                          "mode = %s, lock->l_granted_mode = %s\n",
318                          ldlm_lockname[mode],
319                          ldlm_lockname[lock->l_granted_mode]);
320                 count++;
321                 if (lock->l_blocking_ast)
322                         ldlm_add_ast_work_item(lock, enq, work_list);
323         }
324
325         /* don't count conflicting glimpse locks */
326         extent = ldlm_interval_extent(node);
327         if (!(mode == LCK_PR &&
328             extent->start == 0 && extent->end == OBD_OBJECT_EOF))
329                 *priv->locks += count;
330
331         if (priv->compat)
332                 *priv->compat = 0;
333
334         RETURN(INTERVAL_ITER_CONT);
335 }
336
337 /* Determine if the lock is compatible with all locks on the queue.
338  * We stop walking the queue if we hit ourselves so we don't take
339  * conflicting locks enqueued after us into accound, or we'd wait forever.
340  *
341  * 0 if the lock is not compatible
342  * 1 if the lock is compatible
343  * 2 if this group lock is compatible and requires no further checking
344  * negative error, such as EWOULDBLOCK for group locks
345  */
346 static int
347 ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
348                          int *flags, ldlm_error_t *err,
349                          struct list_head *work_list, int *contended_locks)
350 {
351         struct list_head *tmp;
352         struct ldlm_lock *lock;
353         struct ldlm_resource *res = req->l_resource;
354         ldlm_mode_t req_mode = req->l_req_mode;
355         __u64 req_start = req->l_req_extent.start;
356         __u64 req_end = req->l_req_extent.end;
357         int compat = 1;
358         int scan = 0;
359         int check_contention;
360         ENTRY;
361
362         lockmode_verify(req_mode);
363
364         /* Using interval tree for granted lock */
365         if (queue == &res->lr_granted) {
366                 struct ldlm_interval_tree *tree;
367                 struct ldlm_extent_compat_args data = {.work_list = work_list,
368                                                .lock = req,
369                                                .locks = contended_locks,
370                                                .compat = &compat };
371                 struct interval_node_extent ex = { .start = req_start,
372                                                    .end = req_end };
373                 int idx, rc;
374
375                 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
376                         tree = &res->lr_itree[idx];
377                         if (tree->lit_root == NULL) /* empty tree, skipped */
378                                 continue;
379
380                         data.mode = tree->lit_mode;
381                         if (lockmode_compat(req_mode, tree->lit_mode)) {
382                                 struct ldlm_interval *node;
383                                 struct ldlm_extent *extent;
384
385                                 if (req_mode != LCK_GROUP)
386                                         continue;
387
388                                 /* group lock, grant it immediately if
389                                  * compatible */
390                                 node = to_ldlm_interval(tree->lit_root);
391                                 extent = ldlm_interval_extent(node);
392                                 if (req->l_policy_data.l_extent.gid ==
393                                     extent->gid)
394                                         RETURN(2);
395                         }
396
397                         if (tree->lit_mode == LCK_GROUP) {
398                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
399                                         compat = -EWOULDBLOCK;
400                                         goto destroylock;
401                                 }
402
403                                 *flags |= LDLM_FL_NO_TIMEOUT;
404                                 if (!work_list)
405                                         RETURN(0);
406
407                                 /* if work list is not NULL,add all
408                                    locks in the tree to work list */
409                                 compat = 0;
410                                 interval_iterate(tree->lit_root,
411                                                  ldlm_extent_compat_cb, &data);
412                                 continue;
413                         }
414
415                         if (!work_list) {
416                                 rc = interval_is_overlapped(tree->lit_root,&ex);
417                                 if (rc)
418                                         RETURN(0);
419                         } else {
420                                 interval_search(tree->lit_root, &ex,
421                                                 ldlm_extent_compat_cb, &data);
422                                 if (!list_empty(work_list) && compat)
423                                         compat = 0;
424                         }
425                 }
426         } else { /* for waiting queue */
427                 list_for_each(tmp, queue) {
428                         check_contention = 1;
429
430                         lock = list_entry(tmp, struct ldlm_lock, l_res_link);
431
432                         if (req == lock)
433                                 break;
434
435                         if (unlikely(scan)) {
436                                 /* We only get here if we are queuing GROUP lock
437                                    and met some incompatible one. The main idea of this
438                                    code is to insert GROUP lock past compatible GROUP
439                                    lock in the waiting queue or if there is not any,
440                                    then in front of first non-GROUP lock */
441                                 if (lock->l_req_mode != LCK_GROUP) {
442                                         /* Ok, we hit non-GROUP lock, there should
443                                          * be no more GROUP locks later on, queue in
444                                          * front of first non-GROUP lock */
445
446                                         ldlm_resource_insert_lock_after(lock, req);
447                                         list_del_init(&lock->l_res_link);
448                                         ldlm_resource_insert_lock_after(req, lock);
449                                         compat = 0;
450                                         break;
451                                 }
452                                 if (req->l_policy_data.l_extent.gid ==
453                                     lock->l_policy_data.l_extent.gid) {
454                                         /* found it */
455                                         ldlm_resource_insert_lock_after(lock, req);
456                                         compat = 0;
457                                         break;
458                                 }
459                                 continue;
460                         }
461
462                         /* locks are compatible, overlap doesn't matter */
463                         if (lockmode_compat(lock->l_req_mode, req_mode)) {
464                                 if (req_mode == LCK_PR &&
465                                     ((lock->l_policy_data.l_extent.start <=
466                                       req->l_policy_data.l_extent.start) &&
467                                      (lock->l_policy_data.l_extent.end >=
468                                       req->l_policy_data.l_extent.end))) {
469                                         /* If we met a PR lock just like us or wider,
470                                            and nobody down the list conflicted with
471                                            it, that means we can skip processing of
472                                            the rest of the list and safely place
473                                            ourselves at the end of the list, or grant
474                                            (dependent if we met an conflicting locks
475                                            before in the list).
476                                            In case of 1st enqueue only we continue
477                                            traversing if there is something conflicting
478                                            down the list because we need to make sure
479                                            that something is marked as AST_SENT as well,
480                                            in cse of empy worklist we would exit on
481                                            first conflict met. */
482                                         /* There IS a case where such flag is
483                                            not set for a lock, yet it blocks
484                                            something. Luckily for us this is
485                                            only during destroy, so lock is
486                                            exclusive. So here we are safe */
487                                         if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
488                                                 RETURN(compat);
489                                         }
490                                 }
491
492                                 /* non-group locks are compatible, overlap doesn't
493                                    matter */
494                                 if (likely(req_mode != LCK_GROUP))
495                                         continue;
496
497                                 /* If we are trying to get a GROUP lock and there is
498                                    another one of this kind, we need to compare gid */
499                                 if (req->l_policy_data.l_extent.gid ==
500                                     lock->l_policy_data.l_extent.gid) {
501                                         /* If existing lock with matched gid is granted,
502                                            we grant new one too. */
503                                         if (lock->l_req_mode == lock->l_granted_mode)
504                                                 RETURN(2);
505
506                                         /* Otherwise we are scanning queue of waiting
507                                          * locks and it means current request would
508                                          * block along with existing lock (that is
509                                          * already blocked.
510                                          * If we are in nonblocking mode - return
511                                          * immediately */
512                                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
513                                                 compat = -EWOULDBLOCK;
514                                                 goto destroylock;
515                                         }
516                                         /* If this group lock is compatible with another
517                                          * group lock on the waiting list, they must be
518                                          * together in the list, so they can be granted
519                                          * at the same time.  Otherwise the later lock
520                                          * can get stuck behind another, incompatible,
521                                          * lock. */
522                                         ldlm_resource_insert_lock_after(lock, req);
523                                         /* Because 'lock' is not granted, we can stop
524                                          * processing this queue and return immediately.
525                                          * There is no need to check the rest of the
526                                          * list. */
527                                         RETURN(0);
528                                 }
529                         }
530
531                         if (unlikely(req_mode == LCK_GROUP &&
532                                      (lock->l_req_mode != lock->l_granted_mode))) {
533                                 scan = 1;
534                                 compat = 0;
535                                 if (lock->l_req_mode != LCK_GROUP) {
536                                         /* Ok, we hit non-GROUP lock, there should be no
537                                            more GROUP locks later on, queue in front of
538                                            first non-GROUP lock */
539
540                                         ldlm_resource_insert_lock_after(lock, req);
541                                         list_del_init(&lock->l_res_link);
542                                         ldlm_resource_insert_lock_after(req, lock);
543                                         break;
544                                 }
545                                 if (req->l_policy_data.l_extent.gid ==
546                                     lock->l_policy_data.l_extent.gid) {
547                                         /* found it */
548                                         ldlm_resource_insert_lock_after(lock, req);
549                                         break;
550                                 }
551                                 continue;
552                         }
553
554                         if (unlikely(lock->l_req_mode == LCK_GROUP)) {
555                                 /* If compared lock is GROUP, then requested is PR/PW/
556                                  * so this is not compatible; extent range does not
557                                  * matter */
558                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
559                                         compat = -EWOULDBLOCK;
560                                         goto destroylock;
561                                 } else {
562                                         *flags |= LDLM_FL_NO_TIMEOUT;
563                                 }
564                         } else if (lock->l_policy_data.l_extent.end < req_start ||
565                                    lock->l_policy_data.l_extent.start > req_end) {
566                                 /* if a non group lock doesn't overlap skip it */
567                                 continue;
568                         } else if (lock->l_req_extent.end < req_start ||
569                                    lock->l_req_extent.start > req_end) {
570                                 /* false contention, the requests doesn't really overlap */
571                                 check_contention = 0;
572                         }
573
574                         if (!work_list)
575                                 RETURN(0);
576
577                         /* don't count conflicting glimpse locks */
578                         if (lock->l_req_mode == LCK_PR &&
579                             lock->l_policy_data.l_extent.start == 0 &&
580                             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
581                                 check_contention = 0;
582
583                         *contended_locks += check_contention;
584
585                         compat = 0;
586                         if (lock->l_blocking_ast)
587                                 ldlm_add_ast_work_item(lock, req, work_list);
588                 }
589         }
590
591         if (ldlm_check_contention(req, *contended_locks) &&
592             compat == 0 &&
593             (*flags & LDLM_FL_DENY_ON_CONTENTION) &&
594             req->l_req_mode != LCK_GROUP &&
595             req_end - req_start <=
596             req->l_resource->lr_namespace->ns_max_nolock_size)
597                 GOTO(destroylock, compat = -EUSERS);
598
599         RETURN(compat);
600 destroylock:
601         list_del_init(&req->l_res_link);
602         ldlm_lock_destroy_nolock(req);
603         *err = compat;
604         RETURN(compat);
605 }
606
607 static void discard_bl_list(struct list_head *bl_list)
608 {
609         struct list_head *tmp, *pos;
610         ENTRY;
611
612         list_for_each_safe(pos, tmp, bl_list) {
613                 struct ldlm_lock *lock =
614                         list_entry(pos, struct ldlm_lock, l_bl_ast);
615
616                 list_del_init(&lock->l_bl_ast);
617                 LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
618                 lock->l_flags &= ~LDLM_FL_AST_SENT;
619                 LASSERT(lock->l_bl_ast_run == 0);
620                 LASSERT(lock->l_blocking_lock);
621                 LDLM_LOCK_PUT(lock->l_blocking_lock);
622                 lock->l_blocking_lock = NULL;
623                 LDLM_LOCK_PUT(lock);
624         }
625         EXIT;
626 }
627
628 /* If first_enq is 0 (ie, called from ldlm_reprocess_queue):
629   *   - blocking ASTs have already been sent
630   *   - must call this function with the ns lock held
631   *
632   * If first_enq is 1 (ie, called from ldlm_lock_enqueue):
633   *   - blocking ASTs have not been sent
634   *   - must call this function with the ns lock held once */
635 int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
636                              ldlm_error_t *err, struct list_head *work_list)
637 {
638         struct ldlm_resource *res = lock->l_resource;
639         CFS_LIST_HEAD(rpc_list);
640         int rc, rc2;
641         int contended_locks = 0;
642         ENTRY;
643
644         LASSERT(list_empty(&res->lr_converting));
645         LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
646                 !(lock->l_flags & LDLM_AST_DISCARD_DATA));
647         check_res_locked(res);
648         *err = ELDLM_OK;
649
650         if (!first_enq) {
651                 /* Careful observers will note that we don't handle -EWOULDBLOCK
652                  * here, but it's ok for a non-obvious reason -- compat_queue
653                  * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
654                  * flags should always be zero here, and if that ever stops
655                  * being true, we want to find out. */
656                 LASSERT(*flags == 0);
657                 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
658                                               err, NULL, &contended_locks);
659                 if (rc == 1) {
660                         rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
661                                                       flags, err, NULL,
662                                                       &contended_locks);
663                 }
664                 if (rc == 0)
665                         RETURN(LDLM_ITER_STOP);
666
667                 ldlm_resource_unlink_lock(lock);
668
669                 if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE))
670                         ldlm_extent_policy(res, lock, flags);
671                 ldlm_grant_lock(lock, work_list);
672                 RETURN(LDLM_ITER_CONTINUE);
673         }
674
675  restart:
676         contended_locks = 0;
677         rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
678                                       &rpc_list, &contended_locks);
679         if (rc < 0)
680                 GOTO(out, rc); /* lock was destroyed */
681         if (rc == 2)
682                 goto grant;
683
684         rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err,
685                                        &rpc_list, &contended_locks);
686         if (rc2 < 0)
687                 GOTO(out, rc = rc2); /* lock was destroyed */
688
689         if (rc + rc2 == 2) {
690         grant:
691                 ldlm_extent_policy(res, lock, flags);
692                 ldlm_resource_unlink_lock(lock);
693                 ldlm_grant_lock(lock, NULL);
694         } else {
695                 /* If either of the compat_queue()s returned failure, then we
696                  * have ASTs to send and must go onto the waiting list.
697                  *
698                  * bug 2322: we used to unlink and re-add here, which was a
699                  * terrible folly -- if we goto restart, we could get
700                  * re-ordered!  Causes deadlock, because ASTs aren't sent! */
701                 if (list_empty(&lock->l_res_link))
702                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
703                 unlock_res(res);
704                 rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_BL_AST);
705                 lock_res(res);
706
707                 if (rc == -ERESTART) {
708                         /* lock was granted while resource was unlocked. */
709                         if (lock->l_granted_mode == lock->l_req_mode) {
710                                 /* bug 11300: if the lock has been granted,
711                                  * break earlier because otherwise, we will go
712                                  * to restart and ldlm_resource_unlink will be
713                                  * called and it causes the interval node to be
714                                  * freed. Then we will fail at 
715                                  * ldlm_extent_add_lock() */
716                                 *flags &= ~(LDLM_FL_BLOCK_GRANTED | LDLM_FL_BLOCK_CONV |
717                                             LDLM_FL_BLOCK_WAIT);
718                                 GOTO(out, rc = 0);
719                         }
720
721                         GOTO(restart, -ERESTART);
722                 }
723
724                 *flags |= LDLM_FL_BLOCK_GRANTED;
725                 /* this way we force client to wait for the lock
726                  * endlessly once the lock is enqueued -bzzz */
727                 *flags |= LDLM_FL_NO_TIMEOUT;
728
729         }
730         RETURN(0);
731 out:
732         if (!list_empty(&rpc_list)) {
733                 LASSERT(!(lock->l_flags & LDLM_AST_DISCARD_DATA));
734                 discard_bl_list(&rpc_list);
735         }
736         RETURN(rc);
737 }
738
739 /* When a lock is cancelled by a client, the KMS may undergo change if this
740  * is the "highest lock".  This function returns the new KMS value.
741  * Caller must hold ns_lock already.
742  *
743  * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes! */
744 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
745 {
746         struct ldlm_resource *res = lock->l_resource;
747         struct list_head *tmp;
748         struct ldlm_lock *lck;
749         __u64 kms = 0;
750         ENTRY;
751
752         /* don't let another thread in ldlm_extent_shift_kms race in
753          * just after we finish and take our lock into account in its
754          * calculation of the kms */
755         lock->l_flags |= LDLM_FL_KMS_IGNORE;
756
757         list_for_each(tmp, &res->lr_granted) {
758                 lck = list_entry(tmp, struct ldlm_lock, l_res_link);
759
760                 if (lck->l_flags & LDLM_FL_KMS_IGNORE)
761                         continue;
762
763                 if (lck->l_policy_data.l_extent.end >= old_kms)
764                         RETURN(old_kms);
765
766                 /* This extent _has_ to be smaller than old_kms (checked above)
767                  * so kms can only ever be smaller or the same as old_kms. */
768                 if (lck->l_policy_data.l_extent.end + 1 > kms)
769                         kms = lck->l_policy_data.l_extent.end + 1;
770         }
771         LASSERTF(kms <= old_kms, "kms "LPU64" old_kms "LPU64"\n", kms, old_kms);
772
773         RETURN(kms);
774 }
775
776 cfs_mem_cache_t *ldlm_interval_slab;
777 struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock)
778 {
779         struct ldlm_interval *node;
780         ENTRY;
781
782         LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
783         OBD_SLAB_ALLOC(node, ldlm_interval_slab, CFS_ALLOC_IO, sizeof(*node));
784         if (node == NULL)
785                 RETURN(NULL);
786
787         CFS_INIT_LIST_HEAD(&node->li_group);
788         ldlm_interval_attach(node, lock);
789         RETURN(node);
790 }
791
792 void ldlm_interval_free(struct ldlm_interval *node)
793 {
794         if (node) {
795                 LASSERT(list_empty(&node->li_group));
796                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
797         }
798 }
799
800 /* interval tree, for LDLM_EXTENT. */
801 void ldlm_interval_attach(struct ldlm_interval *n,
802                           struct ldlm_lock *l)
803 {
804         LASSERT(l->l_tree_node == NULL);
805         LASSERT(l->l_resource->lr_type == LDLM_EXTENT);
806
807         list_add_tail(&l->l_sl_policy, &n->li_group);
808         l->l_tree_node = n;
809 }
810
811 struct ldlm_interval *ldlm_interval_detach(struct ldlm_lock *l)
812 {
813         struct ldlm_interval *n = l->l_tree_node;
814
815         if (n == NULL)
816                 return NULL;
817
818         LASSERT(!list_empty(&n->li_group));
819         l->l_tree_node = NULL;
820         list_del_init(&l->l_sl_policy);
821
822         return (list_empty(&n->li_group) ? n : NULL);
823 }
824
825 static inline int lock_mode_to_index(ldlm_mode_t mode)
826 {
827         int index;
828
829         LASSERT(mode != 0);
830         LASSERT(IS_PO2(mode));
831         for (index = -1; mode; index++, mode >>= 1) ;
832         LASSERT(index < LCK_MODE_NUM);
833         return index;
834 }
835
836 void ldlm_extent_add_lock(struct ldlm_resource *res,
837                           struct ldlm_lock *lock)
838 {
839         struct interval_node *found, **root;
840         struct ldlm_interval *node;
841         struct ldlm_extent *extent;
842         int idx;
843
844         LASSERT(lock->l_granted_mode == lock->l_req_mode);
845
846         node = lock->l_tree_node;
847         LASSERT(node != NULL);
848
849         idx = lock_mode_to_index(lock->l_granted_mode);
850         LASSERT(lock->l_granted_mode == 1 << idx);
851         LASSERT(lock->l_granted_mode == res->lr_itree[idx].lit_mode);
852
853         /* node extent initialize */
854         extent = &lock->l_policy_data.l_extent;
855         interval_set(&node->li_node, extent->start, extent->end);
856
857         root = &res->lr_itree[idx].lit_root;
858         found = interval_insert(&node->li_node, root);
859         if (found) { /* The policy group found. */
860                 struct ldlm_interval *tmp = ldlm_interval_detach(lock);
861                 LASSERT(tmp != NULL);
862                 ldlm_interval_free(tmp);
863                 ldlm_interval_attach(to_ldlm_interval(found), lock);
864         }
865         res->lr_itree[idx].lit_size++;
866
867         /* even though we use interval tree to manage the extent lock, we also
868          * add the locks into grant list, for debug purpose, .. */
869         ldlm_resource_add_lock(res, &res->lr_granted, lock);
870 }
871
872 void ldlm_extent_unlink_lock(struct ldlm_lock *lock)
873 {
874         struct ldlm_resource *res = lock->l_resource;
875         struct ldlm_interval *node;
876         struct ldlm_interval_tree *tree;
877         int idx;
878
879         if (lock->l_granted_mode != lock->l_req_mode)
880                 return;
881
882         LASSERT(lock->l_tree_node != NULL);
883         idx = lock_mode_to_index(lock->l_granted_mode);
884         LASSERT(lock->l_granted_mode == 1 << idx);
885         tree = &res->lr_itree[idx];
886
887         LASSERT(tree->lit_root != NULL); /* assure the tree is not null */
888
889         tree->lit_size--;
890         node = ldlm_interval_detach(lock);
891         if (node) {
892                 interval_erase(&node->li_node, &tree->lit_root);
893                 ldlm_interval_free(node);
894         }
895 }