Whamcloud - gitweb
LU-874 ptlrpc: handle in-flight hqreq correctly
[fs/lustre-release.git] / lustre / ldlm / ldlm_extent.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011 Whamcloud, Inc.
33  *
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  *
39  * lustre/ldlm/ldlm_extent.c
40  *
41  * Author: Peter Braam <braam@clusterfs.com>
42  * Author: Phil Schwan <phil@clusterfs.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_LDLM
46 #ifndef __KERNEL__
47 # include <liblustre.h>
48 #else
49 # include <libcfs/libcfs.h>
50 #endif
51
52 #include <lustre_dlm.h>
53 #include <obd_support.h>
54 #include <obd.h>
55 #include <obd_class.h>
56 #include <lustre_lib.h>
57
58 #include "ldlm_internal.h"
59
60 #define LDLM_MAX_GROWN_EXTENT (32 * 1024 * 1024 - 1)
61
62 /* fixup the ldlm_extent after expanding */
63 static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
64                                               struct ldlm_extent *new_ex,
65                                               int conflicting)
66 {
67         ldlm_mode_t req_mode = req->l_req_mode;
68         __u64 req_start = req->l_req_extent.start;
69         __u64 req_end = req->l_req_extent.end;
70         __u64 req_align, mask;
71
72         if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) {
73                 if (req_end < req_start + LDLM_MAX_GROWN_EXTENT)
74                         new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT,
75                                           new_ex->end);
76         }
77
78         if (new_ex->start == 0 && new_ex->end == OBD_OBJECT_EOF) {
79                 EXIT;
80                 return;
81         }
82
83         /* we need to ensure that the lock extent is properly aligned to what
84          * the client requested. Also we need to make sure it's also server
85          * page size aligned otherwise a server page can be covered by two
86          * write locks. */
87         mask = CFS_PAGE_SIZE;
88         req_align = (req_end + 1) | req_start;
89         if (req_align != 0 && (req_align & (mask - 1)) == 0) {
90                 while ((req_align & mask) == 0)
91                         mask <<= 1;
92         }
93         mask -= 1;
94         /* We can only shrink the lock, not grow it.
95          * This should never cause lock to be smaller than requested,
96          * since requested lock was already aligned on these boundaries. */
97         new_ex->start = ((new_ex->start - 1) | mask) + 1;
98         new_ex->end = ((new_ex->end + 1) & ~mask) - 1;
99         LASSERTF(new_ex->start <= req_start,
100                  "mask "LPX64" grant start "LPU64" req start "LPU64"\n",
101                  mask, new_ex->start, req_start);
102         LASSERTF(new_ex->end >= req_end,
103                  "mask "LPX64" grant end "LPU64" req end "LPU64"\n",
104                  mask, new_ex->end, req_end);
105 }
106
107 /* The purpose of this function is to return:
108  * - the maximum extent
109  * - containing the requested extent
110  * - and not overlapping existing conflicting extents outside the requested one
111  *
112  * Use interval tree to expand the lock extent for granted lock.
113  */
114 static void ldlm_extent_internal_policy_granted(struct ldlm_lock *req,
115                                                 struct ldlm_extent *new_ex)
116 {
117         struct ldlm_resource *res = req->l_resource;
118         ldlm_mode_t req_mode = req->l_req_mode;
119         __u64 req_start = req->l_req_extent.start;
120         __u64 req_end = req->l_req_extent.end;
121         struct ldlm_interval_tree *tree;
122         struct interval_node_extent limiter = { new_ex->start, new_ex->end };
123         int conflicting = 0;
124         int idx;
125         ENTRY;
126
127         lockmode_verify(req_mode);
128
129         /* using interval tree to handle the ldlm extent granted locks */
130         for (idx = 0; idx < LCK_MODE_NUM; idx++) {
131                 struct interval_node_extent ext = { req_start, req_end };
132
133                 tree = &res->lr_itree[idx];
134                 if (lockmode_compat(tree->lit_mode, req_mode))
135                         continue;
136
137                 conflicting += tree->lit_size;
138                 if (conflicting > 4)
139                         limiter.start = req_start;
140
141                 if (interval_is_overlapped(tree->lit_root, &ext))
142                         CDEBUG(D_INFO, 
143                                "req_mode = %d, tree->lit_mode = %d, "
144                                "tree->lit_size = %d\n",
145                                req_mode, tree->lit_mode, tree->lit_size);
146                 interval_expand(tree->lit_root, &ext, &limiter);
147                 limiter.start = max(limiter.start, ext.start);
148                 limiter.end = min(limiter.end, ext.end);
149                 if (limiter.start == req_start && limiter.end == req_end)
150                         break;
151         }
152
153         new_ex->start = limiter.start;
154         new_ex->end = limiter.end;
155         LASSERT(new_ex->start <= req_start);
156         LASSERT(new_ex->end >= req_end);
157
158         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
159         EXIT;
160 }
161
162 /* The purpose of this function is to return:
163  * - the maximum extent
164  * - containing the requested extent
165  * - and not overlapping existing conflicting extents outside the requested one
166  */
167 static void
168 ldlm_extent_internal_policy_waiting(struct ldlm_lock *req,
169                                     struct ldlm_extent *new_ex)
170 {
171         cfs_list_t *tmp;
172         struct ldlm_resource *res = req->l_resource;
173         ldlm_mode_t req_mode = req->l_req_mode;
174         __u64 req_start = req->l_req_extent.start;
175         __u64 req_end = req->l_req_extent.end;
176         int conflicting = 0;
177         ENTRY;
178
179         lockmode_verify(req_mode);
180
181         /* for waiting locks */
182         cfs_list_for_each(tmp, &res->lr_waiting) {
183                 struct ldlm_lock *lock;
184                 struct ldlm_extent *l_extent;
185
186                 lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
187                 l_extent = &lock->l_policy_data.l_extent;
188
189                 /* We already hit the minimum requested size, search no more */
190                 if (new_ex->start == req_start && new_ex->end == req_end) {
191                         EXIT;
192                         return;
193                 }
194
195                 /* Don't conflict with ourselves */
196                 if (req == lock)
197                         continue;
198
199                 /* Locks are compatible, overlap doesn't matter */
200                 /* Until bug 20 is fixed, try to avoid granting overlapping
201                  * locks on one client (they take a long time to cancel) */
202                 if (lockmode_compat(lock->l_req_mode, req_mode) &&
203                     lock->l_export != req->l_export)
204                         continue;
205
206                 /* If this is a high-traffic lock, don't grow downwards at all
207                  * or grow upwards too much */
208                 ++conflicting;
209                 if (conflicting > 4)
210                         new_ex->start = req_start;
211
212                 /* If lock doesn't overlap new_ex, skip it. */
213                 if (!ldlm_extent_overlap(l_extent, new_ex))
214                         continue;
215
216                 /* Locks conflicting in requested extents and we can't satisfy
217                  * both locks, so ignore it.  Either we will ping-pong this
218                  * extent (we would regardless of what extent we granted) or
219                  * lock is unused and it shouldn't limit our extent growth. */
220                 if (ldlm_extent_overlap(&lock->l_req_extent,&req->l_req_extent))
221                         continue;
222
223                 /* We grow extents downwards only as far as they don't overlap
224                  * with already-granted locks, on the assumption that clients
225                  * will be writing beyond the initial requested end and would
226                  * then need to enqueue a new lock beyond previous request.
227                  * l_req_extent->end strictly < req_start, checked above. */
228                 if (l_extent->start < req_start && new_ex->start != req_start) {
229                         if (l_extent->end >= req_start)
230                                 new_ex->start = req_start;
231                         else
232                                 new_ex->start = min(l_extent->end+1, req_start);
233                 }
234
235                 /* If we need to cancel this lock anyways because our request
236                  * overlaps the granted lock, we grow up to its requested
237                  * extent start instead of limiting this extent, assuming that
238                  * clients are writing forwards and the lock had over grown
239                  * its extent downwards before we enqueued our request. */
240                 if (l_extent->end > req_end) {
241                         if (l_extent->start <= req_end)
242                                 new_ex->end = max(lock->l_req_extent.start - 1,
243                                                   req_end);
244                         else
245                                 new_ex->end = max(l_extent->start - 1, req_end);
246                 }
247         }
248
249         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
250         EXIT;
251 }
252
253
254 /* In order to determine the largest possible extent we can grant, we need
255  * to scan all of the queues. */
256 static void ldlm_extent_policy(struct ldlm_resource *res,
257                                struct ldlm_lock *lock, int *flags)
258 {
259         struct ldlm_extent new_ex = { .start = 0, .end = OBD_OBJECT_EOF };
260
261         if (lock->l_export == NULL)
262                 /*
263                  * this is local lock taken by server (e.g., as a part of
264                  * OST-side locking, or unlink handling). Expansion doesn't
265                  * make a lot of sense for local locks, because they are
266                  * dropped immediately on operation completion and would only
267                  * conflict with other threads.
268                  */
269                 return;
270
271         if (lock->l_policy_data.l_extent.start == 0 &&
272             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
273                 /* fast-path whole file locks */
274                 return;
275
276         ldlm_extent_internal_policy_granted(lock, &new_ex);
277         ldlm_extent_internal_policy_waiting(lock, &new_ex);
278
279         if (new_ex.start != lock->l_policy_data.l_extent.start ||
280             new_ex.end != lock->l_policy_data.l_extent.end) {
281                 *flags |= LDLM_FL_LOCK_CHANGED;
282                 lock->l_policy_data.l_extent.start = new_ex.start;
283                 lock->l_policy_data.l_extent.end = new_ex.end;
284         }
285 }
286
287 static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
288 {
289         struct ldlm_resource *res = lock->l_resource;
290         cfs_time_t now = cfs_time_current();
291
292         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_SET_CONTENTION))
293                 return 1;
294
295         CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
296         if (contended_locks > ldlm_res_to_ns(res)->ns_contended_locks)
297                 res->lr_contention_time = now;
298         return cfs_time_before(now, cfs_time_add(res->lr_contention_time,
299                 cfs_time_seconds(ldlm_res_to_ns(res)->ns_contention_time)));
300 }
301
302 struct ldlm_extent_compat_args {
303         cfs_list_t *work_list;
304         struct ldlm_lock *lock;
305         ldlm_mode_t mode;
306         int *locks;
307         int *compat;
308 };
309
310 static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
311                                                 void *data)
312 {
313         struct ldlm_extent_compat_args *priv = data;
314         struct ldlm_interval *node = to_ldlm_interval(n);
315         struct ldlm_extent *extent;
316         cfs_list_t *work_list = priv->work_list;
317         struct ldlm_lock *lock, *enq = priv->lock;
318         ldlm_mode_t mode = priv->mode;
319         int count = 0;
320         ENTRY;
321
322         LASSERT(!cfs_list_empty(&node->li_group));
323
324         cfs_list_for_each_entry(lock, &node->li_group, l_sl_policy) {
325                 /* interval tree is for granted lock */
326                 LASSERTF(mode == lock->l_granted_mode,
327                          "mode = %s, lock->l_granted_mode = %s\n",
328                          ldlm_lockname[mode],
329                          ldlm_lockname[lock->l_granted_mode]);
330                 count++;
331                 if (lock->l_blocking_ast)
332                         ldlm_add_ast_work_item(lock, enq, work_list);
333         }
334
335         /* don't count conflicting glimpse locks */
336         extent = ldlm_interval_extent(node);
337         if (!(mode == LCK_PR &&
338             extent->start == 0 && extent->end == OBD_OBJECT_EOF))
339                 *priv->locks += count;
340
341         if (priv->compat)
342                 *priv->compat = 0;
343
344         RETURN(INTERVAL_ITER_CONT);
345 }
346
347 /* Determine if the lock is compatible with all locks on the queue.
348  * We stop walking the queue if we hit ourselves so we don't take
349  * conflicting locks enqueued after us into accound, or we'd wait forever.
350  *
351  * 0 if the lock is not compatible
352  * 1 if the lock is compatible
353  * 2 if this group lock is compatible and requires no further checking
354  * negative error, such as EWOULDBLOCK for group locks
355  */
356 static int
357 ldlm_extent_compat_queue(cfs_list_t *queue, struct ldlm_lock *req,
358                          int *flags, ldlm_error_t *err,
359                          cfs_list_t *work_list, int *contended_locks)
360 {
361         cfs_list_t *tmp;
362         struct ldlm_lock *lock;
363         struct ldlm_resource *res = req->l_resource;
364         ldlm_mode_t req_mode = req->l_req_mode;
365         __u64 req_start = req->l_req_extent.start;
366         __u64 req_end = req->l_req_extent.end;
367         int compat = 1;
368         int scan = 0;
369         int check_contention;
370         ENTRY;
371
372         lockmode_verify(req_mode);
373
374         /* Using interval tree for granted lock */
375         if (queue == &res->lr_granted) {
376                 struct ldlm_interval_tree *tree;
377                 struct ldlm_extent_compat_args data = {.work_list = work_list,
378                                                .lock = req,
379                                                .locks = contended_locks,
380                                                .compat = &compat };
381                 struct interval_node_extent ex = { .start = req_start,
382                                                    .end = req_end };
383                 int idx, rc;
384
385                 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
386                         tree = &res->lr_itree[idx];
387                         if (tree->lit_root == NULL) /* empty tree, skipped */
388                                 continue;
389
390                         data.mode = tree->lit_mode;
391                         if (lockmode_compat(req_mode, tree->lit_mode)) {
392                                 struct ldlm_interval *node;
393                                 struct ldlm_extent *extent;
394
395                                 if (req_mode != LCK_GROUP)
396                                         continue;
397
398                                 /* group lock, grant it immediately if
399                                  * compatible */
400                                 node = to_ldlm_interval(tree->lit_root);
401                                 extent = ldlm_interval_extent(node);
402                                 if (req->l_policy_data.l_extent.gid ==
403                                     extent->gid)
404                                         RETURN(2);
405                         }
406
407                         if (tree->lit_mode == LCK_GROUP) {
408                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
409                                         compat = -EWOULDBLOCK;
410                                         goto destroylock;
411                                 }
412
413                                 *flags |= LDLM_FL_NO_TIMEOUT;
414                                 if (!work_list)
415                                         RETURN(0);
416
417                                 /* if work list is not NULL,add all
418                                    locks in the tree to work list */
419                                 compat = 0;
420                                 interval_iterate(tree->lit_root,
421                                                  ldlm_extent_compat_cb, &data);
422                                 continue;
423                         }
424
425                         if (!work_list) {
426                                 rc = interval_is_overlapped(tree->lit_root,&ex);
427                                 if (rc)
428                                         RETURN(0);
429                         } else {
430                                 interval_search(tree->lit_root, &ex,
431                                                 ldlm_extent_compat_cb, &data);
432                                 if (!cfs_list_empty(work_list) && compat)
433                                         compat = 0;
434                         }
435                 }
436         } else { /* for waiting queue */
437                 cfs_list_for_each(tmp, queue) {
438                         check_contention = 1;
439
440                         lock = cfs_list_entry(tmp, struct ldlm_lock,
441                                               l_res_link);
442
443                         if (req == lock)
444                                 break;
445
446                         if (unlikely(scan)) {
447                                 /* We only get here if we are queuing GROUP lock
448                                    and met some incompatible one. The main idea of this
449                                    code is to insert GROUP lock past compatible GROUP
450                                    lock in the waiting queue or if there is not any,
451                                    then in front of first non-GROUP lock */
452                                 if (lock->l_req_mode != LCK_GROUP) {
453                                         /* Ok, we hit non-GROUP lock, there should
454                                          * be no more GROUP locks later on, queue in
455                                          * front of first non-GROUP lock */
456
457                                         ldlm_resource_insert_lock_after(lock, req);
458                                         cfs_list_del_init(&lock->l_res_link);
459                                         ldlm_resource_insert_lock_after(req, lock);
460                                         compat = 0;
461                                         break;
462                                 }
463                                 if (req->l_policy_data.l_extent.gid ==
464                                     lock->l_policy_data.l_extent.gid) {
465                                         /* found it */
466                                         ldlm_resource_insert_lock_after(lock, req);
467                                         compat = 0;
468                                         break;
469                                 }
470                                 continue;
471                         }
472
473                         /* locks are compatible, overlap doesn't matter */
474                         if (lockmode_compat(lock->l_req_mode, req_mode)) {
475                                 if (req_mode == LCK_PR &&
476                                     ((lock->l_policy_data.l_extent.start <=
477                                       req->l_policy_data.l_extent.start) &&
478                                      (lock->l_policy_data.l_extent.end >=
479                                       req->l_policy_data.l_extent.end))) {
480                                         /* If we met a PR lock just like us or wider,
481                                            and nobody down the list conflicted with
482                                            it, that means we can skip processing of
483                                            the rest of the list and safely place
484                                            ourselves at the end of the list, or grant
485                                            (dependent if we met an conflicting locks
486                                            before in the list).
487                                            In case of 1st enqueue only we continue
488                                            traversing if there is something conflicting
489                                            down the list because we need to make sure
490                                            that something is marked as AST_SENT as well,
491                                            in cse of empy worklist we would exit on
492                                            first conflict met. */
493                                         /* There IS a case where such flag is
494                                            not set for a lock, yet it blocks
495                                            something. Luckily for us this is
496                                            only during destroy, so lock is
497                                            exclusive. So here we are safe */
498                                         if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
499                                                 RETURN(compat);
500                                         }
501                                 }
502
503                                 /* non-group locks are compatible, overlap doesn't
504                                    matter */
505                                 if (likely(req_mode != LCK_GROUP))
506                                         continue;
507
508                                 /* If we are trying to get a GROUP lock and there is
509                                    another one of this kind, we need to compare gid */
510                                 if (req->l_policy_data.l_extent.gid ==
511                                     lock->l_policy_data.l_extent.gid) {
512                                         /* If existing lock with matched gid is granted,
513                                            we grant new one too. */
514                                         if (lock->l_req_mode == lock->l_granted_mode)
515                                                 RETURN(2);
516
517                                         /* Otherwise we are scanning queue of waiting
518                                          * locks and it means current request would
519                                          * block along with existing lock (that is
520                                          * already blocked.
521                                          * If we are in nonblocking mode - return
522                                          * immediately */
523                                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
524                                                 compat = -EWOULDBLOCK;
525                                                 goto destroylock;
526                                         }
527                                         /* If this group lock is compatible with another
528                                          * group lock on the waiting list, they must be
529                                          * together in the list, so they can be granted
530                                          * at the same time.  Otherwise the later lock
531                                          * can get stuck behind another, incompatible,
532                                          * lock. */
533                                         ldlm_resource_insert_lock_after(lock, req);
534                                         /* Because 'lock' is not granted, we can stop
535                                          * processing this queue and return immediately.
536                                          * There is no need to check the rest of the
537                                          * list. */
538                                         RETURN(0);
539                                 }
540                         }
541
542                         if (unlikely(req_mode == LCK_GROUP &&
543                                      (lock->l_req_mode != lock->l_granted_mode))) {
544                                 scan = 1;
545                                 compat = 0;
546                                 if (lock->l_req_mode != LCK_GROUP) {
547                                         /* Ok, we hit non-GROUP lock, there should be no
548                                            more GROUP locks later on, queue in front of
549                                            first non-GROUP lock */
550
551                                         ldlm_resource_insert_lock_after(lock, req);
552                                         cfs_list_del_init(&lock->l_res_link);
553                                         ldlm_resource_insert_lock_after(req, lock);
554                                         break;
555                                 }
556                                 if (req->l_policy_data.l_extent.gid ==
557                                     lock->l_policy_data.l_extent.gid) {
558                                         /* found it */
559                                         ldlm_resource_insert_lock_after(lock, req);
560                                         break;
561                                 }
562                                 continue;
563                         }
564
565                         if (unlikely(lock->l_req_mode == LCK_GROUP)) {
566                                 /* If compared lock is GROUP, then requested is PR/PW/
567                                  * so this is not compatible; extent range does not
568                                  * matter */
569                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
570                                         compat = -EWOULDBLOCK;
571                                         goto destroylock;
572                                 } else {
573                                         *flags |= LDLM_FL_NO_TIMEOUT;
574                                 }
575                         } else if (lock->l_policy_data.l_extent.end < req_start ||
576                                    lock->l_policy_data.l_extent.start > req_end) {
577                                 /* if a non group lock doesn't overlap skip it */
578                                 continue;
579                         } else if (lock->l_req_extent.end < req_start ||
580                                    lock->l_req_extent.start > req_end) {
581                                 /* false contention, the requests doesn't really overlap */
582                                 check_contention = 0;
583                         }
584
585                         if (!work_list)
586                                 RETURN(0);
587
588                         /* don't count conflicting glimpse locks */
589                         if (lock->l_req_mode == LCK_PR &&
590                             lock->l_policy_data.l_extent.start == 0 &&
591                             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
592                                 check_contention = 0;
593
594                         *contended_locks += check_contention;
595
596                         compat = 0;
597                         if (lock->l_blocking_ast)
598                                 ldlm_add_ast_work_item(lock, req, work_list);
599                 }
600         }
601
602         if (ldlm_check_contention(req, *contended_locks) &&
603             compat == 0 &&
604             (*flags & LDLM_FL_DENY_ON_CONTENTION) &&
605             req->l_req_mode != LCK_GROUP &&
606             req_end - req_start <=
607             ldlm_res_to_ns(req->l_resource)->ns_max_nolock_size)
608                 GOTO(destroylock, compat = -EUSERS);
609
610         RETURN(compat);
611 destroylock:
612         cfs_list_del_init(&req->l_res_link);
613         ldlm_lock_destroy_nolock(req);
614         *err = compat;
615         RETURN(compat);
616 }
617
618 static void discard_bl_list(cfs_list_t *bl_list)
619 {
620         cfs_list_t *tmp, *pos;
621         ENTRY;
622
623         cfs_list_for_each_safe(pos, tmp, bl_list) {
624                 struct ldlm_lock *lock =
625                         cfs_list_entry(pos, struct ldlm_lock, l_bl_ast);
626
627                 cfs_list_del_init(&lock->l_bl_ast);
628                 LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
629                 lock->l_flags &= ~LDLM_FL_AST_SENT;
630                 LASSERT(lock->l_bl_ast_run == 0);
631                 LASSERT(lock->l_blocking_lock);
632                 LDLM_LOCK_RELEASE(lock->l_blocking_lock);
633                 lock->l_blocking_lock = NULL;
634                 LDLM_LOCK_RELEASE(lock);
635         }
636         EXIT;
637 }
638
639 /* If first_enq is 0 (ie, called from ldlm_reprocess_queue):
640   *   - blocking ASTs have already been sent
641   *   - must call this function with the ns lock held
642   *
643   * If first_enq is 1 (ie, called from ldlm_lock_enqueue):
644   *   - blocking ASTs have not been sent
645   *   - must call this function with the ns lock held once */
646 int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
647                              ldlm_error_t *err, cfs_list_t *work_list)
648 {
649         struct ldlm_resource *res = lock->l_resource;
650         CFS_LIST_HEAD(rpc_list);
651         int rc, rc2;
652         int contended_locks = 0;
653         ENTRY;
654
655         LASSERT(cfs_list_empty(&res->lr_converting));
656         LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
657                 !(lock->l_flags & LDLM_AST_DISCARD_DATA));
658         check_res_locked(res);
659         *err = ELDLM_OK;
660
661         if (!first_enq) {
662                 /* Careful observers will note that we don't handle -EWOULDBLOCK
663                  * here, but it's ok for a non-obvious reason -- compat_queue
664                  * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
665                  * flags should always be zero here, and if that ever stops
666                  * being true, we want to find out. */
667                 LASSERT(*flags == 0);
668                 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
669                                               err, NULL, &contended_locks);
670                 if (rc == 1) {
671                         rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
672                                                       flags, err, NULL,
673                                                       &contended_locks);
674                 }
675                 if (rc == 0)
676                         RETURN(LDLM_ITER_STOP);
677
678                 ldlm_resource_unlink_lock(lock);
679
680                 if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE))
681                         ldlm_extent_policy(res, lock, flags);
682                 ldlm_grant_lock(lock, work_list);
683                 RETURN(LDLM_ITER_CONTINUE);
684         }
685
686  restart:
687         contended_locks = 0;
688         rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
689                                       &rpc_list, &contended_locks);
690         if (rc < 0)
691                 GOTO(out, rc); /* lock was destroyed */
692         if (rc == 2)
693                 goto grant;
694
695         rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err,
696                                        &rpc_list, &contended_locks);
697         if (rc2 < 0)
698                 GOTO(out, rc = rc2); /* lock was destroyed */
699
700         if (rc + rc2 == 2) {
701         grant:
702                 ldlm_extent_policy(res, lock, flags);
703                 ldlm_resource_unlink_lock(lock);
704                 ldlm_grant_lock(lock, NULL);
705         } else {
706                 /* If either of the compat_queue()s returned failure, then we
707                  * have ASTs to send and must go onto the waiting list.
708                  *
709                  * bug 2322: we used to unlink and re-add here, which was a
710                  * terrible folly -- if we goto restart, we could get
711                  * re-ordered!  Causes deadlock, because ASTs aren't sent! */
712                 if (cfs_list_empty(&lock->l_res_link))
713                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
714                 unlock_res(res);
715                 rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
716                                        LDLM_WORK_BL_AST);
717
718                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_FAIL_RACE) &&
719                     !ns_is_client(ldlm_res_to_ns(res)))
720                         class_fail_export(lock->l_export);
721
722                 lock_res(res);
723                 if (rc == -ERESTART) {
724
725                         /* 15715: The lock was granted and destroyed after
726                          * resource lock was dropped. Interval node was freed
727                          * in ldlm_lock_destroy. Anyway, this always happens
728                          * when a client is being evicted. So it would be
729                          * ok to return an error. -jay */
730                         if (lock->l_destroyed) {
731                                 *err = -EAGAIN;
732                                 GOTO(out, rc = -EAGAIN);
733                         }
734
735                         /* lock was granted while resource was unlocked. */
736                         if (lock->l_granted_mode == lock->l_req_mode) {
737                                 /* bug 11300: if the lock has been granted,
738                                  * break earlier because otherwise, we will go
739                                  * to restart and ldlm_resource_unlink will be
740                                  * called and it causes the interval node to be
741                                  * freed. Then we will fail at
742                                  * ldlm_extent_add_lock() */
743                                 *flags &= ~(LDLM_FL_BLOCK_GRANTED | LDLM_FL_BLOCK_CONV |
744                                             LDLM_FL_BLOCK_WAIT);
745                                 GOTO(out, rc = 0);
746                         }
747
748                         GOTO(restart, -ERESTART);
749                 }
750
751                 *flags |= LDLM_FL_BLOCK_GRANTED;
752                 /* this way we force client to wait for the lock
753                  * endlessly once the lock is enqueued -bzzz */
754                 *flags |= LDLM_FL_NO_TIMEOUT;
755
756         }
757         RETURN(0);
758 out:
759         if (!cfs_list_empty(&rpc_list)) {
760                 LASSERT(!(lock->l_flags & LDLM_AST_DISCARD_DATA));
761                 discard_bl_list(&rpc_list);
762         }
763         RETURN(rc);
764 }
765
766 /* When a lock is cancelled by a client, the KMS may undergo change if this
767  * is the "highest lock".  This function returns the new KMS value.
768  * Caller must hold lr_lock already.
769  *
770  * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes! */
771 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
772 {
773         struct ldlm_resource *res = lock->l_resource;
774         cfs_list_t *tmp;
775         struct ldlm_lock *lck;
776         __u64 kms = 0;
777         ENTRY;
778
779         /* don't let another thread in ldlm_extent_shift_kms race in
780          * just after we finish and take our lock into account in its
781          * calculation of the kms */
782         lock->l_flags |= LDLM_FL_KMS_IGNORE;
783
784         cfs_list_for_each(tmp, &res->lr_granted) {
785                 lck = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
786
787                 if (lck->l_flags & LDLM_FL_KMS_IGNORE)
788                         continue;
789
790                 if (lck->l_policy_data.l_extent.end >= old_kms)
791                         RETURN(old_kms);
792
793                 /* This extent _has_ to be smaller than old_kms (checked above)
794                  * so kms can only ever be smaller or the same as old_kms. */
795                 if (lck->l_policy_data.l_extent.end + 1 > kms)
796                         kms = lck->l_policy_data.l_extent.end + 1;
797         }
798         LASSERTF(kms <= old_kms, "kms "LPU64" old_kms "LPU64"\n", kms, old_kms);
799
800         RETURN(kms);
801 }
802
803 cfs_mem_cache_t *ldlm_interval_slab;
804 struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock)
805 {
806         struct ldlm_interval *node;
807         ENTRY;
808
809         LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
810         OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, CFS_ALLOC_IO);
811         if (node == NULL)
812                 RETURN(NULL);
813
814         CFS_INIT_LIST_HEAD(&node->li_group);
815         ldlm_interval_attach(node, lock);
816         RETURN(node);
817 }
818
819 void ldlm_interval_free(struct ldlm_interval *node)
820 {
821         if (node) {
822                 LASSERT(cfs_list_empty(&node->li_group));
823                 LASSERT(!interval_is_intree(&node->li_node));
824                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
825         }
826 }
827
828 /* interval tree, for LDLM_EXTENT. */
829 void ldlm_interval_attach(struct ldlm_interval *n,
830                           struct ldlm_lock *l)
831 {
832         LASSERT(l->l_tree_node == NULL);
833         LASSERT(l->l_resource->lr_type == LDLM_EXTENT);
834
835         cfs_list_add_tail(&l->l_sl_policy, &n->li_group);
836         l->l_tree_node = n;
837 }
838
839 struct ldlm_interval *ldlm_interval_detach(struct ldlm_lock *l)
840 {
841         struct ldlm_interval *n = l->l_tree_node;
842
843         if (n == NULL)
844                 return NULL;
845
846         LASSERT(!cfs_list_empty(&n->li_group));
847         l->l_tree_node = NULL;
848         cfs_list_del_init(&l->l_sl_policy);
849
850         return (cfs_list_empty(&n->li_group) ? n : NULL);
851 }
852
853 static inline int lock_mode_to_index(ldlm_mode_t mode)
854 {
855         int index;
856
857         LASSERT(mode != 0);
858         LASSERT(IS_PO2(mode));
859         for (index = -1; mode; index++, mode >>= 1) ;
860         LASSERT(index < LCK_MODE_NUM);
861         return index;
862 }
863
864 void ldlm_extent_add_lock(struct ldlm_resource *res,
865                           struct ldlm_lock *lock)
866 {
867         struct interval_node *found, **root;
868         struct ldlm_interval *node;
869         struct ldlm_extent *extent;
870         int idx;
871
872         LASSERT(lock->l_granted_mode == lock->l_req_mode);
873
874         node = lock->l_tree_node;
875         LASSERT(node != NULL);
876         LASSERT(!interval_is_intree(&node->li_node));
877
878         idx = lock_mode_to_index(lock->l_granted_mode);
879         LASSERT(lock->l_granted_mode == 1 << idx);
880         LASSERT(lock->l_granted_mode == res->lr_itree[idx].lit_mode);
881
882         /* node extent initialize */
883         extent = &lock->l_policy_data.l_extent;
884         interval_set(&node->li_node, extent->start, extent->end);
885
886         root = &res->lr_itree[idx].lit_root;
887         found = interval_insert(&node->li_node, root);
888         if (found) { /* The policy group found. */
889                 struct ldlm_interval *tmp = ldlm_interval_detach(lock);
890                 LASSERT(tmp != NULL);
891                 ldlm_interval_free(tmp);
892                 ldlm_interval_attach(to_ldlm_interval(found), lock);
893         }
894         res->lr_itree[idx].lit_size++;
895
896         /* even though we use interval tree to manage the extent lock, we also
897          * add the locks into grant list, for debug purpose, .. */
898         ldlm_resource_add_lock(res, &res->lr_granted, lock);
899 }
900
901 void ldlm_extent_unlink_lock(struct ldlm_lock *lock)
902 {
903         struct ldlm_resource *res = lock->l_resource;
904         struct ldlm_interval *node = lock->l_tree_node;
905         struct ldlm_interval_tree *tree;
906         int idx;
907
908         if (!node || !interval_is_intree(&node->li_node)) /* duplicate unlink */
909                 return;
910
911         idx = lock_mode_to_index(lock->l_granted_mode);
912         LASSERT(lock->l_granted_mode == 1 << idx);
913         tree = &res->lr_itree[idx];
914
915         LASSERT(tree->lit_root != NULL); /* assure the tree is not null */
916
917         tree->lit_size--;
918         node = ldlm_interval_detach(lock);
919         if (node) {
920                 interval_erase(&node->li_node, &tree->lit_root);
921                 ldlm_interval_free(node);
922         }
923 }
924
925 void ldlm_extent_policy_wire_to_local(const ldlm_wire_policy_data_t *wpolicy,
926                                      ldlm_policy_data_t *lpolicy)
927 {
928         memset(lpolicy, 0, sizeof(*lpolicy));
929         lpolicy->l_extent.start = wpolicy->l_extent.start;
930         lpolicy->l_extent.end = wpolicy->l_extent.end;
931         lpolicy->l_extent.gid = wpolicy->l_extent.gid;
932 }
933
934 void ldlm_extent_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
935                                      ldlm_wire_policy_data_t *wpolicy)
936 {
937         memset(wpolicy, 0, sizeof(*wpolicy));
938         wpolicy->l_extent.start = lpolicy->l_extent.start;
939         wpolicy->l_extent.end = lpolicy->l_extent.end;
940         wpolicy->l_extent.gid = lpolicy->l_extent.gid;
941 }
942