Whamcloud - gitweb
LU-3409 llite: silence lockdep warning in ll_md_blocking_ast
[fs/lustre-release.git] / lustre / ldlm / ldlm_extent.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2010, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_extent.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 /**
43  * This file contains implementation of EXTENT lock type
44  *
45  * EXTENT lock type is for locking a contiguous range of values, represented
46  * by 64-bit starting and ending offsets (inclusive). There are several extent
47  * lock modes, some of which may be mutually incompatible. Extent locks are
48  * considered incompatible if their modes are incompatible and their extents
49  * intersect.  See the lock mode compatibility matrix in lustre_dlm.h.
50  */
51
52 #define DEBUG_SUBSYSTEM S_LDLM
53 #ifndef __KERNEL__
54 # include <liblustre.h>
55 #else
56 # include <libcfs/libcfs.h>
57 #endif
58
59 #include <lustre_dlm.h>
60 #include <obd_support.h>
61 #include <obd.h>
62 #include <obd_class.h>
63 #include <lustre_lib.h>
64
65 #include "ldlm_internal.h"
66
67 #ifdef HAVE_SERVER_SUPPORT
68 # define LDLM_MAX_GROWN_EXTENT (32 * 1024 * 1024 - 1)
69
70 /**
71  * Fix up the ldlm_extent after expanding it.
72  *
73  * After expansion has been done, we might still want to do certain adjusting
74  * based on overall contention of the resource and the like to avoid granting
75  * overly wide locks.
76  */
77 static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
78                                               struct ldlm_extent *new_ex,
79                                               int conflicting)
80 {
81         ldlm_mode_t req_mode = req->l_req_mode;
82         __u64 req_start = req->l_req_extent.start;
83         __u64 req_end = req->l_req_extent.end;
84         __u64 req_align, mask;
85
86         if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) {
87                 if (req_end < req_start + LDLM_MAX_GROWN_EXTENT)
88                         new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT,
89                                           new_ex->end);
90         }
91
92         if (new_ex->start == 0 && new_ex->end == OBD_OBJECT_EOF) {
93                 EXIT;
94                 return;
95         }
96
97         /* we need to ensure that the lock extent is properly aligned to what
98          * the client requested. Also we need to make sure it's also server
99          * page size aligned otherwise a server page can be covered by two
100          * write locks. */
101         mask = CFS_PAGE_SIZE;
102         req_align = (req_end + 1) | req_start;
103         if (req_align != 0 && (req_align & (mask - 1)) == 0) {
104                 while ((req_align & mask) == 0)
105                         mask <<= 1;
106         }
107         mask -= 1;
108         /* We can only shrink the lock, not grow it.
109          * This should never cause lock to be smaller than requested,
110          * since requested lock was already aligned on these boundaries. */
111         new_ex->start = ((new_ex->start - 1) | mask) + 1;
112         new_ex->end = ((new_ex->end + 1) & ~mask) - 1;
113         LASSERTF(new_ex->start <= req_start,
114                  "mask "LPX64" grant start "LPU64" req start "LPU64"\n",
115                  mask, new_ex->start, req_start);
116         LASSERTF(new_ex->end >= req_end,
117                  "mask "LPX64" grant end "LPU64" req end "LPU64"\n",
118                  mask, new_ex->end, req_end);
119 }
120
121 /**
122  * Return the maximum extent that:
123  * - contains the requested extent
124  * - does not overlap existing conflicting extents outside the requested one
125  *
126  * This allows clients to request a small required extent range, but if there
127  * is no contention on the lock the full lock can be granted to the client.
128  * This avoids the need for many smaller lock requests to be granted in the
129  * common (uncontended) case.
130  *
131  * Use interval tree to expand the lock extent for granted lock.
132  */
133 static void ldlm_extent_internal_policy_granted(struct ldlm_lock *req,
134                                                 struct ldlm_extent *new_ex)
135 {
136         struct ldlm_resource *res = req->l_resource;
137         ldlm_mode_t req_mode = req->l_req_mode;
138         __u64 req_start = req->l_req_extent.start;
139         __u64 req_end = req->l_req_extent.end;
140         struct ldlm_interval_tree *tree;
141         struct interval_node_extent limiter = { new_ex->start, new_ex->end };
142         int conflicting = 0;
143         int idx;
144         ENTRY;
145
146         lockmode_verify(req_mode);
147
148         /* Using interval tree to handle the LDLM extent granted locks. */
149         for (idx = 0; idx < LCK_MODE_NUM; idx++) {
150                 struct interval_node_extent ext = { req_start, req_end };
151
152                 tree = &res->lr_itree[idx];
153                 if (lockmode_compat(tree->lit_mode, req_mode))
154                         continue;
155
156                 conflicting += tree->lit_size;
157                 if (conflicting > 4)
158                         limiter.start = req_start;
159
160                 if (interval_is_overlapped(tree->lit_root, &ext))
161                         CDEBUG(D_INFO, 
162                                "req_mode = %d, tree->lit_mode = %d, "
163                                "tree->lit_size = %d\n",
164                                req_mode, tree->lit_mode, tree->lit_size);
165                 interval_expand(tree->lit_root, &ext, &limiter);
166                 limiter.start = max(limiter.start, ext.start);
167                 limiter.end = min(limiter.end, ext.end);
168                 if (limiter.start == req_start && limiter.end == req_end)
169                         break;
170         }
171
172         new_ex->start = limiter.start;
173         new_ex->end = limiter.end;
174         LASSERT(new_ex->start <= req_start);
175         LASSERT(new_ex->end >= req_end);
176
177         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
178         EXIT;
179 }
180
181 /* The purpose of this function is to return:
182  * - the maximum extent
183  * - containing the requested extent
184  * - and not overlapping existing conflicting extents outside the requested one
185  */
186 static void
187 ldlm_extent_internal_policy_waiting(struct ldlm_lock *req,
188                                     struct ldlm_extent *new_ex)
189 {
190         cfs_list_t *tmp;
191         struct ldlm_resource *res = req->l_resource;
192         ldlm_mode_t req_mode = req->l_req_mode;
193         __u64 req_start = req->l_req_extent.start;
194         __u64 req_end = req->l_req_extent.end;
195         int conflicting = 0;
196         ENTRY;
197
198         lockmode_verify(req_mode);
199
200         /* for waiting locks */
201         cfs_list_for_each(tmp, &res->lr_waiting) {
202                 struct ldlm_lock *lock;
203                 struct ldlm_extent *l_extent;
204
205                 lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
206                 l_extent = &lock->l_policy_data.l_extent;
207
208                 /* We already hit the minimum requested size, search no more */
209                 if (new_ex->start == req_start && new_ex->end == req_end) {
210                         EXIT;
211                         return;
212                 }
213
214                 /* Don't conflict with ourselves */
215                 if (req == lock)
216                         continue;
217
218                 /* Locks are compatible, overlap doesn't matter */
219                 /* Until bug 20 is fixed, try to avoid granting overlapping
220                  * locks on one client (they take a long time to cancel) */
221                 if (lockmode_compat(lock->l_req_mode, req_mode) &&
222                     lock->l_export != req->l_export)
223                         continue;
224
225                 /* If this is a high-traffic lock, don't grow downwards at all
226                  * or grow upwards too much */
227                 ++conflicting;
228                 if (conflicting > 4)
229                         new_ex->start = req_start;
230
231                 /* If lock doesn't overlap new_ex, skip it. */
232                 if (!ldlm_extent_overlap(l_extent, new_ex))
233                         continue;
234
235                 /* Locks conflicting in requested extents and we can't satisfy
236                  * both locks, so ignore it.  Either we will ping-pong this
237                  * extent (we would regardless of what extent we granted) or
238                  * lock is unused and it shouldn't limit our extent growth. */
239                 if (ldlm_extent_overlap(&lock->l_req_extent,&req->l_req_extent))
240                         continue;
241
242                 /* We grow extents downwards only as far as they don't overlap
243                  * with already-granted locks, on the assumption that clients
244                  * will be writing beyond the initial requested end and would
245                  * then need to enqueue a new lock beyond previous request.
246                  * l_req_extent->end strictly < req_start, checked above. */
247                 if (l_extent->start < req_start && new_ex->start != req_start) {
248                         if (l_extent->end >= req_start)
249                                 new_ex->start = req_start;
250                         else
251                                 new_ex->start = min(l_extent->end+1, req_start);
252                 }
253
254                 /* If we need to cancel this lock anyways because our request
255                  * overlaps the granted lock, we grow up to its requested
256                  * extent start instead of limiting this extent, assuming that
257                  * clients are writing forwards and the lock had over grown
258                  * its extent downwards before we enqueued our request. */
259                 if (l_extent->end > req_end) {
260                         if (l_extent->start <= req_end)
261                                 new_ex->end = max(lock->l_req_extent.start - 1,
262                                                   req_end);
263                         else
264                                 new_ex->end = max(l_extent->start - 1, req_end);
265                 }
266         }
267
268         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
269         EXIT;
270 }
271
272
273 /* In order to determine the largest possible extent we can grant, we need
274  * to scan all of the queues. */
275 static void ldlm_extent_policy(struct ldlm_resource *res,
276                                struct ldlm_lock *lock, __u64 *flags)
277 {
278         struct ldlm_extent new_ex = { .start = 0, .end = OBD_OBJECT_EOF };
279
280         if (lock->l_export == NULL)
281                 /*
282                  * this is local lock taken by server (e.g., as a part of
283                  * OST-side locking, or unlink handling). Expansion doesn't
284                  * make a lot of sense for local locks, because they are
285                  * dropped immediately on operation completion and would only
286                  * conflict with other threads.
287                  */
288                 return;
289
290         if (lock->l_policy_data.l_extent.start == 0 &&
291             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
292                 /* fast-path whole file locks */
293                 return;
294
295         ldlm_extent_internal_policy_granted(lock, &new_ex);
296         ldlm_extent_internal_policy_waiting(lock, &new_ex);
297
298         if (new_ex.start != lock->l_policy_data.l_extent.start ||
299             new_ex.end != lock->l_policy_data.l_extent.end) {
300                 *flags |= LDLM_FL_LOCK_CHANGED;
301                 lock->l_policy_data.l_extent.start = new_ex.start;
302                 lock->l_policy_data.l_extent.end = new_ex.end;
303         }
304 }
305
306 static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
307 {
308         struct ldlm_resource *res = lock->l_resource;
309         cfs_time_t now = cfs_time_current();
310
311         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_SET_CONTENTION))
312                 return 1;
313
314         CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
315         if (contended_locks > ldlm_res_to_ns(res)->ns_contended_locks)
316                 res->lr_contention_time = now;
317         return cfs_time_before(now, cfs_time_add(res->lr_contention_time,
318                 cfs_time_seconds(ldlm_res_to_ns(res)->ns_contention_time)));
319 }
320
321 struct ldlm_extent_compat_args {
322         cfs_list_t *work_list;
323         struct ldlm_lock *lock;
324         ldlm_mode_t mode;
325         int *locks;
326         int *compat;
327 };
328
329 static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
330                                                 void *data)
331 {
332         struct ldlm_extent_compat_args *priv = data;
333         struct ldlm_interval *node = to_ldlm_interval(n);
334         struct ldlm_extent *extent;
335         cfs_list_t *work_list = priv->work_list;
336         struct ldlm_lock *lock, *enq = priv->lock;
337         ldlm_mode_t mode = priv->mode;
338         int count = 0;
339         ENTRY;
340
341         LASSERT(!cfs_list_empty(&node->li_group));
342
343         cfs_list_for_each_entry(lock, &node->li_group, l_sl_policy) {
344                 /* interval tree is for granted lock */
345                 LASSERTF(mode == lock->l_granted_mode,
346                          "mode = %s, lock->l_granted_mode = %s\n",
347                          ldlm_lockname[mode],
348                          ldlm_lockname[lock->l_granted_mode]);
349                 count++;
350                 if (lock->l_blocking_ast)
351                         ldlm_add_ast_work_item(lock, enq, work_list);
352         }
353
354         /* don't count conflicting glimpse locks */
355         extent = ldlm_interval_extent(node);
356         if (!(mode == LCK_PR &&
357             extent->start == 0 && extent->end == OBD_OBJECT_EOF))
358                 *priv->locks += count;
359
360         if (priv->compat)
361                 *priv->compat = 0;
362
363         RETURN(INTERVAL_ITER_CONT);
364 }
365
366 /**
367  * Determine if the lock is compatible with all locks on the queue.
368  *
369  * If \a work_list is provided, conflicting locks are linked there.
370  * If \a work_list is not provided, we exit this function on first conflict.
371  *
372  * \retval 0 if the lock is not compatible
373  * \retval 1 if the lock is compatible
374  * \retval 2 if \a req is a group lock and it is compatible and requires
375  *           no further checking
376  * \retval negative error, such as EWOULDBLOCK for group locks
377  */
378 static int
379 ldlm_extent_compat_queue(cfs_list_t *queue, struct ldlm_lock *req,
380                          __u64 *flags, ldlm_error_t *err,
381                          cfs_list_t *work_list, int *contended_locks)
382 {
383         cfs_list_t *tmp;
384         struct ldlm_lock *lock;
385         struct ldlm_resource *res = req->l_resource;
386         ldlm_mode_t req_mode = req->l_req_mode;
387         __u64 req_start = req->l_req_extent.start;
388         __u64 req_end = req->l_req_extent.end;
389         int compat = 1;
390         int scan = 0;
391         int check_contention;
392         ENTRY;
393
394         lockmode_verify(req_mode);
395
396         /* Using interval tree for granted lock */
397         if (queue == &res->lr_granted) {
398                 struct ldlm_interval_tree *tree;
399                 struct ldlm_extent_compat_args data = {.work_list = work_list,
400                                                .lock = req,
401                                                .locks = contended_locks,
402                                                .compat = &compat };
403                 struct interval_node_extent ex = { .start = req_start,
404                                                    .end = req_end };
405                 int idx, rc;
406
407                 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
408                         tree = &res->lr_itree[idx];
409                         if (tree->lit_root == NULL) /* empty tree, skipped */
410                                 continue;
411
412                         data.mode = tree->lit_mode;
413                         if (lockmode_compat(req_mode, tree->lit_mode)) {
414                                 struct ldlm_interval *node;
415                                 struct ldlm_extent *extent;
416
417                                 if (req_mode != LCK_GROUP)
418                                         continue;
419
420                                 /* group lock, grant it immediately if
421                                  * compatible */
422                                 node = to_ldlm_interval(tree->lit_root);
423                                 extent = ldlm_interval_extent(node);
424                                 if (req->l_policy_data.l_extent.gid ==
425                                     extent->gid)
426                                         RETURN(2);
427                         }
428
429                         if (tree->lit_mode == LCK_GROUP) {
430                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
431                                         compat = -EWOULDBLOCK;
432                                         goto destroylock;
433                                 }
434
435                                 *flags |= LDLM_FL_NO_TIMEOUT;
436                                 if (!work_list)
437                                         RETURN(0);
438
439                                 /* if work list is not NULL,add all
440                                    locks in the tree to work list */
441                                 compat = 0;
442                                 interval_iterate(tree->lit_root,
443                                                  ldlm_extent_compat_cb, &data);
444                                 continue;
445                         }
446
447                         if (!work_list) {
448                                 rc = interval_is_overlapped(tree->lit_root,&ex);
449                                 if (rc)
450                                         RETURN(0);
451                         } else {
452                                 interval_search(tree->lit_root, &ex,
453                                                 ldlm_extent_compat_cb, &data);
454                                 if (!cfs_list_empty(work_list) && compat)
455                                         compat = 0;
456                         }
457                 }
458         } else { /* for waiting queue */
459                 cfs_list_for_each(tmp, queue) {
460                         check_contention = 1;
461
462                         lock = cfs_list_entry(tmp, struct ldlm_lock,
463                                               l_res_link);
464
465                         /* We stop walking the queue if we hit ourselves so
466                          * we don't take conflicting locks enqueued after us
467                          * into account, or we'd wait forever. */
468                         if (req == lock)
469                                 break;
470
471                         if (unlikely(scan)) {
472                                 /* We only get here if we are queuing GROUP lock
473                                    and met some incompatible one. The main idea of this
474                                    code is to insert GROUP lock past compatible GROUP
475                                    lock in the waiting queue or if there is not any,
476                                    then in front of first non-GROUP lock */
477                                 if (lock->l_req_mode != LCK_GROUP) {
478                                         /* Ok, we hit non-GROUP lock, there should
479                                          * be no more GROUP locks later on, queue in
480                                          * front of first non-GROUP lock */
481
482                                         ldlm_resource_insert_lock_after(lock, req);
483                                         cfs_list_del_init(&lock->l_res_link);
484                                         ldlm_resource_insert_lock_after(req, lock);
485                                         compat = 0;
486                                         break;
487                                 }
488                                 if (req->l_policy_data.l_extent.gid ==
489                                     lock->l_policy_data.l_extent.gid) {
490                                         /* found it */
491                                         ldlm_resource_insert_lock_after(lock, req);
492                                         compat = 0;
493                                         break;
494                                 }
495                                 continue;
496                         }
497
498                         /* locks are compatible, overlap doesn't matter */
499                         if (lockmode_compat(lock->l_req_mode, req_mode)) {
500                                 if (req_mode == LCK_PR &&
501                                     ((lock->l_policy_data.l_extent.start <=
502                                       req->l_policy_data.l_extent.start) &&
503                                      (lock->l_policy_data.l_extent.end >=
504                                       req->l_policy_data.l_extent.end))) {
505                                         /* If we met a PR lock just like us or wider,
506                                            and nobody down the list conflicted with
507                                            it, that means we can skip processing of
508                                            the rest of the list and safely place
509                                            ourselves at the end of the list, or grant
510                                            (dependent if we met an conflicting locks
511                                            before in the list).
512                                            In case of 1st enqueue only we continue
513                                            traversing if there is something conflicting
514                                            down the list because we need to make sure
515                                            that something is marked as AST_SENT as well,
516                                            in cse of empy worklist we would exit on
517                                            first conflict met. */
518                                         /* There IS a case where such flag is
519                                            not set for a lock, yet it blocks
520                                            something. Luckily for us this is
521                                            only during destroy, so lock is
522                                            exclusive. So here we are safe */
523                                         if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
524                                                 RETURN(compat);
525                                         }
526                                 }
527
528                                 /* non-group locks are compatible, overlap doesn't
529                                    matter */
530                                 if (likely(req_mode != LCK_GROUP))
531                                         continue;
532
533                                 /* If we are trying to get a GROUP lock and there is
534                                    another one of this kind, we need to compare gid */
535                                 if (req->l_policy_data.l_extent.gid ==
536                                     lock->l_policy_data.l_extent.gid) {
537                                         /* If existing lock with matched gid is granted,
538                                            we grant new one too. */
539                                         if (lock->l_req_mode == lock->l_granted_mode)
540                                                 RETURN(2);
541
542                                         /* Otherwise we are scanning queue of waiting
543                                          * locks and it means current request would
544                                          * block along with existing lock (that is
545                                          * already blocked.
546                                          * If we are in nonblocking mode - return
547                                          * immediately */
548                                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
549                                                 compat = -EWOULDBLOCK;
550                                                 goto destroylock;
551                                         }
552                                         /* If this group lock is compatible with another
553                                          * group lock on the waiting list, they must be
554                                          * together in the list, so they can be granted
555                                          * at the same time.  Otherwise the later lock
556                                          * can get stuck behind another, incompatible,
557                                          * lock. */
558                                         ldlm_resource_insert_lock_after(lock, req);
559                                         /* Because 'lock' is not granted, we can stop
560                                          * processing this queue and return immediately.
561                                          * There is no need to check the rest of the
562                                          * list. */
563                                         RETURN(0);
564                                 }
565                         }
566
567                         if (unlikely(req_mode == LCK_GROUP &&
568                                      (lock->l_req_mode != lock->l_granted_mode))) {
569                                 scan = 1;
570                                 compat = 0;
571                                 if (lock->l_req_mode != LCK_GROUP) {
572                                         /* Ok, we hit non-GROUP lock, there should be no
573                                            more GROUP locks later on, queue in front of
574                                            first non-GROUP lock */
575
576                                         ldlm_resource_insert_lock_after(lock, req);
577                                         cfs_list_del_init(&lock->l_res_link);
578                                         ldlm_resource_insert_lock_after(req, lock);
579                                         break;
580                                 }
581                                 if (req->l_policy_data.l_extent.gid ==
582                                     lock->l_policy_data.l_extent.gid) {
583                                         /* found it */
584                                         ldlm_resource_insert_lock_after(lock, req);
585                                         break;
586                                 }
587                                 continue;
588                         }
589
590                         if (unlikely(lock->l_req_mode == LCK_GROUP)) {
591                                 /* If compared lock is GROUP, then requested is PR/PW/
592                                  * so this is not compatible; extent range does not
593                                  * matter */
594                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
595                                         compat = -EWOULDBLOCK;
596                                         goto destroylock;
597                                 } else {
598                                         *flags |= LDLM_FL_NO_TIMEOUT;
599                                 }
600                         } else if (lock->l_policy_data.l_extent.end < req_start ||
601                                    lock->l_policy_data.l_extent.start > req_end) {
602                                 /* if a non group lock doesn't overlap skip it */
603                                 continue;
604                         } else if (lock->l_req_extent.end < req_start ||
605                                    lock->l_req_extent.start > req_end) {
606                                 /* false contention, the requests doesn't really overlap */
607                                 check_contention = 0;
608                         }
609
610                         if (!work_list)
611                                 RETURN(0);
612
613                         /* don't count conflicting glimpse locks */
614                         if (lock->l_req_mode == LCK_PR &&
615                             lock->l_policy_data.l_extent.start == 0 &&
616                             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
617                                 check_contention = 0;
618
619                         *contended_locks += check_contention;
620
621                         compat = 0;
622                         if (lock->l_blocking_ast)
623                                 ldlm_add_ast_work_item(lock, req, work_list);
624                 }
625         }
626
627         if (ldlm_check_contention(req, *contended_locks) &&
628             compat == 0 &&
629             (*flags & LDLM_FL_DENY_ON_CONTENTION) &&
630             req->l_req_mode != LCK_GROUP &&
631             req_end - req_start <=
632             ldlm_res_to_ns(req->l_resource)->ns_max_nolock_size)
633                 GOTO(destroylock, compat = -EUSERS);
634
635         RETURN(compat);
636 destroylock:
637         cfs_list_del_init(&req->l_res_link);
638         ldlm_lock_destroy_nolock(req);
639         *err = compat;
640         RETURN(compat);
641 }
642
643 /**
644  * Discard all AST work items from list.
645  *
646  * If for whatever reason we do not want to send ASTs to conflicting locks
647  * anymore, disassemble the list with this function.
648  */
649 static void discard_bl_list(cfs_list_t *bl_list)
650 {
651         cfs_list_t *tmp, *pos;
652         ENTRY;
653
654         cfs_list_for_each_safe(pos, tmp, bl_list) {
655                 struct ldlm_lock *lock =
656                         cfs_list_entry(pos, struct ldlm_lock, l_bl_ast);
657
658                 cfs_list_del_init(&lock->l_bl_ast);
659                 LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
660                 lock->l_flags &= ~LDLM_FL_AST_SENT;
661                 LASSERT(lock->l_bl_ast_run == 0);
662                 LASSERT(lock->l_blocking_lock);
663                 LDLM_LOCK_RELEASE(lock->l_blocking_lock);
664                 lock->l_blocking_lock = NULL;
665                 LDLM_LOCK_RELEASE(lock);
666         }
667         EXIT;
668 }
669
670 /**
671  * Process a granting attempt for extent lock.
672  * Must be called with ns lock held.
673  *
674  * This function looks for any conflicts for \a lock in the granted or
675  * waiting queues. The lock is granted if no conflicts are found in
676  * either queue.
677  *
678  * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
679  *   - blocking ASTs have already been sent
680  *
681  * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
682  *   - blocking ASTs have not been sent yet, so list of conflicting locks
683  *     would be collected and ASTs sent.
684  */
685 int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags,
686                              int first_enq, ldlm_error_t *err,
687                              cfs_list_t *work_list)
688 {
689         struct ldlm_resource *res = lock->l_resource;
690         CFS_LIST_HEAD(rpc_list);
691         int rc, rc2;
692         int contended_locks = 0;
693         ENTRY;
694
695         LASSERT(cfs_list_empty(&res->lr_converting));
696         LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
697                 !(lock->l_flags & LDLM_AST_DISCARD_DATA));
698         check_res_locked(res);
699         *err = ELDLM_OK;
700
701         if (!first_enq) {
702                 /* Careful observers will note that we don't handle -EWOULDBLOCK
703                  * here, but it's ok for a non-obvious reason -- compat_queue
704                  * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
705                  * flags should always be zero here, and if that ever stops
706                  * being true, we want to find out. */
707                 LASSERT(*flags == 0);
708                 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
709                                               err, NULL, &contended_locks);
710                 if (rc == 1) {
711                         rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
712                                                       flags, err, NULL,
713                                                       &contended_locks);
714                 }
715                 if (rc == 0)
716                         RETURN(LDLM_ITER_STOP);
717
718                 ldlm_resource_unlink_lock(lock);
719
720                 if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE))
721                         ldlm_extent_policy(res, lock, flags);
722                 ldlm_grant_lock(lock, work_list);
723                 RETURN(LDLM_ITER_CONTINUE);
724         }
725
726  restart:
727         contended_locks = 0;
728         rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
729                                       &rpc_list, &contended_locks);
730         if (rc < 0)
731                 GOTO(out, rc); /* lock was destroyed */
732         if (rc == 2)
733                 goto grant;
734
735         rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err,
736                                        &rpc_list, &contended_locks);
737         if (rc2 < 0)
738                 GOTO(out, rc = rc2); /* lock was destroyed */
739
740         if (rc + rc2 == 2) {
741         grant:
742                 ldlm_extent_policy(res, lock, flags);
743                 ldlm_resource_unlink_lock(lock);
744                 ldlm_grant_lock(lock, NULL);
745         } else {
746                 /* If either of the compat_queue()s returned failure, then we
747                  * have ASTs to send and must go onto the waiting list.
748                  *
749                  * bug 2322: we used to unlink and re-add here, which was a
750                  * terrible folly -- if we goto restart, we could get
751                  * re-ordered!  Causes deadlock, because ASTs aren't sent! */
752                 if (cfs_list_empty(&lock->l_res_link))
753                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
754                 unlock_res(res);
755                 rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
756                                        LDLM_WORK_BL_AST);
757
758                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_FAIL_RACE) &&
759                     !ns_is_client(ldlm_res_to_ns(res)))
760                         class_fail_export(lock->l_export);
761
762                 lock_res(res);
763                 if (rc == -ERESTART) {
764                         /* 15715: The lock was granted and destroyed after
765                          * resource lock was dropped. Interval node was freed
766                          * in ldlm_lock_destroy. Anyway, this always happens
767                          * when a client is being evicted. So it would be
768                          * ok to return an error. -jay */
769                         if (lock->l_destroyed) {
770                                 *err = -EAGAIN;
771                                 GOTO(out, rc = -EAGAIN);
772                         }
773
774                         /* lock was granted while resource was unlocked. */
775                         if (lock->l_granted_mode == lock->l_req_mode) {
776                                 /* bug 11300: if the lock has been granted,
777                                  * break earlier because otherwise, we will go
778                                  * to restart and ldlm_resource_unlink will be
779                                  * called and it causes the interval node to be
780                                  * freed. Then we will fail at
781                                  * ldlm_extent_add_lock() */
782                                 *flags &= ~(LDLM_FL_BLOCK_GRANTED | LDLM_FL_BLOCK_CONV |
783                                             LDLM_FL_BLOCK_WAIT);
784                                 GOTO(out, rc = 0);
785                         }
786
787                         GOTO(restart, -ERESTART);
788                 }
789
790                 *flags |= LDLM_FL_BLOCK_GRANTED;
791                 /* this way we force client to wait for the lock
792                  * endlessly once the lock is enqueued -bzzz */
793                 *flags |= LDLM_FL_NO_TIMEOUT;
794
795         }
796         RETURN(0);
797 out:
798         if (!cfs_list_empty(&rpc_list)) {
799                 LASSERT(!(lock->l_flags & LDLM_AST_DISCARD_DATA));
800                 discard_bl_list(&rpc_list);
801         }
802         RETURN(rc);
803 }
804 #endif /* HAVE_SERVER_SUPPORT */
805
806 /* When a lock is cancelled by a client, the KMS may undergo change if this
807  * is the "highest lock".  This function returns the new KMS value.
808  * Caller must hold lr_lock already.
809  *
810  * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes! */
811 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
812 {
813         struct ldlm_resource *res = lock->l_resource;
814         cfs_list_t *tmp;
815         struct ldlm_lock *lck;
816         __u64 kms = 0;
817         ENTRY;
818
819         /* don't let another thread in ldlm_extent_shift_kms race in
820          * just after we finish and take our lock into account in its
821          * calculation of the kms */
822         lock->l_flags |= LDLM_FL_KMS_IGNORE;
823
824         cfs_list_for_each(tmp, &res->lr_granted) {
825                 lck = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
826
827                 if (lck->l_flags & LDLM_FL_KMS_IGNORE)
828                         continue;
829
830                 if (lck->l_policy_data.l_extent.end >= old_kms)
831                         RETURN(old_kms);
832
833                 /* This extent _has_ to be smaller than old_kms (checked above)
834                  * so kms can only ever be smaller or the same as old_kms. */
835                 if (lck->l_policy_data.l_extent.end + 1 > kms)
836                         kms = lck->l_policy_data.l_extent.end + 1;
837         }
838         LASSERTF(kms <= old_kms, "kms "LPU64" old_kms "LPU64"\n", kms, old_kms);
839
840         RETURN(kms);
841 }
842 EXPORT_SYMBOL(ldlm_extent_shift_kms);
843
844 cfs_mem_cache_t *ldlm_interval_slab;
845 struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock)
846 {
847         struct ldlm_interval *node;
848         ENTRY;
849
850         LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
851         OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, CFS_ALLOC_IO);
852         if (node == NULL)
853                 RETURN(NULL);
854
855         CFS_INIT_LIST_HEAD(&node->li_group);
856         ldlm_interval_attach(node, lock);
857         RETURN(node);
858 }
859
860 void ldlm_interval_free(struct ldlm_interval *node)
861 {
862         if (node) {
863                 LASSERT(cfs_list_empty(&node->li_group));
864                 LASSERT(!interval_is_intree(&node->li_node));
865                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
866         }
867 }
868
869 /* interval tree, for LDLM_EXTENT. */
870 void ldlm_interval_attach(struct ldlm_interval *n,
871                           struct ldlm_lock *l)
872 {
873         LASSERT(l->l_tree_node == NULL);
874         LASSERT(l->l_resource->lr_type == LDLM_EXTENT);
875
876         cfs_list_add_tail(&l->l_sl_policy, &n->li_group);
877         l->l_tree_node = n;
878 }
879
880 struct ldlm_interval *ldlm_interval_detach(struct ldlm_lock *l)
881 {
882         struct ldlm_interval *n = l->l_tree_node;
883
884         if (n == NULL)
885                 return NULL;
886
887         LASSERT(!cfs_list_empty(&n->li_group));
888         l->l_tree_node = NULL;
889         cfs_list_del_init(&l->l_sl_policy);
890
891         return (cfs_list_empty(&n->li_group) ? n : NULL);
892 }
893
894 static inline int lock_mode_to_index(ldlm_mode_t mode)
895 {
896         int index;
897
898         LASSERT(mode != 0);
899         LASSERT(IS_PO2(mode));
900         for (index = -1; mode; index++, mode >>= 1) ;
901         LASSERT(index < LCK_MODE_NUM);
902         return index;
903 }
904
905 /** Add newly granted lock into interval tree for the resource. */
906 void ldlm_extent_add_lock(struct ldlm_resource *res,
907                           struct ldlm_lock *lock)
908 {
909         struct interval_node *found, **root;
910         struct ldlm_interval *node;
911         struct ldlm_extent *extent;
912         int idx;
913
914         LASSERT(lock->l_granted_mode == lock->l_req_mode);
915
916         node = lock->l_tree_node;
917         LASSERT(node != NULL);
918         LASSERT(!interval_is_intree(&node->li_node));
919
920         idx = lock_mode_to_index(lock->l_granted_mode);
921         LASSERT(lock->l_granted_mode == 1 << idx);
922         LASSERT(lock->l_granted_mode == res->lr_itree[idx].lit_mode);
923
924         /* node extent initialize */
925         extent = &lock->l_policy_data.l_extent;
926         interval_set(&node->li_node, extent->start, extent->end);
927
928         root = &res->lr_itree[idx].lit_root;
929         found = interval_insert(&node->li_node, root);
930         if (found) { /* The policy group found. */
931                 struct ldlm_interval *tmp = ldlm_interval_detach(lock);
932                 LASSERT(tmp != NULL);
933                 ldlm_interval_free(tmp);
934                 ldlm_interval_attach(to_ldlm_interval(found), lock);
935         }
936         res->lr_itree[idx].lit_size++;
937
938         /* even though we use interval tree to manage the extent lock, we also
939          * add the locks into grant list, for debug purpose, .. */
940         ldlm_resource_add_lock(res, &res->lr_granted, lock);
941 }
942
943 /** Remove cancelled lock from resource interval tree. */
944 void ldlm_extent_unlink_lock(struct ldlm_lock *lock)
945 {
946         struct ldlm_resource *res = lock->l_resource;
947         struct ldlm_interval *node = lock->l_tree_node;
948         struct ldlm_interval_tree *tree;
949         int idx;
950
951         if (!node || !interval_is_intree(&node->li_node)) /* duplicate unlink */
952                 return;
953
954         idx = lock_mode_to_index(lock->l_granted_mode);
955         LASSERT(lock->l_granted_mode == 1 << idx);
956         tree = &res->lr_itree[idx];
957
958         LASSERT(tree->lit_root != NULL); /* assure the tree is not null */
959
960         tree->lit_size--;
961         node = ldlm_interval_detach(lock);
962         if (node) {
963                 interval_erase(&node->li_node, &tree->lit_root);
964                 ldlm_interval_free(node);
965         }
966 }
967
968 void ldlm_extent_policy_wire_to_local(const ldlm_wire_policy_data_t *wpolicy,
969                                      ldlm_policy_data_t *lpolicy)
970 {
971         memset(lpolicy, 0, sizeof(*lpolicy));
972         lpolicy->l_extent.start = wpolicy->l_extent.start;
973         lpolicy->l_extent.end = wpolicy->l_extent.end;
974         lpolicy->l_extent.gid = wpolicy->l_extent.gid;
975 }
976
977 void ldlm_extent_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
978                                      ldlm_wire_policy_data_t *wpolicy)
979 {
980         memset(wpolicy, 0, sizeof(*wpolicy));
981         wpolicy->l_extent.start = lpolicy->l_extent.start;
982         wpolicy->l_extent.end = lpolicy->l_extent.end;
983         wpolicy->l_extent.gid = lpolicy->l_extent.gid;
984 }
985