Whamcloud - gitweb
LU-1684 ldlm: move ldlm flags not sent through wire to upper 32bits
[fs/lustre-release.git] / lustre / ldlm / ldlm_extent.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_extent.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LDLM
43 #ifndef __KERNEL__
44 # include <liblustre.h>
45 #else
46 # include <libcfs/libcfs.h>
47 #endif
48
49 #include <lustre_dlm.h>
50 #include <obd_support.h>
51 #include <obd.h>
52 #include <obd_class.h>
53 #include <lustre_lib.h>
54
55 #include "ldlm_internal.h"
56
57 #ifdef HAVE_SERVER_SUPPORT
58 # define LDLM_MAX_GROWN_EXTENT (32 * 1024 * 1024 - 1)
59
60 /* fixup the ldlm_extent after expanding */
61 static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
62                                               struct ldlm_extent *new_ex,
63                                               int conflicting)
64 {
65         ldlm_mode_t req_mode = req->l_req_mode;
66         __u64 req_start = req->l_req_extent.start;
67         __u64 req_end = req->l_req_extent.end;
68         __u64 req_align, mask;
69
70         if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) {
71                 if (req_end < req_start + LDLM_MAX_GROWN_EXTENT)
72                         new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT,
73                                           new_ex->end);
74         }
75
76         if (new_ex->start == 0 && new_ex->end == OBD_OBJECT_EOF) {
77                 EXIT;
78                 return;
79         }
80
81         /* we need to ensure that the lock extent is properly aligned to what
82          * the client requested. Also we need to make sure it's also server
83          * page size aligned otherwise a server page can be covered by two
84          * write locks. */
85         mask = CFS_PAGE_SIZE;
86         req_align = (req_end + 1) | req_start;
87         if (req_align != 0 && (req_align & (mask - 1)) == 0) {
88                 while ((req_align & mask) == 0)
89                         mask <<= 1;
90         }
91         mask -= 1;
92         /* We can only shrink the lock, not grow it.
93          * This should never cause lock to be smaller than requested,
94          * since requested lock was already aligned on these boundaries. */
95         new_ex->start = ((new_ex->start - 1) | mask) + 1;
96         new_ex->end = ((new_ex->end + 1) & ~mask) - 1;
97         LASSERTF(new_ex->start <= req_start,
98                  "mask "LPX64" grant start "LPU64" req start "LPU64"\n",
99                  mask, new_ex->start, req_start);
100         LASSERTF(new_ex->end >= req_end,
101                  "mask "LPX64" grant end "LPU64" req end "LPU64"\n",
102                  mask, new_ex->end, req_end);
103 }
104
105 /* The purpose of this function is to return:
106  * - the maximum extent
107  * - containing the requested extent
108  * - and not overlapping existing conflicting extents outside the requested one
109  *
110  * Use interval tree to expand the lock extent for granted lock.
111  */
112 static void ldlm_extent_internal_policy_granted(struct ldlm_lock *req,
113                                                 struct ldlm_extent *new_ex)
114 {
115         struct ldlm_resource *res = req->l_resource;
116         ldlm_mode_t req_mode = req->l_req_mode;
117         __u64 req_start = req->l_req_extent.start;
118         __u64 req_end = req->l_req_extent.end;
119         struct ldlm_interval_tree *tree;
120         struct interval_node_extent limiter = { new_ex->start, new_ex->end };
121         int conflicting = 0;
122         int idx;
123         ENTRY;
124
125         lockmode_verify(req_mode);
126
127         /* using interval tree to handle the ldlm extent granted locks */
128         for (idx = 0; idx < LCK_MODE_NUM; idx++) {
129                 struct interval_node_extent ext = { req_start, req_end };
130
131                 tree = &res->lr_itree[idx];
132                 if (lockmode_compat(tree->lit_mode, req_mode))
133                         continue;
134
135                 conflicting += tree->lit_size;
136                 if (conflicting > 4)
137                         limiter.start = req_start;
138
139                 if (interval_is_overlapped(tree->lit_root, &ext))
140                         CDEBUG(D_INFO, 
141                                "req_mode = %d, tree->lit_mode = %d, "
142                                "tree->lit_size = %d\n",
143                                req_mode, tree->lit_mode, tree->lit_size);
144                 interval_expand(tree->lit_root, &ext, &limiter);
145                 limiter.start = max(limiter.start, ext.start);
146                 limiter.end = min(limiter.end, ext.end);
147                 if (limiter.start == req_start && limiter.end == req_end)
148                         break;
149         }
150
151         new_ex->start = limiter.start;
152         new_ex->end = limiter.end;
153         LASSERT(new_ex->start <= req_start);
154         LASSERT(new_ex->end >= req_end);
155
156         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
157         EXIT;
158 }
159
160 /* The purpose of this function is to return:
161  * - the maximum extent
162  * - containing the requested extent
163  * - and not overlapping existing conflicting extents outside the requested one
164  */
165 static void
166 ldlm_extent_internal_policy_waiting(struct ldlm_lock *req,
167                                     struct ldlm_extent *new_ex)
168 {
169         cfs_list_t *tmp;
170         struct ldlm_resource *res = req->l_resource;
171         ldlm_mode_t req_mode = req->l_req_mode;
172         __u64 req_start = req->l_req_extent.start;
173         __u64 req_end = req->l_req_extent.end;
174         int conflicting = 0;
175         ENTRY;
176
177         lockmode_verify(req_mode);
178
179         /* for waiting locks */
180         cfs_list_for_each(tmp, &res->lr_waiting) {
181                 struct ldlm_lock *lock;
182                 struct ldlm_extent *l_extent;
183
184                 lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
185                 l_extent = &lock->l_policy_data.l_extent;
186
187                 /* We already hit the minimum requested size, search no more */
188                 if (new_ex->start == req_start && new_ex->end == req_end) {
189                         EXIT;
190                         return;
191                 }
192
193                 /* Don't conflict with ourselves */
194                 if (req == lock)
195                         continue;
196
197                 /* Locks are compatible, overlap doesn't matter */
198                 /* Until bug 20 is fixed, try to avoid granting overlapping
199                  * locks on one client (they take a long time to cancel) */
200                 if (lockmode_compat(lock->l_req_mode, req_mode) &&
201                     lock->l_export != req->l_export)
202                         continue;
203
204                 /* If this is a high-traffic lock, don't grow downwards at all
205                  * or grow upwards too much */
206                 ++conflicting;
207                 if (conflicting > 4)
208                         new_ex->start = req_start;
209
210                 /* If lock doesn't overlap new_ex, skip it. */
211                 if (!ldlm_extent_overlap(l_extent, new_ex))
212                         continue;
213
214                 /* Locks conflicting in requested extents and we can't satisfy
215                  * both locks, so ignore it.  Either we will ping-pong this
216                  * extent (we would regardless of what extent we granted) or
217                  * lock is unused and it shouldn't limit our extent growth. */
218                 if (ldlm_extent_overlap(&lock->l_req_extent,&req->l_req_extent))
219                         continue;
220
221                 /* We grow extents downwards only as far as they don't overlap
222                  * with already-granted locks, on the assumption that clients
223                  * will be writing beyond the initial requested end and would
224                  * then need to enqueue a new lock beyond previous request.
225                  * l_req_extent->end strictly < req_start, checked above. */
226                 if (l_extent->start < req_start && new_ex->start != req_start) {
227                         if (l_extent->end >= req_start)
228                                 new_ex->start = req_start;
229                         else
230                                 new_ex->start = min(l_extent->end+1, req_start);
231                 }
232
233                 /* If we need to cancel this lock anyways because our request
234                  * overlaps the granted lock, we grow up to its requested
235                  * extent start instead of limiting this extent, assuming that
236                  * clients are writing forwards and the lock had over grown
237                  * its extent downwards before we enqueued our request. */
238                 if (l_extent->end > req_end) {
239                         if (l_extent->start <= req_end)
240                                 new_ex->end = max(lock->l_req_extent.start - 1,
241                                                   req_end);
242                         else
243                                 new_ex->end = max(l_extent->start - 1, req_end);
244                 }
245         }
246
247         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
248         EXIT;
249 }
250
251
252 /* In order to determine the largest possible extent we can grant, we need
253  * to scan all of the queues. */
254 static void ldlm_extent_policy(struct ldlm_resource *res,
255                                struct ldlm_lock *lock, __u64 *flags)
256 {
257         struct ldlm_extent new_ex = { .start = 0, .end = OBD_OBJECT_EOF };
258
259         if (lock->l_export == NULL)
260                 /*
261                  * this is local lock taken by server (e.g., as a part of
262                  * OST-side locking, or unlink handling). Expansion doesn't
263                  * make a lot of sense for local locks, because they are
264                  * dropped immediately on operation completion and would only
265                  * conflict with other threads.
266                  */
267                 return;
268
269         if (lock->l_policy_data.l_extent.start == 0 &&
270             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
271                 /* fast-path whole file locks */
272                 return;
273
274         ldlm_extent_internal_policy_granted(lock, &new_ex);
275         ldlm_extent_internal_policy_waiting(lock, &new_ex);
276
277         if (new_ex.start != lock->l_policy_data.l_extent.start ||
278             new_ex.end != lock->l_policy_data.l_extent.end) {
279                 *flags |= LDLM_FL_LOCK_CHANGED;
280                 lock->l_policy_data.l_extent.start = new_ex.start;
281                 lock->l_policy_data.l_extent.end = new_ex.end;
282         }
283 }
284
285 static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
286 {
287         struct ldlm_resource *res = lock->l_resource;
288         cfs_time_t now = cfs_time_current();
289
290         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_SET_CONTENTION))
291                 return 1;
292
293         CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
294         if (contended_locks > ldlm_res_to_ns(res)->ns_contended_locks)
295                 res->lr_contention_time = now;
296         return cfs_time_before(now, cfs_time_add(res->lr_contention_time,
297                 cfs_time_seconds(ldlm_res_to_ns(res)->ns_contention_time)));
298 }
299
300 struct ldlm_extent_compat_args {
301         cfs_list_t *work_list;
302         struct ldlm_lock *lock;
303         ldlm_mode_t mode;
304         int *locks;
305         int *compat;
306 };
307
308 static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
309                                                 void *data)
310 {
311         struct ldlm_extent_compat_args *priv = data;
312         struct ldlm_interval *node = to_ldlm_interval(n);
313         struct ldlm_extent *extent;
314         cfs_list_t *work_list = priv->work_list;
315         struct ldlm_lock *lock, *enq = priv->lock;
316         ldlm_mode_t mode = priv->mode;
317         int count = 0;
318         ENTRY;
319
320         LASSERT(!cfs_list_empty(&node->li_group));
321
322         cfs_list_for_each_entry(lock, &node->li_group, l_sl_policy) {
323                 /* interval tree is for granted lock */
324                 LASSERTF(mode == lock->l_granted_mode,
325                          "mode = %s, lock->l_granted_mode = %s\n",
326                          ldlm_lockname[mode],
327                          ldlm_lockname[lock->l_granted_mode]);
328                 count++;
329                 if (lock->l_blocking_ast)
330                         ldlm_add_ast_work_item(lock, enq, work_list);
331         }
332
333         /* don't count conflicting glimpse locks */
334         extent = ldlm_interval_extent(node);
335         if (!(mode == LCK_PR &&
336             extent->start == 0 && extent->end == OBD_OBJECT_EOF))
337                 *priv->locks += count;
338
339         if (priv->compat)
340                 *priv->compat = 0;
341
342         RETURN(INTERVAL_ITER_CONT);
343 }
344
345 /* Determine if the lock is compatible with all locks on the queue.
346  * We stop walking the queue if we hit ourselves so we don't take
347  * conflicting locks enqueued after us into accound, or we'd wait forever.
348  *
349  * 0 if the lock is not compatible
350  * 1 if the lock is compatible
351  * 2 if this group lock is compatible and requires no further checking
352  * negative error, such as EWOULDBLOCK for group locks
353  */
354 static int
355 ldlm_extent_compat_queue(cfs_list_t *queue, struct ldlm_lock *req,
356                          __u64 *flags, ldlm_error_t *err,
357                          cfs_list_t *work_list, int *contended_locks)
358 {
359         cfs_list_t *tmp;
360         struct ldlm_lock *lock;
361         struct ldlm_resource *res = req->l_resource;
362         ldlm_mode_t req_mode = req->l_req_mode;
363         __u64 req_start = req->l_req_extent.start;
364         __u64 req_end = req->l_req_extent.end;
365         int compat = 1;
366         int scan = 0;
367         int check_contention;
368         ENTRY;
369
370         lockmode_verify(req_mode);
371
372         /* Using interval tree for granted lock */
373         if (queue == &res->lr_granted) {
374                 struct ldlm_interval_tree *tree;
375                 struct ldlm_extent_compat_args data = {.work_list = work_list,
376                                                .lock = req,
377                                                .locks = contended_locks,
378                                                .compat = &compat };
379                 struct interval_node_extent ex = { .start = req_start,
380                                                    .end = req_end };
381                 int idx, rc;
382
383                 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
384                         tree = &res->lr_itree[idx];
385                         if (tree->lit_root == NULL) /* empty tree, skipped */
386                                 continue;
387
388                         data.mode = tree->lit_mode;
389                         if (lockmode_compat(req_mode, tree->lit_mode)) {
390                                 struct ldlm_interval *node;
391                                 struct ldlm_extent *extent;
392
393                                 if (req_mode != LCK_GROUP)
394                                         continue;
395
396                                 /* group lock, grant it immediately if
397                                  * compatible */
398                                 node = to_ldlm_interval(tree->lit_root);
399                                 extent = ldlm_interval_extent(node);
400                                 if (req->l_policy_data.l_extent.gid ==
401                                     extent->gid)
402                                         RETURN(2);
403                         }
404
405                         if (tree->lit_mode == LCK_GROUP) {
406                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
407                                         compat = -EWOULDBLOCK;
408                                         goto destroylock;
409                                 }
410
411                                 *flags |= LDLM_FL_NO_TIMEOUT;
412                                 if (!work_list)
413                                         RETURN(0);
414
415                                 /* if work list is not NULL,add all
416                                    locks in the tree to work list */
417                                 compat = 0;
418                                 interval_iterate(tree->lit_root,
419                                                  ldlm_extent_compat_cb, &data);
420                                 continue;
421                         }
422
423                         if (!work_list) {
424                                 rc = interval_is_overlapped(tree->lit_root,&ex);
425                                 if (rc)
426                                         RETURN(0);
427                         } else {
428                                 interval_search(tree->lit_root, &ex,
429                                                 ldlm_extent_compat_cb, &data);
430                                 if (!cfs_list_empty(work_list) && compat)
431                                         compat = 0;
432                         }
433                 }
434         } else { /* for waiting queue */
435                 cfs_list_for_each(tmp, queue) {
436                         check_contention = 1;
437
438                         lock = cfs_list_entry(tmp, struct ldlm_lock,
439                                               l_res_link);
440
441                         if (req == lock)
442                                 break;
443
444                         if (unlikely(scan)) {
445                                 /* We only get here if we are queuing GROUP lock
446                                    and met some incompatible one. The main idea of this
447                                    code is to insert GROUP lock past compatible GROUP
448                                    lock in the waiting queue or if there is not any,
449                                    then in front of first non-GROUP lock */
450                                 if (lock->l_req_mode != LCK_GROUP) {
451                                         /* Ok, we hit non-GROUP lock, there should
452                                          * be no more GROUP locks later on, queue in
453                                          * front of first non-GROUP lock */
454
455                                         ldlm_resource_insert_lock_after(lock, req);
456                                         cfs_list_del_init(&lock->l_res_link);
457                                         ldlm_resource_insert_lock_after(req, lock);
458                                         compat = 0;
459                                         break;
460                                 }
461                                 if (req->l_policy_data.l_extent.gid ==
462                                     lock->l_policy_data.l_extent.gid) {
463                                         /* found it */
464                                         ldlm_resource_insert_lock_after(lock, req);
465                                         compat = 0;
466                                         break;
467                                 }
468                                 continue;
469                         }
470
471                         /* locks are compatible, overlap doesn't matter */
472                         if (lockmode_compat(lock->l_req_mode, req_mode)) {
473                                 if (req_mode == LCK_PR &&
474                                     ((lock->l_policy_data.l_extent.start <=
475                                       req->l_policy_data.l_extent.start) &&
476                                      (lock->l_policy_data.l_extent.end >=
477                                       req->l_policy_data.l_extent.end))) {
478                                         /* If we met a PR lock just like us or wider,
479                                            and nobody down the list conflicted with
480                                            it, that means we can skip processing of
481                                            the rest of the list and safely place
482                                            ourselves at the end of the list, or grant
483                                            (dependent if we met an conflicting locks
484                                            before in the list).
485                                            In case of 1st enqueue only we continue
486                                            traversing if there is something conflicting
487                                            down the list because we need to make sure
488                                            that something is marked as AST_SENT as well,
489                                            in cse of empy worklist we would exit on
490                                            first conflict met. */
491                                         /* There IS a case where such flag is
492                                            not set for a lock, yet it blocks
493                                            something. Luckily for us this is
494                                            only during destroy, so lock is
495                                            exclusive. So here we are safe */
496                                         if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
497                                                 RETURN(compat);
498                                         }
499                                 }
500
501                                 /* non-group locks are compatible, overlap doesn't
502                                    matter */
503                                 if (likely(req_mode != LCK_GROUP))
504                                         continue;
505
506                                 /* If we are trying to get a GROUP lock and there is
507                                    another one of this kind, we need to compare gid */
508                                 if (req->l_policy_data.l_extent.gid ==
509                                     lock->l_policy_data.l_extent.gid) {
510                                         /* If existing lock with matched gid is granted,
511                                            we grant new one too. */
512                                         if (lock->l_req_mode == lock->l_granted_mode)
513                                                 RETURN(2);
514
515                                         /* Otherwise we are scanning queue of waiting
516                                          * locks and it means current request would
517                                          * block along with existing lock (that is
518                                          * already blocked.
519                                          * If we are in nonblocking mode - return
520                                          * immediately */
521                                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
522                                                 compat = -EWOULDBLOCK;
523                                                 goto destroylock;
524                                         }
525                                         /* If this group lock is compatible with another
526                                          * group lock on the waiting list, they must be
527                                          * together in the list, so they can be granted
528                                          * at the same time.  Otherwise the later lock
529                                          * can get stuck behind another, incompatible,
530                                          * lock. */
531                                         ldlm_resource_insert_lock_after(lock, req);
532                                         /* Because 'lock' is not granted, we can stop
533                                          * processing this queue and return immediately.
534                                          * There is no need to check the rest of the
535                                          * list. */
536                                         RETURN(0);
537                                 }
538                         }
539
540                         if (unlikely(req_mode == LCK_GROUP &&
541                                      (lock->l_req_mode != lock->l_granted_mode))) {
542                                 scan = 1;
543                                 compat = 0;
544                                 if (lock->l_req_mode != LCK_GROUP) {
545                                         /* Ok, we hit non-GROUP lock, there should be no
546                                            more GROUP locks later on, queue in front of
547                                            first non-GROUP lock */
548
549                                         ldlm_resource_insert_lock_after(lock, req);
550                                         cfs_list_del_init(&lock->l_res_link);
551                                         ldlm_resource_insert_lock_after(req, lock);
552                                         break;
553                                 }
554                                 if (req->l_policy_data.l_extent.gid ==
555                                     lock->l_policy_data.l_extent.gid) {
556                                         /* found it */
557                                         ldlm_resource_insert_lock_after(lock, req);
558                                         break;
559                                 }
560                                 continue;
561                         }
562
563                         if (unlikely(lock->l_req_mode == LCK_GROUP)) {
564                                 /* If compared lock is GROUP, then requested is PR/PW/
565                                  * so this is not compatible; extent range does not
566                                  * matter */
567                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
568                                         compat = -EWOULDBLOCK;
569                                         goto destroylock;
570                                 } else {
571                                         *flags |= LDLM_FL_NO_TIMEOUT;
572                                 }
573                         } else if (lock->l_policy_data.l_extent.end < req_start ||
574                                    lock->l_policy_data.l_extent.start > req_end) {
575                                 /* if a non group lock doesn't overlap skip it */
576                                 continue;
577                         } else if (lock->l_req_extent.end < req_start ||
578                                    lock->l_req_extent.start > req_end) {
579                                 /* false contention, the requests doesn't really overlap */
580                                 check_contention = 0;
581                         }
582
583                         if (!work_list)
584                                 RETURN(0);
585
586                         /* don't count conflicting glimpse locks */
587                         if (lock->l_req_mode == LCK_PR &&
588                             lock->l_policy_data.l_extent.start == 0 &&
589                             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
590                                 check_contention = 0;
591
592                         *contended_locks += check_contention;
593
594                         compat = 0;
595                         if (lock->l_blocking_ast)
596                                 ldlm_add_ast_work_item(lock, req, work_list);
597                 }
598         }
599
600         if (ldlm_check_contention(req, *contended_locks) &&
601             compat == 0 &&
602             (*flags & LDLM_FL_DENY_ON_CONTENTION) &&
603             req->l_req_mode != LCK_GROUP &&
604             req_end - req_start <=
605             ldlm_res_to_ns(req->l_resource)->ns_max_nolock_size)
606                 GOTO(destroylock, compat = -EUSERS);
607
608         RETURN(compat);
609 destroylock:
610         cfs_list_del_init(&req->l_res_link);
611         ldlm_lock_destroy_nolock(req);
612         *err = compat;
613         RETURN(compat);
614 }
615
616 static void discard_bl_list(cfs_list_t *bl_list)
617 {
618         cfs_list_t *tmp, *pos;
619         ENTRY;
620
621         cfs_list_for_each_safe(pos, tmp, bl_list) {
622                 struct ldlm_lock *lock =
623                         cfs_list_entry(pos, struct ldlm_lock, l_bl_ast);
624
625                 cfs_list_del_init(&lock->l_bl_ast);
626                 LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
627                 lock->l_flags &= ~LDLM_FL_AST_SENT;
628                 LASSERT(lock->l_bl_ast_run == 0);
629                 LASSERT(lock->l_blocking_lock);
630                 LDLM_LOCK_RELEASE(lock->l_blocking_lock);
631                 lock->l_blocking_lock = NULL;
632                 LDLM_LOCK_RELEASE(lock);
633         }
634         EXIT;
635 }
636
637 /* If first_enq is 0 (ie, called from ldlm_reprocess_queue):
638   *   - blocking ASTs have already been sent
639   *   - must call this function with the ns lock held
640   *
641   * If first_enq is 1 (ie, called from ldlm_lock_enqueue):
642   *   - blocking ASTs have not been sent
643   *   - must call this function with the ns lock held once */
644 int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags,
645                              int first_enq, ldlm_error_t *err,
646                              cfs_list_t *work_list)
647 {
648         struct ldlm_resource *res = lock->l_resource;
649         CFS_LIST_HEAD(rpc_list);
650         int rc, rc2;
651         int contended_locks = 0;
652         ENTRY;
653
654         LASSERT(cfs_list_empty(&res->lr_converting));
655         LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
656                 !(lock->l_flags & LDLM_AST_DISCARD_DATA));
657         check_res_locked(res);
658         *err = ELDLM_OK;
659
660         if (!first_enq) {
661                 /* Careful observers will note that we don't handle -EWOULDBLOCK
662                  * here, but it's ok for a non-obvious reason -- compat_queue
663                  * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
664                  * flags should always be zero here, and if that ever stops
665                  * being true, we want to find out. */
666                 LASSERT(*flags == 0);
667                 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
668                                               err, NULL, &contended_locks);
669                 if (rc == 1) {
670                         rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
671                                                       flags, err, NULL,
672                                                       &contended_locks);
673                 }
674                 if (rc == 0)
675                         RETURN(LDLM_ITER_STOP);
676
677                 ldlm_resource_unlink_lock(lock);
678
679                 if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE))
680                         ldlm_extent_policy(res, lock, flags);
681                 ldlm_grant_lock(lock, work_list);
682                 RETURN(LDLM_ITER_CONTINUE);
683         }
684
685  restart:
686         contended_locks = 0;
687         rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
688                                       &rpc_list, &contended_locks);
689         if (rc < 0)
690                 GOTO(out, rc); /* lock was destroyed */
691         if (rc == 2)
692                 goto grant;
693
694         rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err,
695                                        &rpc_list, &contended_locks);
696         if (rc2 < 0)
697                 GOTO(out, rc = rc2); /* lock was destroyed */
698
699         if (rc + rc2 == 2) {
700         grant:
701                 ldlm_extent_policy(res, lock, flags);
702                 ldlm_resource_unlink_lock(lock);
703                 ldlm_grant_lock(lock, NULL);
704         } else {
705                 /* If either of the compat_queue()s returned failure, then we
706                  * have ASTs to send and must go onto the waiting list.
707                  *
708                  * bug 2322: we used to unlink and re-add here, which was a
709                  * terrible folly -- if we goto restart, we could get
710                  * re-ordered!  Causes deadlock, because ASTs aren't sent! */
711                 if (cfs_list_empty(&lock->l_res_link))
712                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
713                 unlock_res(res);
714                 rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
715                                        LDLM_WORK_BL_AST);
716
717                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_FAIL_RACE) &&
718                     !ns_is_client(ldlm_res_to_ns(res)))
719                         class_fail_export(lock->l_export);
720
721                 lock_res(res);
722                 if (rc == -ERESTART) {
723
724                         /* 15715: The lock was granted and destroyed after
725                          * resource lock was dropped. Interval node was freed
726                          * in ldlm_lock_destroy. Anyway, this always happens
727                          * when a client is being evicted. So it would be
728                          * ok to return an error. -jay */
729                         if (lock->l_destroyed) {
730                                 *err = -EAGAIN;
731                                 GOTO(out, rc = -EAGAIN);
732                         }
733
734                         /* lock was granted while resource was unlocked. */
735                         if (lock->l_granted_mode == lock->l_req_mode) {
736                                 /* bug 11300: if the lock has been granted,
737                                  * break earlier because otherwise, we will go
738                                  * to restart and ldlm_resource_unlink will be
739                                  * called and it causes the interval node to be
740                                  * freed. Then we will fail at
741                                  * ldlm_extent_add_lock() */
742                                 *flags &= ~(LDLM_FL_BLOCK_GRANTED | LDLM_FL_BLOCK_CONV |
743                                             LDLM_FL_BLOCK_WAIT);
744                                 GOTO(out, rc = 0);
745                         }
746
747                         GOTO(restart, -ERESTART);
748                 }
749
750                 *flags |= LDLM_FL_BLOCK_GRANTED;
751                 /* this way we force client to wait for the lock
752                  * endlessly once the lock is enqueued -bzzz */
753                 *flags |= LDLM_FL_NO_TIMEOUT;
754
755         }
756         RETURN(0);
757 out:
758         if (!cfs_list_empty(&rpc_list)) {
759                 LASSERT(!(lock->l_flags & LDLM_AST_DISCARD_DATA));
760                 discard_bl_list(&rpc_list);
761         }
762         RETURN(rc);
763 }
764 #endif /* HAVE_SERVER_SUPPORT */
765
766 /* When a lock is cancelled by a client, the KMS may undergo change if this
767  * is the "highest lock".  This function returns the new KMS value.
768  * Caller must hold lr_lock already.
769  *
770  * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes! */
771 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
772 {
773         struct ldlm_resource *res = lock->l_resource;
774         cfs_list_t *tmp;
775         struct ldlm_lock *lck;
776         __u64 kms = 0;
777         ENTRY;
778
779         /* don't let another thread in ldlm_extent_shift_kms race in
780          * just after we finish and take our lock into account in its
781          * calculation of the kms */
782         lock->l_flags |= LDLM_FL_KMS_IGNORE;
783
784         cfs_list_for_each(tmp, &res->lr_granted) {
785                 lck = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
786
787                 if (lck->l_flags & LDLM_FL_KMS_IGNORE)
788                         continue;
789
790                 if (lck->l_policy_data.l_extent.end >= old_kms)
791                         RETURN(old_kms);
792
793                 /* This extent _has_ to be smaller than old_kms (checked above)
794                  * so kms can only ever be smaller or the same as old_kms. */
795                 if (lck->l_policy_data.l_extent.end + 1 > kms)
796                         kms = lck->l_policy_data.l_extent.end + 1;
797         }
798         LASSERTF(kms <= old_kms, "kms "LPU64" old_kms "LPU64"\n", kms, old_kms);
799
800         RETURN(kms);
801 }
802 EXPORT_SYMBOL(ldlm_extent_shift_kms);
803
804 cfs_mem_cache_t *ldlm_interval_slab;
805 struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock)
806 {
807         struct ldlm_interval *node;
808         ENTRY;
809
810         LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
811         OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, CFS_ALLOC_IO);
812         if (node == NULL)
813                 RETURN(NULL);
814
815         CFS_INIT_LIST_HEAD(&node->li_group);
816         ldlm_interval_attach(node, lock);
817         RETURN(node);
818 }
819
820 void ldlm_interval_free(struct ldlm_interval *node)
821 {
822         if (node) {
823                 LASSERT(cfs_list_empty(&node->li_group));
824                 LASSERT(!interval_is_intree(&node->li_node));
825                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
826         }
827 }
828
829 /* interval tree, for LDLM_EXTENT. */
830 void ldlm_interval_attach(struct ldlm_interval *n,
831                           struct ldlm_lock *l)
832 {
833         LASSERT(l->l_tree_node == NULL);
834         LASSERT(l->l_resource->lr_type == LDLM_EXTENT);
835
836         cfs_list_add_tail(&l->l_sl_policy, &n->li_group);
837         l->l_tree_node = n;
838 }
839
840 struct ldlm_interval *ldlm_interval_detach(struct ldlm_lock *l)
841 {
842         struct ldlm_interval *n = l->l_tree_node;
843
844         if (n == NULL)
845                 return NULL;
846
847         LASSERT(!cfs_list_empty(&n->li_group));
848         l->l_tree_node = NULL;
849         cfs_list_del_init(&l->l_sl_policy);
850
851         return (cfs_list_empty(&n->li_group) ? n : NULL);
852 }
853
854 static inline int lock_mode_to_index(ldlm_mode_t mode)
855 {
856         int index;
857
858         LASSERT(mode != 0);
859         LASSERT(IS_PO2(mode));
860         for (index = -1; mode; index++, mode >>= 1) ;
861         LASSERT(index < LCK_MODE_NUM);
862         return index;
863 }
864
865 void ldlm_extent_add_lock(struct ldlm_resource *res,
866                           struct ldlm_lock *lock)
867 {
868         struct interval_node *found, **root;
869         struct ldlm_interval *node;
870         struct ldlm_extent *extent;
871         int idx;
872
873         LASSERT(lock->l_granted_mode == lock->l_req_mode);
874
875         node = lock->l_tree_node;
876         LASSERT(node != NULL);
877         LASSERT(!interval_is_intree(&node->li_node));
878
879         idx = lock_mode_to_index(lock->l_granted_mode);
880         LASSERT(lock->l_granted_mode == 1 << idx);
881         LASSERT(lock->l_granted_mode == res->lr_itree[idx].lit_mode);
882
883         /* node extent initialize */
884         extent = &lock->l_policy_data.l_extent;
885         interval_set(&node->li_node, extent->start, extent->end);
886
887         root = &res->lr_itree[idx].lit_root;
888         found = interval_insert(&node->li_node, root);
889         if (found) { /* The policy group found. */
890                 struct ldlm_interval *tmp = ldlm_interval_detach(lock);
891                 LASSERT(tmp != NULL);
892                 ldlm_interval_free(tmp);
893                 ldlm_interval_attach(to_ldlm_interval(found), lock);
894         }
895         res->lr_itree[idx].lit_size++;
896
897         /* even though we use interval tree to manage the extent lock, we also
898          * add the locks into grant list, for debug purpose, .. */
899         ldlm_resource_add_lock(res, &res->lr_granted, lock);
900 }
901
902 void ldlm_extent_unlink_lock(struct ldlm_lock *lock)
903 {
904         struct ldlm_resource *res = lock->l_resource;
905         struct ldlm_interval *node = lock->l_tree_node;
906         struct ldlm_interval_tree *tree;
907         int idx;
908
909         if (!node || !interval_is_intree(&node->li_node)) /* duplicate unlink */
910                 return;
911
912         idx = lock_mode_to_index(lock->l_granted_mode);
913         LASSERT(lock->l_granted_mode == 1 << idx);
914         tree = &res->lr_itree[idx];
915
916         LASSERT(tree->lit_root != NULL); /* assure the tree is not null */
917
918         tree->lit_size--;
919         node = ldlm_interval_detach(lock);
920         if (node) {
921                 interval_erase(&node->li_node, &tree->lit_root);
922                 ldlm_interval_free(node);
923         }
924 }
925
926 void ldlm_extent_policy_wire_to_local(const ldlm_wire_policy_data_t *wpolicy,
927                                      ldlm_policy_data_t *lpolicy)
928 {
929         memset(lpolicy, 0, sizeof(*lpolicy));
930         lpolicy->l_extent.start = wpolicy->l_extent.start;
931         lpolicy->l_extent.end = wpolicy->l_extent.end;
932         lpolicy->l_extent.gid = wpolicy->l_extent.gid;
933 }
934
935 void ldlm_extent_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
936                                      ldlm_wire_policy_data_t *wpolicy)
937 {
938         memset(wpolicy, 0, sizeof(*wpolicy));
939         wpolicy->l_extent.start = lpolicy->l_extent.start;
940         wpolicy->l_extent.end = lpolicy->l_extent.end;
941         wpolicy->l_extent.gid = lpolicy->l_extent.gid;
942 }
943