Whamcloud - gitweb
LU-3963 ldlm: convert to linux list api
[fs/lustre-release.git] / lustre / ldlm / ldlm_extent.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2010, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_extent.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 /**
43  * This file contains implementation of EXTENT lock type
44  *
45  * EXTENT lock type is for locking a contiguous range of values, represented
46  * by 64-bit starting and ending offsets (inclusive). There are several extent
47  * lock modes, some of which may be mutually incompatible. Extent locks are
48  * considered incompatible if their modes are incompatible and their extents
49  * intersect.  See the lock mode compatibility matrix in lustre_dlm.h.
50  */
51
52 #define DEBUG_SUBSYSTEM S_LDLM
53 #ifndef __KERNEL__
54 # include <liblustre.h>
55 #else
56 # include <libcfs/libcfs.h>
57 #endif
58
59 #include <lustre_dlm.h>
60 #include <obd_support.h>
61 #include <obd.h>
62 #include <obd_class.h>
63 #include <lustre_lib.h>
64
65 #include "ldlm_internal.h"
66
67 #ifdef HAVE_SERVER_SUPPORT
68 # define LDLM_MAX_GROWN_EXTENT (32 * 1024 * 1024 - 1)
69
70 /**
71  * Fix up the ldlm_extent after expanding it.
72  *
73  * After expansion has been done, we might still want to do certain adjusting
74  * based on overall contention of the resource and the like to avoid granting
75  * overly wide locks.
76  */
77 static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
78                                               struct ldlm_extent *new_ex,
79                                               int conflicting)
80 {
81         ldlm_mode_t req_mode = req->l_req_mode;
82         __u64 req_start = req->l_req_extent.start;
83         __u64 req_end = req->l_req_extent.end;
84         __u64 req_align, mask;
85
86         if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) {
87                 if (req_end < req_start + LDLM_MAX_GROWN_EXTENT)
88                         new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT,
89                                           new_ex->end);
90         }
91
92         if (new_ex->start == 0 && new_ex->end == OBD_OBJECT_EOF) {
93                 EXIT;
94                 return;
95         }
96
97         /* we need to ensure that the lock extent is properly aligned to what
98          * the client requested. Also we need to make sure it's also server
99          * page size aligned otherwise a server page can be covered by two
100          * write locks. */
101         mask = PAGE_CACHE_SIZE;
102         req_align = (req_end + 1) | req_start;
103         if (req_align != 0 && (req_align & (mask - 1)) == 0) {
104                 while ((req_align & mask) == 0)
105                         mask <<= 1;
106         }
107         mask -= 1;
108         /* We can only shrink the lock, not grow it.
109          * This should never cause lock to be smaller than requested,
110          * since requested lock was already aligned on these boundaries. */
111         new_ex->start = ((new_ex->start - 1) | mask) + 1;
112         new_ex->end = ((new_ex->end + 1) & ~mask) - 1;
113         LASSERTF(new_ex->start <= req_start,
114                  "mask "LPX64" grant start "LPU64" req start "LPU64"\n",
115                  mask, new_ex->start, req_start);
116         LASSERTF(new_ex->end >= req_end,
117                  "mask "LPX64" grant end "LPU64" req end "LPU64"\n",
118                  mask, new_ex->end, req_end);
119 }
120
121 /**
122  * Return the maximum extent that:
123  * - contains the requested extent
124  * - does not overlap existing conflicting extents outside the requested one
125  *
126  * This allows clients to request a small required extent range, but if there
127  * is no contention on the lock the full lock can be granted to the client.
128  * This avoids the need for many smaller lock requests to be granted in the
129  * common (uncontended) case.
130  *
131  * Use interval tree to expand the lock extent for granted lock.
132  */
133 static void ldlm_extent_internal_policy_granted(struct ldlm_lock *req,
134                                                 struct ldlm_extent *new_ex)
135 {
136         struct ldlm_resource *res = req->l_resource;
137         ldlm_mode_t req_mode = req->l_req_mode;
138         __u64 req_start = req->l_req_extent.start;
139         __u64 req_end = req->l_req_extent.end;
140         struct ldlm_interval_tree *tree;
141         struct interval_node_extent limiter = { new_ex->start, new_ex->end };
142         int conflicting = 0;
143         int idx;
144         ENTRY;
145
146         lockmode_verify(req_mode);
147
148         /* Using interval tree to handle the LDLM extent granted locks. */
149         for (idx = 0; idx < LCK_MODE_NUM; idx++) {
150                 struct interval_node_extent ext = { req_start, req_end };
151
152                 tree = &res->lr_itree[idx];
153                 if (lockmode_compat(tree->lit_mode, req_mode))
154                         continue;
155
156                 conflicting += tree->lit_size;
157                 if (conflicting > 4)
158                         limiter.start = req_start;
159
160                 if (interval_is_overlapped(tree->lit_root, &ext))
161                         CDEBUG(D_INFO, 
162                                "req_mode = %d, tree->lit_mode = %d, "
163                                "tree->lit_size = %d\n",
164                                req_mode, tree->lit_mode, tree->lit_size);
165                 interval_expand(tree->lit_root, &ext, &limiter);
166                 limiter.start = max(limiter.start, ext.start);
167                 limiter.end = min(limiter.end, ext.end);
168                 if (limiter.start == req_start && limiter.end == req_end)
169                         break;
170         }
171
172         new_ex->start = limiter.start;
173         new_ex->end = limiter.end;
174         LASSERT(new_ex->start <= req_start);
175         LASSERT(new_ex->end >= req_end);
176
177         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
178         EXIT;
179 }
180
181 /* The purpose of this function is to return:
182  * - the maximum extent
183  * - containing the requested extent
184  * - and not overlapping existing conflicting extents outside the requested one
185  */
186 static void
187 ldlm_extent_internal_policy_waiting(struct ldlm_lock *req,
188                                     struct ldlm_extent *new_ex)
189 {
190         struct ldlm_resource *res = req->l_resource;
191         ldlm_mode_t req_mode = req->l_req_mode;
192         __u64 req_start = req->l_req_extent.start;
193         __u64 req_end = req->l_req_extent.end;
194         struct ldlm_lock *lock;
195         int conflicting = 0;
196         ENTRY;
197
198         lockmode_verify(req_mode);
199
200         /* for waiting locks */
201         list_for_each_entry(lock, &res->lr_waiting, l_res_link) {
202                 struct ldlm_extent *l_extent = &lock->l_policy_data.l_extent;
203
204                 /* We already hit the minimum requested size, search no more */
205                 if (new_ex->start == req_start && new_ex->end == req_end) {
206                         EXIT;
207                         return;
208                 }
209
210                 /* Don't conflict with ourselves */
211                 if (req == lock)
212                         continue;
213
214                 /* Locks are compatible, overlap doesn't matter */
215                 /* Until bug 20 is fixed, try to avoid granting overlapping
216                  * locks on one client (they take a long time to cancel) */
217                 if (lockmode_compat(lock->l_req_mode, req_mode) &&
218                     lock->l_export != req->l_export)
219                         continue;
220
221                 /* If this is a high-traffic lock, don't grow downwards at all
222                  * or grow upwards too much */
223                 ++conflicting;
224                 if (conflicting > 4)
225                         new_ex->start = req_start;
226
227                 /* If lock doesn't overlap new_ex, skip it. */
228                 if (!ldlm_extent_overlap(l_extent, new_ex))
229                         continue;
230
231                 /* Locks conflicting in requested extents and we can't satisfy
232                  * both locks, so ignore it.  Either we will ping-pong this
233                  * extent (we would regardless of what extent we granted) or
234                  * lock is unused and it shouldn't limit our extent growth. */
235                 if (ldlm_extent_overlap(&lock->l_req_extent,&req->l_req_extent))
236                         continue;
237
238                 /* We grow extents downwards only as far as they don't overlap
239                  * with already-granted locks, on the assumption that clients
240                  * will be writing beyond the initial requested end and would
241                  * then need to enqueue a new lock beyond previous request.
242                  * l_req_extent->end strictly < req_start, checked above. */
243                 if (l_extent->start < req_start && new_ex->start != req_start) {
244                         if (l_extent->end >= req_start)
245                                 new_ex->start = req_start;
246                         else
247                                 new_ex->start = min(l_extent->end+1, req_start);
248                 }
249
250                 /* If we need to cancel this lock anyways because our request
251                  * overlaps the granted lock, we grow up to its requested
252                  * extent start instead of limiting this extent, assuming that
253                  * clients are writing forwards and the lock had over grown
254                  * its extent downwards before we enqueued our request. */
255                 if (l_extent->end > req_end) {
256                         if (l_extent->start <= req_end)
257                                 new_ex->end = max(lock->l_req_extent.start - 1,
258                                                   req_end);
259                         else
260                                 new_ex->end = max(l_extent->start - 1, req_end);
261                 }
262         }
263
264         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
265         EXIT;
266 }
267
268
269 /* In order to determine the largest possible extent we can grant, we need
270  * to scan all of the queues. */
271 static void ldlm_extent_policy(struct ldlm_resource *res,
272                                struct ldlm_lock *lock, __u64 *flags)
273 {
274         struct ldlm_extent new_ex = { .start = 0, .end = OBD_OBJECT_EOF };
275
276         if (lock->l_export == NULL)
277                 /*
278                  * this is local lock taken by server (e.g., as a part of
279                  * OST-side locking, or unlink handling). Expansion doesn't
280                  * make a lot of sense for local locks, because they are
281                  * dropped immediately on operation completion and would only
282                  * conflict with other threads.
283                  */
284                 return;
285
286         if (lock->l_policy_data.l_extent.start == 0 &&
287             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
288                 /* fast-path whole file locks */
289                 return;
290
291         ldlm_extent_internal_policy_granted(lock, &new_ex);
292         ldlm_extent_internal_policy_waiting(lock, &new_ex);
293
294         if (new_ex.start != lock->l_policy_data.l_extent.start ||
295             new_ex.end != lock->l_policy_data.l_extent.end) {
296                 *flags |= LDLM_FL_LOCK_CHANGED;
297                 lock->l_policy_data.l_extent.start = new_ex.start;
298                 lock->l_policy_data.l_extent.end = new_ex.end;
299         }
300 }
301
302 static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
303 {
304         struct ldlm_resource *res = lock->l_resource;
305         cfs_time_t now = cfs_time_current();
306
307         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_SET_CONTENTION))
308                 return 1;
309
310         CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
311         if (contended_locks > ldlm_res_to_ns(res)->ns_contended_locks)
312                 res->lr_contention_time = now;
313         return cfs_time_before(now, cfs_time_add(res->lr_contention_time,
314                 cfs_time_seconds(ldlm_res_to_ns(res)->ns_contention_time)));
315 }
316
317 struct ldlm_extent_compat_args {
318         struct list_head *work_list;
319         struct ldlm_lock *lock;
320         ldlm_mode_t mode;
321         int *locks;
322         int *compat;
323 };
324
325 static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
326                                                 void *data)
327 {
328         struct ldlm_extent_compat_args *priv = data;
329         struct ldlm_interval *node = to_ldlm_interval(n);
330         struct ldlm_extent *extent;
331         struct list_head *work_list = priv->work_list;
332         struct ldlm_lock *lock, *enq = priv->lock;
333         ldlm_mode_t mode = priv->mode;
334         int count = 0;
335         ENTRY;
336
337         LASSERT(!list_empty(&node->li_group));
338
339         list_for_each_entry(lock, &node->li_group, l_sl_policy) {
340                 /* interval tree is for granted lock */
341                 LASSERTF(mode == lock->l_granted_mode,
342                          "mode = %s, lock->l_granted_mode = %s\n",
343                          ldlm_lockname[mode],
344                          ldlm_lockname[lock->l_granted_mode]);
345                 count++;
346                 if (lock->l_blocking_ast)
347                         ldlm_add_ast_work_item(lock, enq, work_list);
348         }
349
350         /* don't count conflicting glimpse locks */
351         extent = ldlm_interval_extent(node);
352         if (!(mode == LCK_PR &&
353             extent->start == 0 && extent->end == OBD_OBJECT_EOF))
354                 *priv->locks += count;
355
356         if (priv->compat)
357                 *priv->compat = 0;
358
359         RETURN(INTERVAL_ITER_CONT);
360 }
361
362 /**
363  * Determine if the lock is compatible with all locks on the queue.
364  *
365  * If \a work_list is provided, conflicting locks are linked there.
366  * If \a work_list is not provided, we exit this function on first conflict.
367  *
368  * \retval 0 if the lock is not compatible
369  * \retval 1 if the lock is compatible
370  * \retval 2 if \a req is a group lock and it is compatible and requires
371  *           no further checking
372  * \retval negative error, such as EWOULDBLOCK for group locks
373  */
374 static int
375 ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
376                          __u64 *flags, ldlm_error_t *err,
377                          struct list_head *work_list, int *contended_locks)
378 {
379         struct ldlm_resource *res = req->l_resource;
380         ldlm_mode_t req_mode = req->l_req_mode;
381         __u64 req_start = req->l_req_extent.start;
382         __u64 req_end = req->l_req_extent.end;
383         struct ldlm_lock *lock;
384         int check_contention;
385         int compat = 1;
386         int scan = 0;
387         ENTRY;
388
389         lockmode_verify(req_mode);
390
391         /* Using interval tree for granted lock */
392         if (queue == &res->lr_granted) {
393                 struct ldlm_interval_tree *tree;
394                 struct ldlm_extent_compat_args data = {.work_list = work_list,
395                                                .lock = req,
396                                                .locks = contended_locks,
397                                                .compat = &compat };
398                 struct interval_node_extent ex = { .start = req_start,
399                                                    .end = req_end };
400                 int idx, rc;
401
402                 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
403                         tree = &res->lr_itree[idx];
404                         if (tree->lit_root == NULL) /* empty tree, skipped */
405                                 continue;
406
407                         data.mode = tree->lit_mode;
408                         if (lockmode_compat(req_mode, tree->lit_mode)) {
409                                 struct ldlm_interval *node;
410                                 struct ldlm_extent *extent;
411
412                                 if (req_mode != LCK_GROUP)
413                                         continue;
414
415                                 /* group lock, grant it immediately if
416                                  * compatible */
417                                 node = to_ldlm_interval(tree->lit_root);
418                                 extent = ldlm_interval_extent(node);
419                                 if (req->l_policy_data.l_extent.gid ==
420                                     extent->gid)
421                                         RETURN(2);
422                         }
423
424                         if (tree->lit_mode == LCK_GROUP) {
425                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
426                                         compat = -EWOULDBLOCK;
427                                         goto destroylock;
428                                 }
429
430                                 *flags |= LDLM_FL_NO_TIMEOUT;
431                                 if (!work_list)
432                                         RETURN(0);
433
434                                 /* if work list is not NULL,add all
435                                    locks in the tree to work list */
436                                 compat = 0;
437                                 interval_iterate(tree->lit_root,
438                                                  ldlm_extent_compat_cb, &data);
439                                 continue;
440                         }
441
442                         if (!work_list) {
443                                 rc = interval_is_overlapped(tree->lit_root,&ex);
444                                 if (rc)
445                                         RETURN(0);
446                         } else {
447                                 interval_search(tree->lit_root, &ex,
448                                                 ldlm_extent_compat_cb, &data);
449                                 if (!list_empty(work_list) && compat)
450                                         compat = 0;
451                         }
452                 }
453         } else { /* for waiting queue */
454                 list_for_each_entry(lock, queue, l_res_link) {
455                         check_contention = 1;
456
457                         /* We stop walking the queue if we hit ourselves so
458                          * we don't take conflicting locks enqueued after us
459                          * into account, or we'd wait forever. */
460                         if (req == lock)
461                                 break;
462
463                         if (unlikely(scan)) {
464                                 /* We only get here if we are queuing GROUP lock
465                                    and met some incompatible one. The main idea of this
466                                    code is to insert GROUP lock past compatible GROUP
467                                    lock in the waiting queue or if there is not any,
468                                    then in front of first non-GROUP lock */
469                                 if (lock->l_req_mode != LCK_GROUP) {
470                                         /* Ok, we hit non-GROUP lock, there should
471                                          * be no more GROUP locks later on, queue in
472                                          * front of first non-GROUP lock */
473
474                                         ldlm_resource_insert_lock_after(lock, req);
475                                         list_del_init(&lock->l_res_link);
476                                         ldlm_resource_insert_lock_after(req, lock);
477                                         compat = 0;
478                                         break;
479                                 }
480                                 if (req->l_policy_data.l_extent.gid ==
481                                     lock->l_policy_data.l_extent.gid) {
482                                         /* found it */
483                                         ldlm_resource_insert_lock_after(lock, req);
484                                         compat = 0;
485                                         break;
486                                 }
487                                 continue;
488                         }
489
490                         /* locks are compatible, overlap doesn't matter */
491                         if (lockmode_compat(lock->l_req_mode, req_mode)) {
492                                 if (req_mode == LCK_PR &&
493                                     ((lock->l_policy_data.l_extent.start <=
494                                       req->l_policy_data.l_extent.start) &&
495                                      (lock->l_policy_data.l_extent.end >=
496                                       req->l_policy_data.l_extent.end))) {
497                                         /* If we met a PR lock just like us or
498                                            wider, and nobody down the list
499                                            conflicted with it, that means we
500                                            can skip processing of the rest of
501                                            the list and safely place ourselves
502                                            at the end of the list, or grant
503                                            (dependent if we met an conflicting
504                                            locks before in the list).  In case
505                                            of 1st enqueue only we continue
506                                            traversing if there is something
507                                            conflicting down the list because
508                                            we need to make sure that something
509                                            is marked as AST_SENT as well, in
510                                            cse of empy worklist we would exit
511                                            on first conflict met. */
512                                         /* There IS a case where such flag is
513                                            not set for a lock, yet it blocks
514                                            something. Luckily for us this is
515                                            only during destroy, so lock is
516                                            exclusive. So here we are safe */
517                                         if (!ldlm_is_ast_sent(lock))
518                                                 RETURN(compat);
519                                 }
520
521                                 /* non-group locks are compatible, overlap doesn't
522                                    matter */
523                                 if (likely(req_mode != LCK_GROUP))
524                                         continue;
525
526                                 /* If we are trying to get a GROUP lock and there is
527                                    another one of this kind, we need to compare gid */
528                                 if (req->l_policy_data.l_extent.gid ==
529                                     lock->l_policy_data.l_extent.gid) {
530                                         /* If existing lock with matched gid is granted,
531                                            we grant new one too. */
532                                         if (lock->l_req_mode == lock->l_granted_mode)
533                                                 RETURN(2);
534
535                                         /* Otherwise we are scanning queue of waiting
536                                          * locks and it means current request would
537                                          * block along with existing lock (that is
538                                          * already blocked.
539                                          * If we are in nonblocking mode - return
540                                          * immediately */
541                                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
542                                                 compat = -EWOULDBLOCK;
543                                                 goto destroylock;
544                                         }
545                                         /* If this group lock is compatible with another
546                                          * group lock on the waiting list, they must be
547                                          * together in the list, so they can be granted
548                                          * at the same time.  Otherwise the later lock
549                                          * can get stuck behind another, incompatible,
550                                          * lock. */
551                                         ldlm_resource_insert_lock_after(lock, req);
552                                         /* Because 'lock' is not granted, we can stop
553                                          * processing this queue and return immediately.
554                                          * There is no need to check the rest of the
555                                          * list. */
556                                         RETURN(0);
557                                 }
558                         }
559
560                         if (unlikely(req_mode == LCK_GROUP &&
561                                      (lock->l_req_mode != lock->l_granted_mode))) {
562                                 scan = 1;
563                                 compat = 0;
564                                 if (lock->l_req_mode != LCK_GROUP) {
565                                         /* Ok, we hit non-GROUP lock, there should be no
566                                            more GROUP locks later on, queue in front of
567                                            first non-GROUP lock */
568
569                                         ldlm_resource_insert_lock_after(lock, req);
570                                         list_del_init(&lock->l_res_link);
571                                         ldlm_resource_insert_lock_after(req, lock);
572                                         break;
573                                 }
574                                 if (req->l_policy_data.l_extent.gid ==
575                                     lock->l_policy_data.l_extent.gid) {
576                                         /* found it */
577                                         ldlm_resource_insert_lock_after(lock, req);
578                                         break;
579                                 }
580                                 continue;
581                         }
582
583                         if (unlikely(lock->l_req_mode == LCK_GROUP)) {
584                                 /* If compared lock is GROUP, then requested is PR/PW/
585                                  * so this is not compatible; extent range does not
586                                  * matter */
587                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
588                                         compat = -EWOULDBLOCK;
589                                         goto destroylock;
590                                 } else {
591                                         *flags |= LDLM_FL_NO_TIMEOUT;
592                                 }
593                         } else if (lock->l_policy_data.l_extent.end < req_start ||
594                                    lock->l_policy_data.l_extent.start > req_end) {
595                                 /* if a non group lock doesn't overlap skip it */
596                                 continue;
597                         } else if (lock->l_req_extent.end < req_start ||
598                                    lock->l_req_extent.start > req_end) {
599                                 /* false contention, the requests doesn't really overlap */
600                                 check_contention = 0;
601                         }
602
603                         if (!work_list)
604                                 RETURN(0);
605
606                         /* don't count conflicting glimpse locks */
607                         if (lock->l_req_mode == LCK_PR &&
608                             lock->l_policy_data.l_extent.start == 0 &&
609                             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
610                                 check_contention = 0;
611
612                         *contended_locks += check_contention;
613
614                         compat = 0;
615                         if (lock->l_blocking_ast)
616                                 ldlm_add_ast_work_item(lock, req, work_list);
617                 }
618         }
619
620         if (ldlm_check_contention(req, *contended_locks) &&
621             compat == 0 &&
622             (*flags & LDLM_FL_DENY_ON_CONTENTION) &&
623             req->l_req_mode != LCK_GROUP &&
624             req_end - req_start <=
625             ldlm_res_to_ns(req->l_resource)->ns_max_nolock_size)
626                 GOTO(destroylock, compat = -EUSERS);
627
628         RETURN(compat);
629 destroylock:
630         list_del_init(&req->l_res_link);
631         ldlm_lock_destroy_nolock(req);
632         *err = compat;
633         RETURN(compat);
634 }
635
636 /**
637  * Discard all AST work items from list.
638  *
639  * If for whatever reason we do not want to send ASTs to conflicting locks
640  * anymore, disassemble the list with this function.
641  */
642 static void discard_bl_list(struct list_head *bl_list)
643 {
644         struct list_head *tmp, *pos;
645         ENTRY;
646
647         list_for_each_safe(pos, tmp, bl_list) {
648                 struct ldlm_lock *lock =
649                         list_entry(pos, struct ldlm_lock, l_bl_ast);
650
651                 list_del_init(&lock->l_bl_ast);
652                 LASSERT(ldlm_is_ast_sent(lock));
653                 ldlm_clear_ast_sent(lock);
654                 LASSERT(lock->l_bl_ast_run == 0);
655                 LASSERT(lock->l_blocking_lock);
656                 LDLM_LOCK_RELEASE(lock->l_blocking_lock);
657                 lock->l_blocking_lock = NULL;
658                 LDLM_LOCK_RELEASE(lock);
659         }
660         EXIT;
661 }
662
663 /**
664  * Process a granting attempt for extent lock.
665  * Must be called with ns lock held.
666  *
667  * This function looks for any conflicts for \a lock in the granted or
668  * waiting queues. The lock is granted if no conflicts are found in
669  * either queue.
670  *
671  * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
672  *   - blocking ASTs have already been sent
673  *
674  * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
675  *   - blocking ASTs have not been sent yet, so list of conflicting locks
676  *     would be collected and ASTs sent.
677  */
678 int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags,
679                              int first_enq, ldlm_error_t *err,
680                              struct list_head *work_list)
681 {
682         struct ldlm_resource *res = lock->l_resource;
683         struct list_head rpc_list;
684         int rc, rc2;
685         int contended_locks = 0;
686         ENTRY;
687
688         LASSERT(lock->l_granted_mode != lock->l_req_mode);
689         LASSERT(list_empty(&res->lr_converting));
690         LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
691                 !ldlm_is_ast_discard_data(lock));
692         INIT_LIST_HEAD(&rpc_list);
693         check_res_locked(res);
694         *err = ELDLM_OK;
695
696         if (!first_enq) {
697                 /* Careful observers will note that we don't handle -EWOULDBLOCK
698                  * here, but it's ok for a non-obvious reason -- compat_queue
699                  * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
700                  * flags should always be zero here, and if that ever stops
701                  * being true, we want to find out. */
702                 LASSERT(*flags == 0);
703                 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
704                                               err, NULL, &contended_locks);
705                 if (rc == 1) {
706                         rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
707                                                       flags, err, NULL,
708                                                       &contended_locks);
709                 }
710                 if (rc == 0)
711                         RETURN(LDLM_ITER_STOP);
712
713                 ldlm_resource_unlink_lock(lock);
714
715                 if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE))
716                         ldlm_extent_policy(res, lock, flags);
717                 ldlm_grant_lock(lock, work_list);
718                 RETURN(LDLM_ITER_CONTINUE);
719         }
720
721  restart:
722         contended_locks = 0;
723         rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
724                                       &rpc_list, &contended_locks);
725         if (rc < 0)
726                 GOTO(out, rc); /* lock was destroyed */
727         if (rc == 2)
728                 goto grant;
729
730         rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err,
731                                        &rpc_list, &contended_locks);
732         if (rc2 < 0)
733                 GOTO(out, rc = rc2); /* lock was destroyed */
734
735         if (rc + rc2 == 2) {
736         grant:
737                 ldlm_extent_policy(res, lock, flags);
738                 ldlm_resource_unlink_lock(lock);
739                 ldlm_grant_lock(lock, NULL);
740         } else {
741                 /* If either of the compat_queue()s returned failure, then we
742                  * have ASTs to send and must go onto the waiting list.
743                  *
744                  * bug 2322: we used to unlink and re-add here, which was a
745                  * terrible folly -- if we goto restart, we could get
746                  * re-ordered!  Causes deadlock, because ASTs aren't sent! */
747                 if (list_empty(&lock->l_res_link))
748                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
749                 unlock_res(res);
750                 rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
751                                        LDLM_WORK_BL_AST);
752
753                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_FAIL_RACE) &&
754                     !ns_is_client(ldlm_res_to_ns(res)))
755                         class_fail_export(lock->l_export);
756
757                 lock_res(res);
758                 if (rc == -ERESTART) {
759                         /* 15715: The lock was granted and destroyed after
760                          * resource lock was dropped. Interval node was freed
761                          * in ldlm_lock_destroy. Anyway, this always happens
762                          * when a client is being evicted. So it would be
763                          * ok to return an error. -jay */
764                         if (ldlm_is_destroyed(lock)) {
765                                 *err = -EAGAIN;
766                                 GOTO(out, rc = -EAGAIN);
767                         }
768
769                         /* lock was granted while resource was unlocked. */
770                         if (lock->l_granted_mode == lock->l_req_mode) {
771                                 /* bug 11300: if the lock has been granted,
772                                  * break earlier because otherwise, we will go
773                                  * to restart and ldlm_resource_unlink will be
774                                  * called and it causes the interval node to be
775                                  * freed. Then we will fail at
776                                  * ldlm_extent_add_lock() */
777                                 *flags &= ~LDLM_FL_BLOCKED_MASK;
778                                 GOTO(out, rc = 0);
779                         }
780
781                         GOTO(restart, rc);
782                 }
783
784                 /* this way we force client to wait for the lock
785                  * endlessly once the lock is enqueued -bzzz */
786                 *flags |= LDLM_FL_BLOCK_GRANTED | LDLM_FL_NO_TIMEOUT;
787
788         }
789         RETURN(0);
790 out:
791         if (!list_empty(&rpc_list)) {
792                 LASSERT(!ldlm_is_ast_discard_data(lock));
793                 discard_bl_list(&rpc_list);
794         }
795         RETURN(rc);
796 }
797 #endif /* HAVE_SERVER_SUPPORT */
798
799 /* When a lock is cancelled by a client, the KMS may undergo change if this
800  * is the "highest lock".  This function returns the new KMS value.
801  * Caller must hold lr_lock already.
802  *
803  * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes! */
804 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
805 {
806         struct ldlm_resource *res = lock->l_resource;
807         struct list_head *tmp;
808         struct ldlm_lock *lck;
809         __u64 kms = 0;
810         ENTRY;
811
812         /* don't let another thread in ldlm_extent_shift_kms race in
813          * just after we finish and take our lock into account in its
814          * calculation of the kms */
815         ldlm_set_kms_ignore(lock);
816
817         list_for_each(tmp, &res->lr_granted) {
818                 lck = list_entry(tmp, struct ldlm_lock, l_res_link);
819
820                 if (ldlm_is_kms_ignore(lck))
821                         continue;
822
823                 if (lck->l_policy_data.l_extent.end >= old_kms)
824                         RETURN(old_kms);
825
826                 /* This extent _has_ to be smaller than old_kms (checked above)
827                  * so kms can only ever be smaller or the same as old_kms. */
828                 if (lck->l_policy_data.l_extent.end + 1 > kms)
829                         kms = lck->l_policy_data.l_extent.end + 1;
830         }
831         LASSERTF(kms <= old_kms, "kms "LPU64" old_kms "LPU64"\n", kms, old_kms);
832
833         RETURN(kms);
834 }
835 EXPORT_SYMBOL(ldlm_extent_shift_kms);
836
837 struct kmem_cache *ldlm_interval_slab;
838 struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock)
839 {
840         struct ldlm_interval *node;
841         ENTRY;
842
843         LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
844         OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS);
845         if (node == NULL)
846                 RETURN(NULL);
847
848         INIT_LIST_HEAD(&node->li_group);
849         ldlm_interval_attach(node, lock);
850         RETURN(node);
851 }
852
853 void ldlm_interval_free(struct ldlm_interval *node)
854 {
855         if (node) {
856                 LASSERT(list_empty(&node->li_group));
857                 LASSERT(!interval_is_intree(&node->li_node));
858                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
859         }
860 }
861
862 /* interval tree, for LDLM_EXTENT. */
863 void ldlm_interval_attach(struct ldlm_interval *n,
864                           struct ldlm_lock *l)
865 {
866         LASSERT(l->l_tree_node == NULL);
867         LASSERT(l->l_resource->lr_type == LDLM_EXTENT);
868
869         list_add_tail(&l->l_sl_policy, &n->li_group);
870         l->l_tree_node = n;
871 }
872
873 struct ldlm_interval *ldlm_interval_detach(struct ldlm_lock *l)
874 {
875         struct ldlm_interval *n = l->l_tree_node;
876
877         if (n == NULL)
878                 return NULL;
879
880         LASSERT(!list_empty(&n->li_group));
881         l->l_tree_node = NULL;
882         list_del_init(&l->l_sl_policy);
883
884         return list_empty(&n->li_group) ? n : NULL;
885 }
886
887 static inline int lock_mode_to_index(ldlm_mode_t mode)
888 {
889         int index;
890
891         LASSERT(mode != 0);
892         LASSERT(IS_PO2(mode));
893         for (index = -1; mode; index++, mode >>= 1) ;
894         LASSERT(index < LCK_MODE_NUM);
895         return index;
896 }
897
898 /** Add newly granted lock into interval tree for the resource. */
899 void ldlm_extent_add_lock(struct ldlm_resource *res,
900                           struct ldlm_lock *lock)
901 {
902         struct interval_node *found, **root;
903         struct ldlm_interval *node;
904         struct ldlm_extent *extent;
905         int idx;
906
907         LASSERT(lock->l_granted_mode == lock->l_req_mode);
908
909         node = lock->l_tree_node;
910         LASSERT(node != NULL);
911         LASSERT(!interval_is_intree(&node->li_node));
912
913         idx = lock_mode_to_index(lock->l_granted_mode);
914         LASSERT(lock->l_granted_mode == 1 << idx);
915         LASSERT(lock->l_granted_mode == res->lr_itree[idx].lit_mode);
916
917         /* node extent initialize */
918         extent = &lock->l_policy_data.l_extent;
919         interval_set(&node->li_node, extent->start, extent->end);
920
921         root = &res->lr_itree[idx].lit_root;
922         found = interval_insert(&node->li_node, root);
923         if (found) { /* The policy group found. */
924                 struct ldlm_interval *tmp = ldlm_interval_detach(lock);
925                 LASSERT(tmp != NULL);
926                 ldlm_interval_free(tmp);
927                 ldlm_interval_attach(to_ldlm_interval(found), lock);
928         }
929         res->lr_itree[idx].lit_size++;
930
931         /* even though we use interval tree to manage the extent lock, we also
932          * add the locks into grant list, for debug purpose, .. */
933         ldlm_resource_add_lock(res, &res->lr_granted, lock);
934 }
935
936 /** Remove cancelled lock from resource interval tree. */
937 void ldlm_extent_unlink_lock(struct ldlm_lock *lock)
938 {
939         struct ldlm_resource *res = lock->l_resource;
940         struct ldlm_interval *node = lock->l_tree_node;
941         struct ldlm_interval_tree *tree;
942         int idx;
943
944         if (!node || !interval_is_intree(&node->li_node)) /* duplicate unlink */
945                 return;
946
947         idx = lock_mode_to_index(lock->l_granted_mode);
948         LASSERT(lock->l_granted_mode == 1 << idx);
949         tree = &res->lr_itree[idx];
950
951         LASSERT(tree->lit_root != NULL); /* assure the tree is not null */
952
953         tree->lit_size--;
954         node = ldlm_interval_detach(lock);
955         if (node) {
956                 interval_erase(&node->li_node, &tree->lit_root);
957                 ldlm_interval_free(node);
958         }
959 }
960
961 void ldlm_extent_policy_wire_to_local(const ldlm_wire_policy_data_t *wpolicy,
962                                      ldlm_policy_data_t *lpolicy)
963 {
964         memset(lpolicy, 0, sizeof(*lpolicy));
965         lpolicy->l_extent.start = wpolicy->l_extent.start;
966         lpolicy->l_extent.end = wpolicy->l_extent.end;
967         lpolicy->l_extent.gid = wpolicy->l_extent.gid;
968 }
969
970 void ldlm_extent_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
971                                      ldlm_wire_policy_data_t *wpolicy)
972 {
973         memset(wpolicy, 0, sizeof(*wpolicy));
974         wpolicy->l_extent.start = lpolicy->l_extent.start;
975         wpolicy->l_extent.end = lpolicy->l_extent.end;
976         wpolicy->l_extent.gid = lpolicy->l_extent.gid;
977 }
978