Whamcloud - gitweb
LU-4357 libcfs: restore __GFP_WAIT flag to memalloc calls
[fs/lustre-release.git] / lustre / ldlm / ldlm_extent.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2010, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_extent.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 /**
43  * This file contains implementation of EXTENT lock type
44  *
45  * EXTENT lock type is for locking a contiguous range of values, represented
46  * by 64-bit starting and ending offsets (inclusive). There are several extent
47  * lock modes, some of which may be mutually incompatible. Extent locks are
48  * considered incompatible if their modes are incompatible and their extents
49  * intersect.  See the lock mode compatibility matrix in lustre_dlm.h.
50  */
51
52 #define DEBUG_SUBSYSTEM S_LDLM
53 #ifndef __KERNEL__
54 # include <liblustre.h>
55 #else
56 # include <libcfs/libcfs.h>
57 #endif
58
59 #include <lustre_dlm.h>
60 #include <obd_support.h>
61 #include <obd.h>
62 #include <obd_class.h>
63 #include <lustre_lib.h>
64
65 #include "ldlm_internal.h"
66
67 #ifdef HAVE_SERVER_SUPPORT
68 # define LDLM_MAX_GROWN_EXTENT (32 * 1024 * 1024 - 1)
69
70 /**
71  * Fix up the ldlm_extent after expanding it.
72  *
73  * After expansion has been done, we might still want to do certain adjusting
74  * based on overall contention of the resource and the like to avoid granting
75  * overly wide locks.
76  */
77 static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
78                                               struct ldlm_extent *new_ex,
79                                               int conflicting)
80 {
81         ldlm_mode_t req_mode = req->l_req_mode;
82         __u64 req_start = req->l_req_extent.start;
83         __u64 req_end = req->l_req_extent.end;
84         __u64 req_align, mask;
85
86         if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) {
87                 if (req_end < req_start + LDLM_MAX_GROWN_EXTENT)
88                         new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT,
89                                           new_ex->end);
90         }
91
92         if (new_ex->start == 0 && new_ex->end == OBD_OBJECT_EOF) {
93                 EXIT;
94                 return;
95         }
96
97         /* we need to ensure that the lock extent is properly aligned to what
98          * the client requested. Also we need to make sure it's also server
99          * page size aligned otherwise a server page can be covered by two
100          * write locks. */
101         mask = PAGE_CACHE_SIZE;
102         req_align = (req_end + 1) | req_start;
103         if (req_align != 0 && (req_align & (mask - 1)) == 0) {
104                 while ((req_align & mask) == 0)
105                         mask <<= 1;
106         }
107         mask -= 1;
108         /* We can only shrink the lock, not grow it.
109          * This should never cause lock to be smaller than requested,
110          * since requested lock was already aligned on these boundaries. */
111         new_ex->start = ((new_ex->start - 1) | mask) + 1;
112         new_ex->end = ((new_ex->end + 1) & ~mask) - 1;
113         LASSERTF(new_ex->start <= req_start,
114                  "mask "LPX64" grant start "LPU64" req start "LPU64"\n",
115                  mask, new_ex->start, req_start);
116         LASSERTF(new_ex->end >= req_end,
117                  "mask "LPX64" grant end "LPU64" req end "LPU64"\n",
118                  mask, new_ex->end, req_end);
119 }
120
121 /**
122  * Return the maximum extent that:
123  * - contains the requested extent
124  * - does not overlap existing conflicting extents outside the requested one
125  *
126  * This allows clients to request a small required extent range, but if there
127  * is no contention on the lock the full lock can be granted to the client.
128  * This avoids the need for many smaller lock requests to be granted in the
129  * common (uncontended) case.
130  *
131  * Use interval tree to expand the lock extent for granted lock.
132  */
133 static void ldlm_extent_internal_policy_granted(struct ldlm_lock *req,
134                                                 struct ldlm_extent *new_ex)
135 {
136         struct ldlm_resource *res = req->l_resource;
137         ldlm_mode_t req_mode = req->l_req_mode;
138         __u64 req_start = req->l_req_extent.start;
139         __u64 req_end = req->l_req_extent.end;
140         struct ldlm_interval_tree *tree;
141         struct interval_node_extent limiter = { new_ex->start, new_ex->end };
142         int conflicting = 0;
143         int idx;
144         ENTRY;
145
146         lockmode_verify(req_mode);
147
148         /* Using interval tree to handle the LDLM extent granted locks. */
149         for (idx = 0; idx < LCK_MODE_NUM; idx++) {
150                 struct interval_node_extent ext = { req_start, req_end };
151
152                 tree = &res->lr_itree[idx];
153                 if (lockmode_compat(tree->lit_mode, req_mode))
154                         continue;
155
156                 conflicting += tree->lit_size;
157                 if (conflicting > 4)
158                         limiter.start = req_start;
159
160                 if (interval_is_overlapped(tree->lit_root, &ext))
161                         CDEBUG(D_INFO, 
162                                "req_mode = %d, tree->lit_mode = %d, "
163                                "tree->lit_size = %d\n",
164                                req_mode, tree->lit_mode, tree->lit_size);
165                 interval_expand(tree->lit_root, &ext, &limiter);
166                 limiter.start = max(limiter.start, ext.start);
167                 limiter.end = min(limiter.end, ext.end);
168                 if (limiter.start == req_start && limiter.end == req_end)
169                         break;
170         }
171
172         new_ex->start = limiter.start;
173         new_ex->end = limiter.end;
174         LASSERT(new_ex->start <= req_start);
175         LASSERT(new_ex->end >= req_end);
176
177         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
178         EXIT;
179 }
180
181 /* The purpose of this function is to return:
182  * - the maximum extent
183  * - containing the requested extent
184  * - and not overlapping existing conflicting extents outside the requested one
185  */
186 static void
187 ldlm_extent_internal_policy_waiting(struct ldlm_lock *req,
188                                     struct ldlm_extent *new_ex)
189 {
190         cfs_list_t *tmp;
191         struct ldlm_resource *res = req->l_resource;
192         ldlm_mode_t req_mode = req->l_req_mode;
193         __u64 req_start = req->l_req_extent.start;
194         __u64 req_end = req->l_req_extent.end;
195         int conflicting = 0;
196         ENTRY;
197
198         lockmode_verify(req_mode);
199
200         /* for waiting locks */
201         cfs_list_for_each(tmp, &res->lr_waiting) {
202                 struct ldlm_lock *lock;
203                 struct ldlm_extent *l_extent;
204
205                 lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
206                 l_extent = &lock->l_policy_data.l_extent;
207
208                 /* We already hit the minimum requested size, search no more */
209                 if (new_ex->start == req_start && new_ex->end == req_end) {
210                         EXIT;
211                         return;
212                 }
213
214                 /* Don't conflict with ourselves */
215                 if (req == lock)
216                         continue;
217
218                 /* Locks are compatible, overlap doesn't matter */
219                 /* Until bug 20 is fixed, try to avoid granting overlapping
220                  * locks on one client (they take a long time to cancel) */
221                 if (lockmode_compat(lock->l_req_mode, req_mode) &&
222                     lock->l_export != req->l_export)
223                         continue;
224
225                 /* If this is a high-traffic lock, don't grow downwards at all
226                  * or grow upwards too much */
227                 ++conflicting;
228                 if (conflicting > 4)
229                         new_ex->start = req_start;
230
231                 /* If lock doesn't overlap new_ex, skip it. */
232                 if (!ldlm_extent_overlap(l_extent, new_ex))
233                         continue;
234
235                 /* Locks conflicting in requested extents and we can't satisfy
236                  * both locks, so ignore it.  Either we will ping-pong this
237                  * extent (we would regardless of what extent we granted) or
238                  * lock is unused and it shouldn't limit our extent growth. */
239                 if (ldlm_extent_overlap(&lock->l_req_extent,&req->l_req_extent))
240                         continue;
241
242                 /* We grow extents downwards only as far as they don't overlap
243                  * with already-granted locks, on the assumption that clients
244                  * will be writing beyond the initial requested end and would
245                  * then need to enqueue a new lock beyond previous request.
246                  * l_req_extent->end strictly < req_start, checked above. */
247                 if (l_extent->start < req_start && new_ex->start != req_start) {
248                         if (l_extent->end >= req_start)
249                                 new_ex->start = req_start;
250                         else
251                                 new_ex->start = min(l_extent->end+1, req_start);
252                 }
253
254                 /* If we need to cancel this lock anyways because our request
255                  * overlaps the granted lock, we grow up to its requested
256                  * extent start instead of limiting this extent, assuming that
257                  * clients are writing forwards and the lock had over grown
258                  * its extent downwards before we enqueued our request. */
259                 if (l_extent->end > req_end) {
260                         if (l_extent->start <= req_end)
261                                 new_ex->end = max(lock->l_req_extent.start - 1,
262                                                   req_end);
263                         else
264                                 new_ex->end = max(l_extent->start - 1, req_end);
265                 }
266         }
267
268         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
269         EXIT;
270 }
271
272
273 /* In order to determine the largest possible extent we can grant, we need
274  * to scan all of the queues. */
275 static void ldlm_extent_policy(struct ldlm_resource *res,
276                                struct ldlm_lock *lock, __u64 *flags)
277 {
278         struct ldlm_extent new_ex = { .start = 0, .end = OBD_OBJECT_EOF };
279
280         if (lock->l_export == NULL)
281                 /*
282                  * this is local lock taken by server (e.g., as a part of
283                  * OST-side locking, or unlink handling). Expansion doesn't
284                  * make a lot of sense for local locks, because they are
285                  * dropped immediately on operation completion and would only
286                  * conflict with other threads.
287                  */
288                 return;
289
290         if (lock->l_policy_data.l_extent.start == 0 &&
291             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
292                 /* fast-path whole file locks */
293                 return;
294
295         ldlm_extent_internal_policy_granted(lock, &new_ex);
296         ldlm_extent_internal_policy_waiting(lock, &new_ex);
297
298         if (new_ex.start != lock->l_policy_data.l_extent.start ||
299             new_ex.end != lock->l_policy_data.l_extent.end) {
300                 *flags |= LDLM_FL_LOCK_CHANGED;
301                 lock->l_policy_data.l_extent.start = new_ex.start;
302                 lock->l_policy_data.l_extent.end = new_ex.end;
303         }
304 }
305
306 static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
307 {
308         struct ldlm_resource *res = lock->l_resource;
309         cfs_time_t now = cfs_time_current();
310
311         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_SET_CONTENTION))
312                 return 1;
313
314         CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
315         if (contended_locks > ldlm_res_to_ns(res)->ns_contended_locks)
316                 res->lr_contention_time = now;
317         return cfs_time_before(now, cfs_time_add(res->lr_contention_time,
318                 cfs_time_seconds(ldlm_res_to_ns(res)->ns_contention_time)));
319 }
320
321 struct ldlm_extent_compat_args {
322         cfs_list_t *work_list;
323         struct ldlm_lock *lock;
324         ldlm_mode_t mode;
325         int *locks;
326         int *compat;
327 };
328
329 static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
330                                                 void *data)
331 {
332         struct ldlm_extent_compat_args *priv = data;
333         struct ldlm_interval *node = to_ldlm_interval(n);
334         struct ldlm_extent *extent;
335         cfs_list_t *work_list = priv->work_list;
336         struct ldlm_lock *lock, *enq = priv->lock;
337         ldlm_mode_t mode = priv->mode;
338         int count = 0;
339         ENTRY;
340
341         LASSERT(!cfs_list_empty(&node->li_group));
342
343         cfs_list_for_each_entry(lock, &node->li_group, l_sl_policy) {
344                 /* interval tree is for granted lock */
345                 LASSERTF(mode == lock->l_granted_mode,
346                          "mode = %s, lock->l_granted_mode = %s\n",
347                          ldlm_lockname[mode],
348                          ldlm_lockname[lock->l_granted_mode]);
349                 count++;
350                 if (lock->l_blocking_ast)
351                         ldlm_add_ast_work_item(lock, enq, work_list);
352         }
353
354         /* don't count conflicting glimpse locks */
355         extent = ldlm_interval_extent(node);
356         if (!(mode == LCK_PR &&
357             extent->start == 0 && extent->end == OBD_OBJECT_EOF))
358                 *priv->locks += count;
359
360         if (priv->compat)
361                 *priv->compat = 0;
362
363         RETURN(INTERVAL_ITER_CONT);
364 }
365
366 /**
367  * Determine if the lock is compatible with all locks on the queue.
368  *
369  * If \a work_list is provided, conflicting locks are linked there.
370  * If \a work_list is not provided, we exit this function on first conflict.
371  *
372  * \retval 0 if the lock is not compatible
373  * \retval 1 if the lock is compatible
374  * \retval 2 if \a req is a group lock and it is compatible and requires
375  *           no further checking
376  * \retval negative error, such as EWOULDBLOCK for group locks
377  */
378 static int
379 ldlm_extent_compat_queue(cfs_list_t *queue, struct ldlm_lock *req,
380                          __u64 *flags, ldlm_error_t *err,
381                          cfs_list_t *work_list, int *contended_locks)
382 {
383         cfs_list_t *tmp;
384         struct ldlm_lock *lock;
385         struct ldlm_resource *res = req->l_resource;
386         ldlm_mode_t req_mode = req->l_req_mode;
387         __u64 req_start = req->l_req_extent.start;
388         __u64 req_end = req->l_req_extent.end;
389         int compat = 1;
390         int scan = 0;
391         int check_contention;
392         ENTRY;
393
394         lockmode_verify(req_mode);
395
396         /* Using interval tree for granted lock */
397         if (queue == &res->lr_granted) {
398                 struct ldlm_interval_tree *tree;
399                 struct ldlm_extent_compat_args data = {.work_list = work_list,
400                                                .lock = req,
401                                                .locks = contended_locks,
402                                                .compat = &compat };
403                 struct interval_node_extent ex = { .start = req_start,
404                                                    .end = req_end };
405                 int idx, rc;
406
407                 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
408                         tree = &res->lr_itree[idx];
409                         if (tree->lit_root == NULL) /* empty tree, skipped */
410                                 continue;
411
412                         data.mode = tree->lit_mode;
413                         if (lockmode_compat(req_mode, tree->lit_mode)) {
414                                 struct ldlm_interval *node;
415                                 struct ldlm_extent *extent;
416
417                                 if (req_mode != LCK_GROUP)
418                                         continue;
419
420                                 /* group lock, grant it immediately if
421                                  * compatible */
422                                 node = to_ldlm_interval(tree->lit_root);
423                                 extent = ldlm_interval_extent(node);
424                                 if (req->l_policy_data.l_extent.gid ==
425                                     extent->gid)
426                                         RETURN(2);
427                         }
428
429                         if (tree->lit_mode == LCK_GROUP) {
430                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
431                                         compat = -EWOULDBLOCK;
432                                         goto destroylock;
433                                 }
434
435                                 *flags |= LDLM_FL_NO_TIMEOUT;
436                                 if (!work_list)
437                                         RETURN(0);
438
439                                 /* if work list is not NULL,add all
440                                    locks in the tree to work list */
441                                 compat = 0;
442                                 interval_iterate(tree->lit_root,
443                                                  ldlm_extent_compat_cb, &data);
444                                 continue;
445                         }
446
447                         if (!work_list) {
448                                 rc = interval_is_overlapped(tree->lit_root,&ex);
449                                 if (rc)
450                                         RETURN(0);
451                         } else {
452                                 interval_search(tree->lit_root, &ex,
453                                                 ldlm_extent_compat_cb, &data);
454                                 if (!cfs_list_empty(work_list) && compat)
455                                         compat = 0;
456                         }
457                 }
458         } else { /* for waiting queue */
459                 cfs_list_for_each(tmp, queue) {
460                         check_contention = 1;
461
462                         lock = cfs_list_entry(tmp, struct ldlm_lock,
463                                               l_res_link);
464
465                         /* We stop walking the queue if we hit ourselves so
466                          * we don't take conflicting locks enqueued after us
467                          * into account, or we'd wait forever. */
468                         if (req == lock)
469                                 break;
470
471                         if (unlikely(scan)) {
472                                 /* We only get here if we are queuing GROUP lock
473                                    and met some incompatible one. The main idea of this
474                                    code is to insert GROUP lock past compatible GROUP
475                                    lock in the waiting queue or if there is not any,
476                                    then in front of first non-GROUP lock */
477                                 if (lock->l_req_mode != LCK_GROUP) {
478                                         /* Ok, we hit non-GROUP lock, there should
479                                          * be no more GROUP locks later on, queue in
480                                          * front of first non-GROUP lock */
481
482                                         ldlm_resource_insert_lock_after(lock, req);
483                                         cfs_list_del_init(&lock->l_res_link);
484                                         ldlm_resource_insert_lock_after(req, lock);
485                                         compat = 0;
486                                         break;
487                                 }
488                                 if (req->l_policy_data.l_extent.gid ==
489                                     lock->l_policy_data.l_extent.gid) {
490                                         /* found it */
491                                         ldlm_resource_insert_lock_after(lock, req);
492                                         compat = 0;
493                                         break;
494                                 }
495                                 continue;
496                         }
497
498                         /* locks are compatible, overlap doesn't matter */
499                         if (lockmode_compat(lock->l_req_mode, req_mode)) {
500                                 if (req_mode == LCK_PR &&
501                                     ((lock->l_policy_data.l_extent.start <=
502                                       req->l_policy_data.l_extent.start) &&
503                                      (lock->l_policy_data.l_extent.end >=
504                                       req->l_policy_data.l_extent.end))) {
505                                         /* If we met a PR lock just like us or
506                                            wider, and nobody down the list
507                                            conflicted with it, that means we
508                                            can skip processing of the rest of
509                                            the list and safely place ourselves
510                                            at the end of the list, or grant
511                                            (dependent if we met an conflicting
512                                            locks before in the list).  In case
513                                            of 1st enqueue only we continue
514                                            traversing if there is something
515                                            conflicting down the list because
516                                            we need to make sure that something
517                                            is marked as AST_SENT as well, in
518                                            cse of empy worklist we would exit
519                                            on first conflict met. */
520                                         /* There IS a case where such flag is
521                                            not set for a lock, yet it blocks
522                                            something. Luckily for us this is
523                                            only during destroy, so lock is
524                                            exclusive. So here we are safe */
525                                         if (!ldlm_is_ast_sent(lock))
526                                                 RETURN(compat);
527                                 }
528
529                                 /* non-group locks are compatible, overlap doesn't
530                                    matter */
531                                 if (likely(req_mode != LCK_GROUP))
532                                         continue;
533
534                                 /* If we are trying to get a GROUP lock and there is
535                                    another one of this kind, we need to compare gid */
536                                 if (req->l_policy_data.l_extent.gid ==
537                                     lock->l_policy_data.l_extent.gid) {
538                                         /* If existing lock with matched gid is granted,
539                                            we grant new one too. */
540                                         if (lock->l_req_mode == lock->l_granted_mode)
541                                                 RETURN(2);
542
543                                         /* Otherwise we are scanning queue of waiting
544                                          * locks and it means current request would
545                                          * block along with existing lock (that is
546                                          * already blocked.
547                                          * If we are in nonblocking mode - return
548                                          * immediately */
549                                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
550                                                 compat = -EWOULDBLOCK;
551                                                 goto destroylock;
552                                         }
553                                         /* If this group lock is compatible with another
554                                          * group lock on the waiting list, they must be
555                                          * together in the list, so they can be granted
556                                          * at the same time.  Otherwise the later lock
557                                          * can get stuck behind another, incompatible,
558                                          * lock. */
559                                         ldlm_resource_insert_lock_after(lock, req);
560                                         /* Because 'lock' is not granted, we can stop
561                                          * processing this queue and return immediately.
562                                          * There is no need to check the rest of the
563                                          * list. */
564                                         RETURN(0);
565                                 }
566                         }
567
568                         if (unlikely(req_mode == LCK_GROUP &&
569                                      (lock->l_req_mode != lock->l_granted_mode))) {
570                                 scan = 1;
571                                 compat = 0;
572                                 if (lock->l_req_mode != LCK_GROUP) {
573                                         /* Ok, we hit non-GROUP lock, there should be no
574                                            more GROUP locks later on, queue in front of
575                                            first non-GROUP lock */
576
577                                         ldlm_resource_insert_lock_after(lock, req);
578                                         cfs_list_del_init(&lock->l_res_link);
579                                         ldlm_resource_insert_lock_after(req, lock);
580                                         break;
581                                 }
582                                 if (req->l_policy_data.l_extent.gid ==
583                                     lock->l_policy_data.l_extent.gid) {
584                                         /* found it */
585                                         ldlm_resource_insert_lock_after(lock, req);
586                                         break;
587                                 }
588                                 continue;
589                         }
590
591                         if (unlikely(lock->l_req_mode == LCK_GROUP)) {
592                                 /* If compared lock is GROUP, then requested is PR/PW/
593                                  * so this is not compatible; extent range does not
594                                  * matter */
595                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
596                                         compat = -EWOULDBLOCK;
597                                         goto destroylock;
598                                 } else {
599                                         *flags |= LDLM_FL_NO_TIMEOUT;
600                                 }
601                         } else if (lock->l_policy_data.l_extent.end < req_start ||
602                                    lock->l_policy_data.l_extent.start > req_end) {
603                                 /* if a non group lock doesn't overlap skip it */
604                                 continue;
605                         } else if (lock->l_req_extent.end < req_start ||
606                                    lock->l_req_extent.start > req_end) {
607                                 /* false contention, the requests doesn't really overlap */
608                                 check_contention = 0;
609                         }
610
611                         if (!work_list)
612                                 RETURN(0);
613
614                         /* don't count conflicting glimpse locks */
615                         if (lock->l_req_mode == LCK_PR &&
616                             lock->l_policy_data.l_extent.start == 0 &&
617                             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
618                                 check_contention = 0;
619
620                         *contended_locks += check_contention;
621
622                         compat = 0;
623                         if (lock->l_blocking_ast)
624                                 ldlm_add_ast_work_item(lock, req, work_list);
625                 }
626         }
627
628         if (ldlm_check_contention(req, *contended_locks) &&
629             compat == 0 &&
630             (*flags & LDLM_FL_DENY_ON_CONTENTION) &&
631             req->l_req_mode != LCK_GROUP &&
632             req_end - req_start <=
633             ldlm_res_to_ns(req->l_resource)->ns_max_nolock_size)
634                 GOTO(destroylock, compat = -EUSERS);
635
636         RETURN(compat);
637 destroylock:
638         cfs_list_del_init(&req->l_res_link);
639         ldlm_lock_destroy_nolock(req);
640         *err = compat;
641         RETURN(compat);
642 }
643
644 /**
645  * Discard all AST work items from list.
646  *
647  * If for whatever reason we do not want to send ASTs to conflicting locks
648  * anymore, disassemble the list with this function.
649  */
650 static void discard_bl_list(cfs_list_t *bl_list)
651 {
652         cfs_list_t *tmp, *pos;
653         ENTRY;
654
655         cfs_list_for_each_safe(pos, tmp, bl_list) {
656                 struct ldlm_lock *lock =
657                         cfs_list_entry(pos, struct ldlm_lock, l_bl_ast);
658
659                 cfs_list_del_init(&lock->l_bl_ast);
660                 LASSERT(ldlm_is_ast_sent(lock));
661                 ldlm_clear_ast_sent(lock);
662                 LASSERT(lock->l_bl_ast_run == 0);
663                 LASSERT(lock->l_blocking_lock);
664                 LDLM_LOCK_RELEASE(lock->l_blocking_lock);
665                 lock->l_blocking_lock = NULL;
666                 LDLM_LOCK_RELEASE(lock);
667         }
668         EXIT;
669 }
670
671 /**
672  * Process a granting attempt for extent lock.
673  * Must be called with ns lock held.
674  *
675  * This function looks for any conflicts for \a lock in the granted or
676  * waiting queues. The lock is granted if no conflicts are found in
677  * either queue.
678  *
679  * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
680  *   - blocking ASTs have already been sent
681  *
682  * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
683  *   - blocking ASTs have not been sent yet, so list of conflicting locks
684  *     would be collected and ASTs sent.
685  */
686 int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags,
687                              int first_enq, ldlm_error_t *err,
688                              cfs_list_t *work_list)
689 {
690         struct ldlm_resource *res = lock->l_resource;
691         CFS_LIST_HEAD(rpc_list);
692         int rc, rc2;
693         int contended_locks = 0;
694         ENTRY;
695
696         LASSERT(cfs_list_empty(&res->lr_converting));
697         LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
698                 !ldlm_is_ast_discard_data(lock));
699         check_res_locked(res);
700         *err = ELDLM_OK;
701
702         if (!first_enq) {
703                 /* Careful observers will note that we don't handle -EWOULDBLOCK
704                  * here, but it's ok for a non-obvious reason -- compat_queue
705                  * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
706                  * flags should always be zero here, and if that ever stops
707                  * being true, we want to find out. */
708                 LASSERT(*flags == 0);
709                 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
710                                               err, NULL, &contended_locks);
711                 if (rc == 1) {
712                         rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
713                                                       flags, err, NULL,
714                                                       &contended_locks);
715                 }
716                 if (rc == 0)
717                         RETURN(LDLM_ITER_STOP);
718
719                 ldlm_resource_unlink_lock(lock);
720
721                 if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE))
722                         ldlm_extent_policy(res, lock, flags);
723                 ldlm_grant_lock(lock, work_list);
724                 RETURN(LDLM_ITER_CONTINUE);
725         }
726
727  restart:
728         contended_locks = 0;
729         rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
730                                       &rpc_list, &contended_locks);
731         if (rc < 0)
732                 GOTO(out, rc); /* lock was destroyed */
733         if (rc == 2)
734                 goto grant;
735
736         rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err,
737                                        &rpc_list, &contended_locks);
738         if (rc2 < 0)
739                 GOTO(out, rc = rc2); /* lock was destroyed */
740
741         if (rc + rc2 == 2) {
742         grant:
743                 ldlm_extent_policy(res, lock, flags);
744                 ldlm_resource_unlink_lock(lock);
745                 ldlm_grant_lock(lock, NULL);
746         } else {
747                 /* If either of the compat_queue()s returned failure, then we
748                  * have ASTs to send and must go onto the waiting list.
749                  *
750                  * bug 2322: we used to unlink and re-add here, which was a
751                  * terrible folly -- if we goto restart, we could get
752                  * re-ordered!  Causes deadlock, because ASTs aren't sent! */
753                 if (cfs_list_empty(&lock->l_res_link))
754                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
755                 unlock_res(res);
756                 rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
757                                        LDLM_WORK_BL_AST);
758
759                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_FAIL_RACE) &&
760                     !ns_is_client(ldlm_res_to_ns(res)))
761                         class_fail_export(lock->l_export);
762
763                 lock_res(res);
764                 if (rc == -ERESTART) {
765                         /* 15715: The lock was granted and destroyed after
766                          * resource lock was dropped. Interval node was freed
767                          * in ldlm_lock_destroy. Anyway, this always happens
768                          * when a client is being evicted. So it would be
769                          * ok to return an error. -jay */
770                         if (ldlm_is_destroyed(lock)) {
771                                 *err = -EAGAIN;
772                                 GOTO(out, rc = -EAGAIN);
773                         }
774
775                         /* lock was granted while resource was unlocked. */
776                         if (lock->l_granted_mode == lock->l_req_mode) {
777                                 /* bug 11300: if the lock has been granted,
778                                  * break earlier because otherwise, we will go
779                                  * to restart and ldlm_resource_unlink will be
780                                  * called and it causes the interval node to be
781                                  * freed. Then we will fail at
782                                  * ldlm_extent_add_lock() */
783                                 *flags &= ~LDLM_FL_BLOCKED_MASK;
784                                 GOTO(out, rc = 0);
785                         }
786
787                         GOTO(restart, rc);
788                 }
789
790                 /* this way we force client to wait for the lock
791                  * endlessly once the lock is enqueued -bzzz */
792                 *flags |= LDLM_FL_BLOCK_GRANTED | LDLM_FL_NO_TIMEOUT;
793
794         }
795         RETURN(0);
796 out:
797         if (!cfs_list_empty(&rpc_list)) {
798                 LASSERT(!ldlm_is_ast_discard_data(lock));
799                 discard_bl_list(&rpc_list);
800         }
801         RETURN(rc);
802 }
803 #endif /* HAVE_SERVER_SUPPORT */
804
805 /* When a lock is cancelled by a client, the KMS may undergo change if this
806  * is the "highest lock".  This function returns the new KMS value.
807  * Caller must hold lr_lock already.
808  *
809  * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes! */
810 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
811 {
812         struct ldlm_resource *res = lock->l_resource;
813         cfs_list_t *tmp;
814         struct ldlm_lock *lck;
815         __u64 kms = 0;
816         ENTRY;
817
818         /* don't let another thread in ldlm_extent_shift_kms race in
819          * just after we finish and take our lock into account in its
820          * calculation of the kms */
821         ldlm_set_kms_ignore(lock);
822
823         cfs_list_for_each(tmp, &res->lr_granted) {
824                 lck = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
825
826                 if (ldlm_is_kms_ignore(lck))
827                         continue;
828
829                 if (lck->l_policy_data.l_extent.end >= old_kms)
830                         RETURN(old_kms);
831
832                 /* This extent _has_ to be smaller than old_kms (checked above)
833                  * so kms can only ever be smaller or the same as old_kms. */
834                 if (lck->l_policy_data.l_extent.end + 1 > kms)
835                         kms = lck->l_policy_data.l_extent.end + 1;
836         }
837         LASSERTF(kms <= old_kms, "kms "LPU64" old_kms "LPU64"\n", kms, old_kms);
838
839         RETURN(kms);
840 }
841 EXPORT_SYMBOL(ldlm_extent_shift_kms);
842
843 struct kmem_cache *ldlm_interval_slab;
844 struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock)
845 {
846         struct ldlm_interval *node;
847         ENTRY;
848
849         LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
850         OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS);
851         if (node == NULL)
852                 RETURN(NULL);
853
854         CFS_INIT_LIST_HEAD(&node->li_group);
855         ldlm_interval_attach(node, lock);
856         RETURN(node);
857 }
858
859 void ldlm_interval_free(struct ldlm_interval *node)
860 {
861         if (node) {
862                 LASSERT(cfs_list_empty(&node->li_group));
863                 LASSERT(!interval_is_intree(&node->li_node));
864                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
865         }
866 }
867
868 /* interval tree, for LDLM_EXTENT. */
869 void ldlm_interval_attach(struct ldlm_interval *n,
870                           struct ldlm_lock *l)
871 {
872         LASSERT(l->l_tree_node == NULL);
873         LASSERT(l->l_resource->lr_type == LDLM_EXTENT);
874
875         cfs_list_add_tail(&l->l_sl_policy, &n->li_group);
876         l->l_tree_node = n;
877 }
878
879 struct ldlm_interval *ldlm_interval_detach(struct ldlm_lock *l)
880 {
881         struct ldlm_interval *n = l->l_tree_node;
882
883         if (n == NULL)
884                 return NULL;
885
886         LASSERT(!cfs_list_empty(&n->li_group));
887         l->l_tree_node = NULL;
888         cfs_list_del_init(&l->l_sl_policy);
889
890         return (cfs_list_empty(&n->li_group) ? n : NULL);
891 }
892
893 static inline int lock_mode_to_index(ldlm_mode_t mode)
894 {
895         int index;
896
897         LASSERT(mode != 0);
898         LASSERT(IS_PO2(mode));
899         for (index = -1; mode; index++, mode >>= 1) ;
900         LASSERT(index < LCK_MODE_NUM);
901         return index;
902 }
903
904 /** Add newly granted lock into interval tree for the resource. */
905 void ldlm_extent_add_lock(struct ldlm_resource *res,
906                           struct ldlm_lock *lock)
907 {
908         struct interval_node *found, **root;
909         struct ldlm_interval *node;
910         struct ldlm_extent *extent;
911         int idx;
912
913         LASSERT(lock->l_granted_mode == lock->l_req_mode);
914
915         node = lock->l_tree_node;
916         LASSERT(node != NULL);
917         LASSERT(!interval_is_intree(&node->li_node));
918
919         idx = lock_mode_to_index(lock->l_granted_mode);
920         LASSERT(lock->l_granted_mode == 1 << idx);
921         LASSERT(lock->l_granted_mode == res->lr_itree[idx].lit_mode);
922
923         /* node extent initialize */
924         extent = &lock->l_policy_data.l_extent;
925         interval_set(&node->li_node, extent->start, extent->end);
926
927         root = &res->lr_itree[idx].lit_root;
928         found = interval_insert(&node->li_node, root);
929         if (found) { /* The policy group found. */
930                 struct ldlm_interval *tmp = ldlm_interval_detach(lock);
931                 LASSERT(tmp != NULL);
932                 ldlm_interval_free(tmp);
933                 ldlm_interval_attach(to_ldlm_interval(found), lock);
934         }
935         res->lr_itree[idx].lit_size++;
936
937         /* even though we use interval tree to manage the extent lock, we also
938          * add the locks into grant list, for debug purpose, .. */
939         ldlm_resource_add_lock(res, &res->lr_granted, lock);
940 }
941
942 /** Remove cancelled lock from resource interval tree. */
943 void ldlm_extent_unlink_lock(struct ldlm_lock *lock)
944 {
945         struct ldlm_resource *res = lock->l_resource;
946         struct ldlm_interval *node = lock->l_tree_node;
947         struct ldlm_interval_tree *tree;
948         int idx;
949
950         if (!node || !interval_is_intree(&node->li_node)) /* duplicate unlink */
951                 return;
952
953         idx = lock_mode_to_index(lock->l_granted_mode);
954         LASSERT(lock->l_granted_mode == 1 << idx);
955         tree = &res->lr_itree[idx];
956
957         LASSERT(tree->lit_root != NULL); /* assure the tree is not null */
958
959         tree->lit_size--;
960         node = ldlm_interval_detach(lock);
961         if (node) {
962                 interval_erase(&node->li_node, &tree->lit_root);
963                 ldlm_interval_free(node);
964         }
965 }
966
967 void ldlm_extent_policy_wire_to_local(const ldlm_wire_policy_data_t *wpolicy,
968                                      ldlm_policy_data_t *lpolicy)
969 {
970         memset(lpolicy, 0, sizeof(*lpolicy));
971         lpolicy->l_extent.start = wpolicy->l_extent.start;
972         lpolicy->l_extent.end = wpolicy->l_extent.end;
973         lpolicy->l_extent.gid = wpolicy->l_extent.gid;
974 }
975
976 void ldlm_extent_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
977                                      ldlm_wire_policy_data_t *wpolicy)
978 {
979         memset(wpolicy, 0, sizeof(*wpolicy));
980         wpolicy->l_extent.start = lpolicy->l_extent.start;
981         wpolicy->l_extent.end = lpolicy->l_extent.end;
982         wpolicy->l_extent.gid = lpolicy->l_extent.gid;
983 }
984