Whamcloud - gitweb
LU-1347 ldlm: makes EXPORT_SYMBOL follows function body
[fs/lustre-release.git] / lustre / ldlm / ldlm_extent.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_extent.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LDLM
43 #ifndef __KERNEL__
44 # include <liblustre.h>
45 #else
46 # include <libcfs/libcfs.h>
47 #endif
48
49 #include <lustre_dlm.h>
50 #include <obd_support.h>
51 #include <obd.h>
52 #include <obd_class.h>
53 #include <lustre_lib.h>
54
55 #include "ldlm_internal.h"
56
57 #ifdef HAVE_SERVER_SUPPORT
58 # define LDLM_MAX_GROWN_EXTENT (32 * 1024 * 1024 - 1)
59
60 /* fixup the ldlm_extent after expanding */
61 static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
62                                               struct ldlm_extent *new_ex,
63                                               int conflicting)
64 {
65         ldlm_mode_t req_mode = req->l_req_mode;
66         __u64 req_start = req->l_req_extent.start;
67         __u64 req_end = req->l_req_extent.end;
68         __u64 req_align, mask;
69
70         if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) {
71                 if (req_end < req_start + LDLM_MAX_GROWN_EXTENT)
72                         new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT,
73                                           new_ex->end);
74         }
75
76         if (new_ex->start == 0 && new_ex->end == OBD_OBJECT_EOF) {
77                 EXIT;
78                 return;
79         }
80
81         /* we need to ensure that the lock extent is properly aligned to what
82          * the client requested. Also we need to make sure it's also server
83          * page size aligned otherwise a server page can be covered by two
84          * write locks. */
85         mask = CFS_PAGE_SIZE;
86         req_align = (req_end + 1) | req_start;
87         if (req_align != 0 && (req_align & (mask - 1)) == 0) {
88                 while ((req_align & mask) == 0)
89                         mask <<= 1;
90         }
91         mask -= 1;
92         /* We can only shrink the lock, not grow it.
93          * This should never cause lock to be smaller than requested,
94          * since requested lock was already aligned on these boundaries. */
95         new_ex->start = ((new_ex->start - 1) | mask) + 1;
96         new_ex->end = ((new_ex->end + 1) & ~mask) - 1;
97         LASSERTF(new_ex->start <= req_start,
98                  "mask "LPX64" grant start "LPU64" req start "LPU64"\n",
99                  mask, new_ex->start, req_start);
100         LASSERTF(new_ex->end >= req_end,
101                  "mask "LPX64" grant end "LPU64" req end "LPU64"\n",
102                  mask, new_ex->end, req_end);
103 }
104
105 /* The purpose of this function is to return:
106  * - the maximum extent
107  * - containing the requested extent
108  * - and not overlapping existing conflicting extents outside the requested one
109  *
110  * Use interval tree to expand the lock extent for granted lock.
111  */
112 static void ldlm_extent_internal_policy_granted(struct ldlm_lock *req,
113                                                 struct ldlm_extent *new_ex)
114 {
115         struct ldlm_resource *res = req->l_resource;
116         ldlm_mode_t req_mode = req->l_req_mode;
117         __u64 req_start = req->l_req_extent.start;
118         __u64 req_end = req->l_req_extent.end;
119         struct ldlm_interval_tree *tree;
120         struct interval_node_extent limiter = { new_ex->start, new_ex->end };
121         int conflicting = 0;
122         int idx;
123         ENTRY;
124
125         lockmode_verify(req_mode);
126
127         /* using interval tree to handle the ldlm extent granted locks */
128         for (idx = 0; idx < LCK_MODE_NUM; idx++) {
129                 struct interval_node_extent ext = { req_start, req_end };
130
131                 tree = &res->lr_itree[idx];
132                 if (lockmode_compat(tree->lit_mode, req_mode))
133                         continue;
134
135                 conflicting += tree->lit_size;
136                 if (conflicting > 4)
137                         limiter.start = req_start;
138
139                 if (interval_is_overlapped(tree->lit_root, &ext))
140                         CDEBUG(D_INFO, 
141                                "req_mode = %d, tree->lit_mode = %d, "
142                                "tree->lit_size = %d\n",
143                                req_mode, tree->lit_mode, tree->lit_size);
144                 interval_expand(tree->lit_root, &ext, &limiter);
145                 limiter.start = max(limiter.start, ext.start);
146                 limiter.end = min(limiter.end, ext.end);
147                 if (limiter.start == req_start && limiter.end == req_end)
148                         break;
149         }
150
151         new_ex->start = limiter.start;
152         new_ex->end = limiter.end;
153         LASSERT(new_ex->start <= req_start);
154         LASSERT(new_ex->end >= req_end);
155
156         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
157         EXIT;
158 }
159
160 /* The purpose of this function is to return:
161  * - the maximum extent
162  * - containing the requested extent
163  * - and not overlapping existing conflicting extents outside the requested one
164  */
165 static void
166 ldlm_extent_internal_policy_waiting(struct ldlm_lock *req,
167                                     struct ldlm_extent *new_ex)
168 {
169         cfs_list_t *tmp;
170         struct ldlm_resource *res = req->l_resource;
171         ldlm_mode_t req_mode = req->l_req_mode;
172         __u64 req_start = req->l_req_extent.start;
173         __u64 req_end = req->l_req_extent.end;
174         int conflicting = 0;
175         ENTRY;
176
177         lockmode_verify(req_mode);
178
179         /* for waiting locks */
180         cfs_list_for_each(tmp, &res->lr_waiting) {
181                 struct ldlm_lock *lock;
182                 struct ldlm_extent *l_extent;
183
184                 lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
185                 l_extent = &lock->l_policy_data.l_extent;
186
187                 /* We already hit the minimum requested size, search no more */
188                 if (new_ex->start == req_start && new_ex->end == req_end) {
189                         EXIT;
190                         return;
191                 }
192
193                 /* Don't conflict with ourselves */
194                 if (req == lock)
195                         continue;
196
197                 /* Locks are compatible, overlap doesn't matter */
198                 /* Until bug 20 is fixed, try to avoid granting overlapping
199                  * locks on one client (they take a long time to cancel) */
200                 if (lockmode_compat(lock->l_req_mode, req_mode) &&
201                     lock->l_export != req->l_export)
202                         continue;
203
204                 /* If this is a high-traffic lock, don't grow downwards at all
205                  * or grow upwards too much */
206                 ++conflicting;
207                 if (conflicting > 4)
208                         new_ex->start = req_start;
209
210                 /* If lock doesn't overlap new_ex, skip it. */
211                 if (!ldlm_extent_overlap(l_extent, new_ex))
212                         continue;
213
214                 /* Locks conflicting in requested extents and we can't satisfy
215                  * both locks, so ignore it.  Either we will ping-pong this
216                  * extent (we would regardless of what extent we granted) or
217                  * lock is unused and it shouldn't limit our extent growth. */
218                 if (ldlm_extent_overlap(&lock->l_req_extent,&req->l_req_extent))
219                         continue;
220
221                 /* We grow extents downwards only as far as they don't overlap
222                  * with already-granted locks, on the assumption that clients
223                  * will be writing beyond the initial requested end and would
224                  * then need to enqueue a new lock beyond previous request.
225                  * l_req_extent->end strictly < req_start, checked above. */
226                 if (l_extent->start < req_start && new_ex->start != req_start) {
227                         if (l_extent->end >= req_start)
228                                 new_ex->start = req_start;
229                         else
230                                 new_ex->start = min(l_extent->end+1, req_start);
231                 }
232
233                 /* If we need to cancel this lock anyways because our request
234                  * overlaps the granted lock, we grow up to its requested
235                  * extent start instead of limiting this extent, assuming that
236                  * clients are writing forwards and the lock had over grown
237                  * its extent downwards before we enqueued our request. */
238                 if (l_extent->end > req_end) {
239                         if (l_extent->start <= req_end)
240                                 new_ex->end = max(lock->l_req_extent.start - 1,
241                                                   req_end);
242                         else
243                                 new_ex->end = max(l_extent->start - 1, req_end);
244                 }
245         }
246
247         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
248         EXIT;
249 }
250
251
252 /* In order to determine the largest possible extent we can grant, we need
253  * to scan all of the queues. */
254 static void ldlm_extent_policy(struct ldlm_resource *res,
255                                struct ldlm_lock *lock, int *flags)
256 {
257         struct ldlm_extent new_ex = { .start = 0, .end = OBD_OBJECT_EOF };
258
259         if (lock->l_export == NULL)
260                 /*
261                  * this is local lock taken by server (e.g., as a part of
262                  * OST-side locking, or unlink handling). Expansion doesn't
263                  * make a lot of sense for local locks, because they are
264                  * dropped immediately on operation completion and would only
265                  * conflict with other threads.
266                  */
267                 return;
268
269         if (lock->l_policy_data.l_extent.start == 0 &&
270             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
271                 /* fast-path whole file locks */
272                 return;
273
274         ldlm_extent_internal_policy_granted(lock, &new_ex);
275         ldlm_extent_internal_policy_waiting(lock, &new_ex);
276
277         if (new_ex.start != lock->l_policy_data.l_extent.start ||
278             new_ex.end != lock->l_policy_data.l_extent.end) {
279                 *flags |= LDLM_FL_LOCK_CHANGED;
280                 lock->l_policy_data.l_extent.start = new_ex.start;
281                 lock->l_policy_data.l_extent.end = new_ex.end;
282         }
283 }
284
285 static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
286 {
287         struct ldlm_resource *res = lock->l_resource;
288         cfs_time_t now = cfs_time_current();
289
290         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_SET_CONTENTION))
291                 return 1;
292
293         CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
294         if (contended_locks > ldlm_res_to_ns(res)->ns_contended_locks)
295                 res->lr_contention_time = now;
296         return cfs_time_before(now, cfs_time_add(res->lr_contention_time,
297                 cfs_time_seconds(ldlm_res_to_ns(res)->ns_contention_time)));
298 }
299
300 struct ldlm_extent_compat_args {
301         cfs_list_t *work_list;
302         struct ldlm_lock *lock;
303         ldlm_mode_t mode;
304         int *locks;
305         int *compat;
306 };
307
308 static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
309                                                 void *data)
310 {
311         struct ldlm_extent_compat_args *priv = data;
312         struct ldlm_interval *node = to_ldlm_interval(n);
313         struct ldlm_extent *extent;
314         cfs_list_t *work_list = priv->work_list;
315         struct ldlm_lock *lock, *enq = priv->lock;
316         ldlm_mode_t mode = priv->mode;
317         int count = 0;
318         ENTRY;
319
320         LASSERT(!cfs_list_empty(&node->li_group));
321
322         cfs_list_for_each_entry(lock, &node->li_group, l_sl_policy) {
323                 /* interval tree is for granted lock */
324                 LASSERTF(mode == lock->l_granted_mode,
325                          "mode = %s, lock->l_granted_mode = %s\n",
326                          ldlm_lockname[mode],
327                          ldlm_lockname[lock->l_granted_mode]);
328                 count++;
329                 if (lock->l_blocking_ast)
330                         ldlm_add_ast_work_item(lock, enq, work_list);
331         }
332
333         /* don't count conflicting glimpse locks */
334         extent = ldlm_interval_extent(node);
335         if (!(mode == LCK_PR &&
336             extent->start == 0 && extent->end == OBD_OBJECT_EOF))
337                 *priv->locks += count;
338
339         if (priv->compat)
340                 *priv->compat = 0;
341
342         RETURN(INTERVAL_ITER_CONT);
343 }
344
345 /* Determine if the lock is compatible with all locks on the queue.
346  * We stop walking the queue if we hit ourselves so we don't take
347  * conflicting locks enqueued after us into accound, or we'd wait forever.
348  *
349  * 0 if the lock is not compatible
350  * 1 if the lock is compatible
351  * 2 if this group lock is compatible and requires no further checking
352  * negative error, such as EWOULDBLOCK for group locks
353  */
354 static int
355 ldlm_extent_compat_queue(cfs_list_t *queue, struct ldlm_lock *req,
356                          int *flags, ldlm_error_t *err,
357                          cfs_list_t *work_list, int *contended_locks)
358 {
359         cfs_list_t *tmp;
360         struct ldlm_lock *lock;
361         struct ldlm_resource *res = req->l_resource;
362         ldlm_mode_t req_mode = req->l_req_mode;
363         __u64 req_start = req->l_req_extent.start;
364         __u64 req_end = req->l_req_extent.end;
365         int compat = 1;
366         int scan = 0;
367         int check_contention;
368         ENTRY;
369
370         lockmode_verify(req_mode);
371
372         /* Using interval tree for granted lock */
373         if (queue == &res->lr_granted) {
374                 struct ldlm_interval_tree *tree;
375                 struct ldlm_extent_compat_args data = {.work_list = work_list,
376                                                .lock = req,
377                                                .locks = contended_locks,
378                                                .compat = &compat };
379                 struct interval_node_extent ex = { .start = req_start,
380                                                    .end = req_end };
381                 int idx, rc;
382
383                 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
384                         tree = &res->lr_itree[idx];
385                         if (tree->lit_root == NULL) /* empty tree, skipped */
386                                 continue;
387
388                         data.mode = tree->lit_mode;
389                         if (lockmode_compat(req_mode, tree->lit_mode)) {
390                                 struct ldlm_interval *node;
391                                 struct ldlm_extent *extent;
392
393                                 if (req_mode != LCK_GROUP)
394                                         continue;
395
396                                 /* group lock, grant it immediately if
397                                  * compatible */
398                                 node = to_ldlm_interval(tree->lit_root);
399                                 extent = ldlm_interval_extent(node);
400                                 if (req->l_policy_data.l_extent.gid ==
401                                     extent->gid)
402                                         RETURN(2);
403                         }
404
405                         if (tree->lit_mode == LCK_GROUP) {
406                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
407                                         compat = -EWOULDBLOCK;
408                                         goto destroylock;
409                                 }
410
411                                 *flags |= LDLM_FL_NO_TIMEOUT;
412                                 if (!work_list)
413                                         RETURN(0);
414
415                                 /* if work list is not NULL,add all
416                                    locks in the tree to work list */
417                                 compat = 0;
418                                 interval_iterate(tree->lit_root,
419                                                  ldlm_extent_compat_cb, &data);
420                                 continue;
421                         }
422
423                         if (!work_list) {
424                                 rc = interval_is_overlapped(tree->lit_root,&ex);
425                                 if (rc)
426                                         RETURN(0);
427                         } else {
428                                 interval_search(tree->lit_root, &ex,
429                                                 ldlm_extent_compat_cb, &data);
430                                 if (!cfs_list_empty(work_list) && compat)
431                                         compat = 0;
432                         }
433                 }
434         } else { /* for waiting queue */
435                 cfs_list_for_each(tmp, queue) {
436                         check_contention = 1;
437
438                         lock = cfs_list_entry(tmp, struct ldlm_lock,
439                                               l_res_link);
440
441                         if (req == lock)
442                                 break;
443
444                         if (unlikely(scan)) {
445                                 /* We only get here if we are queuing GROUP lock
446                                    and met some incompatible one. The main idea of this
447                                    code is to insert GROUP lock past compatible GROUP
448                                    lock in the waiting queue or if there is not any,
449                                    then in front of first non-GROUP lock */
450                                 if (lock->l_req_mode != LCK_GROUP) {
451                                         /* Ok, we hit non-GROUP lock, there should
452                                          * be no more GROUP locks later on, queue in
453                                          * front of first non-GROUP lock */
454
455                                         ldlm_resource_insert_lock_after(lock, req);
456                                         cfs_list_del_init(&lock->l_res_link);
457                                         ldlm_resource_insert_lock_after(req, lock);
458                                         compat = 0;
459                                         break;
460                                 }
461                                 if (req->l_policy_data.l_extent.gid ==
462                                     lock->l_policy_data.l_extent.gid) {
463                                         /* found it */
464                                         ldlm_resource_insert_lock_after(lock, req);
465                                         compat = 0;
466                                         break;
467                                 }
468                                 continue;
469                         }
470
471                         /* locks are compatible, overlap doesn't matter */
472                         if (lockmode_compat(lock->l_req_mode, req_mode)) {
473                                 if (req_mode == LCK_PR &&
474                                     ((lock->l_policy_data.l_extent.start <=
475                                       req->l_policy_data.l_extent.start) &&
476                                      (lock->l_policy_data.l_extent.end >=
477                                       req->l_policy_data.l_extent.end))) {
478                                         /* If we met a PR lock just like us or wider,
479                                            and nobody down the list conflicted with
480                                            it, that means we can skip processing of
481                                            the rest of the list and safely place
482                                            ourselves at the end of the list, or grant
483                                            (dependent if we met an conflicting locks
484                                            before in the list).
485                                            In case of 1st enqueue only we continue
486                                            traversing if there is something conflicting
487                                            down the list because we need to make sure
488                                            that something is marked as AST_SENT as well,
489                                            in cse of empy worklist we would exit on
490                                            first conflict met. */
491                                         /* There IS a case where such flag is
492                                            not set for a lock, yet it blocks
493                                            something. Luckily for us this is
494                                            only during destroy, so lock is
495                                            exclusive. So here we are safe */
496                                         if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
497                                                 RETURN(compat);
498                                         }
499                                 }
500
501                                 /* non-group locks are compatible, overlap doesn't
502                                    matter */
503                                 if (likely(req_mode != LCK_GROUP))
504                                         continue;
505
506                                 /* If we are trying to get a GROUP lock and there is
507                                    another one of this kind, we need to compare gid */
508                                 if (req->l_policy_data.l_extent.gid ==
509                                     lock->l_policy_data.l_extent.gid) {
510                                         /* If existing lock with matched gid is granted,
511                                            we grant new one too. */
512                                         if (lock->l_req_mode == lock->l_granted_mode)
513                                                 RETURN(2);
514
515                                         /* Otherwise we are scanning queue of waiting
516                                          * locks and it means current request would
517                                          * block along with existing lock (that is
518                                          * already blocked.
519                                          * If we are in nonblocking mode - return
520                                          * immediately */
521                                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
522                                                 compat = -EWOULDBLOCK;
523                                                 goto destroylock;
524                                         }
525                                         /* If this group lock is compatible with another
526                                          * group lock on the waiting list, they must be
527                                          * together in the list, so they can be granted
528                                          * at the same time.  Otherwise the later lock
529                                          * can get stuck behind another, incompatible,
530                                          * lock. */
531                                         ldlm_resource_insert_lock_after(lock, req);
532                                         /* Because 'lock' is not granted, we can stop
533                                          * processing this queue and return immediately.
534                                          * There is no need to check the rest of the
535                                          * list. */
536                                         RETURN(0);
537                                 }
538                         }
539
540                         if (unlikely(req_mode == LCK_GROUP &&
541                                      (lock->l_req_mode != lock->l_granted_mode))) {
542                                 scan = 1;
543                                 compat = 0;
544                                 if (lock->l_req_mode != LCK_GROUP) {
545                                         /* Ok, we hit non-GROUP lock, there should be no
546                                            more GROUP locks later on, queue in front of
547                                            first non-GROUP lock */
548
549                                         ldlm_resource_insert_lock_after(lock, req);
550                                         cfs_list_del_init(&lock->l_res_link);
551                                         ldlm_resource_insert_lock_after(req, lock);
552                                         break;
553                                 }
554                                 if (req->l_policy_data.l_extent.gid ==
555                                     lock->l_policy_data.l_extent.gid) {
556                                         /* found it */
557                                         ldlm_resource_insert_lock_after(lock, req);
558                                         break;
559                                 }
560                                 continue;
561                         }
562
563                         if (unlikely(lock->l_req_mode == LCK_GROUP)) {
564                                 /* If compared lock is GROUP, then requested is PR/PW/
565                                  * so this is not compatible; extent range does not
566                                  * matter */
567                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
568                                         compat = -EWOULDBLOCK;
569                                         goto destroylock;
570                                 } else {
571                                         *flags |= LDLM_FL_NO_TIMEOUT;
572                                 }
573                         } else if (lock->l_policy_data.l_extent.end < req_start ||
574                                    lock->l_policy_data.l_extent.start > req_end) {
575                                 /* if a non group lock doesn't overlap skip it */
576                                 continue;
577                         } else if (lock->l_req_extent.end < req_start ||
578                                    lock->l_req_extent.start > req_end) {
579                                 /* false contention, the requests doesn't really overlap */
580                                 check_contention = 0;
581                         }
582
583                         if (!work_list)
584                                 RETURN(0);
585
586                         /* don't count conflicting glimpse locks */
587                         if (lock->l_req_mode == LCK_PR &&
588                             lock->l_policy_data.l_extent.start == 0 &&
589                             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
590                                 check_contention = 0;
591
592                         *contended_locks += check_contention;
593
594                         compat = 0;
595                         if (lock->l_blocking_ast)
596                                 ldlm_add_ast_work_item(lock, req, work_list);
597                 }
598         }
599
600         if (ldlm_check_contention(req, *contended_locks) &&
601             compat == 0 &&
602             (*flags & LDLM_FL_DENY_ON_CONTENTION) &&
603             req->l_req_mode != LCK_GROUP &&
604             req_end - req_start <=
605             ldlm_res_to_ns(req->l_resource)->ns_max_nolock_size)
606                 GOTO(destroylock, compat = -EUSERS);
607
608         RETURN(compat);
609 destroylock:
610         cfs_list_del_init(&req->l_res_link);
611         ldlm_lock_destroy_nolock(req);
612         *err = compat;
613         RETURN(compat);
614 }
615
616 static void discard_bl_list(cfs_list_t *bl_list)
617 {
618         cfs_list_t *tmp, *pos;
619         ENTRY;
620
621         cfs_list_for_each_safe(pos, tmp, bl_list) {
622                 struct ldlm_lock *lock =
623                         cfs_list_entry(pos, struct ldlm_lock, l_bl_ast);
624
625                 cfs_list_del_init(&lock->l_bl_ast);
626                 LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
627                 lock->l_flags &= ~LDLM_FL_AST_SENT;
628                 LASSERT(lock->l_bl_ast_run == 0);
629                 LASSERT(lock->l_blocking_lock);
630                 LDLM_LOCK_RELEASE(lock->l_blocking_lock);
631                 lock->l_blocking_lock = NULL;
632                 LDLM_LOCK_RELEASE(lock);
633         }
634         EXIT;
635 }
636
637 /* If first_enq is 0 (ie, called from ldlm_reprocess_queue):
638   *   - blocking ASTs have already been sent
639   *   - must call this function with the ns lock held
640   *
641   * If first_enq is 1 (ie, called from ldlm_lock_enqueue):
642   *   - blocking ASTs have not been sent
643   *   - must call this function with the ns lock held once */
644 int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
645                              ldlm_error_t *err, cfs_list_t *work_list)
646 {
647         struct ldlm_resource *res = lock->l_resource;
648         CFS_LIST_HEAD(rpc_list);
649         int rc, rc2;
650         int contended_locks = 0;
651         ENTRY;
652
653         LASSERT(cfs_list_empty(&res->lr_converting));
654         LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
655                 !(lock->l_flags & LDLM_AST_DISCARD_DATA));
656         check_res_locked(res);
657         *err = ELDLM_OK;
658
659         if (!first_enq) {
660                 /* Careful observers will note that we don't handle -EWOULDBLOCK
661                  * here, but it's ok for a non-obvious reason -- compat_queue
662                  * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
663                  * flags should always be zero here, and if that ever stops
664                  * being true, we want to find out. */
665                 LASSERT(*flags == 0);
666                 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
667                                               err, NULL, &contended_locks);
668                 if (rc == 1) {
669                         rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
670                                                       flags, err, NULL,
671                                                       &contended_locks);
672                 }
673                 if (rc == 0)
674                         RETURN(LDLM_ITER_STOP);
675
676                 ldlm_resource_unlink_lock(lock);
677
678                 if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE))
679                         ldlm_extent_policy(res, lock, flags);
680                 ldlm_grant_lock(lock, work_list);
681                 RETURN(LDLM_ITER_CONTINUE);
682         }
683
684  restart:
685         contended_locks = 0;
686         rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
687                                       &rpc_list, &contended_locks);
688         if (rc < 0)
689                 GOTO(out, rc); /* lock was destroyed */
690         if (rc == 2)
691                 goto grant;
692
693         rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err,
694                                        &rpc_list, &contended_locks);
695         if (rc2 < 0)
696                 GOTO(out, rc = rc2); /* lock was destroyed */
697
698         if (rc + rc2 == 2) {
699         grant:
700                 ldlm_extent_policy(res, lock, flags);
701                 ldlm_resource_unlink_lock(lock);
702                 ldlm_grant_lock(lock, NULL);
703         } else {
704                 /* If either of the compat_queue()s returned failure, then we
705                  * have ASTs to send and must go onto the waiting list.
706                  *
707                  * bug 2322: we used to unlink and re-add here, which was a
708                  * terrible folly -- if we goto restart, we could get
709                  * re-ordered!  Causes deadlock, because ASTs aren't sent! */
710                 if (cfs_list_empty(&lock->l_res_link))
711                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
712                 unlock_res(res);
713                 rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
714                                        LDLM_WORK_BL_AST);
715
716                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_FAIL_RACE) &&
717                     !ns_is_client(ldlm_res_to_ns(res)))
718                         class_fail_export(lock->l_export);
719
720                 lock_res(res);
721                 if (rc == -ERESTART) {
722
723                         /* 15715: The lock was granted and destroyed after
724                          * resource lock was dropped. Interval node was freed
725                          * in ldlm_lock_destroy. Anyway, this always happens
726                          * when a client is being evicted. So it would be
727                          * ok to return an error. -jay */
728                         if (lock->l_destroyed) {
729                                 *err = -EAGAIN;
730                                 GOTO(out, rc = -EAGAIN);
731                         }
732
733                         /* lock was granted while resource was unlocked. */
734                         if (lock->l_granted_mode == lock->l_req_mode) {
735                                 /* bug 11300: if the lock has been granted,
736                                  * break earlier because otherwise, we will go
737                                  * to restart and ldlm_resource_unlink will be
738                                  * called and it causes the interval node to be
739                                  * freed. Then we will fail at
740                                  * ldlm_extent_add_lock() */
741                                 *flags &= ~(LDLM_FL_BLOCK_GRANTED | LDLM_FL_BLOCK_CONV |
742                                             LDLM_FL_BLOCK_WAIT);
743                                 GOTO(out, rc = 0);
744                         }
745
746                         GOTO(restart, -ERESTART);
747                 }
748
749                 *flags |= LDLM_FL_BLOCK_GRANTED;
750                 /* this way we force client to wait for the lock
751                  * endlessly once the lock is enqueued -bzzz */
752                 *flags |= LDLM_FL_NO_TIMEOUT;
753
754         }
755         RETURN(0);
756 out:
757         if (!cfs_list_empty(&rpc_list)) {
758                 LASSERT(!(lock->l_flags & LDLM_AST_DISCARD_DATA));
759                 discard_bl_list(&rpc_list);
760         }
761         RETURN(rc);
762 }
763 #endif /* HAVE_SERVER_SUPPORT */
764
765 /* When a lock is cancelled by a client, the KMS may undergo change if this
766  * is the "highest lock".  This function returns the new KMS value.
767  * Caller must hold lr_lock already.
768  *
769  * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes! */
770 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
771 {
772         struct ldlm_resource *res = lock->l_resource;
773         cfs_list_t *tmp;
774         struct ldlm_lock *lck;
775         __u64 kms = 0;
776         ENTRY;
777
778         /* don't let another thread in ldlm_extent_shift_kms race in
779          * just after we finish and take our lock into account in its
780          * calculation of the kms */
781         lock->l_flags |= LDLM_FL_KMS_IGNORE;
782
783         cfs_list_for_each(tmp, &res->lr_granted) {
784                 lck = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
785
786                 if (lck->l_flags & LDLM_FL_KMS_IGNORE)
787                         continue;
788
789                 if (lck->l_policy_data.l_extent.end >= old_kms)
790                         RETURN(old_kms);
791
792                 /* This extent _has_ to be smaller than old_kms (checked above)
793                  * so kms can only ever be smaller or the same as old_kms. */
794                 if (lck->l_policy_data.l_extent.end + 1 > kms)
795                         kms = lck->l_policy_data.l_extent.end + 1;
796         }
797         LASSERTF(kms <= old_kms, "kms "LPU64" old_kms "LPU64"\n", kms, old_kms);
798
799         RETURN(kms);
800 }
801 EXPORT_SYMBOL(ldlm_extent_shift_kms);
802
803 cfs_mem_cache_t *ldlm_interval_slab;
804 struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock)
805 {
806         struct ldlm_interval *node;
807         ENTRY;
808
809         LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
810         OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, CFS_ALLOC_IO);
811         if (node == NULL)
812                 RETURN(NULL);
813
814         CFS_INIT_LIST_HEAD(&node->li_group);
815         ldlm_interval_attach(node, lock);
816         RETURN(node);
817 }
818
819 void ldlm_interval_free(struct ldlm_interval *node)
820 {
821         if (node) {
822                 LASSERT(cfs_list_empty(&node->li_group));
823                 LASSERT(!interval_is_intree(&node->li_node));
824                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
825         }
826 }
827
828 /* interval tree, for LDLM_EXTENT. */
829 void ldlm_interval_attach(struct ldlm_interval *n,
830                           struct ldlm_lock *l)
831 {
832         LASSERT(l->l_tree_node == NULL);
833         LASSERT(l->l_resource->lr_type == LDLM_EXTENT);
834
835         cfs_list_add_tail(&l->l_sl_policy, &n->li_group);
836         l->l_tree_node = n;
837 }
838
839 struct ldlm_interval *ldlm_interval_detach(struct ldlm_lock *l)
840 {
841         struct ldlm_interval *n = l->l_tree_node;
842
843         if (n == NULL)
844                 return NULL;
845
846         LASSERT(!cfs_list_empty(&n->li_group));
847         l->l_tree_node = NULL;
848         cfs_list_del_init(&l->l_sl_policy);
849
850         return (cfs_list_empty(&n->li_group) ? n : NULL);
851 }
852
853 static inline int lock_mode_to_index(ldlm_mode_t mode)
854 {
855         int index;
856
857         LASSERT(mode != 0);
858         LASSERT(IS_PO2(mode));
859         for (index = -1; mode; index++, mode >>= 1) ;
860         LASSERT(index < LCK_MODE_NUM);
861         return index;
862 }
863
864 void ldlm_extent_add_lock(struct ldlm_resource *res,
865                           struct ldlm_lock *lock)
866 {
867         struct interval_node *found, **root;
868         struct ldlm_interval *node;
869         struct ldlm_extent *extent;
870         int idx;
871
872         LASSERT(lock->l_granted_mode == lock->l_req_mode);
873
874         node = lock->l_tree_node;
875         LASSERT(node != NULL);
876         LASSERT(!interval_is_intree(&node->li_node));
877
878         idx = lock_mode_to_index(lock->l_granted_mode);
879         LASSERT(lock->l_granted_mode == 1 << idx);
880         LASSERT(lock->l_granted_mode == res->lr_itree[idx].lit_mode);
881
882         /* node extent initialize */
883         extent = &lock->l_policy_data.l_extent;
884         interval_set(&node->li_node, extent->start, extent->end);
885
886         root = &res->lr_itree[idx].lit_root;
887         found = interval_insert(&node->li_node, root);
888         if (found) { /* The policy group found. */
889                 struct ldlm_interval *tmp = ldlm_interval_detach(lock);
890                 LASSERT(tmp != NULL);
891                 ldlm_interval_free(tmp);
892                 ldlm_interval_attach(to_ldlm_interval(found), lock);
893         }
894         res->lr_itree[idx].lit_size++;
895
896         /* even though we use interval tree to manage the extent lock, we also
897          * add the locks into grant list, for debug purpose, .. */
898         ldlm_resource_add_lock(res, &res->lr_granted, lock);
899 }
900
901 void ldlm_extent_unlink_lock(struct ldlm_lock *lock)
902 {
903         struct ldlm_resource *res = lock->l_resource;
904         struct ldlm_interval *node = lock->l_tree_node;
905         struct ldlm_interval_tree *tree;
906         int idx;
907
908         if (!node || !interval_is_intree(&node->li_node)) /* duplicate unlink */
909                 return;
910
911         idx = lock_mode_to_index(lock->l_granted_mode);
912         LASSERT(lock->l_granted_mode == 1 << idx);
913         tree = &res->lr_itree[idx];
914
915         LASSERT(tree->lit_root != NULL); /* assure the tree is not null */
916
917         tree->lit_size--;
918         node = ldlm_interval_detach(lock);
919         if (node) {
920                 interval_erase(&node->li_node, &tree->lit_root);
921                 ldlm_interval_free(node);
922         }
923 }
924
925 void ldlm_extent_policy_wire_to_local(const ldlm_wire_policy_data_t *wpolicy,
926                                      ldlm_policy_data_t *lpolicy)
927 {
928         memset(lpolicy, 0, sizeof(*lpolicy));
929         lpolicy->l_extent.start = wpolicy->l_extent.start;
930         lpolicy->l_extent.end = wpolicy->l_extent.end;
931         lpolicy->l_extent.gid = wpolicy->l_extent.gid;
932 }
933
934 void ldlm_extent_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
935                                      ldlm_wire_policy_data_t *wpolicy)
936 {
937         memset(wpolicy, 0, sizeof(*wpolicy));
938         wpolicy->l_extent.start = lpolicy->l_extent.start;
939         wpolicy->l_extent.end = lpolicy->l_extent.end;
940         wpolicy->l_extent.gid = lpolicy->l_extent.gid;
941 }
942