Whamcloud - gitweb
b=19325 adjust waiting extent locks during 1st enqueue
[fs/lustre-release.git] / lustre / ldlm / ldlm_extent.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_extent.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LDLM
43 #ifndef __KERNEL__
44 # include <liblustre.h>
45 #else
46 # include <libcfs/libcfs.h>
47 #endif
48
49 #include <lustre_dlm.h>
50 #include <obd_support.h>
51 #include <obd.h>
52 #include <obd_class.h>
53 #include <lustre_lib.h>
54
55 #include "ldlm_internal.h"
56
57 #define LDLM_MAX_GROWN_EXTENT (32 * 1024 * 1024 - 1)
58
59 /* fixup the ldlm_extent after expanding */
60 static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
61                                               struct ldlm_extent *new_ex,
62                                               int conflicting)
63 {
64         ldlm_mode_t req_mode = req->l_req_mode;
65         __u64 req_start = req->l_req_extent.start;
66         __u64 req_end = req->l_req_extent.end;
67         __u64 req_align, mask;
68
69         if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) {
70                 if (req_end < req_start + LDLM_MAX_GROWN_EXTENT)
71                         new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT,
72                                           new_ex->end);
73         }
74
75         if (new_ex->start == 0 && new_ex->end == OBD_OBJECT_EOF) {
76                 EXIT;
77                 return;
78         }
79
80         /* we need to ensure that the lock extent is properly aligned to what
81          * the client requested.  We align it to the lowest-common denominator
82          * of the clients requested lock start and end alignment. */
83         mask = 0x1000ULL;
84         req_align = (req_end + 1) | req_start;
85         if (req_align != 0) {
86                 while ((req_align & mask) == 0)
87                         mask <<= 1;
88         }
89         mask -= 1;
90         /* We can only shrink the lock, not grow it.
91          * This should never cause lock to be smaller than requested,
92          * since requested lock was already aligned on these boundaries. */
93         new_ex->start = ((new_ex->start - 1) | mask) + 1;
94         new_ex->end = ((new_ex->end + 1) & ~mask) - 1;
95         LASSERTF(new_ex->start <= req_start,
96                  "mask "LPX64" grant start "LPU64" req start "LPU64"\n",
97                  mask, new_ex->start, req_start);
98         LASSERTF(new_ex->end >= req_end,
99                  "mask "LPX64" grant end "LPU64" req end "LPU64"\n",
100                  mask, new_ex->end, req_end);
101 }
102
103
104 static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
105 {
106         struct ldlm_resource *res = lock->l_resource;
107         cfs_time_t now = cfs_time_current();
108
109         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_SET_CONTENTION))
110                 return 1;
111
112         CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
113         if (contended_locks > ldlm_res_to_ns(res)->ns_contended_locks)
114                 res->lr_contention_time = now;
115         return cfs_time_before(now, cfs_time_add(res->lr_contention_time,
116                 cfs_time_seconds(ldlm_res_to_ns(res)->ns_contention_time)));
117 }
118
119 struct ldlm_extent_compat_args {
120         cfs_list_t *work_list;
121         struct ldlm_lock *lock;
122         ldlm_mode_t mode;
123         int *locks;
124         int *compat;
125         int *conflicts;
126 };
127
128 static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
129                                                 void *data)
130 {
131         struct ldlm_extent_compat_args *priv = data;
132         struct ldlm_interval *node = to_ldlm_interval(n);
133         struct ldlm_extent *extent;
134         cfs_list_t *work_list = priv->work_list;
135         struct ldlm_lock *lock, *enq = priv->lock;
136         ldlm_mode_t mode = priv->mode;
137         int count = 0;
138         ENTRY;
139
140         LASSERT(!cfs_list_empty(&node->li_group));
141
142         cfs_list_for_each_entry(lock, &node->li_group, l_sl_policy) {
143                 /* interval tree is for granted lock */
144                 LASSERTF(mode == lock->l_granted_mode,
145                          "mode = %s, lock->l_granted_mode = %s\n",
146                          ldlm_lockname[mode],
147                          ldlm_lockname[lock->l_granted_mode]);
148
149                 /* only count _requested_ region overlapped locks as contended
150                  * locks */
151                 if (lock->l_req_extent.end >= enq->l_req_extent.start &&
152                     lock->l_req_extent.start <= enq->l_req_extent.end) {
153                         count++;
154                         (*priv->conflicts)++;
155                 }
156                 if (lock->l_blocking_ast)
157                         ldlm_add_ast_work_item(lock, enq, work_list);
158         }
159
160         /* don't count conflicting glimpse locks */
161         extent = ldlm_interval_extent(node);
162         if (!(mode == LCK_PR &&
163             extent->start == 0 && extent->end == OBD_OBJECT_EOF))
164                 *priv->locks += count;
165
166         if (priv->compat)
167                 *priv->compat = 0;
168
169         RETURN(INTERVAL_ITER_CONT);
170 }
171
172 static int
173 ldlm_extent_compat_granted_queue(cfs_list_t *queue, struct ldlm_lock *req,
174                                  int *flags, ldlm_error_t *err,
175                                  cfs_list_t *work_list, int *contended_locks)
176 {
177         struct ldlm_resource *res = req->l_resource;
178         ldlm_mode_t req_mode = req->l_req_mode;
179         __u64 req_start = req->l_req_extent.start;
180         __u64 req_end = req->l_req_extent.end;
181         int compat = 1, conflicts;
182         /* Using interval tree for granted lock */
183         struct ldlm_interval_tree *tree;
184         struct ldlm_extent_compat_args data = {.work_list = work_list,
185                                        .lock = req,
186                                        .locks = contended_locks,
187                                        .compat = &compat,
188                                        .conflicts = &conflicts };
189         struct interval_node_extent ex = { .start = req_start,
190                                            .end = req_end };
191         int idx, rc;
192         ENTRY;
193
194
195         for (idx = 0; idx < LCK_MODE_NUM; idx++) {
196                 tree = &res->lr_itree[idx];
197                 if (tree->lit_root == NULL) /* empty tree, skipped */
198                         continue;
199
200                 data.mode = tree->lit_mode;
201                 if (lockmode_compat(req_mode, tree->lit_mode)) {
202                         struct ldlm_interval *node;
203                         struct ldlm_extent *extent;
204
205                         if (req_mode != LCK_GROUP)
206                                 continue;
207
208                         /* group lock, grant it immediately if
209                          * compatible */
210                         node = to_ldlm_interval(tree->lit_root);
211                         extent = ldlm_interval_extent(node);
212                         if (req->l_policy_data.l_extent.gid ==
213                             extent->gid)
214                                 RETURN(2);
215                 }
216
217                 if (tree->lit_mode == LCK_GROUP) {
218                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
219                                 compat = -EWOULDBLOCK;
220                                 goto destroylock;
221                         }
222
223                         *flags |= LDLM_FL_NO_TIMEOUT;
224                         if (!work_list)
225                                 RETURN(0);
226
227                         /* if work list is not NULL,add all
228                            locks in the tree to work list */
229                         compat = 0;
230                         interval_iterate(tree->lit_root,
231                                          ldlm_extent_compat_cb, &data);
232                         continue;
233                 }
234
235
236                 if (!work_list) {
237                         rc = interval_is_overlapped(tree->lit_root, &ex);
238                         if (rc)
239                                 RETURN(0);
240                 } else {
241                         struct interval_node_extent result_ext = {
242                                 .start = req->l_policy_data.l_extent.start,
243                                 .end = req->l_policy_data.l_extent.end };
244
245                         conflicts = 0;
246                         interval_search_expand_extent(tree->lit_root, &ex,
247                                                       &result_ext,
248                                                       ldlm_extent_compat_cb,
249                                                       &data);
250                         req->l_policy_data.l_extent.start = result_ext.start;
251                         req->l_policy_data.l_extent.end = result_ext.end;
252                         /* for granted locks, count non-compatible not overlapping
253                          * locks in traffic index */
254                         req->l_traffic += tree->lit_size - conflicts;
255
256                         if (!cfs_list_empty(work_list)) {
257                                 if (compat)
258                                         compat = 0;
259                                 /* if there is at least 1 conflicting lock, we
260                                  * do not expand to the left, since we often
261                                  * continue writing to the right.
262                                  */
263                                 req->l_policy_data.l_extent.start = req_start;
264                         }
265                 }
266         }
267
268         RETURN(compat);
269 destroylock:
270         cfs_list_del_init(&req->l_res_link);
271         ldlm_lock_destroy_nolock(req);
272         *err = compat;
273         RETURN(compat);
274 }
275
276 static int
277 ldlm_extent_compat_waiting_queue(cfs_list_t *queue, struct ldlm_lock *req,
278                                  int *flags, ldlm_error_t *err,
279                                  cfs_list_t *work_list, int *contended_locks)
280 {
281         cfs_list_t *tmp;
282         struct ldlm_lock *lock;
283         ldlm_mode_t req_mode = req->l_req_mode;
284         __u64 req_start = req->l_req_extent.start;
285         __u64 req_end = req->l_req_extent.end;
286         int compat = 1;
287         int scan = 0;
288         int check_contention;
289         ENTRY;
290
291         cfs_list_for_each(tmp, queue) {
292                 check_contention = 1;
293
294                 lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
295
296                 if (req == lock)
297                         break;
298
299                 if (unlikely(scan)) {
300                         /* We only get here if we are queuing GROUP lock
301                            and met some incompatible one. The main idea of this
302                            code is to insert GROUP lock past compatible GROUP
303                            lock in the waiting queue or if there is not any,
304                            then in front of first non-GROUP lock */
305                         if (lock->l_req_mode != LCK_GROUP) {
306                                 /* Ok, we hit non-GROUP lock, there should be no
307                                    more GROUP locks later on, queue in front of
308                                    first non-GROUP lock */
309
310                                 ldlm_resource_insert_lock_after(lock, req);
311                                 cfs_list_del_init(&lock->l_res_link);
312                                 ldlm_resource_insert_lock_after(req, lock);
313                                 compat = 0;
314                                 break;
315                         }
316                         if (req->l_policy_data.l_extent.gid ==
317                             lock->l_policy_data.l_extent.gid) {
318                                 /* found it */
319                                 ldlm_resource_insert_lock_after(lock, req);
320                                 compat = 0;
321                                 break;
322                         }
323                         continue;
324                 }
325
326                 /* locks are compatible, overlap doesn't matter */
327                 if (lockmode_compat(lock->l_req_mode, req_mode)) {
328                         if (req_mode == LCK_PR &&
329                             ((lock->l_policy_data.l_extent.start <=
330                               req->l_policy_data.l_extent.start) &&
331                              (lock->l_policy_data.l_extent.end >=
332                               req->l_policy_data.l_extent.end))) {
333                                 /* If we met a PR lock just like us or wider,
334                                    and nobody down the list conflicted with
335                                    it, that means we can skip processing of
336                                    the rest of the list and safely place
337                                    ourselves at the end of the list, or grant
338                                    (dependent if we met an conflicting locks
339                                    before in the list).
340                                    In case of 1st enqueue only we continue
341                                    traversing if there is something conflicting
342                                    down the list because we need to make sure
343                                    that something is marked as AST_SENT as well,
344                                    in cse of empy worklist we would exit on
345                                    first conflict met. */
346                                 /* There IS a case where such flag is
347                                    not set for a lock, yet it blocks
348                                    something. Luckily for us this is
349                                    only during destroy, so lock is
350                                    exclusive. So here we are safe */
351                                 if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
352                                         RETURN(compat);
353                                 }
354                         }
355
356                         /* non-group locks are compatible, overlap doesn't
357                            matter */
358                         if (likely(req_mode != LCK_GROUP))
359                                 continue;
360
361                         /* If we are trying to get a GROUP lock and there is
362                            another one of this kind, we need to compare gid */
363                         if (req->l_policy_data.l_extent.gid ==
364                             lock->l_policy_data.l_extent.gid) {
365                                 /* We are scanning queue of waiting
366                                  * locks and it means current request would
367                                  * block along with existing lock (that is
368                                  * already blocked.
369                                  * If we are in nonblocking mode - return
370                                  * immediately */
371                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
372                                         compat = -EWOULDBLOCK;
373                                         goto destroylock;
374                                 }
375                                 /* If this group lock is compatible with another
376                                  * group lock on the waiting list, they must be
377                                  * together in the list, so they can be granted
378                                  * at the same time.  Otherwise the later lock
379                                  * can get stuck behind another, incompatible,
380                                  * lock. */
381                                 ldlm_resource_insert_lock_after(lock, req);
382                                 /* Because 'lock' is not granted, we can stop
383                                  * processing this queue and return immediately.
384                                  * There is no need to check the rest of the
385                                  * list. */
386                                 RETURN(0);
387                         }
388                 }
389
390                 if (unlikely(req_mode == LCK_GROUP &&
391                              (lock->l_req_mode != lock->l_granted_mode))) {
392                         scan = 1;
393                         compat = 0;
394                         if (lock->l_req_mode != LCK_GROUP) {
395                                 /* Ok, we hit non-GROUP lock, there should
396                                  * be no more GROUP locks later on, queue in
397                                  * front of first non-GROUP lock */
398
399                                 ldlm_resource_insert_lock_after(lock, req);
400                                 cfs_list_del_init(&lock->l_res_link);
401                                 ldlm_resource_insert_lock_after(req, lock);
402                                 break;
403                         }
404                         if (req->l_policy_data.l_extent.gid ==
405                             lock->l_policy_data.l_extent.gid) {
406                                 /* found it */
407                                 ldlm_resource_insert_lock_after(lock, req);
408                                 break;
409                         }
410                         continue;
411                 }
412
413                 if (unlikely(lock->l_req_mode == LCK_GROUP)) {
414                         /* If compared lock is GROUP, then requested is PR/PW/
415                          * so this is not compatible; extent range does not
416                          * matter */
417                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
418                                 compat = -EWOULDBLOCK;
419                                 goto destroylock;
420                         } else {
421                                 *flags |= LDLM_FL_NO_TIMEOUT;
422                         }
423                 } else if (!work_list) {
424                         if (lock->l_policy_data.l_extent.end < req_start ||
425                             lock->l_policy_data.l_extent.start > req_end)
426                                 /* if a non group lock doesn't overlap skip it */
427                                 continue;
428                         RETURN(0);
429                 } else {
430                         /* for waiting locks, count all non-compatible locks in
431                          * traffic index */
432                         ++req->l_traffic;
433                         ++lock->l_traffic;
434
435                         /* adjust policy */
436                         if (lock->l_policy_data.l_extent.end < req_start) {
437                                 /*     lock            req
438                                  * ------------+
439                                  * ++++++      |   +++++++
440                                  *      +      |   +
441                                  * ++++++      |   +++++++
442                                  * ------------+
443                                  */
444                                 if (lock->l_policy_data.l_extent.end >
445                                     req->l_policy_data.l_extent.start)
446                                         req->l_policy_data.l_extent.start =
447                                              lock->l_policy_data.l_extent.end+1;
448                                 continue;
449                         } else if (lock->l_req_extent.end < req_start) {
450                                 /*     lock            req
451                                  * ------------------+
452                                  * ++++++          +++++++
453                                  *      +          + |
454                                  * ++++++          +++++++
455                                  * ------------------+
456                                  */
457                                 lock->l_policy_data.l_extent.end =
458                                                           req_start - 1;
459                                 req->l_policy_data.l_extent.start =
460                                                               req_start;
461                                 continue;
462                         } else if (lock->l_policy_data.l_extent.start >
463                                    req_end) {
464                                 /*  req              lock
465                                  *              +--------------
466                                  *  +++++++     |    +++++++
467                                  *        +     |    +
468                                  *  +++++++     |    +++++++
469                                  *              +--------------
470                                  */
471                                 if (lock->l_policy_data.l_extent.start <
472                                     req->l_policy_data.l_extent.end)
473                                         req->l_policy_data.l_extent.end =
474                                            lock->l_policy_data.l_extent.start-1;
475                                 continue;
476                         } else if (lock->l_req_extent.start > req_end) {
477                                 /*  req              lock
478                                  *      +----------------------
479                                  *  +++++++          +++++++
480                                  *      | +          +
481                                  *  +++++++          +++++++
482                                  *      +----------------------
483                                  */
484                                 lock->l_policy_data.l_extent.start =
485                                                             req_end + 1;
486                                 req->l_policy_data.l_extent.end=req_end;
487                                 continue;
488                         }
489                 } /* policy_adj */
490
491                 compat = 0;
492                 if (work_list) {
493                         /* don't count conflicting glimpse locks */
494                         if (lock->l_flags & LDLM_FL_HAS_INTENT)
495                                 check_contention = 0;
496
497                         *contended_locks += check_contention;
498
499                         if (lock->l_blocking_ast)
500                                 ldlm_add_ast_work_item(lock, req, work_list);
501                 }
502         }
503
504         RETURN(compat);
505 destroylock:
506         cfs_list_del_init(&req->l_res_link);
507         ldlm_lock_destroy_nolock(req);
508         *err = compat;
509         RETURN(compat);
510 }
511 /* Determine if the lock is compatible with all locks on the queue.
512  * We stop walking the queue if we hit ourselves so we don't take
513  * conflicting locks enqueued after us into accound, or we'd wait forever.
514  *
515  * 0 if the lock is not compatible
516  * 1 if the lock is compatible
517  * 2 if this group lock is compatible and requires no further checking
518  * negative error, such as EWOULDBLOCK for group locks
519  *
520  * Note: policy adjustment only happends during the 1st lock enqueue procedure
521  */
522 static int
523 ldlm_extent_compat_queue(cfs_list_t *queue, struct ldlm_lock *req,
524                          int *flags, ldlm_error_t *err,
525                          cfs_list_t *work_list, int *contended_locks)
526 {
527         struct ldlm_resource *res = req->l_resource;
528         ldlm_mode_t req_mode = req->l_req_mode;
529         __u64 req_start = req->l_req_extent.start;
530         __u64 req_end = req->l_req_extent.end;
531         int compat = 1;
532         ENTRY;
533
534         lockmode_verify(req_mode);
535
536         if (queue == &res->lr_granted)
537                 compat = ldlm_extent_compat_granted_queue(queue, req, flags,
538                                                           err, work_list,
539                                                           contended_locks);
540         else
541                 compat = ldlm_extent_compat_waiting_queue(queue, req, flags,
542                                                           err, work_list,
543                                                           contended_locks);
544
545
546         if (ldlm_check_contention(req, *contended_locks) &&
547             compat == 0 &&
548             (*flags & LDLM_FL_DENY_ON_CONTENTION) &&
549             req->l_req_mode != LCK_GROUP &&
550             req_end - req_start <=
551             ldlm_res_to_ns(req->l_resource)->ns_max_nolock_size)
552                 GOTO(destroylock, compat = -EUSERS);
553
554         RETURN(compat);
555 destroylock:
556         cfs_list_del_init(&req->l_res_link);
557         ldlm_lock_destroy_nolock(req);
558         *err = compat;
559         RETURN(compat);
560 }
561
562 static void discard_bl_list(cfs_list_t *bl_list)
563 {
564         cfs_list_t *tmp, *pos;
565         ENTRY;
566
567         cfs_list_for_each_safe(pos, tmp, bl_list) {
568                 struct ldlm_lock *lock =
569                         cfs_list_entry(pos, struct ldlm_lock, l_bl_ast);
570
571                 cfs_list_del_init(&lock->l_bl_ast);
572                 LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
573                 lock->l_flags &= ~LDLM_FL_AST_SENT;
574                 LASSERT(lock->l_bl_ast_run == 0);
575                 LASSERT(lock->l_blocking_lock);
576                 LDLM_LOCK_RELEASE(lock->l_blocking_lock);
577                 lock->l_blocking_lock = NULL;
578                 LDLM_LOCK_RELEASE(lock);
579         }
580         EXIT;
581 }
582
583 static inline void ldlm_process_extent_init(struct ldlm_lock *lock)
584 {
585         lock->l_policy_data.l_extent.start = 0;
586         lock->l_policy_data.l_extent.end = OBD_OBJECT_EOF;
587 }
588
589 static inline void ldlm_process_extent_fini(struct ldlm_lock *lock, int *flags)
590 {
591         if (lock->l_traffic > 4)
592                 lock->l_policy_data.l_extent.start = lock->l_req_extent.start;
593         ldlm_extent_internal_policy_fixup(lock,
594                                           &lock->l_policy_data.l_extent,
595                                           lock->l_traffic);
596         if (lock->l_req_extent.start != lock->l_policy_data.l_extent.start ||
597             lock->l_req_extent.end   != lock->l_policy_data.l_extent.end)
598                 *flags |= LDLM_FL_LOCK_CHANGED;
599 }
600
601 /* If first_enq is 0 (ie, called from ldlm_reprocess_queue):
602   *   - blocking ASTs have already been sent
603   *   - must call this function with the ns lock held
604   *
605   * If first_enq is 1 (ie, called from ldlm_lock_enqueue):
606   *   - blocking ASTs have not been sent
607   *   - must call this function with the ns lock held once */
608 int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
609                              ldlm_error_t *err, cfs_list_t *work_list)
610 {
611         struct ldlm_resource *res = lock->l_resource;
612         CFS_LIST_HEAD(rpc_list);
613         int rc, rc2;
614         int contended_locks = 0;
615         ENTRY;
616
617         LASSERT(cfs_list_empty(&res->lr_converting));
618         LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
619                 !(lock->l_flags & LDLM_AST_DISCARD_DATA));
620         check_res_locked(res);
621         *err = ELDLM_OK;
622
623         if (!first_enq) {
624                 /* Careful observers will note that we don't handle -EWOULDBLOCK
625                  * here, but it's ok for a non-obvious reason -- compat_queue
626                  * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
627                  * flags should always be zero here, and if that ever stops
628                  * being true, we want to find out. */
629                 LASSERT(*flags == 0);
630                 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
631                                               err, NULL, &contended_locks);
632                 if (rc == 1) {
633                         rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
634                                                       flags, err, NULL,
635                                                       &contended_locks);
636                 }
637                 if (rc == 0)
638                         RETURN(LDLM_ITER_STOP);
639
640                 ldlm_resource_unlink_lock(lock);
641
642                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE)) {
643                         lock->l_policy_data.l_extent.start =
644                                 lock->l_req_extent.start;
645                         lock->l_policy_data.l_extent.end =
646                                 lock->l_req_extent.end;
647                 } else {
648                         ldlm_process_extent_fini(lock, flags);
649                 }
650
651                 ldlm_grant_lock(lock, work_list);
652                 RETURN(LDLM_ITER_CONTINUE);
653         }
654
655  restart:
656         contended_locks = 0;
657
658         ldlm_process_extent_init(lock);
659
660         rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
661                                       &rpc_list, &contended_locks);
662         if (rc < 0)
663                 GOTO(out, rc); /* lock was destroyed */
664         if (rc == 2)
665                 goto grant;
666
667         rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err,
668                                        &rpc_list, &contended_locks);
669         if (rc2 < 0)
670                 GOTO(out, rc = rc2); /* lock was destroyed */
671
672         if (rc + rc2 == 2) {
673         grant:
674                 ldlm_resource_unlink_lock(lock);
675                 ldlm_process_extent_fini(lock, flags);
676                 ldlm_grant_lock(lock, NULL);
677         } else {
678                 /* If either of the compat_queue()s returned failure, then we
679                  * have ASTs to send and must go onto the waiting list.
680                  *
681                  * bug 2322: we used to unlink and re-add here, which was a
682                  * terrible folly -- if we goto restart, we could get
683                  * re-ordered!  Causes deadlock, because ASTs aren't sent! */
684                 if (cfs_list_empty(&lock->l_res_link))
685                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
686                 unlock_res(res);
687                 rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_BL_AST);
688
689                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_FAIL_RACE) &&
690                     !ns_is_client(ldlm_res_to_ns(res)))
691                         class_fail_export(lock->l_export);
692  
693                 lock_res(res);
694                 if (rc == -ERESTART) {
695
696                         /* 15715: The lock was granted and destroyed after
697                          * resource lock was dropped. Interval node was freed
698                          * in ldlm_lock_destroy. Anyway, this always happens
699                          * when a client is being evicted. So it would be
700                          * ok to return an error. -jay */
701                         if (lock->l_destroyed) {
702                                 *err = -EAGAIN;
703                                 GOTO(out, rc = -EAGAIN);
704                         }
705
706                         /* lock was granted while resource was unlocked. */
707                         if (lock->l_granted_mode == lock->l_req_mode) {
708                                 /* bug 11300: if the lock has been granted,
709                                  * break earlier because otherwise, we will go
710                                  * to restart and ldlm_resource_unlink will be
711                                  * called and it causes the interval node to be
712                                  * freed. Then we will fail at
713                                  * ldlm_extent_add_lock() */
714                                 *flags &= ~(LDLM_FL_BLOCK_GRANTED | LDLM_FL_BLOCK_CONV |
715                                             LDLM_FL_BLOCK_WAIT);
716                                 GOTO(out, rc = 0);
717                         }
718
719                         GOTO(restart, -ERESTART);
720                 }
721
722                 *flags |= LDLM_FL_BLOCK_GRANTED;
723                 /* this way we force client to wait for the lock
724                  * endlessly once the lock is enqueued -bzzz */
725                 *flags |= LDLM_FL_NO_TIMEOUT;
726
727         }
728         RETURN(0);
729 out:
730         if (!cfs_list_empty(&rpc_list)) {
731                 LASSERT(!(lock->l_flags & LDLM_AST_DISCARD_DATA));
732                 discard_bl_list(&rpc_list);
733         }
734         RETURN(rc);
735 }
736
737 /* When a lock is cancelled by a client, the KMS may undergo change if this
738  * is the "highest lock".  This function returns the new KMS value.
739  * Caller must hold ns_lock already.
740  *
741  * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes! */
742 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
743 {
744         struct ldlm_resource *res = lock->l_resource;
745         cfs_list_t *tmp;
746         struct ldlm_lock *lck;
747         __u64 kms = 0;
748         ENTRY;
749
750         /* don't let another thread in ldlm_extent_shift_kms race in
751          * just after we finish and take our lock into account in its
752          * calculation of the kms */
753         lock->l_flags |= LDLM_FL_KMS_IGNORE;
754
755         cfs_list_for_each(tmp, &res->lr_granted) {
756                 lck = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
757
758                 if (lck->l_flags & LDLM_FL_KMS_IGNORE)
759                         continue;
760
761                 if (lck->l_policy_data.l_extent.end >= old_kms)
762                         RETURN(old_kms);
763
764                 /* This extent _has_ to be smaller than old_kms (checked above)
765                  * so kms can only ever be smaller or the same as old_kms. */
766                 if (lck->l_policy_data.l_extent.end + 1 > kms)
767                         kms = lck->l_policy_data.l_extent.end + 1;
768         }
769         LASSERTF(kms <= old_kms, "kms "LPU64" old_kms "LPU64"\n", kms, old_kms);
770
771         RETURN(kms);
772 }
773
774 cfs_mem_cache_t *ldlm_interval_slab;
775 struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock)
776 {
777         struct ldlm_interval *node;
778         ENTRY;
779
780         LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
781         OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, CFS_ALLOC_IO);
782         if (node == NULL)
783                 RETURN(NULL);
784
785         CFS_INIT_LIST_HEAD(&node->li_group);
786         ldlm_interval_attach(node, lock);
787         RETURN(node);
788 }
789
790 void ldlm_interval_free(struct ldlm_interval *node)
791 {
792         if (node) {
793                 LASSERT(cfs_list_empty(&node->li_group));
794                 LASSERT(!interval_is_intree(&node->li_node));
795                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
796         }
797 }
798
799 /* interval tree, for LDLM_EXTENT. */
800 void ldlm_interval_attach(struct ldlm_interval *n,
801                           struct ldlm_lock *l)
802 {
803         LASSERT(l->l_tree_node == NULL);
804         LASSERT(l->l_resource->lr_type == LDLM_EXTENT);
805
806         cfs_list_add_tail(&l->l_sl_policy, &n->li_group);
807         l->l_tree_node = n;
808 }
809
810 struct ldlm_interval *ldlm_interval_detach(struct ldlm_lock *l)
811 {
812         struct ldlm_interval *n = l->l_tree_node;
813
814         if (n == NULL)
815                 return NULL;
816
817         LASSERT(!cfs_list_empty(&n->li_group));
818         l->l_tree_node = NULL;
819         cfs_list_del_init(&l->l_sl_policy);
820
821         return (cfs_list_empty(&n->li_group) ? n : NULL);
822 }
823
824 static inline int lock_mode_to_index(ldlm_mode_t mode)
825 {
826         int index;
827
828         LASSERT(mode != 0);
829         LASSERT(IS_PO2(mode));
830         for (index = -1; mode; index++, mode >>= 1) ;
831         LASSERT(index < LCK_MODE_NUM);
832         return index;
833 }
834
835 void ldlm_extent_add_lock(struct ldlm_resource *res,
836                           struct ldlm_lock *lock)
837 {
838         struct interval_node *found, **root;
839         struct ldlm_interval *node;
840         struct ldlm_extent *extent;
841         int idx;
842
843         LASSERT(lock->l_granted_mode == lock->l_req_mode);
844
845         node = lock->l_tree_node;
846         LASSERT(node != NULL);
847         LASSERT(!interval_is_intree(&node->li_node));
848
849         idx = lock_mode_to_index(lock->l_granted_mode);
850         LASSERT(lock->l_granted_mode == 1 << idx);
851         LASSERT(lock->l_granted_mode == res->lr_itree[idx].lit_mode);
852
853         /* node extent initialize */
854         extent = &lock->l_policy_data.l_extent;
855         interval_set(&node->li_node, extent->start, extent->end);
856
857         root = &res->lr_itree[idx].lit_root;
858         found = interval_insert(&node->li_node, root);
859         if (found) { /* The policy group found. */
860                 struct ldlm_interval *tmp = ldlm_interval_detach(lock);
861                 LASSERT(tmp != NULL);
862                 ldlm_interval_free(tmp);
863                 ldlm_interval_attach(to_ldlm_interval(found), lock);
864         }
865         res->lr_itree[idx].lit_size++;
866
867         /* even though we use interval tree to manage the extent lock, we also
868          * add the locks into grant list, for debug purpose, .. */
869         ldlm_resource_add_lock(res, &res->lr_granted, lock);
870 }
871
872 void ldlm_extent_unlink_lock(struct ldlm_lock *lock)
873 {
874         struct ldlm_resource *res = lock->l_resource;
875         struct ldlm_interval *node = lock->l_tree_node;
876         struct ldlm_interval_tree *tree;
877         int idx;
878
879         if (!node || !interval_is_intree(&node->li_node)) /* duplicate unlink */
880                 return;
881
882         idx = lock_mode_to_index(lock->l_granted_mode);
883         LASSERT(lock->l_granted_mode == 1 << idx);
884         tree = &res->lr_itree[idx];
885
886         LASSERT(tree->lit_root != NULL); /* assure the tree is not null */
887
888         tree->lit_size--;
889         node = ldlm_interval_detach(lock);
890         if (node) {
891                 interval_erase(&node->li_node, &tree->lit_root);
892                 ldlm_interval_free(node);
893         }
894 }