Whamcloud - gitweb
LU-571 ldlm: Remove parallel AST limitation
[fs/lustre-release.git] / lustre / ldlm / ldlm_extent.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011 Whamcloud, Inc.
33  *
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  *
39  * lustre/ldlm/ldlm_extent.c
40  *
41  * Author: Peter Braam <braam@clusterfs.com>
42  * Author: Phil Schwan <phil@clusterfs.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_LDLM
46 #ifndef __KERNEL__
47 # include <liblustre.h>
48 #else
49 # include <libcfs/libcfs.h>
50 #endif
51
52 #include <lustre_dlm.h>
53 #include <obd_support.h>
54 #include <obd.h>
55 #include <obd_class.h>
56 #include <lustre_lib.h>
57
58 #include "ldlm_internal.h"
59
60 #define LDLM_MAX_GROWN_EXTENT (32 * 1024 * 1024 - 1)
61
62 /* fixup the ldlm_extent after expanding */
63 static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
64                                               struct ldlm_extent *new_ex,
65                                               int conflicting)
66 {
67         ldlm_mode_t req_mode = req->l_req_mode;
68         __u64 req_start = req->l_req_extent.start;
69         __u64 req_end = req->l_req_extent.end;
70         __u64 req_align, mask;
71
72         if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) {
73                 if (req_end < req_start + LDLM_MAX_GROWN_EXTENT)
74                         new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT,
75                                           new_ex->end);
76         }
77
78         if (new_ex->start == 0 && new_ex->end == OBD_OBJECT_EOF) {
79                 EXIT;
80                 return;
81         }
82
83         /* we need to ensure that the lock extent is properly aligned to what
84          * the client requested.  We align it to the lowest-common denominator
85          * of the clients requested lock start and end alignment. */
86         mask = 0x1000ULL;
87         req_align = (req_end + 1) | req_start;
88         if (req_align != 0) {
89                 while ((req_align & mask) == 0)
90                         mask <<= 1;
91         }
92         mask -= 1;
93         /* We can only shrink the lock, not grow it.
94          * This should never cause lock to be smaller than requested,
95          * since requested lock was already aligned on these boundaries. */
96         new_ex->start = ((new_ex->start - 1) | mask) + 1;
97         new_ex->end = ((new_ex->end + 1) & ~mask) - 1;
98         LASSERTF(new_ex->start <= req_start,
99                  "mask "LPX64" grant start "LPU64" req start "LPU64"\n",
100                  mask, new_ex->start, req_start);
101         LASSERTF(new_ex->end >= req_end,
102                  "mask "LPX64" grant end "LPU64" req end "LPU64"\n",
103                  mask, new_ex->end, req_end);
104 }
105
106 /* The purpose of this function is to return:
107  * - the maximum extent
108  * - containing the requested extent
109  * - and not overlapping existing conflicting extents outside the requested one
110  *
111  * Use interval tree to expand the lock extent for granted lock.
112  */
113 static void ldlm_extent_internal_policy_granted(struct ldlm_lock *req,
114                                                 struct ldlm_extent *new_ex)
115 {
116         struct ldlm_resource *res = req->l_resource;
117         ldlm_mode_t req_mode = req->l_req_mode;
118         __u64 req_start = req->l_req_extent.start;
119         __u64 req_end = req->l_req_extent.end;
120         struct ldlm_interval_tree *tree;
121         struct interval_node_extent limiter = { new_ex->start, new_ex->end };
122         int conflicting = 0;
123         int idx;
124         ENTRY;
125
126         lockmode_verify(req_mode);
127
128         /* using interval tree to handle the ldlm extent granted locks */
129         for (idx = 0; idx < LCK_MODE_NUM; idx++) {
130                 struct interval_node_extent ext = { req_start, req_end };
131
132                 tree = &res->lr_itree[idx];
133                 if (lockmode_compat(tree->lit_mode, req_mode))
134                         continue;
135
136                 conflicting += tree->lit_size;
137                 if (conflicting > 4)
138                         limiter.start = req_start;
139
140                 if (interval_is_overlapped(tree->lit_root, &ext))
141                         CDEBUG(D_INFO, 
142                                "req_mode = %d, tree->lit_mode = %d, "
143                                "tree->lit_size = %d\n",
144                                req_mode, tree->lit_mode, tree->lit_size);
145                 interval_expand(tree->lit_root, &ext, &limiter);
146                 limiter.start = max(limiter.start, ext.start);
147                 limiter.end = min(limiter.end, ext.end);
148                 if (limiter.start == req_start && limiter.end == req_end)
149                         break;
150         }
151
152         new_ex->start = limiter.start;
153         new_ex->end = limiter.end;
154         LASSERT(new_ex->start <= req_start);
155         LASSERT(new_ex->end >= req_end);
156
157         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
158         EXIT;
159 }
160
161 /* The purpose of this function is to return:
162  * - the maximum extent
163  * - containing the requested extent
164  * - and not overlapping existing conflicting extents outside the requested one
165  */
166 static void
167 ldlm_extent_internal_policy_waiting(struct ldlm_lock *req,
168                                     struct ldlm_extent *new_ex)
169 {
170         cfs_list_t *tmp;
171         struct ldlm_resource *res = req->l_resource;
172         ldlm_mode_t req_mode = req->l_req_mode;
173         __u64 req_start = req->l_req_extent.start;
174         __u64 req_end = req->l_req_extent.end;
175         int conflicting = 0;
176         ENTRY;
177
178         lockmode_verify(req_mode);
179
180         /* for waiting locks */
181         cfs_list_for_each(tmp, &res->lr_waiting) {
182                 struct ldlm_lock *lock;
183                 struct ldlm_extent *l_extent;
184
185                 lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
186                 l_extent = &lock->l_policy_data.l_extent;
187
188                 /* We already hit the minimum requested size, search no more */
189                 if (new_ex->start == req_start && new_ex->end == req_end) {
190                         EXIT;
191                         return;
192                 }
193
194                 /* Don't conflict with ourselves */
195                 if (req == lock)
196                         continue;
197
198                 /* Locks are compatible, overlap doesn't matter */
199                 /* Until bug 20 is fixed, try to avoid granting overlapping
200                  * locks on one client (they take a long time to cancel) */
201                 if (lockmode_compat(lock->l_req_mode, req_mode) &&
202                     lock->l_export != req->l_export)
203                         continue;
204
205                 /* If this is a high-traffic lock, don't grow downwards at all
206                  * or grow upwards too much */
207                 ++conflicting;
208                 if (conflicting > 4)
209                         new_ex->start = req_start;
210
211                 /* If lock doesn't overlap new_ex, skip it. */
212                 if (!ldlm_extent_overlap(l_extent, new_ex))
213                         continue;
214
215                 /* Locks conflicting in requested extents and we can't satisfy
216                  * both locks, so ignore it.  Either we will ping-pong this
217                  * extent (we would regardless of what extent we granted) or
218                  * lock is unused and it shouldn't limit our extent growth. */
219                 if (ldlm_extent_overlap(&lock->l_req_extent,&req->l_req_extent))
220                         continue;
221
222                 /* We grow extents downwards only as far as they don't overlap
223                  * with already-granted locks, on the assumption that clients
224                  * will be writing beyond the initial requested end and would
225                  * then need to enqueue a new lock beyond previous request.
226                  * l_req_extent->end strictly < req_start, checked above. */
227                 if (l_extent->start < req_start && new_ex->start != req_start) {
228                         if (l_extent->end >= req_start)
229                                 new_ex->start = req_start;
230                         else
231                                 new_ex->start = min(l_extent->end+1, req_start);
232                 }
233
234                 /* If we need to cancel this lock anyways because our request
235                  * overlaps the granted lock, we grow up to its requested
236                  * extent start instead of limiting this extent, assuming that
237                  * clients are writing forwards and the lock had over grown
238                  * its extent downwards before we enqueued our request. */
239                 if (l_extent->end > req_end) {
240                         if (l_extent->start <= req_end)
241                                 new_ex->end = max(lock->l_req_extent.start - 1,
242                                                   req_end);
243                         else
244                                 new_ex->end = max(l_extent->start - 1, req_end);
245                 }
246         }
247
248         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
249         EXIT;
250 }
251
252
253 /* In order to determine the largest possible extent we can grant, we need
254  * to scan all of the queues. */
255 static void ldlm_extent_policy(struct ldlm_resource *res,
256                                struct ldlm_lock *lock, int *flags)
257 {
258         struct ldlm_extent new_ex = { .start = 0, .end = OBD_OBJECT_EOF };
259
260         if (lock->l_export == NULL)
261                 /*
262                  * this is local lock taken by server (e.g., as a part of
263                  * OST-side locking, or unlink handling). Expansion doesn't
264                  * make a lot of sense for local locks, because they are
265                  * dropped immediately on operation completion and would only
266                  * conflict with other threads.
267                  */
268                 return;
269
270         if (lock->l_policy_data.l_extent.start == 0 &&
271             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
272                 /* fast-path whole file locks */
273                 return;
274
275         ldlm_extent_internal_policy_granted(lock, &new_ex);
276         ldlm_extent_internal_policy_waiting(lock, &new_ex);
277
278         if (new_ex.start != lock->l_policy_data.l_extent.start ||
279             new_ex.end != lock->l_policy_data.l_extent.end) {
280                 *flags |= LDLM_FL_LOCK_CHANGED;
281                 lock->l_policy_data.l_extent.start = new_ex.start;
282                 lock->l_policy_data.l_extent.end = new_ex.end;
283         }
284 }
285
286 static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
287 {
288         struct ldlm_resource *res = lock->l_resource;
289         cfs_time_t now = cfs_time_current();
290
291         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_SET_CONTENTION))
292                 return 1;
293
294         CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
295         if (contended_locks > ldlm_res_to_ns(res)->ns_contended_locks)
296                 res->lr_contention_time = now;
297         return cfs_time_before(now, cfs_time_add(res->lr_contention_time,
298                 cfs_time_seconds(ldlm_res_to_ns(res)->ns_contention_time)));
299 }
300
301 struct ldlm_extent_compat_args {
302         cfs_list_t *work_list;
303         struct ldlm_lock *lock;
304         ldlm_mode_t mode;
305         int *locks;
306         int *compat;
307 };
308
309 static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
310                                                 void *data)
311 {
312         struct ldlm_extent_compat_args *priv = data;
313         struct ldlm_interval *node = to_ldlm_interval(n);
314         struct ldlm_extent *extent;
315         cfs_list_t *work_list = priv->work_list;
316         struct ldlm_lock *lock, *enq = priv->lock;
317         ldlm_mode_t mode = priv->mode;
318         int count = 0;
319         ENTRY;
320
321         LASSERT(!cfs_list_empty(&node->li_group));
322
323         cfs_list_for_each_entry(lock, &node->li_group, l_sl_policy) {
324                 /* interval tree is for granted lock */
325                 LASSERTF(mode == lock->l_granted_mode,
326                          "mode = %s, lock->l_granted_mode = %s\n",
327                          ldlm_lockname[mode],
328                          ldlm_lockname[lock->l_granted_mode]);
329                 count++;
330                 if (lock->l_blocking_ast)
331                         ldlm_add_ast_work_item(lock, enq, work_list);
332         }
333
334         /* don't count conflicting glimpse locks */
335         extent = ldlm_interval_extent(node);
336         if (!(mode == LCK_PR &&
337             extent->start == 0 && extent->end == OBD_OBJECT_EOF))
338                 *priv->locks += count;
339
340         if (priv->compat)
341                 *priv->compat = 0;
342
343         RETURN(INTERVAL_ITER_CONT);
344 }
345
346 /* Determine if the lock is compatible with all locks on the queue.
347  * We stop walking the queue if we hit ourselves so we don't take
348  * conflicting locks enqueued after us into accound, or we'd wait forever.
349  *
350  * 0 if the lock is not compatible
351  * 1 if the lock is compatible
352  * 2 if this group lock is compatible and requires no further checking
353  * negative error, such as EWOULDBLOCK for group locks
354  */
355 static int
356 ldlm_extent_compat_queue(cfs_list_t *queue, struct ldlm_lock *req,
357                          int *flags, ldlm_error_t *err,
358                          cfs_list_t *work_list, int *contended_locks)
359 {
360         cfs_list_t *tmp;
361         struct ldlm_lock *lock;
362         struct ldlm_resource *res = req->l_resource;
363         ldlm_mode_t req_mode = req->l_req_mode;
364         __u64 req_start = req->l_req_extent.start;
365         __u64 req_end = req->l_req_extent.end;
366         int compat = 1;
367         int scan = 0;
368         int check_contention;
369         ENTRY;
370
371         lockmode_verify(req_mode);
372
373         /* Using interval tree for granted lock */
374         if (queue == &res->lr_granted) {
375                 struct ldlm_interval_tree *tree;
376                 struct ldlm_extent_compat_args data = {.work_list = work_list,
377                                                .lock = req,
378                                                .locks = contended_locks,
379                                                .compat = &compat };
380                 struct interval_node_extent ex = { .start = req_start,
381                                                    .end = req_end };
382                 int idx, rc;
383
384                 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
385                         tree = &res->lr_itree[idx];
386                         if (tree->lit_root == NULL) /* empty tree, skipped */
387                                 continue;
388
389                         data.mode = tree->lit_mode;
390                         if (lockmode_compat(req_mode, tree->lit_mode)) {
391                                 struct ldlm_interval *node;
392                                 struct ldlm_extent *extent;
393
394                                 if (req_mode != LCK_GROUP)
395                                         continue;
396
397                                 /* group lock, grant it immediately if
398                                  * compatible */
399                                 node = to_ldlm_interval(tree->lit_root);
400                                 extent = ldlm_interval_extent(node);
401                                 if (req->l_policy_data.l_extent.gid ==
402                                     extent->gid)
403                                         RETURN(2);
404                         }
405
406                         if (tree->lit_mode == LCK_GROUP) {
407                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
408                                         compat = -EWOULDBLOCK;
409                                         goto destroylock;
410                                 }
411
412                                 *flags |= LDLM_FL_NO_TIMEOUT;
413                                 if (!work_list)
414                                         RETURN(0);
415
416                                 /* if work list is not NULL,add all
417                                    locks in the tree to work list */
418                                 compat = 0;
419                                 interval_iterate(tree->lit_root,
420                                                  ldlm_extent_compat_cb, &data);
421                                 continue;
422                         }
423
424                         if (!work_list) {
425                                 rc = interval_is_overlapped(tree->lit_root,&ex);
426                                 if (rc)
427                                         RETURN(0);
428                         } else {
429                                 interval_search(tree->lit_root, &ex,
430                                                 ldlm_extent_compat_cb, &data);
431                                 if (!cfs_list_empty(work_list) && compat)
432                                         compat = 0;
433                         }
434                 }
435         } else { /* for waiting queue */
436                 cfs_list_for_each(tmp, queue) {
437                         check_contention = 1;
438
439                         lock = cfs_list_entry(tmp, struct ldlm_lock,
440                                               l_res_link);
441
442                         if (req == lock)
443                                 break;
444
445                         if (unlikely(scan)) {
446                                 /* We only get here if we are queuing GROUP lock
447                                    and met some incompatible one. The main idea of this
448                                    code is to insert GROUP lock past compatible GROUP
449                                    lock in the waiting queue or if there is not any,
450                                    then in front of first non-GROUP lock */
451                                 if (lock->l_req_mode != LCK_GROUP) {
452                                         /* Ok, we hit non-GROUP lock, there should
453                                          * be no more GROUP locks later on, queue in
454                                          * front of first non-GROUP lock */
455
456                                         ldlm_resource_insert_lock_after(lock, req);
457                                         cfs_list_del_init(&lock->l_res_link);
458                                         ldlm_resource_insert_lock_after(req, lock);
459                                         compat = 0;
460                                         break;
461                                 }
462                                 if (req->l_policy_data.l_extent.gid ==
463                                     lock->l_policy_data.l_extent.gid) {
464                                         /* found it */
465                                         ldlm_resource_insert_lock_after(lock, req);
466                                         compat = 0;
467                                         break;
468                                 }
469                                 continue;
470                         }
471
472                         /* locks are compatible, overlap doesn't matter */
473                         if (lockmode_compat(lock->l_req_mode, req_mode)) {
474                                 if (req_mode == LCK_PR &&
475                                     ((lock->l_policy_data.l_extent.start <=
476                                       req->l_policy_data.l_extent.start) &&
477                                      (lock->l_policy_data.l_extent.end >=
478                                       req->l_policy_data.l_extent.end))) {
479                                         /* If we met a PR lock just like us or wider,
480                                            and nobody down the list conflicted with
481                                            it, that means we can skip processing of
482                                            the rest of the list and safely place
483                                            ourselves at the end of the list, or grant
484                                            (dependent if we met an conflicting locks
485                                            before in the list).
486                                            In case of 1st enqueue only we continue
487                                            traversing if there is something conflicting
488                                            down the list because we need to make sure
489                                            that something is marked as AST_SENT as well,
490                                            in cse of empy worklist we would exit on
491                                            first conflict met. */
492                                         /* There IS a case where such flag is
493                                            not set for a lock, yet it blocks
494                                            something. Luckily for us this is
495                                            only during destroy, so lock is
496                                            exclusive. So here we are safe */
497                                         if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
498                                                 RETURN(compat);
499                                         }
500                                 }
501
502                                 /* non-group locks are compatible, overlap doesn't
503                                    matter */
504                                 if (likely(req_mode != LCK_GROUP))
505                                         continue;
506
507                                 /* If we are trying to get a GROUP lock and there is
508                                    another one of this kind, we need to compare gid */
509                                 if (req->l_policy_data.l_extent.gid ==
510                                     lock->l_policy_data.l_extent.gid) {
511                                         /* If existing lock with matched gid is granted,
512                                            we grant new one too. */
513                                         if (lock->l_req_mode == lock->l_granted_mode)
514                                                 RETURN(2);
515
516                                         /* Otherwise we are scanning queue of waiting
517                                          * locks and it means current request would
518                                          * block along with existing lock (that is
519                                          * already blocked.
520                                          * If we are in nonblocking mode - return
521                                          * immediately */
522                                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
523                                                 compat = -EWOULDBLOCK;
524                                                 goto destroylock;
525                                         }
526                                         /* If this group lock is compatible with another
527                                          * group lock on the waiting list, they must be
528                                          * together in the list, so they can be granted
529                                          * at the same time.  Otherwise the later lock
530                                          * can get stuck behind another, incompatible,
531                                          * lock. */
532                                         ldlm_resource_insert_lock_after(lock, req);
533                                         /* Because 'lock' is not granted, we can stop
534                                          * processing this queue and return immediately.
535                                          * There is no need to check the rest of the
536                                          * list. */
537                                         RETURN(0);
538                                 }
539                         }
540
541                         if (unlikely(req_mode == LCK_GROUP &&
542                                      (lock->l_req_mode != lock->l_granted_mode))) {
543                                 scan = 1;
544                                 compat = 0;
545                                 if (lock->l_req_mode != LCK_GROUP) {
546                                         /* Ok, we hit non-GROUP lock, there should be no
547                                            more GROUP locks later on, queue in front of
548                                            first non-GROUP lock */
549
550                                         ldlm_resource_insert_lock_after(lock, req);
551                                         cfs_list_del_init(&lock->l_res_link);
552                                         ldlm_resource_insert_lock_after(req, lock);
553                                         break;
554                                 }
555                                 if (req->l_policy_data.l_extent.gid ==
556                                     lock->l_policy_data.l_extent.gid) {
557                                         /* found it */
558                                         ldlm_resource_insert_lock_after(lock, req);
559                                         break;
560                                 }
561                                 continue;
562                         }
563
564                         if (unlikely(lock->l_req_mode == LCK_GROUP)) {
565                                 /* If compared lock is GROUP, then requested is PR/PW/
566                                  * so this is not compatible; extent range does not
567                                  * matter */
568                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
569                                         compat = -EWOULDBLOCK;
570                                         goto destroylock;
571                                 } else {
572                                         *flags |= LDLM_FL_NO_TIMEOUT;
573                                 }
574                         } else if (lock->l_policy_data.l_extent.end < req_start ||
575                                    lock->l_policy_data.l_extent.start > req_end) {
576                                 /* if a non group lock doesn't overlap skip it */
577                                 continue;
578                         } else if (lock->l_req_extent.end < req_start ||
579                                    lock->l_req_extent.start > req_end) {
580                                 /* false contention, the requests doesn't really overlap */
581                                 check_contention = 0;
582                         }
583
584                         if (!work_list)
585                                 RETURN(0);
586
587                         /* don't count conflicting glimpse locks */
588                         if (lock->l_req_mode == LCK_PR &&
589                             lock->l_policy_data.l_extent.start == 0 &&
590                             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
591                                 check_contention = 0;
592
593                         *contended_locks += check_contention;
594
595                         compat = 0;
596                         if (lock->l_blocking_ast)
597                                 ldlm_add_ast_work_item(lock, req, work_list);
598                 }
599         }
600
601         if (ldlm_check_contention(req, *contended_locks) &&
602             compat == 0 &&
603             (*flags & LDLM_FL_DENY_ON_CONTENTION) &&
604             req->l_req_mode != LCK_GROUP &&
605             req_end - req_start <=
606             ldlm_res_to_ns(req->l_resource)->ns_max_nolock_size)
607                 GOTO(destroylock, compat = -EUSERS);
608
609         RETURN(compat);
610 destroylock:
611         cfs_list_del_init(&req->l_res_link);
612         ldlm_lock_destroy_nolock(req);
613         *err = compat;
614         RETURN(compat);
615 }
616
617 static void discard_bl_list(cfs_list_t *bl_list)
618 {
619         cfs_list_t *tmp, *pos;
620         ENTRY;
621
622         cfs_list_for_each_safe(pos, tmp, bl_list) {
623                 struct ldlm_lock *lock =
624                         cfs_list_entry(pos, struct ldlm_lock, l_bl_ast);
625
626                 cfs_list_del_init(&lock->l_bl_ast);
627                 LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
628                 lock->l_flags &= ~LDLM_FL_AST_SENT;
629                 LASSERT(lock->l_bl_ast_run == 0);
630                 LASSERT(lock->l_blocking_lock);
631                 LDLM_LOCK_RELEASE(lock->l_blocking_lock);
632                 lock->l_blocking_lock = NULL;
633                 LDLM_LOCK_RELEASE(lock);
634         }
635         EXIT;
636 }
637
638 /* If first_enq is 0 (ie, called from ldlm_reprocess_queue):
639   *   - blocking ASTs have already been sent
640   *   - must call this function with the ns lock held
641   *
642   * If first_enq is 1 (ie, called from ldlm_lock_enqueue):
643   *   - blocking ASTs have not been sent
644   *   - must call this function with the ns lock held once */
645 int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
646                              ldlm_error_t *err, cfs_list_t *work_list)
647 {
648         struct ldlm_resource *res = lock->l_resource;
649         CFS_LIST_HEAD(rpc_list);
650         int rc, rc2;
651         int contended_locks = 0;
652         ENTRY;
653
654         LASSERT(cfs_list_empty(&res->lr_converting));
655         LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
656                 !(lock->l_flags & LDLM_AST_DISCARD_DATA));
657         check_res_locked(res);
658         *err = ELDLM_OK;
659
660         if (!first_enq) {
661                 /* Careful observers will note that we don't handle -EWOULDBLOCK
662                  * here, but it's ok for a non-obvious reason -- compat_queue
663                  * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
664                  * flags should always be zero here, and if that ever stops
665                  * being true, we want to find out. */
666                 LASSERT(*flags == 0);
667                 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
668                                               err, NULL, &contended_locks);
669                 if (rc == 1) {
670                         rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
671                                                       flags, err, NULL,
672                                                       &contended_locks);
673                 }
674                 if (rc == 0)
675                         RETURN(LDLM_ITER_STOP);
676
677                 ldlm_resource_unlink_lock(lock);
678
679                 if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE))
680                         ldlm_extent_policy(res, lock, flags);
681                 ldlm_grant_lock(lock, work_list);
682                 RETURN(LDLM_ITER_CONTINUE);
683         }
684
685  restart:
686         contended_locks = 0;
687         rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
688                                       &rpc_list, &contended_locks);
689         if (rc < 0)
690                 GOTO(out, rc); /* lock was destroyed */
691         if (rc == 2)
692                 goto grant;
693
694         rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err,
695                                        &rpc_list, &contended_locks);
696         if (rc2 < 0)
697                 GOTO(out, rc = rc2); /* lock was destroyed */
698
699         if (rc + rc2 == 2) {
700         grant:
701                 ldlm_extent_policy(res, lock, flags);
702                 ldlm_resource_unlink_lock(lock);
703                 ldlm_grant_lock(lock, NULL);
704         } else {
705                 /* If either of the compat_queue()s returned failure, then we
706                  * have ASTs to send and must go onto the waiting list.
707                  *
708                  * bug 2322: we used to unlink and re-add here, which was a
709                  * terrible folly -- if we goto restart, we could get
710                  * re-ordered!  Causes deadlock, because ASTs aren't sent! */
711                 if (cfs_list_empty(&lock->l_res_link))
712                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
713                 unlock_res(res);
714                 rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
715                                        LDLM_WORK_BL_AST);
716
717                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_FAIL_RACE) &&
718                     !ns_is_client(ldlm_res_to_ns(res)))
719                         class_fail_export(lock->l_export);
720
721                 lock_res(res);
722                 if (rc == -ERESTART) {
723
724                         /* 15715: The lock was granted and destroyed after
725                          * resource lock was dropped. Interval node was freed
726                          * in ldlm_lock_destroy. Anyway, this always happens
727                          * when a client is being evicted. So it would be
728                          * ok to return an error. -jay */
729                         if (lock->l_destroyed) {
730                                 *err = -EAGAIN;
731                                 GOTO(out, rc = -EAGAIN);
732                         }
733
734                         /* lock was granted while resource was unlocked. */
735                         if (lock->l_granted_mode == lock->l_req_mode) {
736                                 /* bug 11300: if the lock has been granted,
737                                  * break earlier because otherwise, we will go
738                                  * to restart and ldlm_resource_unlink will be
739                                  * called and it causes the interval node to be
740                                  * freed. Then we will fail at
741                                  * ldlm_extent_add_lock() */
742                                 *flags &= ~(LDLM_FL_BLOCK_GRANTED | LDLM_FL_BLOCK_CONV |
743                                             LDLM_FL_BLOCK_WAIT);
744                                 GOTO(out, rc = 0);
745                         }
746
747                         GOTO(restart, -ERESTART);
748                 }
749
750                 *flags |= LDLM_FL_BLOCK_GRANTED;
751                 /* this way we force client to wait for the lock
752                  * endlessly once the lock is enqueued -bzzz */
753                 *flags |= LDLM_FL_NO_TIMEOUT;
754
755         }
756         RETURN(0);
757 out:
758         if (!cfs_list_empty(&rpc_list)) {
759                 LASSERT(!(lock->l_flags & LDLM_AST_DISCARD_DATA));
760                 discard_bl_list(&rpc_list);
761         }
762         RETURN(rc);
763 }
764
765 /* When a lock is cancelled by a client, the KMS may undergo change if this
766  * is the "highest lock".  This function returns the new KMS value.
767  * Caller must hold lr_lock already.
768  *
769  * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes! */
770 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
771 {
772         struct ldlm_resource *res = lock->l_resource;
773         cfs_list_t *tmp;
774         struct ldlm_lock *lck;
775         __u64 kms = 0;
776         ENTRY;
777
778         /* don't let another thread in ldlm_extent_shift_kms race in
779          * just after we finish and take our lock into account in its
780          * calculation of the kms */
781         lock->l_flags |= LDLM_FL_KMS_IGNORE;
782
783         cfs_list_for_each(tmp, &res->lr_granted) {
784                 lck = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
785
786                 if (lck->l_flags & LDLM_FL_KMS_IGNORE)
787                         continue;
788
789                 if (lck->l_policy_data.l_extent.end >= old_kms)
790                         RETURN(old_kms);
791
792                 /* This extent _has_ to be smaller than old_kms (checked above)
793                  * so kms can only ever be smaller or the same as old_kms. */
794                 if (lck->l_policy_data.l_extent.end + 1 > kms)
795                         kms = lck->l_policy_data.l_extent.end + 1;
796         }
797         LASSERTF(kms <= old_kms, "kms "LPU64" old_kms "LPU64"\n", kms, old_kms);
798
799         RETURN(kms);
800 }
801
802 cfs_mem_cache_t *ldlm_interval_slab;
803 struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock)
804 {
805         struct ldlm_interval *node;
806         ENTRY;
807
808         LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
809         OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, CFS_ALLOC_IO);
810         if (node == NULL)
811                 RETURN(NULL);
812
813         CFS_INIT_LIST_HEAD(&node->li_group);
814         ldlm_interval_attach(node, lock);
815         RETURN(node);
816 }
817
818 void ldlm_interval_free(struct ldlm_interval *node)
819 {
820         if (node) {
821                 LASSERT(cfs_list_empty(&node->li_group));
822                 LASSERT(!interval_is_intree(&node->li_node));
823                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
824         }
825 }
826
827 /* interval tree, for LDLM_EXTENT. */
828 void ldlm_interval_attach(struct ldlm_interval *n,
829                           struct ldlm_lock *l)
830 {
831         LASSERT(l->l_tree_node == NULL);
832         LASSERT(l->l_resource->lr_type == LDLM_EXTENT);
833
834         cfs_list_add_tail(&l->l_sl_policy, &n->li_group);
835         l->l_tree_node = n;
836 }
837
838 struct ldlm_interval *ldlm_interval_detach(struct ldlm_lock *l)
839 {
840         struct ldlm_interval *n = l->l_tree_node;
841
842         if (n == NULL)
843                 return NULL;
844
845         LASSERT(!cfs_list_empty(&n->li_group));
846         l->l_tree_node = NULL;
847         cfs_list_del_init(&l->l_sl_policy);
848
849         return (cfs_list_empty(&n->li_group) ? n : NULL);
850 }
851
852 static inline int lock_mode_to_index(ldlm_mode_t mode)
853 {
854         int index;
855
856         LASSERT(mode != 0);
857         LASSERT(IS_PO2(mode));
858         for (index = -1; mode; index++, mode >>= 1) ;
859         LASSERT(index < LCK_MODE_NUM);
860         return index;
861 }
862
863 void ldlm_extent_add_lock(struct ldlm_resource *res,
864                           struct ldlm_lock *lock)
865 {
866         struct interval_node *found, **root;
867         struct ldlm_interval *node;
868         struct ldlm_extent *extent;
869         int idx;
870
871         LASSERT(lock->l_granted_mode == lock->l_req_mode);
872
873         node = lock->l_tree_node;
874         LASSERT(node != NULL);
875         LASSERT(!interval_is_intree(&node->li_node));
876
877         idx = lock_mode_to_index(lock->l_granted_mode);
878         LASSERT(lock->l_granted_mode == 1 << idx);
879         LASSERT(lock->l_granted_mode == res->lr_itree[idx].lit_mode);
880
881         /* node extent initialize */
882         extent = &lock->l_policy_data.l_extent;
883         interval_set(&node->li_node, extent->start, extent->end);
884
885         root = &res->lr_itree[idx].lit_root;
886         found = interval_insert(&node->li_node, root);
887         if (found) { /* The policy group found. */
888                 struct ldlm_interval *tmp = ldlm_interval_detach(lock);
889                 LASSERT(tmp != NULL);
890                 ldlm_interval_free(tmp);
891                 ldlm_interval_attach(to_ldlm_interval(found), lock);
892         }
893         res->lr_itree[idx].lit_size++;
894
895         /* even though we use interval tree to manage the extent lock, we also
896          * add the locks into grant list, for debug purpose, .. */
897         ldlm_resource_add_lock(res, &res->lr_granted, lock);
898 }
899
900 void ldlm_extent_unlink_lock(struct ldlm_lock *lock)
901 {
902         struct ldlm_resource *res = lock->l_resource;
903         struct ldlm_interval *node = lock->l_tree_node;
904         struct ldlm_interval_tree *tree;
905         int idx;
906
907         if (!node || !interval_is_intree(&node->li_node)) /* duplicate unlink */
908                 return;
909
910         idx = lock_mode_to_index(lock->l_granted_mode);
911         LASSERT(lock->l_granted_mode == 1 << idx);
912         tree = &res->lr_itree[idx];
913
914         LASSERT(tree->lit_root != NULL); /* assure the tree is not null */
915
916         tree->lit_size--;
917         node = ldlm_interval_detach(lock);
918         if (node) {
919                 interval_erase(&node->li_node, &tree->lit_root);
920                 ldlm_interval_free(node);
921         }
922 }
923
924 void ldlm_extent_policy_wire_to_local(const ldlm_wire_policy_data_t *wpolicy,
925                                      ldlm_policy_data_t *lpolicy)
926 {
927         memset(lpolicy, 0, sizeof(*lpolicy));
928         lpolicy->l_extent.start = wpolicy->l_extent.start;
929         lpolicy->l_extent.end = wpolicy->l_extent.end;
930         lpolicy->l_extent.gid = wpolicy->l_extent.gid;
931 }
932
933 void ldlm_extent_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
934                                      ldlm_wire_policy_data_t *wpolicy)
935 {
936         memset(wpolicy, 0, sizeof(*wpolicy));
937         wpolicy->l_extent.start = lpolicy->l_extent.start;
938         wpolicy->l_extent.end = lpolicy->l_extent.end;
939         wpolicy->l_extent.gid = lpolicy->l_extent.gid;
940 }
941