Whamcloud - gitweb
b=21452 kABI tracking
[fs/lustre-release.git] / lustre / ldlm / ldlm_extent.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_extent.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LDLM
43 #ifndef __KERNEL__
44 # include <liblustre.h>
45 #else
46 # include <libcfs/libcfs.h>
47 #endif
48
49 #include <lustre_dlm.h>
50 #include <obd_support.h>
51 #include <obd.h>
52 #include <obd_class.h>
53 #include <lustre_lib.h>
54
55 #include "ldlm_internal.h"
56
57 #define LDLM_MAX_GROWN_EXTENT (32 * 1024 * 1024 - 1)
58
59 /* fixup the ldlm_extent after expanding */
60 static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
61                                               struct ldlm_extent *new_ex,
62                                               int conflicting)
63 {
64         ldlm_mode_t req_mode = req->l_req_mode;
65         __u64 req_start = req->l_req_extent.start;
66         __u64 req_end = req->l_req_extent.end;
67         __u64 req_align, mask;
68
69         if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) {
70                 if (req_end < req_start + LDLM_MAX_GROWN_EXTENT)
71                         new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT,
72                                           new_ex->end);
73         }
74
75         if (new_ex->start == 0 && new_ex->end == OBD_OBJECT_EOF) {
76                 EXIT;
77                 return;
78         }
79
80         /* we need to ensure that the lock extent is properly aligned to what
81          * the client requested.  We align it to the lowest-common denominator
82          * of the clients requested lock start and end alignment. */
83         mask = 0x1000ULL;
84         req_align = (req_end + 1) | req_start;
85         if (req_align != 0) {
86                 while ((req_align & mask) == 0)
87                         mask <<= 1;
88         }
89         mask -= 1;
90         /* We can only shrink the lock, not grow it.
91          * This should never cause lock to be smaller than requested,
92          * since requested lock was already aligned on these boundaries. */
93         new_ex->start = ((new_ex->start - 1) | mask) + 1;
94         new_ex->end = ((new_ex->end + 1) & ~mask) - 1;
95         LASSERTF(new_ex->start <= req_start,
96                  "mask "LPX64" grant start "LPU64" req start "LPU64"\n",
97                  mask, new_ex->start, req_start);
98         LASSERTF(new_ex->end >= req_end,
99                  "mask "LPX64" grant end "LPU64" req end "LPU64"\n",
100                  mask, new_ex->end, req_end);
101 }
102
103 /* The purpose of this function is to return:
104  * - the maximum extent
105  * - containing the requested extent
106  * - and not overlapping existing conflicting extents outside the requested one
107  *
108  * Use interval tree to expand the lock extent for granted lock.
109  */
110 static void ldlm_extent_internal_policy_granted(struct ldlm_lock *req,
111                                                 struct ldlm_extent *new_ex)
112 {
113         struct ldlm_resource *res = req->l_resource;
114         ldlm_mode_t req_mode = req->l_req_mode;
115         __u64 req_start = req->l_req_extent.start;
116         __u64 req_end = req->l_req_extent.end;
117         struct ldlm_interval_tree *tree;
118         struct interval_node_extent limiter = { new_ex->start, new_ex->end };
119         int conflicting = 0;
120         int idx;
121         ENTRY;
122
123         lockmode_verify(req_mode);
124
125         /* using interval tree to handle the ldlm extent granted locks */
126         for (idx = 0; idx < LCK_MODE_NUM; idx++) {
127                 struct interval_node_extent ext = { req_start, req_end };
128
129                 tree = &res->lr_itree[idx];
130                 if (lockmode_compat(tree->lit_mode, req_mode))
131                         continue;
132
133                 conflicting += tree->lit_size;
134                 if (conflicting > 4)
135                         limiter.start = req_start;
136
137                 if (interval_is_overlapped(tree->lit_root, &ext))
138                         CDEBUG(D_INFO, 
139                                "req_mode = %d, tree->lit_mode = %d, "
140                                "tree->lit_size = %d\n",
141                                req_mode, tree->lit_mode, tree->lit_size);
142                 interval_expand(tree->lit_root, &ext, &limiter);
143                 limiter.start = max(limiter.start, ext.start);
144                 limiter.end = min(limiter.end, ext.end);
145                 if (limiter.start == req_start && limiter.end == req_end)
146                         break;
147         }
148
149         new_ex->start = limiter.start;
150         new_ex->end = limiter.end;
151         LASSERT(new_ex->start <= req_start);
152         LASSERT(new_ex->end >= req_end);
153
154         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
155         EXIT;
156 }
157
158 /* The purpose of this function is to return:
159  * - the maximum extent
160  * - containing the requested extent
161  * - and not overlapping existing conflicting extents outside the requested one
162  */
163 static void
164 ldlm_extent_internal_policy_waiting(struct ldlm_lock *req,
165                                     struct ldlm_extent *new_ex)
166 {
167         cfs_list_t *tmp;
168         struct ldlm_resource *res = req->l_resource;
169         ldlm_mode_t req_mode = req->l_req_mode;
170         __u64 req_start = req->l_req_extent.start;
171         __u64 req_end = req->l_req_extent.end;
172         int conflicting = 0;
173         ENTRY;
174
175         lockmode_verify(req_mode);
176
177         /* for waiting locks */
178         cfs_list_for_each(tmp, &res->lr_waiting) {
179                 struct ldlm_lock *lock;
180                 struct ldlm_extent *l_extent;
181
182                 lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
183                 l_extent = &lock->l_policy_data.l_extent;
184
185                 /* We already hit the minimum requested size, search no more */
186                 if (new_ex->start == req_start && new_ex->end == req_end) {
187                         EXIT;
188                         return;
189                 }
190
191                 /* Don't conflict with ourselves */
192                 if (req == lock)
193                         continue;
194
195                 /* Locks are compatible, overlap doesn't matter */
196                 /* Until bug 20 is fixed, try to avoid granting overlapping
197                  * locks on one client (they take a long time to cancel) */
198                 if (lockmode_compat(lock->l_req_mode, req_mode) &&
199                     lock->l_export != req->l_export)
200                         continue;
201
202                 /* If this is a high-traffic lock, don't grow downwards at all
203                  * or grow upwards too much */
204                 ++conflicting;
205                 if (conflicting > 4)
206                         new_ex->start = req_start;
207
208                 /* If lock doesn't overlap new_ex, skip it. */
209                 if (!ldlm_extent_overlap(l_extent, new_ex))
210                         continue;
211
212                 /* Locks conflicting in requested extents and we can't satisfy
213                  * both locks, so ignore it.  Either we will ping-pong this
214                  * extent (we would regardless of what extent we granted) or
215                  * lock is unused and it shouldn't limit our extent growth. */
216                 if (ldlm_extent_overlap(&lock->l_req_extent,&req->l_req_extent))
217                         continue;
218
219                 /* We grow extents downwards only as far as they don't overlap
220                  * with already-granted locks, on the assumption that clients
221                  * will be writing beyond the initial requested end and would
222                  * then need to enqueue a new lock beyond previous request.
223                  * l_req_extent->end strictly < req_start, checked above. */
224                 if (l_extent->start < req_start && new_ex->start != req_start) {
225                         if (l_extent->end >= req_start)
226                                 new_ex->start = req_start;
227                         else
228                                 new_ex->start = min(l_extent->end+1, req_start);
229                 }
230
231                 /* If we need to cancel this lock anyways because our request
232                  * overlaps the granted lock, we grow up to its requested
233                  * extent start instead of limiting this extent, assuming that
234                  * clients are writing forwards and the lock had over grown
235                  * its extent downwards before we enqueued our request. */
236                 if (l_extent->end > req_end) {
237                         if (l_extent->start <= req_end)
238                                 new_ex->end = max(lock->l_req_extent.start - 1,
239                                                   req_end);
240                         else
241                                 new_ex->end = max(l_extent->start - 1, req_end);
242                 }
243         }
244
245         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
246         EXIT;
247 }
248
249
250 /* In order to determine the largest possible extent we can grant, we need
251  * to scan all of the queues. */
252 static void ldlm_extent_policy(struct ldlm_resource *res,
253                                struct ldlm_lock *lock, int *flags)
254 {
255         struct ldlm_extent new_ex = { .start = 0, .end = OBD_OBJECT_EOF };
256
257         if (lock->l_export == NULL)
258                 /*
259                  * this is local lock taken by server (e.g., as a part of
260                  * OST-side locking, or unlink handling). Expansion doesn't
261                  * make a lot of sense for local locks, because they are
262                  * dropped immediately on operation completion and would only
263                  * conflict with other threads.
264                  */
265                 return;
266
267         if (lock->l_policy_data.l_extent.start == 0 &&
268             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
269                 /* fast-path whole file locks */
270                 return;
271
272         ldlm_extent_internal_policy_granted(lock, &new_ex);
273         ldlm_extent_internal_policy_waiting(lock, &new_ex);
274
275         if (new_ex.start != lock->l_policy_data.l_extent.start ||
276             new_ex.end != lock->l_policy_data.l_extent.end) {
277                 *flags |= LDLM_FL_LOCK_CHANGED;
278                 lock->l_policy_data.l_extent.start = new_ex.start;
279                 lock->l_policy_data.l_extent.end = new_ex.end;
280         }
281 }
282
283 static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
284 {
285         struct ldlm_resource *res = lock->l_resource;
286         cfs_time_t now = cfs_time_current();
287
288         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_SET_CONTENTION))
289                 return 1;
290
291         CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
292         if (contended_locks > res->lr_namespace->ns_contended_locks)
293                 res->lr_contention_time = now;
294         return cfs_time_before(now, cfs_time_add(res->lr_contention_time,
295                 cfs_time_seconds(res->lr_namespace->ns_contention_time)));
296 }
297
298 struct ldlm_extent_compat_args {
299         cfs_list_t *work_list;
300         struct ldlm_lock *lock;
301         ldlm_mode_t mode;
302         int *locks;
303         int *compat;
304 };
305
306 static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
307                                                 void *data)
308 {
309         struct ldlm_extent_compat_args *priv = data;
310         struct ldlm_interval *node = to_ldlm_interval(n);
311         struct ldlm_extent *extent;
312         cfs_list_t *work_list = priv->work_list;
313         struct ldlm_lock *lock, *enq = priv->lock;
314         ldlm_mode_t mode = priv->mode;
315         int count = 0;
316         ENTRY;
317
318         LASSERT(!cfs_list_empty(&node->li_group));
319
320         cfs_list_for_each_entry(lock, &node->li_group, l_sl_policy) {
321                 /* interval tree is for granted lock */
322                 LASSERTF(mode == lock->l_granted_mode,
323                          "mode = %s, lock->l_granted_mode = %s\n",
324                          ldlm_lockname[mode],
325                          ldlm_lockname[lock->l_granted_mode]);
326                 count++;
327                 if (lock->l_blocking_ast)
328                         ldlm_add_ast_work_item(lock, enq, work_list);
329         }
330
331         /* don't count conflicting glimpse locks */
332         extent = ldlm_interval_extent(node);
333         if (!(mode == LCK_PR &&
334             extent->start == 0 && extent->end == OBD_OBJECT_EOF))
335                 *priv->locks += count;
336
337         if (priv->compat)
338                 *priv->compat = 0;
339
340         RETURN(INTERVAL_ITER_CONT);
341 }
342
343 /* Determine if the lock is compatible with all locks on the queue.
344  * We stop walking the queue if we hit ourselves so we don't take
345  * conflicting locks enqueued after us into accound, or we'd wait forever.
346  *
347  * 0 if the lock is not compatible
348  * 1 if the lock is compatible
349  * 2 if this group lock is compatible and requires no further checking
350  * negative error, such as EWOULDBLOCK for group locks
351  */
352 static int
353 ldlm_extent_compat_queue(cfs_list_t *queue, struct ldlm_lock *req,
354                          int *flags, ldlm_error_t *err,
355                          cfs_list_t *work_list, int *contended_locks)
356 {
357         cfs_list_t *tmp;
358         struct ldlm_lock *lock;
359         struct ldlm_resource *res = req->l_resource;
360         ldlm_mode_t req_mode = req->l_req_mode;
361         __u64 req_start = req->l_req_extent.start;
362         __u64 req_end = req->l_req_extent.end;
363         int compat = 1;
364         int scan = 0;
365         int check_contention;
366         ENTRY;
367
368         lockmode_verify(req_mode);
369
370         /* Using interval tree for granted lock */
371         if (queue == &res->lr_granted) {
372                 struct ldlm_interval_tree *tree;
373                 struct ldlm_extent_compat_args data = {.work_list = work_list,
374                                                .lock = req,
375                                                .locks = contended_locks,
376                                                .compat = &compat };
377                 struct interval_node_extent ex = { .start = req_start,
378                                                    .end = req_end };
379                 int idx, rc;
380
381                 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
382                         tree = &res->lr_itree[idx];
383                         if (tree->lit_root == NULL) /* empty tree, skipped */
384                                 continue;
385
386                         data.mode = tree->lit_mode;
387                         if (lockmode_compat(req_mode, tree->lit_mode)) {
388                                 struct ldlm_interval *node;
389                                 struct ldlm_extent *extent;
390
391                                 if (req_mode != LCK_GROUP)
392                                         continue;
393
394                                 /* group lock, grant it immediately if
395                                  * compatible */
396                                 node = to_ldlm_interval(tree->lit_root);
397                                 extent = ldlm_interval_extent(node);
398                                 if (req->l_policy_data.l_extent.gid ==
399                                     extent->gid)
400                                         RETURN(2);
401                         }
402
403                         if (tree->lit_mode == LCK_GROUP) {
404                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
405                                         compat = -EWOULDBLOCK;
406                                         goto destroylock;
407                                 }
408
409                                 *flags |= LDLM_FL_NO_TIMEOUT;
410                                 if (!work_list)
411                                         RETURN(0);
412
413                                 /* if work list is not NULL,add all
414                                    locks in the tree to work list */
415                                 compat = 0;
416                                 interval_iterate(tree->lit_root,
417                                                  ldlm_extent_compat_cb, &data);
418                                 continue;
419                         }
420
421                         if (!work_list) {
422                                 rc = interval_is_overlapped(tree->lit_root,&ex);
423                                 if (rc)
424                                         RETURN(0);
425                         } else {
426                                 interval_search(tree->lit_root, &ex,
427                                                 ldlm_extent_compat_cb, &data);
428                                 if (!cfs_list_empty(work_list) && compat)
429                                         compat = 0;
430                         }
431                 }
432         } else { /* for waiting queue */
433                 cfs_list_for_each(tmp, queue) {
434                         check_contention = 1;
435
436                         lock = cfs_list_entry(tmp, struct ldlm_lock,
437                                               l_res_link);
438
439                         if (req == lock)
440                                 break;
441
442                         if (unlikely(scan)) {
443                                 /* We only get here if we are queuing GROUP lock
444                                    and met some incompatible one. The main idea of this
445                                    code is to insert GROUP lock past compatible GROUP
446                                    lock in the waiting queue or if there is not any,
447                                    then in front of first non-GROUP lock */
448                                 if (lock->l_req_mode != LCK_GROUP) {
449                                         /* Ok, we hit non-GROUP lock, there should
450                                          * be no more GROUP locks later on, queue in
451                                          * front of first non-GROUP lock */
452
453                                         ldlm_resource_insert_lock_after(lock, req);
454                                         cfs_list_del_init(&lock->l_res_link);
455                                         ldlm_resource_insert_lock_after(req, lock);
456                                         compat = 0;
457                                         break;
458                                 }
459                                 if (req->l_policy_data.l_extent.gid ==
460                                     lock->l_policy_data.l_extent.gid) {
461                                         /* found it */
462                                         ldlm_resource_insert_lock_after(lock, req);
463                                         compat = 0;
464                                         break;
465                                 }
466                                 continue;
467                         }
468
469                         /* locks are compatible, overlap doesn't matter */
470                         if (lockmode_compat(lock->l_req_mode, req_mode)) {
471                                 if (req_mode == LCK_PR &&
472                                     ((lock->l_policy_data.l_extent.start <=
473                                       req->l_policy_data.l_extent.start) &&
474                                      (lock->l_policy_data.l_extent.end >=
475                                       req->l_policy_data.l_extent.end))) {
476                                         /* If we met a PR lock just like us or wider,
477                                            and nobody down the list conflicted with
478                                            it, that means we can skip processing of
479                                            the rest of the list and safely place
480                                            ourselves at the end of the list, or grant
481                                            (dependent if we met an conflicting locks
482                                            before in the list).
483                                            In case of 1st enqueue only we continue
484                                            traversing if there is something conflicting
485                                            down the list because we need to make sure
486                                            that something is marked as AST_SENT as well,
487                                            in cse of empy worklist we would exit on
488                                            first conflict met. */
489                                         /* There IS a case where such flag is
490                                            not set for a lock, yet it blocks
491                                            something. Luckily for us this is
492                                            only during destroy, so lock is
493                                            exclusive. So here we are safe */
494                                         if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
495                                                 RETURN(compat);
496                                         }
497                                 }
498
499                                 /* non-group locks are compatible, overlap doesn't
500                                    matter */
501                                 if (likely(req_mode != LCK_GROUP))
502                                         continue;
503
504                                 /* If we are trying to get a GROUP lock and there is
505                                    another one of this kind, we need to compare gid */
506                                 if (req->l_policy_data.l_extent.gid ==
507                                     lock->l_policy_data.l_extent.gid) {
508                                         /* If existing lock with matched gid is granted,
509                                            we grant new one too. */
510                                         if (lock->l_req_mode == lock->l_granted_mode)
511                                                 RETURN(2);
512
513                                         /* Otherwise we are scanning queue of waiting
514                                          * locks and it means current request would
515                                          * block along with existing lock (that is
516                                          * already blocked.
517                                          * If we are in nonblocking mode - return
518                                          * immediately */
519                                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
520                                                 compat = -EWOULDBLOCK;
521                                                 goto destroylock;
522                                         }
523                                         /* If this group lock is compatible with another
524                                          * group lock on the waiting list, they must be
525                                          * together in the list, so they can be granted
526                                          * at the same time.  Otherwise the later lock
527                                          * can get stuck behind another, incompatible,
528                                          * lock. */
529                                         ldlm_resource_insert_lock_after(lock, req);
530                                         /* Because 'lock' is not granted, we can stop
531                                          * processing this queue and return immediately.
532                                          * There is no need to check the rest of the
533                                          * list. */
534                                         RETURN(0);
535                                 }
536                         }
537
538                         if (unlikely(req_mode == LCK_GROUP &&
539                                      (lock->l_req_mode != lock->l_granted_mode))) {
540                                 scan = 1;
541                                 compat = 0;
542                                 if (lock->l_req_mode != LCK_GROUP) {
543                                         /* Ok, we hit non-GROUP lock, there should be no
544                                            more GROUP locks later on, queue in front of
545                                            first non-GROUP lock */
546
547                                         ldlm_resource_insert_lock_after(lock, req);
548                                         cfs_list_del_init(&lock->l_res_link);
549                                         ldlm_resource_insert_lock_after(req, lock);
550                                         break;
551                                 }
552                                 if (req->l_policy_data.l_extent.gid ==
553                                     lock->l_policy_data.l_extent.gid) {
554                                         /* found it */
555                                         ldlm_resource_insert_lock_after(lock, req);
556                                         break;
557                                 }
558                                 continue;
559                         }
560
561                         if (unlikely(lock->l_req_mode == LCK_GROUP)) {
562                                 /* If compared lock is GROUP, then requested is PR/PW/
563                                  * so this is not compatible; extent range does not
564                                  * matter */
565                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
566                                         compat = -EWOULDBLOCK;
567                                         goto destroylock;
568                                 } else {
569                                         *flags |= LDLM_FL_NO_TIMEOUT;
570                                 }
571                         } else if (lock->l_policy_data.l_extent.end < req_start ||
572                                    lock->l_policy_data.l_extent.start > req_end) {
573                                 /* if a non group lock doesn't overlap skip it */
574                                 continue;
575                         } else if (lock->l_req_extent.end < req_start ||
576                                    lock->l_req_extent.start > req_end) {
577                                 /* false contention, the requests doesn't really overlap */
578                                 check_contention = 0;
579                         }
580
581                         if (!work_list)
582                                 RETURN(0);
583
584                         /* don't count conflicting glimpse locks */
585                         if (lock->l_req_mode == LCK_PR &&
586                             lock->l_policy_data.l_extent.start == 0 &&
587                             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
588                                 check_contention = 0;
589
590                         *contended_locks += check_contention;
591
592                         compat = 0;
593                         if (lock->l_blocking_ast)
594                                 ldlm_add_ast_work_item(lock, req, work_list);
595                 }
596         }
597
598         if (ldlm_check_contention(req, *contended_locks) &&
599             compat == 0 &&
600             (*flags & LDLM_FL_DENY_ON_CONTENTION) &&
601             req->l_req_mode != LCK_GROUP &&
602             req_end - req_start <=
603             req->l_resource->lr_namespace->ns_max_nolock_size)
604                 GOTO(destroylock, compat = -EUSERS);
605
606         RETURN(compat);
607 destroylock:
608         cfs_list_del_init(&req->l_res_link);
609         ldlm_lock_destroy_nolock(req);
610         *err = compat;
611         RETURN(compat);
612 }
613
614 static void discard_bl_list(cfs_list_t *bl_list)
615 {
616         cfs_list_t *tmp, *pos;
617         ENTRY;
618
619         cfs_list_for_each_safe(pos, tmp, bl_list) {
620                 struct ldlm_lock *lock =
621                         cfs_list_entry(pos, struct ldlm_lock, l_bl_ast);
622
623                 cfs_list_del_init(&lock->l_bl_ast);
624                 LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
625                 lock->l_flags &= ~LDLM_FL_AST_SENT;
626                 LASSERT(lock->l_bl_ast_run == 0);
627                 LASSERT(lock->l_blocking_lock);
628                 LDLM_LOCK_RELEASE(lock->l_blocking_lock);
629                 lock->l_blocking_lock = NULL;
630                 LDLM_LOCK_RELEASE(lock);
631         }
632         EXIT;
633 }
634
635 /* If first_enq is 0 (ie, called from ldlm_reprocess_queue):
636   *   - blocking ASTs have already been sent
637   *   - must call this function with the ns lock held
638   *
639   * If first_enq is 1 (ie, called from ldlm_lock_enqueue):
640   *   - blocking ASTs have not been sent
641   *   - must call this function with the ns lock held once */
642 int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
643                              ldlm_error_t *err, cfs_list_t *work_list)
644 {
645         struct ldlm_resource *res = lock->l_resource;
646         CFS_LIST_HEAD(rpc_list);
647         int rc, rc2;
648         int contended_locks = 0;
649         ENTRY;
650
651         LASSERT(cfs_list_empty(&res->lr_converting));
652         LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
653                 !(lock->l_flags & LDLM_AST_DISCARD_DATA));
654         check_res_locked(res);
655         *err = ELDLM_OK;
656
657         if (!first_enq) {
658                 /* Careful observers will note that we don't handle -EWOULDBLOCK
659                  * here, but it's ok for a non-obvious reason -- compat_queue
660                  * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
661                  * flags should always be zero here, and if that ever stops
662                  * being true, we want to find out. */
663                 LASSERT(*flags == 0);
664                 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
665                                               err, NULL, &contended_locks);
666                 if (rc == 1) {
667                         rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
668                                                       flags, err, NULL,
669                                                       &contended_locks);
670                 }
671                 if (rc == 0)
672                         RETURN(LDLM_ITER_STOP);
673
674                 ldlm_resource_unlink_lock(lock);
675
676                 if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE))
677                         ldlm_extent_policy(res, lock, flags);
678                 ldlm_grant_lock(lock, work_list);
679                 RETURN(LDLM_ITER_CONTINUE);
680         }
681
682  restart:
683         contended_locks = 0;
684         rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
685                                       &rpc_list, &contended_locks);
686         if (rc < 0)
687                 GOTO(out, rc); /* lock was destroyed */
688         if (rc == 2)
689                 goto grant;
690
691         rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err,
692                                        &rpc_list, &contended_locks);
693         if (rc2 < 0)
694                 GOTO(out, rc = rc2); /* lock was destroyed */
695
696         if (rc + rc2 == 2) {
697         grant:
698                 ldlm_extent_policy(res, lock, flags);
699                 ldlm_resource_unlink_lock(lock);
700                 ldlm_grant_lock(lock, NULL);
701         } else {
702                 /* If either of the compat_queue()s returned failure, then we
703                  * have ASTs to send and must go onto the waiting list.
704                  *
705                  * bug 2322: we used to unlink and re-add here, which was a
706                  * terrible folly -- if we goto restart, we could get
707                  * re-ordered!  Causes deadlock, because ASTs aren't sent! */
708                 if (cfs_list_empty(&lock->l_res_link))
709                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
710                 unlock_res(res);
711                 rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_BL_AST);
712
713                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_FAIL_RACE) &&
714                     !ns_is_client(res->lr_namespace))
715                         class_fail_export(lock->l_export);
716  
717                 lock_res(res);
718                 if (rc == -ERESTART) {
719
720                         /* 15715: The lock was granted and destroyed after
721                          * resource lock was dropped. Interval node was freed
722                          * in ldlm_lock_destroy. Anyway, this always happens
723                          * when a client is being evicted. So it would be
724                          * ok to return an error. -jay */
725                         if (lock->l_destroyed) {
726                                 *err = -EAGAIN;
727                                 GOTO(out, rc = -EAGAIN);
728                         }
729
730                         /* lock was granted while resource was unlocked. */
731                         if (lock->l_granted_mode == lock->l_req_mode) {
732                                 /* bug 11300: if the lock has been granted,
733                                  * break earlier because otherwise, we will go
734                                  * to restart and ldlm_resource_unlink will be
735                                  * called and it causes the interval node to be
736                                  * freed. Then we will fail at
737                                  * ldlm_extent_add_lock() */
738                                 *flags &= ~(LDLM_FL_BLOCK_GRANTED | LDLM_FL_BLOCK_CONV |
739                                             LDLM_FL_BLOCK_WAIT);
740                                 GOTO(out, rc = 0);
741                         }
742
743                         GOTO(restart, -ERESTART);
744                 }
745
746                 *flags |= LDLM_FL_BLOCK_GRANTED;
747                 /* this way we force client to wait for the lock
748                  * endlessly once the lock is enqueued -bzzz */
749                 *flags |= LDLM_FL_NO_TIMEOUT;
750
751         }
752         RETURN(0);
753 out:
754         if (!cfs_list_empty(&rpc_list)) {
755                 LASSERT(!(lock->l_flags & LDLM_AST_DISCARD_DATA));
756                 discard_bl_list(&rpc_list);
757         }
758         RETURN(rc);
759 }
760
761 /* When a lock is cancelled by a client, the KMS may undergo change if this
762  * is the "highest lock".  This function returns the new KMS value.
763  * Caller must hold ns_lock already.
764  *
765  * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes! */
766 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
767 {
768         struct ldlm_resource *res = lock->l_resource;
769         cfs_list_t *tmp;
770         struct ldlm_lock *lck;
771         __u64 kms = 0;
772         ENTRY;
773
774         /* don't let another thread in ldlm_extent_shift_kms race in
775          * just after we finish and take our lock into account in its
776          * calculation of the kms */
777         lock->l_flags |= LDLM_FL_KMS_IGNORE;
778
779         cfs_list_for_each(tmp, &res->lr_granted) {
780                 lck = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
781
782                 if (lck->l_flags & LDLM_FL_KMS_IGNORE)
783                         continue;
784
785                 if (lck->l_policy_data.l_extent.end >= old_kms)
786                         RETURN(old_kms);
787
788                 /* This extent _has_ to be smaller than old_kms (checked above)
789                  * so kms can only ever be smaller or the same as old_kms. */
790                 if (lck->l_policy_data.l_extent.end + 1 > kms)
791                         kms = lck->l_policy_data.l_extent.end + 1;
792         }
793         LASSERTF(kms <= old_kms, "kms "LPU64" old_kms "LPU64"\n", kms, old_kms);
794
795         RETURN(kms);
796 }
797
798 cfs_mem_cache_t *ldlm_interval_slab;
799 struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock)
800 {
801         struct ldlm_interval *node;
802         ENTRY;
803
804         LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
805         OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, CFS_ALLOC_IO);
806         if (node == NULL)
807                 RETURN(NULL);
808
809         CFS_INIT_LIST_HEAD(&node->li_group);
810         ldlm_interval_attach(node, lock);
811         RETURN(node);
812 }
813
814 void ldlm_interval_free(struct ldlm_interval *node)
815 {
816         if (node) {
817                 LASSERT(cfs_list_empty(&node->li_group));
818                 LASSERT(!interval_is_intree(&node->li_node));
819                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
820         }
821 }
822
823 /* interval tree, for LDLM_EXTENT. */
824 void ldlm_interval_attach(struct ldlm_interval *n,
825                           struct ldlm_lock *l)
826 {
827         LASSERT(l->l_tree_node == NULL);
828         LASSERT(l->l_resource->lr_type == LDLM_EXTENT);
829
830         cfs_list_add_tail(&l->l_sl_policy, &n->li_group);
831         l->l_tree_node = n;
832 }
833
834 struct ldlm_interval *ldlm_interval_detach(struct ldlm_lock *l)
835 {
836         struct ldlm_interval *n = l->l_tree_node;
837
838         if (n == NULL)
839                 return NULL;
840
841         LASSERT(!cfs_list_empty(&n->li_group));
842         l->l_tree_node = NULL;
843         cfs_list_del_init(&l->l_sl_policy);
844
845         return (cfs_list_empty(&n->li_group) ? n : NULL);
846 }
847
848 static inline int lock_mode_to_index(ldlm_mode_t mode)
849 {
850         int index;
851
852         LASSERT(mode != 0);
853         LASSERT(IS_PO2(mode));
854         for (index = -1; mode; index++, mode >>= 1) ;
855         LASSERT(index < LCK_MODE_NUM);
856         return index;
857 }
858
859 void ldlm_extent_add_lock(struct ldlm_resource *res,
860                           struct ldlm_lock *lock)
861 {
862         struct interval_node *found, **root;
863         struct ldlm_interval *node;
864         struct ldlm_extent *extent;
865         int idx;
866
867         LASSERT(lock->l_granted_mode == lock->l_req_mode);
868
869         node = lock->l_tree_node;
870         LASSERT(node != NULL);
871         LASSERT(!interval_is_intree(&node->li_node));
872
873         idx = lock_mode_to_index(lock->l_granted_mode);
874         LASSERT(lock->l_granted_mode == 1 << idx);
875         LASSERT(lock->l_granted_mode == res->lr_itree[idx].lit_mode);
876
877         /* node extent initialize */
878         extent = &lock->l_policy_data.l_extent;
879         interval_set(&node->li_node, extent->start, extent->end);
880
881         root = &res->lr_itree[idx].lit_root;
882         found = interval_insert(&node->li_node, root);
883         if (found) { /* The policy group found. */
884                 struct ldlm_interval *tmp = ldlm_interval_detach(lock);
885                 LASSERT(tmp != NULL);
886                 ldlm_interval_free(tmp);
887                 ldlm_interval_attach(to_ldlm_interval(found), lock);
888         }
889         res->lr_itree[idx].lit_size++;
890
891         /* even though we use interval tree to manage the extent lock, we also
892          * add the locks into grant list, for debug purpose, .. */
893         ldlm_resource_add_lock(res, &res->lr_granted, lock);
894 }
895
896 void ldlm_extent_unlink_lock(struct ldlm_lock *lock)
897 {
898         struct ldlm_resource *res = lock->l_resource;
899         struct ldlm_interval *node = lock->l_tree_node;
900         struct ldlm_interval_tree *tree;
901         int idx;
902
903         if (!node || !interval_is_intree(&node->li_node)) /* duplicate unlink */
904                 return;
905
906         idx = lock_mode_to_index(lock->l_granted_mode);
907         LASSERT(lock->l_granted_mode == 1 << idx);
908         tree = &res->lr_itree[idx];
909
910         LASSERT(tree->lit_root != NULL); /* assure the tree is not null */
911
912         tree->lit_size--;
913         node = ldlm_interval_detach(lock);
914         if (node) {
915                 interval_erase(&node->li_node, &tree->lit_root);
916                 ldlm_interval_free(node);
917         }
918 }