Whamcloud - gitweb
b=16517
[fs/lustre-release.git] / lustre / ldlm / ldlm_extent.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_extent.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LDLM
43 #ifndef __KERNEL__
44 # include <liblustre.h>
45 #else
46 # include <libcfs/libcfs.h>
47 #endif
48
49 #include <lustre_dlm.h>
50 #include <obd_support.h>
51 #include <obd.h>
52 #include <lustre_lib.h>
53
54 #include "ldlm_internal.h"
55
56 #define LDLM_MAX_GROWN_EXTENT (32 * 1024 * 1024 - 1)
57
58 /* fixup the ldlm_extent after expanding */
59 static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
60                                               struct ldlm_extent *new_ex,
61                                               int conflicting)
62 {
63         ldlm_mode_t req_mode = req->l_req_mode;
64         __u64 req_start = req->l_req_extent.start;
65         __u64 req_end = req->l_req_extent.end;
66         __u64 req_align, mask;
67  
68         if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) {
69                 if (req_end < req_start + LDLM_MAX_GROWN_EXTENT)
70                         new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT,
71                                           new_ex->end);
72         }
73
74         if (new_ex->start == 0 && new_ex->end == OBD_OBJECT_EOF) {
75                 EXIT;
76                 return;
77         }
78
79         /* we need to ensure that the lock extent is properly aligned to what
80          * the client requested.  We align it to the lowest-common denominator
81          * of the clients requested lock start and end alignment. */
82         mask = 0x1000ULL;
83         req_align = (req_end + 1) | req_start;
84         if (req_align != 0) {
85                 while ((req_align & mask) == 0)
86                         mask <<= 1;
87         }
88         mask -= 1;
89         /* We can only shrink the lock, not grow it.
90          * This should never cause lock to be smaller than requested,
91          * since requested lock was already aligned on these boundaries. */
92         new_ex->start = ((new_ex->start - 1) | mask) + 1;
93         new_ex->end = ((new_ex->end + 1) & ~mask) - 1;
94         LASSERTF(new_ex->start <= req_start,
95                  "mask "LPX64" grant start "LPU64" req start "LPU64"\n",
96                  mask, new_ex->start, req_start);
97         LASSERTF(new_ex->end >= req_end,
98                  "mask "LPX64" grant end "LPU64" req end "LPU64"\n",
99                  mask, new_ex->end, req_end);
100 }
101
102 /* The purpose of this function is to return:
103  * - the maximum extent
104  * - containing the requested extent
105  * - and not overlapping existing conflicting extents outside the requested one
106  *
107  * Use interval tree to expand the lock extent for granted lock.
108  */
109 static void ldlm_extent_internal_policy_granted(struct ldlm_lock *req,
110                                                 struct ldlm_extent *new_ex)
111 {
112         struct ldlm_resource *res = req->l_resource;
113         ldlm_mode_t req_mode = req->l_req_mode;
114         __u64 req_start = req->l_req_extent.start;
115         __u64 req_end = req->l_req_extent.end;
116         struct ldlm_interval_tree *tree;
117         struct interval_node_extent limiter = { new_ex->start, new_ex->end };
118         int conflicting = 0;
119         int idx;
120         ENTRY;
121
122         lockmode_verify(req_mode);
123
124         /* using interval tree to handle the ldlm extent granted locks */
125         for (idx = 0; idx < LCK_MODE_NUM; idx++) {
126                 struct interval_node_extent ext = { req_start, req_end };
127
128                 tree = &res->lr_itree[idx];
129                 if (lockmode_compat(tree->lit_mode, req_mode))
130                         continue;
131
132                 conflicting += tree->lit_size;
133                 if (conflicting > 4)
134                         limiter.start = req_start;
135
136                 if (interval_is_overlapped(tree->lit_root, &ext))
137                         CDEBUG(D_INFO, 
138                                "req_mode = %d, tree->lit_mode = %d, "
139                                "tree->lit_size = %d\n",
140                                req_mode, tree->lit_mode, tree->lit_size);
141                 interval_expand(tree->lit_root, &ext, &limiter);
142                 limiter.start = max(limiter.start, ext.start);
143                 limiter.end = min(limiter.end, ext.end);
144                 if (limiter.start == req_start && limiter.end == req_end)
145                         break;
146         }
147
148         new_ex->start = limiter.start;
149         new_ex->end = limiter.end;
150         LASSERT(new_ex->start <= req_start);
151         LASSERT(new_ex->end >= req_end);
152
153         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
154         EXIT;
155 }
156
157 /* The purpose of this function is to return:
158  * - the maximum extent
159  * - containing the requested extent
160  * - and not overlapping existing conflicting extents outside the requested one
161  */
162 static void
163 ldlm_extent_internal_policy_waiting(struct ldlm_lock *req,
164                                     struct ldlm_extent *new_ex)
165 {
166         struct list_head *tmp;
167         struct ldlm_resource *res = req->l_resource;
168         ldlm_mode_t req_mode = req->l_req_mode;
169         __u64 req_start = req->l_req_extent.start;
170         __u64 req_end = req->l_req_extent.end;
171         int conflicting = 0;
172         ENTRY;
173
174         lockmode_verify(req_mode);
175
176         /* for waiting locks */
177         list_for_each(tmp, &res->lr_waiting) {
178                 struct ldlm_lock *lock;
179                 struct ldlm_extent *l_extent;
180
181                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
182                 l_extent = &lock->l_policy_data.l_extent;
183
184                 /* We already hit the minimum requested size, search no more */
185                 if (new_ex->start == req_start && new_ex->end == req_end) {
186                         EXIT;
187                         return;
188                 }
189
190                 /* Don't conflict with ourselves */
191                 if (req == lock)
192                         continue;
193
194                 /* Locks are compatible, overlap doesn't matter */
195                 /* Until bug 20 is fixed, try to avoid granting overlapping
196                  * locks on one client (they take a long time to cancel) */
197                 if (lockmode_compat(lock->l_req_mode, req_mode) &&
198                     lock->l_export != req->l_export)
199                         continue;
200
201                 /* If this is a high-traffic lock, don't grow downwards at all
202                  * or grow upwards too much */
203                 ++conflicting;
204                 if (conflicting > 4)
205                         new_ex->start = req_start;
206
207                 /* If lock doesn't overlap new_ex, skip it. */
208                 if (!ldlm_extent_overlap(l_extent, new_ex))
209                         continue;
210
211                 /* Locks conflicting in requested extents and we can't satisfy
212                  * both locks, so ignore it.  Either we will ping-pong this
213                  * extent (we would regardless of what extent we granted) or
214                  * lock is unused and it shouldn't limit our extent growth. */
215                 if (ldlm_extent_overlap(&lock->l_req_extent,&req->l_req_extent))
216                         continue;
217
218                 /* We grow extents downwards only as far as they don't overlap
219                  * with already-granted locks, on the assumtion that clients
220                  * will be writing beyond the initial requested end and would
221                  * then need to enqueue a new lock beyond previous request.
222                  * l_req_extent->end strictly < req_start, checked above. */
223                 if (l_extent->start < req_start && new_ex->start != req_start) {
224                         if (l_extent->end >= req_start)
225                                 new_ex->start = req_start;
226                         else
227                                 new_ex->start = min(l_extent->end+1, req_start);
228                 }
229
230                 /* If we need to cancel this lock anyways because our request
231                  * overlaps the granted lock, we grow up to its requested
232                  * extent start instead of limiting this extent, assuming that
233                  * clients are writing forwards and the lock had over grown
234                  * its extent downwards before we enqueued our request. */
235                 if (l_extent->end > req_end) {
236                         if (l_extent->start <= req_end)
237                                 new_ex->end = max(lock->l_req_extent.start - 1,
238                                                   req_end);
239                         else
240                                 new_ex->end = max(l_extent->start - 1, req_end);
241                 }
242         }
243
244         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
245         EXIT;
246 }
247
248
249 /* In order to determine the largest possible extent we can grant, we need
250  * to scan all of the queues. */
251 static void ldlm_extent_policy(struct ldlm_resource *res,
252                                struct ldlm_lock *lock, int *flags)
253 {
254         struct ldlm_extent new_ex = { .start = 0, .end = OBD_OBJECT_EOF };
255
256         if (lock->l_export == NULL)
257                 /*
258                  * this is local lock taken by server (e.g., as a part of
259                  * OST-side locking, or unlink handling). Expansion doesn't
260                  * make a lot of sense for local locks, because they are
261                  * dropped immediately on operation completion and would only
262                  * conflict with other threads.
263                  */
264                 return;
265
266         if (lock->l_policy_data.l_extent.start == 0 &&
267             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
268                 /* fast-path whole file locks */
269                 return;
270
271         ldlm_extent_internal_policy_granted(lock, &new_ex);
272         ldlm_extent_internal_policy_waiting(lock, &new_ex);
273
274         if (new_ex.start != lock->l_policy_data.l_extent.start ||
275             new_ex.end != lock->l_policy_data.l_extent.end) {
276                 *flags |= LDLM_FL_LOCK_CHANGED;
277                 lock->l_policy_data.l_extent.start = new_ex.start;
278                 lock->l_policy_data.l_extent.end = new_ex.end;
279         }
280 }
281
282 static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
283 {
284         struct ldlm_resource *res = lock->l_resource;
285         cfs_time_t now = cfs_time_current();
286
287         CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
288         if (contended_locks > res->lr_namespace->ns_contended_locks)
289                 res->lr_contention_time = now;
290         return cfs_time_before(now, cfs_time_add(res->lr_contention_time,
291                 cfs_time_seconds(res->lr_namespace->ns_contention_time)));
292 }
293
294 struct ldlm_extent_compat_args {
295         struct list_head *work_list;
296         struct ldlm_lock *lock;
297         ldlm_mode_t mode;
298         int *locks;
299         int *compat;
300 };
301
302 static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
303                                                 void *data)
304 {
305         struct ldlm_extent_compat_args *priv = data;
306         struct ldlm_interval *node = to_ldlm_interval(n);
307         struct ldlm_extent *extent;
308         struct list_head *work_list = priv->work_list;
309         struct ldlm_lock *lock, *enq = priv->lock;
310         ldlm_mode_t mode = priv->mode;
311         int count = 0;
312         ENTRY;
313
314         LASSERT(!list_empty(&node->li_group));
315
316         list_for_each_entry(lock, &node->li_group, l_sl_policy) {
317                 /* interval tree is for granted lock */
318                 LASSERTF(mode == lock->l_granted_mode,
319                          "mode = %s, lock->l_granted_mode = %s\n",
320                          ldlm_lockname[mode],
321                          ldlm_lockname[lock->l_granted_mode]);
322                 count++;
323                 if (lock->l_blocking_ast)
324                         ldlm_add_ast_work_item(lock, enq, work_list);
325         }
326
327         /* don't count conflicting glimpse locks */
328         extent = ldlm_interval_extent(node);
329         if (!(mode == LCK_PR &&
330             extent->start == 0 && extent->end == OBD_OBJECT_EOF))
331                 *priv->locks += count;
332
333         if (priv->compat)
334                 *priv->compat = 0;
335
336         RETURN(INTERVAL_ITER_CONT);
337 }
338
339 /* Determine if the lock is compatible with all locks on the queue.
340  * We stop walking the queue if we hit ourselves so we don't take
341  * conflicting locks enqueued after us into accound, or we'd wait forever.
342  *
343  * 0 if the lock is not compatible
344  * 1 if the lock is compatible
345  * 2 if this group lock is compatible and requires no further checking
346  * negative error, such as EWOULDBLOCK for group locks
347  */
348 static int
349 ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
350                          int *flags, ldlm_error_t *err,
351                          struct list_head *work_list, int *contended_locks)
352 {
353         struct list_head *tmp;
354         struct ldlm_lock *lock;
355         struct ldlm_resource *res = req->l_resource;
356         ldlm_mode_t req_mode = req->l_req_mode;
357         __u64 req_start = req->l_req_extent.start;
358         __u64 req_end = req->l_req_extent.end;
359         int compat = 1;
360         int scan = 0;
361         int check_contention;
362         ENTRY;
363
364         lockmode_verify(req_mode);
365
366         /* Using interval tree for granted lock */
367         if (queue == &res->lr_granted) {
368                 struct ldlm_interval_tree *tree;
369                 struct ldlm_extent_compat_args data = {.work_list = work_list,
370                                                .lock = req,
371                                                .locks = contended_locks,
372                                                .compat = &compat };
373                 struct interval_node_extent ex = { .start = req_start,
374                                                    .end = req_end };
375                 int idx, rc;
376
377                 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
378                         tree = &res->lr_itree[idx];
379                         if (tree->lit_root == NULL) /* empty tree, skipped */
380                                 continue;
381
382                         data.mode = tree->lit_mode;
383                         if (lockmode_compat(req_mode, tree->lit_mode)) {
384                                 struct ldlm_interval *node;
385                                 struct ldlm_extent *extent;
386
387                                 if (req_mode != LCK_GROUP)
388                                         continue;
389
390                                 /* group lock, grant it immediately if
391                                  * compatible */
392                                 node = to_ldlm_interval(tree->lit_root);
393                                 extent = ldlm_interval_extent(node);
394                                 if (req->l_policy_data.l_extent.gid ==
395                                     extent->gid)
396                                         RETURN(2);
397                         }
398
399                         if (tree->lit_mode == LCK_GROUP) {
400                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
401                                         compat = -EWOULDBLOCK;
402                                         goto destroylock;
403                                 }
404
405                                 *flags |= LDLM_FL_NO_TIMEOUT;
406                                 if (!work_list)
407                                         RETURN(0);
408
409                                 /* if work list is not NULL,add all
410                                    locks in the tree to work list */
411                                 compat = 0;
412                                 interval_iterate(tree->lit_root,
413                                                  ldlm_extent_compat_cb, &data);
414                                 continue;
415                         }
416
417                         if (!work_list) {
418                                 rc = interval_is_overlapped(tree->lit_root,&ex);
419                                 if (rc)
420                                         RETURN(0);
421                         } else {
422                                 interval_search(tree->lit_root, &ex,
423                                                 ldlm_extent_compat_cb, &data);
424                                 if (!list_empty(work_list) && compat)
425                                         compat = 0;
426                         }
427                 }
428         } else { /* for waiting queue */
429                 list_for_each(tmp, queue) {
430                         check_contention = 1;
431
432                         lock = list_entry(tmp, struct ldlm_lock, l_res_link);
433
434                         if (req == lock)
435                                 break;
436
437                         if (unlikely(scan)) {
438                                 /* We only get here if we are queuing GROUP lock
439                                    and met some incompatible one. The main idea of this
440                                    code is to insert GROUP lock past compatible GROUP
441                                    lock in the waiting queue or if there is not any,
442                                    then in front of first non-GROUP lock */
443                                 if (lock->l_req_mode != LCK_GROUP) {
444                                         /* Ok, we hit non-GROUP lock, there should
445                                          * be no more GROUP locks later on, queue in
446                                          * front of first non-GROUP lock */
447
448                                         ldlm_resource_insert_lock_after(lock, req);
449                                         list_del_init(&lock->l_res_link);
450                                         ldlm_resource_insert_lock_after(req, lock);
451                                         compat = 0;
452                                         break;
453                                 }
454                                 if (req->l_policy_data.l_extent.gid ==
455                                     lock->l_policy_data.l_extent.gid) {
456                                         /* found it */
457                                         ldlm_resource_insert_lock_after(lock, req);
458                                         compat = 0;
459                                         break;
460                                 }
461                                 continue;
462                         }
463
464                         /* locks are compatible, overlap doesn't matter */
465                         if (lockmode_compat(lock->l_req_mode, req_mode)) {
466                                 if (req_mode == LCK_PR &&
467                                     ((lock->l_policy_data.l_extent.start <=
468                                       req->l_policy_data.l_extent.start) &&
469                                      (lock->l_policy_data.l_extent.end >=
470                                       req->l_policy_data.l_extent.end))) {
471                                         /* If we met a PR lock just like us or wider,
472                                            and nobody down the list conflicted with
473                                            it, that means we can skip processing of
474                                            the rest of the list and safely place
475                                            ourselves at the end of the list, or grant
476                                            (dependent if we met an conflicting locks
477                                            before in the list).
478                                            In case of 1st enqueue only we continue
479                                            traversing if there is something conflicting
480                                            down the list because we need to make sure
481                                            that something is marked as AST_SENT as well,
482                                            in cse of empy worklist we would exit on
483                                            first conflict met. */
484                                         /* There IS a case where such flag is
485                                            not set for a lock, yet it blocks
486                                            something. Luckily for us this is
487                                            only during destroy, so lock is
488                                            exclusive. So here we are safe */
489                                         if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
490                                                 RETURN(compat);
491                                         }
492                                 }
493
494                                 /* non-group locks are compatible, overlap doesn't
495                                    matter */
496                                 if (likely(req_mode != LCK_GROUP))
497                                         continue;
498
499                                 /* If we are trying to get a GROUP lock and there is
500                                    another one of this kind, we need to compare gid */
501                                 if (req->l_policy_data.l_extent.gid ==
502                                     lock->l_policy_data.l_extent.gid) {
503                                         /* If existing lock with matched gid is granted,
504                                            we grant new one too. */
505                                         if (lock->l_req_mode == lock->l_granted_mode)
506                                                 RETURN(2);
507
508                                         /* Otherwise we are scanning queue of waiting
509                                          * locks and it means current request would
510                                          * block along with existing lock (that is
511                                          * already blocked.
512                                          * If we are in nonblocking mode - return
513                                          * immediately */
514                                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
515                                                 compat = -EWOULDBLOCK;
516                                                 goto destroylock;
517                                         }
518                                         /* If this group lock is compatible with another
519                                          * group lock on the waiting list, they must be
520                                          * together in the list, so they can be granted
521                                          * at the same time.  Otherwise the later lock
522                                          * can get stuck behind another, incompatible,
523                                          * lock. */
524                                         ldlm_resource_insert_lock_after(lock, req);
525                                         /* Because 'lock' is not granted, we can stop
526                                          * processing this queue and return immediately.
527                                          * There is no need to check the rest of the
528                                          * list. */
529                                         RETURN(0);
530                                 }
531                         }
532
533                         if (unlikely(req_mode == LCK_GROUP &&
534                                      (lock->l_req_mode != lock->l_granted_mode))) {
535                                 scan = 1;
536                                 compat = 0;
537                                 if (lock->l_req_mode != LCK_GROUP) {
538                                         /* Ok, we hit non-GROUP lock, there should be no
539                                            more GROUP locks later on, queue in front of
540                                            first non-GROUP lock */
541
542                                         ldlm_resource_insert_lock_after(lock, req);
543                                         list_del_init(&lock->l_res_link);
544                                         ldlm_resource_insert_lock_after(req, lock);
545                                         break;
546                                 }
547                                 if (req->l_policy_data.l_extent.gid ==
548                                     lock->l_policy_data.l_extent.gid) {
549                                         /* found it */
550                                         ldlm_resource_insert_lock_after(lock, req);
551                                         break;
552                                 }
553                                 continue;
554                         }
555
556                         if (unlikely(lock->l_req_mode == LCK_GROUP)) {
557                                 /* If compared lock is GROUP, then requested is PR/PW/
558                                  * so this is not compatible; extent range does not
559                                  * matter */
560                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
561                                         compat = -EWOULDBLOCK;
562                                         goto destroylock;
563                                 } else {
564                                         *flags |= LDLM_FL_NO_TIMEOUT;
565                                 }
566                         } else if (lock->l_policy_data.l_extent.end < req_start ||
567                                    lock->l_policy_data.l_extent.start > req_end) {
568                                 /* if a non group lock doesn't overlap skip it */
569                                 continue;
570                         } else if (lock->l_req_extent.end < req_start ||
571                                    lock->l_req_extent.start > req_end) {
572                                 /* false contention, the requests doesn't really overlap */
573                                 check_contention = 0;
574                         }
575
576                         if (!work_list)
577                                 RETURN(0);
578
579                         /* don't count conflicting glimpse locks */
580                         if (lock->l_req_mode == LCK_PR &&
581                             lock->l_policy_data.l_extent.start == 0 &&
582                             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
583                                 check_contention = 0;
584
585                         *contended_locks += check_contention;
586
587                         compat = 0;
588                         if (lock->l_blocking_ast)
589                                 ldlm_add_ast_work_item(lock, req, work_list);
590                 }
591         }
592
593         if (ldlm_check_contention(req, *contended_locks) &&
594             compat == 0 &&
595             (*flags & LDLM_FL_DENY_ON_CONTENTION) &&
596             req->l_req_mode != LCK_GROUP &&
597             req_end - req_start <=
598             req->l_resource->lr_namespace->ns_max_nolock_size)
599                 GOTO(destroylock, compat = -EUSERS);
600
601         RETURN(compat);
602 destroylock:
603         list_del_init(&req->l_res_link);
604         ldlm_lock_destroy_nolock(req);
605         *err = compat;
606         RETURN(compat);
607 }
608
609 static void discard_bl_list(struct list_head *bl_list)
610 {
611         struct list_head *tmp, *pos;
612         ENTRY;
613
614         list_for_each_safe(pos, tmp, bl_list) {
615                 struct ldlm_lock *lock =
616                         list_entry(pos, struct ldlm_lock, l_bl_ast);
617
618                 list_del_init(&lock->l_bl_ast);
619                 LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
620                 lock->l_flags &= ~LDLM_FL_AST_SENT;
621                 LASSERT(lock->l_bl_ast_run == 0);
622                 LASSERT(lock->l_blocking_lock);
623                 LDLM_LOCK_PUT(lock->l_blocking_lock);
624                 lock->l_blocking_lock = NULL;
625                 LDLM_LOCK_PUT(lock);
626         }
627         EXIT;
628 }
629
630 /* If first_enq is 0 (ie, called from ldlm_reprocess_queue):
631   *   - blocking ASTs have already been sent
632   *   - must call this function with the ns lock held
633   *
634   * If first_enq is 1 (ie, called from ldlm_lock_enqueue):
635   *   - blocking ASTs have not been sent
636   *   - must call this function with the ns lock held once */
637 int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
638                              ldlm_error_t *err, struct list_head *work_list)
639 {
640         struct ldlm_resource *res = lock->l_resource;
641         CFS_LIST_HEAD(rpc_list);
642         int rc, rc2;
643         int contended_locks = 0;
644         ENTRY;
645
646         LASSERT(list_empty(&res->lr_converting));
647         LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
648                 !(lock->l_flags & LDLM_AST_DISCARD_DATA));
649         check_res_locked(res);
650         *err = ELDLM_OK;
651
652         if (!first_enq) {
653                 /* Careful observers will note that we don't handle -EWOULDBLOCK
654                  * here, but it's ok for a non-obvious reason -- compat_queue
655                  * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
656                  * flags should always be zero here, and if that ever stops
657                  * being true, we want to find out. */
658                 LASSERT(*flags == 0);
659                 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
660                                               err, NULL, &contended_locks);
661                 if (rc == 1) {
662                         rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
663                                                       flags, err, NULL,
664                                                       &contended_locks);
665                 }
666                 if (rc == 0)
667                         RETURN(LDLM_ITER_STOP);
668
669                 ldlm_resource_unlink_lock(lock);
670
671                 if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE))
672                         ldlm_extent_policy(res, lock, flags);
673                 ldlm_grant_lock(lock, work_list);
674                 RETURN(LDLM_ITER_CONTINUE);
675         }
676
677  restart:
678         contended_locks = 0;
679         rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
680                                       &rpc_list, &contended_locks);
681         if (rc < 0)
682                 GOTO(out, rc); /* lock was destroyed */
683         if (rc == 2)
684                 goto grant;
685
686         rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err,
687                                        &rpc_list, &contended_locks);
688         if (rc2 < 0)
689                 GOTO(out, rc = rc2); /* lock was destroyed */
690
691         if (rc + rc2 == 2) {
692         grant:
693                 ldlm_extent_policy(res, lock, flags);
694                 ldlm_resource_unlink_lock(lock);
695                 ldlm_grant_lock(lock, NULL);
696         } else {
697                 /* If either of the compat_queue()s returned failure, then we
698                  * have ASTs to send and must go onto the waiting list.
699                  *
700                  * bug 2322: we used to unlink and re-add here, which was a
701                  * terrible folly -- if we goto restart, we could get
702                  * re-ordered!  Causes deadlock, because ASTs aren't sent! */
703                 if (list_empty(&lock->l_res_link))
704                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
705                 unlock_res(res);
706                 rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_BL_AST);
707                 lock_res(res);
708
709                 if (rc == -ERESTART) {
710                         /* lock was granted while resource was unlocked. */
711                         if (lock->l_granted_mode == lock->l_req_mode) {
712                                 /* bug 11300: if the lock has been granted,
713                                  * break earlier because otherwise, we will go
714                                  * to restart and ldlm_resource_unlink will be
715                                  * called and it causes the interval node to be
716                                  * freed. Then we will fail at 
717                                  * ldlm_extent_add_lock() */
718                                 *flags &= ~(LDLM_FL_BLOCK_GRANTED | LDLM_FL_BLOCK_CONV |
719                                             LDLM_FL_BLOCK_WAIT);
720                                 GOTO(out, rc = 0);
721                         }
722
723                         GOTO(restart, -ERESTART);
724                 }
725
726                 *flags |= LDLM_FL_BLOCK_GRANTED;
727                 /* this way we force client to wait for the lock
728                  * endlessly once the lock is enqueued -bzzz */
729                 *flags |= LDLM_FL_NO_TIMEOUT;
730
731         }
732         RETURN(0);
733 out:
734         if (!list_empty(&rpc_list)) {
735                 LASSERT(!(lock->l_flags & LDLM_AST_DISCARD_DATA));
736                 discard_bl_list(&rpc_list);
737         }
738         RETURN(rc);
739 }
740
741 /* When a lock is cancelled by a client, the KMS may undergo change if this
742  * is the "highest lock".  This function returns the new KMS value.
743  * Caller must hold ns_lock already.
744  *
745  * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes! */
746 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
747 {
748         struct ldlm_resource *res = lock->l_resource;
749         struct list_head *tmp;
750         struct ldlm_lock *lck;
751         __u64 kms = 0;
752         ENTRY;
753
754         /* don't let another thread in ldlm_extent_shift_kms race in
755          * just after we finish and take our lock into account in its
756          * calculation of the kms */
757         lock->l_flags |= LDLM_FL_KMS_IGNORE;
758
759         list_for_each(tmp, &res->lr_granted) {
760                 lck = list_entry(tmp, struct ldlm_lock, l_res_link);
761
762                 if (lck->l_flags & LDLM_FL_KMS_IGNORE)
763                         continue;
764
765                 if (lck->l_policy_data.l_extent.end >= old_kms)
766                         RETURN(old_kms);
767
768                 /* This extent _has_ to be smaller than old_kms (checked above)
769                  * so kms can only ever be smaller or the same as old_kms. */
770                 if (lck->l_policy_data.l_extent.end + 1 > kms)
771                         kms = lck->l_policy_data.l_extent.end + 1;
772         }
773         LASSERTF(kms <= old_kms, "kms "LPU64" old_kms "LPU64"\n", kms, old_kms);
774
775         RETURN(kms);
776 }
777
778 cfs_mem_cache_t *ldlm_interval_slab;
779 struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock)
780 {
781         struct ldlm_interval *node;
782         ENTRY;
783
784         LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
785         OBD_SLAB_ALLOC(node, ldlm_interval_slab, CFS_ALLOC_IO, sizeof(*node));
786         if (node == NULL)
787                 RETURN(NULL);
788
789         CFS_INIT_LIST_HEAD(&node->li_group);
790         ldlm_interval_attach(node, lock);
791         RETURN(node);
792 }
793
794 void ldlm_interval_free(struct ldlm_interval *node)
795 {
796         if (node) {
797                 LASSERT(list_empty(&node->li_group));
798                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
799         }
800 }
801
802 /* interval tree, for LDLM_EXTENT. */
803 void ldlm_interval_attach(struct ldlm_interval *n,
804                           struct ldlm_lock *l)
805 {
806         LASSERT(l->l_tree_node == NULL);
807         LASSERT(l->l_resource->lr_type == LDLM_EXTENT);
808
809         list_add_tail(&l->l_sl_policy, &n->li_group);
810         l->l_tree_node = n;
811 }
812
813 struct ldlm_interval *ldlm_interval_detach(struct ldlm_lock *l)
814 {
815         struct ldlm_interval *n = l->l_tree_node;
816
817         if (n == NULL)
818                 return NULL;
819
820         LASSERT(!list_empty(&n->li_group));
821         l->l_tree_node = NULL;
822         list_del_init(&l->l_sl_policy);
823
824         return (list_empty(&n->li_group) ? n : NULL);
825 }
826
827 static inline int lock_mode_to_index(ldlm_mode_t mode)
828 {
829         int index;
830
831         LASSERT(mode != 0);
832         LASSERT(IS_PO2(mode));
833         for (index = -1; mode; index++, mode >>= 1) ;
834         LASSERT(index < LCK_MODE_NUM);
835         return index;
836 }
837
838 void ldlm_extent_add_lock(struct ldlm_resource *res,
839                           struct ldlm_lock *lock)
840 {
841         struct interval_node *found, **root;
842         struct ldlm_interval *node;
843         struct ldlm_extent *extent;
844         int idx;
845
846         LASSERT(lock->l_granted_mode == lock->l_req_mode);
847
848         node = lock->l_tree_node;
849         LASSERT(node != NULL);
850
851         idx = lock_mode_to_index(lock->l_granted_mode);
852         LASSERT(lock->l_granted_mode == 1 << idx);
853         LASSERT(lock->l_granted_mode == res->lr_itree[idx].lit_mode);
854
855         /* node extent initialize */
856         extent = &lock->l_policy_data.l_extent;
857         interval_set(&node->li_node, extent->start, extent->end);
858
859         root = &res->lr_itree[idx].lit_root;
860         found = interval_insert(&node->li_node, root);
861         if (found) { /* The policy group found. */
862                 struct ldlm_interval *tmp = ldlm_interval_detach(lock);
863                 LASSERT(tmp != NULL);
864                 ldlm_interval_free(tmp);
865                 ldlm_interval_attach(to_ldlm_interval(found), lock);
866         }
867         res->lr_itree[idx].lit_size++;
868
869         /* even though we use interval tree to manage the extent lock, we also
870          * add the locks into grant list, for debug purpose, .. */
871         ldlm_resource_add_lock(res, &res->lr_granted, lock);
872 }
873
874 void ldlm_extent_unlink_lock(struct ldlm_lock *lock)
875 {
876         struct ldlm_resource *res = lock->l_resource;
877         struct ldlm_interval *node;
878         struct ldlm_interval_tree *tree;
879         int idx;
880
881         if (lock->l_granted_mode != lock->l_req_mode)
882                 return;
883
884         LASSERT(lock->l_tree_node != NULL);
885         idx = lock_mode_to_index(lock->l_granted_mode);
886         LASSERT(lock->l_granted_mode == 1 << idx);
887         tree = &res->lr_itree[idx];
888
889         LASSERT(tree->lit_root != NULL); /* assure the tree is not null */
890
891         tree->lit_size--;
892         node = ldlm_interval_detach(lock);
893         if (node) {
894                 interval_erase(&node->li_node, &tree->lit_root);
895                 ldlm_interval_free(node);
896         }
897 }