Whamcloud - gitweb
Branch b1_8
[fs/lustre-release.git] / lustre / ldlm / ldlm_extent.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_extent.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LDLM
43 #ifndef __KERNEL__
44 # include <liblustre.h>
45 #endif
46
47 #include <lustre_dlm.h>
48 #include <obd_support.h>
49 #include <obd.h>
50 #include <obd_class.h>
51 #include <lustre_lib.h>
52
53 #include "ldlm_internal.h"
54
55 #define LDLM_MAX_GROWN_EXTENT (32 * 1024 * 1024 - 1)
56
57 /* fixup the ldlm_extent after expanding */
58 static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
59                                               struct ldlm_extent *new_ex,
60                                               int conflicting)
61 {
62         ldlm_mode_t req_mode = req->l_req_mode;
63         __u64 req_start = req->l_req_extent.start;
64         __u64 req_end = req->l_req_extent.end;
65         __u64 req_align, mask;
66
67         if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) {
68                 if (req_end < req_start + LDLM_MAX_GROWN_EXTENT)
69                         new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT,
70                                           new_ex->end);
71         }
72
73         if (new_ex->start == 0 && new_ex->end == OBD_OBJECT_EOF) {
74                 EXIT;
75                 return;
76         }
77
78         /* we need to ensure that the lock extent is properly aligned to what
79          * the client requested.  We align it to the lowest-common denominator
80          * of the clients requested lock start and end alignment. */
81         mask = 0x1000ULL;
82         req_align = (req_end + 1) | req_start;
83         if (req_align != 0) {
84                 while ((req_align & mask) == 0)
85                         mask <<= 1;
86         }
87         mask -= 1;
88         /* We can only shrink the lock, not grow it.
89          * This should never cause lock to be smaller than requested,
90          * since requested lock was already aligned on these boundaries. */
91         new_ex->start = ((new_ex->start - 1) | mask) + 1;
92         new_ex->end = ((new_ex->end + 1) & ~mask) - 1;
93         LASSERTF(new_ex->start <= req_start,
94                  "mask "LPX64" grant start "LPU64" req start "LPU64"\n",
95                  mask, new_ex->start, req_start);
96         LASSERTF(new_ex->end >= req_end,
97                  "mask "LPX64" grant end "LPU64" req end "LPU64"\n",
98                  mask, new_ex->end, req_end);
99 }
100
101 static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
102 {
103         struct ldlm_resource *res = lock->l_resource;
104         cfs_time_t now = cfs_time_current();
105
106         CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
107         if (contended_locks > res->lr_namespace->ns_contended_locks)
108                 res->lr_contention_time = now;
109         return cfs_time_before(now, cfs_time_add(res->lr_contention_time,
110                 cfs_time_seconds(res->lr_namespace->ns_contention_time)));
111 }
112
113 struct ldlm_extent_compat_args {
114         struct list_head *work_list;
115         struct ldlm_lock *lock;
116         ldlm_mode_t mode;
117         int *locks;
118         int *compat;
119         int *conflicts;
120 };
121
122 static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
123                                                 void *data)
124 {
125         struct ldlm_extent_compat_args *priv = data;
126         struct ldlm_interval *node = to_ldlm_interval(n);
127         struct ldlm_extent *extent;
128         struct list_head *work_list = priv->work_list;
129         struct ldlm_lock *lock, *enq = priv->lock;
130         ldlm_mode_t mode = priv->mode;
131         int count = 0;
132         ENTRY;
133
134         LASSERT(!list_empty(&node->li_group));
135
136         list_for_each_entry(lock, &node->li_group, l_sl_policy) {
137                 /* interval tree is for granted lock */
138                 LASSERTF(mode == lock->l_granted_mode,
139                          "mode = %s, lock->l_granted_mode = %s\n",
140                          ldlm_lockname[mode],
141                          ldlm_lockname[lock->l_granted_mode]);
142
143                 count++;
144                 /* only count _requested_ region overlapped locks as contended
145                  * locks */
146                 if (lock->l_req_extent.end >= enq->l_req_extent.start &&
147                     lock->l_req_extent.start <= enq->l_req_extent.end)
148                         (*priv->conflicts)++;
149                 if (lock->l_blocking_ast)
150                         ldlm_add_ast_work_item(lock, enq, work_list);
151         }
152         LASSERT(count > 0);
153
154         /* don't count conflicting glimpse locks */
155         extent = ldlm_interval_extent(node);
156         if (!(mode == LCK_PR &&
157             extent->start == 0 && extent->end == OBD_OBJECT_EOF))
158                 *priv->locks += count;
159
160         if (priv->compat)
161                 *priv->compat = 0;
162
163         RETURN(INTERVAL_ITER_CONT);
164 }
165
166 static int
167 ldlm_extent_compat_granted_queue(struct list_head *queue, struct ldlm_lock *req,
168                                  int *flags, ldlm_error_t *err,
169                                  struct list_head *work_list,
170                                  int *contended_locks)
171 {
172         struct ldlm_resource *res = req->l_resource;
173         ldlm_mode_t req_mode = req->l_req_mode;
174         __u64 req_start = req->l_req_extent.start;
175         __u64 req_end = req->l_req_extent.end;
176         int compat = 1, conflicts;
177         /* Using interval tree for granted lock */
178         struct ldlm_interval_tree *tree;
179         struct ldlm_extent_compat_args data = {.work_list = work_list,
180                                        .lock = req,
181                                        .locks = contended_locks,
182                                        .compat = &compat,
183                                        .conflicts = &conflicts };
184         struct interval_node_extent ex = { .start = req_start,
185                                            .end = req_end };
186         int idx, rc;
187         ENTRY;
188
189
190         for (idx = 0; idx < LCK_MODE_NUM; idx++) {
191                 conflicts = 0;
192                 tree = &res->lr_itree[idx];
193                 if (tree->lit_root == NULL) /* empty tree, skipped */
194                         continue;
195
196                 data.mode = tree->lit_mode;
197                 if (lockmode_compat(req_mode, tree->lit_mode)) {
198                         struct ldlm_interval *node;
199                         struct ldlm_extent *extent;
200
201                         if (req_mode != LCK_GROUP)
202                                 continue;
203
204                         /* group lock, grant it immediately if
205                          * compatible */
206                         node = to_ldlm_interval(tree->lit_root);
207                         extent = ldlm_interval_extent(node);
208                         if (req->l_policy_data.l_extent.gid ==
209                             extent->gid)
210                                 RETURN(2);
211                 }
212
213                 if (tree->lit_mode == LCK_GROUP) {
214                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
215                                 compat = -EWOULDBLOCK;
216                                 goto destroylock;
217                         }
218
219                         *flags |= LDLM_FL_NO_TIMEOUT;
220                         if (!work_list)
221                                 RETURN(0);
222
223                         /* if work list is not NULL,add all
224                            locks in the tree to work list */
225                         compat = 0;
226                         interval_iterate(tree->lit_root,
227                                          ldlm_extent_compat_cb, &data);
228                         continue;
229                 }
230
231
232                 if (!work_list) {
233                         rc = interval_is_overlapped(tree->lit_root, &ex);
234                         if (rc)
235                                 RETURN(0);
236                 } else {
237                         struct interval_node_extent result_ext = {
238                                 .start = req->l_policy_data.l_extent.start,
239                                 .end = req->l_policy_data.l_extent.end };
240
241                         interval_search_expand_extent(tree->lit_root, &ex,
242                                                       &result_ext,
243                                                       ldlm_extent_compat_cb,
244                                                       &data);
245                         req->l_policy_data.l_extent.start = result_ext.start;
246                         req->l_policy_data.l_extent.end = result_ext.end;
247                         /* for granted locks, count non-compatible not overlapping
248                          * locks in traffic index */
249                         req->l_traffic += tree->lit_size - conflicts;
250
251                         if (!list_empty(work_list)) {
252                                 compat = 0;
253                                /* if there is at least 1 conflicting lock, we
254                                 * do not expand to the left, since we often
255                                 * continue writing to the right.
256                                 */
257                                req->l_policy_data.l_extent.start = req_start;
258                         }
259                 }
260         }
261
262         RETURN(compat);
263 destroylock:
264         list_del_init(&req->l_res_link);
265         ldlm_lock_destroy_nolock(req);
266         *err = compat;
267         RETURN(compat);
268 }
269
270 static int
271 ldlm_extent_compat_waiting_queue(struct list_head *queue, struct ldlm_lock *req,
272                                  int *flags, ldlm_error_t *err,
273                                  struct list_head *work_list,
274                                  int *contended_locks)
275 {
276         struct list_head *tmp;
277         struct ldlm_lock *lock;
278         ldlm_mode_t req_mode = req->l_req_mode;
279         __u64 req_start = req->l_req_extent.start;
280         __u64 req_end = req->l_req_extent.end;
281         int compat = 1;
282         int scan = 0;
283         int check_contention;
284         ENTRY;
285
286         list_for_each(tmp, queue) {
287                 check_contention = 1;
288
289                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
290
291                 if (req == lock)
292                         break;
293
294                 if (unlikely(scan)) {
295                         /* We only get here if we are queuing GROUP lock
296                            and met some incompatible one. The main idea of this
297                            code is to insert GROUP lock past compatible GROUP
298                            lock in the waiting queue or if there is not any,
299                            then in front of first non-GROUP lock */
300                         if (lock->l_req_mode != LCK_GROUP) {
301                                 /* Ok, we hit non-GROUP lock, there should be no
302                                    more GROUP locks later on, queue in front of
303                                    first non-GROUP lock */
304
305                                 ldlm_resource_insert_lock_after(lock, req);
306                                 list_del_init(&lock->l_res_link);
307                                 ldlm_resource_insert_lock_after(req, lock);
308                                 compat = 0;
309                                 break;
310                         }
311                         if (req->l_policy_data.l_extent.gid ==
312                             lock->l_policy_data.l_extent.gid) {
313                                 /* found it */
314                                 ldlm_resource_insert_lock_after(lock, req);
315                                 compat = 0;
316                                 break;
317                         }
318                         continue;
319                 }
320
321                 /* locks are compatible, overlap doesn't matter */
322                 if (lockmode_compat(lock->l_req_mode, req_mode)) {
323                         if (req_mode == LCK_PR &&
324                             ((lock->l_policy_data.l_extent.start <=
325                               req->l_policy_data.l_extent.start) &&
326                              (lock->l_policy_data.l_extent.end >=
327                               req->l_policy_data.l_extent.end))) {
328                                 /* If we met a PR lock just like us or wider,
329                                    and nobody down the list conflicted with
330                                    it, that means we can skip processing of
331                                    the rest of the list and safely place
332                                    ourselves at the end of the list, or grant
333                                    (dependent if we met an conflicting locks
334                                    before in the list).
335                                    In case of 1st enqueue only we continue
336                                    traversing if there is something conflicting
337                                    down the list because we need to make sure
338                                    that something is marked as AST_SENT as well,
339                                    in cse of empy worklist we would exit on
340                                    first conflict met. */
341                                 /* There IS a case where such flag is
342                                    not set for a lock, yet it blocks
343                                    something. Luckily for us this is
344                                    only during destroy, so lock is
345                                    exclusive. So here we are safe */
346                                 if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
347                                         RETURN(compat);
348                                 }
349                         }
350
351                         /* non-group locks are compatible, overlap doesn't
352                            matter */
353                         if (likely(req_mode != LCK_GROUP))
354                                 continue;
355
356                         /* If we are trying to get a GROUP lock and there is
357                            another one of this kind, we need to compare gid */
358                         if (req->l_policy_data.l_extent.gid ==
359                             lock->l_policy_data.l_extent.gid) {
360                                 /* We are scanning queue of waiting
361                                  * locks and it means current request would
362                                  * block along with existing lock (that is
363                                  * already blocked.
364                                  * If we are in nonblocking mode - return
365                                  * immediately */
366                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
367                                         compat = -EWOULDBLOCK;
368                                         goto destroylock;
369                                 }
370                                 /* If this group lock is compatible with another
371                                  * group lock on the waiting list, they must be
372                                  * together in the list, so they can be granted
373                                  * at the same time.  Otherwise the later lock
374                                  * can get stuck behind another, incompatible,
375                                  * lock. */
376                                 ldlm_resource_insert_lock_after(lock, req);
377                                 /* Because 'lock' is not granted, we can stop
378                                  * processing this queue and return immediately.
379                                  * There is no need to check the rest of the
380                                  * list. */
381                                 RETURN(0);
382                         }
383                 }
384
385                 if (unlikely(req_mode == LCK_GROUP &&
386                              (lock->l_req_mode != lock->l_granted_mode))) {
387                         scan = 1;
388                         compat = 0;
389                         if (lock->l_req_mode != LCK_GROUP) {
390                                 /* Ok, we hit non-GROUP lock, there should
391                                  * be no more GROUP locks later on, queue in
392                                  * front of first non-GROUP lock */
393
394                                 ldlm_resource_insert_lock_after(lock, req);
395                                 list_del_init(&lock->l_res_link);
396                                 ldlm_resource_insert_lock_after(req, lock);
397                                 break;
398                         }
399                         if (req->l_policy_data.l_extent.gid ==
400                             lock->l_policy_data.l_extent.gid) {
401                                 /* found it */
402                                 ldlm_resource_insert_lock_after(lock, req);
403                                 break;
404                         }
405                         continue;
406                 }
407
408                 if (unlikely(lock->l_req_mode == LCK_GROUP)) {
409                         /* If compared lock is GROUP, then requested is PR/PW/
410                          * so this is not compatible; extent range does not
411                          * matter */
412                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
413                                 compat = -EWOULDBLOCK;
414                                 goto destroylock;
415                         } else {
416                                 *flags |= LDLM_FL_NO_TIMEOUT;
417                         }
418                 } else if (!work_list) {
419                         if (lock->l_policy_data.l_extent.end < req_start ||
420                             lock->l_policy_data.l_extent.start > req_end)
421                                 /* if a non group lock doesn't overlap skip it */
422                                 continue;
423                         RETURN(0);
424                 } else {
425                         /* for waiting locks, count all non-compatible locks in
426                          * traffic index */
427                         ++req->l_traffic;
428                         ++lock->l_traffic;
429
430                         /* adjust policy */
431                         if (lock->l_policy_data.l_extent.end < req_start) {
432                                 /*     lock            req
433                                  * ------------+
434                                  * ++++++      |   +++++++
435                                  *      +      |   +
436                                  * ++++++      |   +++++++
437                                  * ------------+
438                                  */
439                                 if (lock->l_policy_data.l_extent.end >
440                                     req->l_policy_data.l_extent.start)
441                                         req->l_policy_data.l_extent.start =
442                                              lock->l_policy_data.l_extent.end+1;
443                                 continue;
444                         } else if (lock->l_req_extent.end < req_start) {
445                                 /*     lock            req
446                                  * ------------------+
447                                  * ++++++          +++++++
448                                  *      +          + |
449                                  * ++++++          +++++++
450                                  * ------------------+
451                                  */
452                                 lock->l_policy_data.l_extent.end =
453                                                           req_start - 1;
454                                 req->l_policy_data.l_extent.start =
455                                                               req_start;
456                                 continue;
457                         } else if (lock->l_policy_data.l_extent.start >
458                                    req_end) {
459                                 /*  req              lock
460                                  *              +--------------
461                                  *  +++++++     |    +++++++
462                                  *        +     |    +
463                                  *  +++++++     |    +++++++
464                                  *              +--------------
465                                  */
466                                 if (lock->l_policy_data.l_extent.start <
467                                      req->l_policy_data.l_extent.end)
468                                         req->l_policy_data.l_extent.end =
469                                            lock->l_policy_data.l_extent.start-1;
470                                 continue;
471                         } else if (lock->l_req_extent.start > req_end) {
472                                 /*  req              lock
473                                  *      +----------------------
474                                  *  +++++++          +++++++
475                                  *      | +          +
476                                  *  +++++++          +++++++
477                                  *      +----------------------
478                                  */
479                                 lock->l_policy_data.l_extent.start =
480                                                             req_end + 1;
481                                 req->l_policy_data.l_extent.end=req_end;
482                                 continue;
483                         }
484                 } /* policy_adj */
485
486                 compat = 0;
487                 if (work_list) {
488                         /* don't count conflicting glimpse locks */
489                         if (lock->l_flags & LDLM_FL_HAS_INTENT)
490                                 check_contention = 0;
491
492                         *contended_locks += check_contention;
493
494                         if (lock->l_blocking_ast)
495                                 ldlm_add_ast_work_item(lock, req, work_list);
496                 }
497         }
498
499         RETURN(compat);
500 destroylock:
501         list_del_init(&req->l_res_link);
502         ldlm_lock_destroy_nolock(req);
503         *err = compat;
504         RETURN(compat);
505 }
506 /* Determine if the lock is compatible with all locks on the queue.
507  * We stop walking the queue if we hit ourselves so we don't take
508  * conflicting locks enqueued after us into accound, or we'd wait forever.
509  *
510  * 0 if the lock is not compatible
511  * 1 if the lock is compatible
512  * 2 if this group lock is compatible and requires no further checking
513  * negative error, such as EWOULDBLOCK for group locks
514  *
515  * Note: policy adjustment only happends during the 1st lock enqueue procedure
516  */
517 static int
518 ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
519                          int *flags, ldlm_error_t *err,
520                          struct list_head *work_list, int *contended_locks)
521 {
522         struct ldlm_resource *res = req->l_resource;
523         ldlm_mode_t req_mode = req->l_req_mode;
524         __u64 req_start = req->l_req_extent.start;
525         __u64 req_end = req->l_req_extent.end;
526         int compat = 1;
527         ENTRY;
528
529         lockmode_verify(req_mode);
530
531         if (queue == &res->lr_granted)
532                 compat = ldlm_extent_compat_granted_queue(queue, req, flags,
533                                                           err, work_list,
534                                                           contended_locks);
535         else
536                 compat = ldlm_extent_compat_waiting_queue(queue, req, flags,
537                                                           err, work_list,
538                                                           contended_locks);
539         if (ldlm_check_contention(req, *contended_locks) &&
540             compat == 0 &&
541             (*flags & LDLM_FL_DENY_ON_CONTENTION) &&
542             req->l_req_mode != LCK_GROUP &&
543             req_end - req_start <=
544             req->l_resource->lr_namespace->ns_max_nolock_size)
545                 GOTO(destroylock, compat = -EUSERS);
546
547         RETURN(compat);
548 destroylock:
549         list_del_init(&req->l_res_link);
550         ldlm_lock_destroy_nolock(req);
551         *err = compat;
552         RETURN(compat);
553 }
554
555 static void discard_bl_list(struct list_head *bl_list)
556 {
557         struct list_head *tmp, *pos;
558         ENTRY;
559
560         list_for_each_safe(pos, tmp, bl_list) {
561                 struct ldlm_lock *lock =
562                         list_entry(pos, struct ldlm_lock, l_bl_ast);
563
564                 list_del_init(&lock->l_bl_ast);
565                 LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
566                 lock->l_flags &= ~LDLM_FL_AST_SENT;
567                 LASSERT(lock->l_bl_ast_run == 0);
568                 LASSERT(lock->l_blocking_lock);
569                 LDLM_LOCK_PUT(lock->l_blocking_lock);
570                 lock->l_blocking_lock = NULL;
571                 LDLM_LOCK_PUT(lock);
572         }
573         EXIT;
574 }
575
576 static inline void ldlm_process_extent_init(struct ldlm_lock *lock)
577 {
578         lock->l_policy_data.l_extent.start = 0;
579         lock->l_policy_data.l_extent.end = OBD_OBJECT_EOF;
580 }
581
582 static inline void ldlm_process_extent_fini(struct ldlm_lock *lock, int *flags)
583 {
584         if (lock->l_traffic > 4)
585                 lock->l_policy_data.l_extent.start = lock->l_req_extent.start;
586         ldlm_extent_internal_policy_fixup(lock,
587                                           &lock->l_policy_data.l_extent,
588                                           lock->l_traffic);
589         if (lock->l_req_extent.start != lock->l_policy_data.l_extent.start ||
590             lock->l_req_extent.end   != lock->l_policy_data.l_extent.end)
591                 *flags |= LDLM_FL_LOCK_CHANGED;
592 }
593
594 /* If first_enq is 0 (ie, called from ldlm_reprocess_queue):
595   *   - blocking ASTs have already been sent
596   *   - must call this function with the ns lock held
597   *
598   * If first_enq is 1 (ie, called from ldlm_lock_enqueue):
599   *   - blocking ASTs have not been sent
600   *   - must call this function with the ns lock held once */
601 int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
602                              ldlm_error_t *err, struct list_head *work_list)
603 {
604         struct ldlm_resource *res = lock->l_resource;
605         struct list_head rpc_list = CFS_LIST_HEAD_INIT(rpc_list);
606         int rc, rc2;
607         int contended_locks = 0;
608         ENTRY;
609
610         LASSERT(list_empty(&res->lr_converting));
611         LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
612                 !(lock->l_flags & LDLM_AST_DISCARD_DATA));
613         check_res_locked(res);
614         *err = ELDLM_OK;
615
616         if (!first_enq) {
617                 /* Careful observers will note that we don't handle -EWOULDBLOCK
618                  * here, but it's ok for a non-obvious reason -- compat_queue
619                  * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
620                  * flags should always be zero here, and if that ever stops
621                  * being true, we want to find out. */
622                 LASSERT(*flags == 0);
623                 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
624                                               err, NULL, &contended_locks);
625                 if (rc == 1) {
626                         rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
627                                                       flags, err, NULL,
628                                                       &contended_locks);
629                 }
630                 if (rc == 0)
631                         RETURN(LDLM_ITER_STOP);
632
633                 ldlm_resource_unlink_lock(lock);
634
635                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE)) {
636                         lock->l_policy_data.l_extent.start =
637                                 lock->l_req_extent.start;
638                         lock->l_policy_data.l_extent.end =
639                                 lock->l_req_extent.end;
640                 } else {
641                         ldlm_process_extent_fini(lock, flags);
642                 }
643
644                 ldlm_grant_lock(lock, work_list);
645                 RETURN(LDLM_ITER_CONTINUE);
646         }
647
648  restart:
649         contended_locks = 0;
650
651         ldlm_process_extent_init(lock);
652
653         rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
654                                       &rpc_list, &contended_locks);
655         if (rc < 0)
656                 GOTO(out, rc); /* lock was destroyed */
657         if (rc == 2)
658                 goto grant;
659
660         rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err,
661                                        &rpc_list, &contended_locks);
662         if (rc2 < 0)
663                 GOTO(out, rc = rc2); /* lock was destroyed */
664
665         if (rc + rc2 == 2) {
666         grant:
667                 ldlm_resource_unlink_lock(lock);
668                 ldlm_process_extent_fini(lock, flags);
669                 ldlm_grant_lock(lock, NULL);
670         } else {
671                 /* If either of the compat_queue()s returned failure, then we
672                  * have ASTs to send and must go onto the waiting list.
673                  *
674                  * bug 2322: we used to unlink and re-add here, which was a
675                  * terrible folly -- if we goto restart, we could get
676                  * re-ordered!  Causes deadlock, because ASTs aren't sent! */
677                 if (list_empty(&lock->l_res_link))
678                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
679                 unlock_res(res);
680
681                 rc = ldlm_run_bl_ast_work(&rpc_list);
682
683                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_FAIL_RACE) &&
684                     !ns_is_client(res->lr_namespace))
685                         class_fail_export(lock->l_export);
686
687                 lock_res(res);
688                 if (rc == -ERESTART) {
689                         /* 15715: The lock was granted and destroyed after
690                          * resource lock was dropped. Interval node was freed
691                          * in ldlm_lock_destroy. Anyway, this always happens
692                          * when a client is being evicted. So it would be
693                          * ok to return an error. -jay */
694                         if (lock->l_destroyed) {
695                                 *err = -EAGAIN;
696                                 GOTO(out, rc = -EAGAIN);
697                         }
698
699                         /* lock was granted while resource was unlocked. */
700                         if (lock->l_granted_mode == lock->l_req_mode) {
701                                 /* bug 11300: if the lock has been granted,
702                                  * break earlier because otherwise, we will go
703                                  * to restart and ldlm_resource_unlink will be
704                                  * called and it causes the interval node to be
705                                  * freed. Then we will fail at
706                                  * ldlm_extent_add_lock() */
707                                 *flags &= ~(LDLM_FL_BLOCK_GRANTED | LDLM_FL_BLOCK_CONV |
708                                             LDLM_FL_BLOCK_WAIT);
709                                 GOTO(out, rc = 0);
710                         }
711
712                         GOTO(restart, -ERESTART);
713                 }
714
715                 *flags |= LDLM_FL_BLOCK_GRANTED;
716                 /* this way we force client to wait for the lock
717                  * endlessly once the lock is enqueued -bzzz */
718                 *flags |= LDLM_FL_NO_TIMEOUT;
719
720         }
721         RETURN(0);
722 out:
723         if (!list_empty(&rpc_list)) {
724                 LASSERT(!(lock->l_flags & LDLM_AST_DISCARD_DATA));
725                 discard_bl_list(&rpc_list);
726         }
727         RETURN(rc);
728 }
729
730 /* When a lock is cancelled by a client, the KMS may undergo change if this
731  * is the "highest lock".  This function returns the new KMS value.
732  * Caller must hold ns_lock already.
733  *
734  * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes! */
735 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
736 {
737         struct ldlm_resource *res = lock->l_resource;
738         struct list_head *tmp;
739         struct ldlm_lock *lck;
740         __u64 kms = 0;
741         ENTRY;
742
743         /* don't let another thread in ldlm_extent_shift_kms race in
744          * just after we finish and take our lock into account in its
745          * calculation of the kms */
746         lock->l_flags |= LDLM_FL_KMS_IGNORE;
747
748         list_for_each(tmp, &res->lr_granted) {
749                 lck = list_entry(tmp, struct ldlm_lock, l_res_link);
750
751                 if (lck->l_flags & LDLM_FL_KMS_IGNORE)
752                         continue;
753
754                 if (lck->l_policy_data.l_extent.end >= old_kms)
755                         RETURN(old_kms);
756
757                 /* This extent _has_ to be smaller than old_kms (checked above)
758                  * so kms can only ever be smaller or the same as old_kms. */
759                 if (lck->l_policy_data.l_extent.end + 1 > kms)
760                         kms = lck->l_policy_data.l_extent.end + 1;
761         }
762         LASSERTF(kms <= old_kms, "kms "LPU64" old_kms "LPU64"\n", kms, old_kms);
763
764         RETURN(kms);
765 }
766
767 cfs_mem_cache_t *ldlm_interval_slab;
768 struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock)
769 {
770         struct ldlm_interval *node;
771         ENTRY;
772
773         LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
774         OBD_SLAB_ALLOC(node, ldlm_interval_slab, CFS_ALLOC_IO, sizeof(*node));
775         if (node == NULL)
776                 RETURN(NULL);
777
778         CFS_INIT_LIST_HEAD(&node->li_group);
779         ldlm_interval_attach(node, lock);
780         RETURN(node);
781 }
782
783 void ldlm_interval_free(struct ldlm_interval *node)
784 {
785         if (node) {
786                 LASSERT(list_empty(&node->li_group));
787                 LASSERT(!interval_is_intree(&node->li_node));
788                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
789         }
790 }
791
792 /* interval tree, for LDLM_EXTENT. */
793 void ldlm_interval_attach(struct ldlm_interval *n,
794                           struct ldlm_lock *l)
795 {
796         LASSERT(l->l_tree_node == NULL);
797         LASSERT(l->l_resource->lr_type == LDLM_EXTENT);
798
799         list_add_tail(&l->l_sl_policy, &n->li_group);
800         l->l_tree_node = n;
801 }
802
803 struct ldlm_interval *ldlm_interval_detach(struct ldlm_lock *l)
804 {
805         struct ldlm_interval *n = l->l_tree_node;
806
807         if (n == NULL)
808                 return NULL;
809
810         LASSERT(!list_empty(&n->li_group));
811         l->l_tree_node = NULL;
812         list_del_init(&l->l_sl_policy);
813
814         return (list_empty(&n->li_group) ? n : NULL);
815 }
816
817 static inline int lock_mode_to_index(ldlm_mode_t mode)
818 {
819         int index;
820
821         LASSERT(mode != 0);
822         LASSERT(IS_PO2(mode));
823         for (index = -1; mode; index++, mode >>= 1) ;
824         LASSERT(index < LCK_MODE_NUM);
825         return index;
826 }
827
828 void ldlm_extent_add_lock(struct ldlm_resource *res,
829                           struct ldlm_lock *lock)
830 {
831         struct interval_node *found, **root;
832         struct ldlm_interval *node;
833         struct ldlm_extent *extent;
834         int idx;
835
836         LASSERT(lock->l_granted_mode == lock->l_req_mode);
837         LASSERT(!lock->l_destroyed);
838
839         node = lock->l_tree_node;
840         LASSERT(node != NULL);
841         LASSERT(!interval_is_intree(&node->li_node));
842
843         idx = lock_mode_to_index(lock->l_granted_mode);
844         LASSERT(lock->l_granted_mode == 1 << idx);
845         LASSERT(lock->l_granted_mode == res->lr_itree[idx].lit_mode);
846
847         /* node extent initialize */
848         extent = &lock->l_policy_data.l_extent;
849         interval_set(&node->li_node, extent->start, extent->end);
850
851         root = &res->lr_itree[idx].lit_root;
852         found = interval_insert(&node->li_node, root);
853         if (found) { /* The policy group found. */
854                 struct ldlm_interval *tmp = ldlm_interval_detach(lock);
855                 LASSERT(tmp != NULL);
856                 ldlm_interval_free(tmp);
857                 ldlm_interval_attach(to_ldlm_interval(found), lock);
858         }
859         res->lr_itree[idx].lit_size++;
860
861         /* even though we use interval tree to manage the extent lock, we also
862          * add the locks into grant list, for debug purpose, .. */
863         ldlm_resource_add_lock(res, &res->lr_granted, lock);
864 }
865
866 void ldlm_extent_unlink_lock(struct ldlm_lock *lock)
867 {
868         struct ldlm_resource *res = lock->l_resource;
869         struct ldlm_interval *node = lock->l_tree_node;
870         struct ldlm_interval_tree *tree;
871         int idx;
872
873         if (!node || !interval_is_intree(&node->li_node)) /* duplicate unlink */
874                 return;
875
876         idx = lock_mode_to_index(lock->l_granted_mode);
877         LASSERT(lock->l_granted_mode == 1 << idx);
878         tree = &res->lr_itree[idx];
879
880         LASSERT(tree->lit_root != NULL); /* assure the tree is not null */
881
882         tree->lit_size--;
883         node = ldlm_interval_detach(lock);
884         if (node) {
885                 interval_erase(&node->li_node, &tree->lit_root);
886                 ldlm_interval_free(node);
887         }
888 }