Whamcloud - gitweb
lu_ref support for ldlm_lock and ldlm_resource. See lu_ref patch.
[fs/lustre-release.git] / lustre / ldlm / ldlm_extent.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_extent.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LDLM
43 #ifndef __KERNEL__
44 # include <liblustre.h>
45 #else
46 # include <libcfs/libcfs.h>
47 #endif
48
49 #include <lustre_dlm.h>
50 #include <obd_support.h>
51 #include <obd.h>
52 #include <lustre_lib.h>
53
54 #include "ldlm_internal.h"
55
56 #define LDLM_MAX_GROWN_EXTENT (32 * 1024 * 1024 - 1)
57
58 /* fixup the ldlm_extent after expanding */
59 static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
60                                               struct ldlm_extent *new_ex,
61                                               int conflicting)
62 {
63         ldlm_mode_t req_mode = req->l_req_mode;
64         __u64 req_start = req->l_req_extent.start;
65         __u64 req_end = req->l_req_extent.end;
66         __u64 req_align, mask;
67  
68         if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) {
69                 if (req_end < req_start + LDLM_MAX_GROWN_EXTENT)
70                         new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT,
71                                           new_ex->end);
72         }
73
74         if (new_ex->start == 0 && new_ex->end == OBD_OBJECT_EOF) {
75                 EXIT;
76                 return;
77         }
78
79         /* we need to ensure that the lock extent is properly aligned to what
80          * the client requested.  We align it to the lowest-common denominator
81          * of the clients requested lock start and end alignment. */
82         mask = 0x1000ULL;
83         req_align = (req_end + 1) | req_start;
84         if (req_align != 0) {
85                 while ((req_align & mask) == 0)
86                         mask <<= 1;
87         }
88         mask -= 1;
89         /* We can only shrink the lock, not grow it.
90          * This should never cause lock to be smaller than requested,
91          * since requested lock was already aligned on these boundaries. */
92         new_ex->start = ((new_ex->start - 1) | mask) + 1;
93         new_ex->end = ((new_ex->end + 1) & ~mask) - 1;
94         LASSERTF(new_ex->start <= req_start,
95                  "mask "LPX64" grant start "LPU64" req start "LPU64"\n",
96                  mask, new_ex->start, req_start);
97         LASSERTF(new_ex->end >= req_end,
98                  "mask "LPX64" grant end "LPU64" req end "LPU64"\n",
99                  mask, new_ex->end, req_end);
100 }
101
102 /* The purpose of this function is to return:
103  * - the maximum extent
104  * - containing the requested extent
105  * - and not overlapping existing conflicting extents outside the requested one
106  *
107  * Use interval tree to expand the lock extent for granted lock.
108  */
109 static void ldlm_extent_internal_policy_granted(struct ldlm_lock *req,
110                                                 struct ldlm_extent *new_ex)
111 {
112         struct ldlm_resource *res = req->l_resource;
113         ldlm_mode_t req_mode = req->l_req_mode;
114         __u64 req_start = req->l_req_extent.start;
115         __u64 req_end = req->l_req_extent.end;
116         struct ldlm_interval_tree *tree;
117         struct interval_node_extent limiter = { new_ex->start, new_ex->end };
118         int conflicting = 0;
119         int idx;
120         ENTRY;
121
122         lockmode_verify(req_mode);
123
124         /* using interval tree to handle the ldlm extent granted locks */
125         for (idx = 0; idx < LCK_MODE_NUM; idx++) {
126                 struct interval_node_extent ext = { req_start, req_end };
127
128                 tree = &res->lr_itree[idx];
129                 if (lockmode_compat(tree->lit_mode, req_mode))
130                         continue;
131
132                 conflicting += tree->lit_size;
133                 if (conflicting > 4)
134                         limiter.start = req_start;
135
136                 if (interval_is_overlapped(tree->lit_root, &ext))
137                         CDEBUG(D_INFO, 
138                                "req_mode = %d, tree->lit_mode = %d, "
139                                "tree->lit_size = %d\n",
140                                req_mode, tree->lit_mode, tree->lit_size);
141                 interval_expand(tree->lit_root, &ext, &limiter);
142                 limiter.start = max(limiter.start, ext.start);
143                 limiter.end = min(limiter.end, ext.end);
144                 if (limiter.start == req_start && limiter.end == req_end)
145                         break;
146         }
147
148         new_ex->start = limiter.start;
149         new_ex->end = limiter.end;
150         LASSERT(new_ex->start <= req_start);
151         LASSERT(new_ex->end >= req_end);
152
153         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
154         EXIT;
155 }
156
157 /* The purpose of this function is to return:
158  * - the maximum extent
159  * - containing the requested extent
160  * - and not overlapping existing conflicting extents outside the requested one
161  */
162 static void
163 ldlm_extent_internal_policy_waiting(struct ldlm_lock *req,
164                                     struct ldlm_extent *new_ex)
165 {
166         struct list_head *tmp;
167         struct ldlm_resource *res = req->l_resource;
168         ldlm_mode_t req_mode = req->l_req_mode;
169         __u64 req_start = req->l_req_extent.start;
170         __u64 req_end = req->l_req_extent.end;
171         int conflicting = 0;
172         ENTRY;
173
174         lockmode_verify(req_mode);
175
176         /* for waiting locks */
177         list_for_each(tmp, &res->lr_waiting) {
178                 struct ldlm_lock *lock;
179                 struct ldlm_extent *l_extent;
180
181                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
182                 l_extent = &lock->l_policy_data.l_extent;
183
184                 /* We already hit the minimum requested size, search no more */
185                 if (new_ex->start == req_start && new_ex->end == req_end) {
186                         EXIT;
187                         return;
188                 }
189
190                 /* Don't conflict with ourselves */
191                 if (req == lock)
192                         continue;
193
194                 /* Locks are compatible, overlap doesn't matter */
195                 /* Until bug 20 is fixed, try to avoid granting overlapping
196                  * locks on one client (they take a long time to cancel) */
197                 if (lockmode_compat(lock->l_req_mode, req_mode) &&
198                     lock->l_export != req->l_export)
199                         continue;
200
201                 /* If this is a high-traffic lock, don't grow downwards at all
202                  * or grow upwards too much */
203                 ++conflicting;
204                 if (conflicting > 4)
205                         new_ex->start = req_start;
206
207                 /* If lock doesn't overlap new_ex, skip it. */
208                 if (!ldlm_extent_overlap(l_extent, new_ex))
209                         continue;
210
211                 /* Locks conflicting in requested extents and we can't satisfy
212                  * both locks, so ignore it.  Either we will ping-pong this
213                  * extent (we would regardless of what extent we granted) or
214                  * lock is unused and it shouldn't limit our extent growth. */
215                 if (ldlm_extent_overlap(&lock->l_req_extent,&req->l_req_extent))
216                         continue;
217
218                 /* We grow extents downwards only as far as they don't overlap
219                  * with already-granted locks, on the assumption that clients
220                  * will be writing beyond the initial requested end and would
221                  * then need to enqueue a new lock beyond previous request.
222                  * l_req_extent->end strictly < req_start, checked above. */
223                 if (l_extent->start < req_start && new_ex->start != req_start) {
224                         if (l_extent->end >= req_start)
225                                 new_ex->start = req_start;
226                         else
227                                 new_ex->start = min(l_extent->end+1, req_start);
228                 }
229
230                 /* If we need to cancel this lock anyways because our request
231                  * overlaps the granted lock, we grow up to its requested
232                  * extent start instead of limiting this extent, assuming that
233                  * clients are writing forwards and the lock had over grown
234                  * its extent downwards before we enqueued our request. */
235                 if (l_extent->end > req_end) {
236                         if (l_extent->start <= req_end)
237                                 new_ex->end = max(lock->l_req_extent.start - 1,
238                                                   req_end);
239                         else
240                                 new_ex->end = max(l_extent->start - 1, req_end);
241                 }
242         }
243
244         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
245         EXIT;
246 }
247
248
249 /* In order to determine the largest possible extent we can grant, we need
250  * to scan all of the queues. */
251 static void ldlm_extent_policy(struct ldlm_resource *res,
252                                struct ldlm_lock *lock, int *flags)
253 {
254         struct ldlm_extent new_ex = { .start = 0, .end = OBD_OBJECT_EOF };
255
256         if (lock->l_export == NULL)
257                 /*
258                  * this is local lock taken by server (e.g., as a part of
259                  * OST-side locking, or unlink handling). Expansion doesn't
260                  * make a lot of sense for local locks, because they are
261                  * dropped immediately on operation completion and would only
262                  * conflict with other threads.
263                  */
264                 return;
265
266         if (lock->l_policy_data.l_extent.start == 0 &&
267             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
268                 /* fast-path whole file locks */
269                 return;
270
271         ldlm_extent_internal_policy_granted(lock, &new_ex);
272         ldlm_extent_internal_policy_waiting(lock, &new_ex);
273
274         if (new_ex.start != lock->l_policy_data.l_extent.start ||
275             new_ex.end != lock->l_policy_data.l_extent.end) {
276                 *flags |= LDLM_FL_LOCK_CHANGED;
277                 lock->l_policy_data.l_extent.start = new_ex.start;
278                 lock->l_policy_data.l_extent.end = new_ex.end;
279         }
280 }
281
282 static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
283 {
284         struct ldlm_resource *res = lock->l_resource;
285         cfs_time_t now = cfs_time_current();
286
287         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_SET_CONTENTION))
288                 return 1;
289
290         CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
291         if (contended_locks > res->lr_namespace->ns_contended_locks)
292                 res->lr_contention_time = now;
293         return cfs_time_before(now, cfs_time_add(res->lr_contention_time,
294                 cfs_time_seconds(res->lr_namespace->ns_contention_time)));
295 }
296
297 struct ldlm_extent_compat_args {
298         struct list_head *work_list;
299         struct ldlm_lock *lock;
300         ldlm_mode_t mode;
301         int *locks;
302         int *compat;
303 };
304
305 static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
306                                                 void *data)
307 {
308         struct ldlm_extent_compat_args *priv = data;
309         struct ldlm_interval *node = to_ldlm_interval(n);
310         struct ldlm_extent *extent;
311         struct list_head *work_list = priv->work_list;
312         struct ldlm_lock *lock, *enq = priv->lock;
313         ldlm_mode_t mode = priv->mode;
314         int count = 0;
315         ENTRY;
316
317         LASSERT(!list_empty(&node->li_group));
318
319         list_for_each_entry(lock, &node->li_group, l_sl_policy) {
320                 /* interval tree is for granted lock */
321                 LASSERTF(mode == lock->l_granted_mode,
322                          "mode = %s, lock->l_granted_mode = %s\n",
323                          ldlm_lockname[mode],
324                          ldlm_lockname[lock->l_granted_mode]);
325                 count++;
326                 if (lock->l_blocking_ast)
327                         ldlm_add_ast_work_item(lock, enq, work_list);
328         }
329
330         /* don't count conflicting glimpse locks */
331         extent = ldlm_interval_extent(node);
332         if (!(mode == LCK_PR &&
333             extent->start == 0 && extent->end == OBD_OBJECT_EOF))
334                 *priv->locks += count;
335
336         if (priv->compat)
337                 *priv->compat = 0;
338
339         RETURN(INTERVAL_ITER_CONT);
340 }
341
342 /* Determine if the lock is compatible with all locks on the queue.
343  * We stop walking the queue if we hit ourselves so we don't take
344  * conflicting locks enqueued after us into accound, or we'd wait forever.
345  *
346  * 0 if the lock is not compatible
347  * 1 if the lock is compatible
348  * 2 if this group lock is compatible and requires no further checking
349  * negative error, such as EWOULDBLOCK for group locks
350  */
351 static int
352 ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
353                          int *flags, ldlm_error_t *err,
354                          struct list_head *work_list, int *contended_locks)
355 {
356         struct list_head *tmp;
357         struct ldlm_lock *lock;
358         struct ldlm_resource *res = req->l_resource;
359         ldlm_mode_t req_mode = req->l_req_mode;
360         __u64 req_start = req->l_req_extent.start;
361         __u64 req_end = req->l_req_extent.end;
362         int compat = 1;
363         int scan = 0;
364         int check_contention;
365         ENTRY;
366
367         lockmode_verify(req_mode);
368
369         /* Using interval tree for granted lock */
370         if (queue == &res->lr_granted) {
371                 struct ldlm_interval_tree *tree;
372                 struct ldlm_extent_compat_args data = {.work_list = work_list,
373                                                .lock = req,
374                                                .locks = contended_locks,
375                                                .compat = &compat };
376                 struct interval_node_extent ex = { .start = req_start,
377                                                    .end = req_end };
378                 int idx, rc;
379
380                 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
381                         tree = &res->lr_itree[idx];
382                         if (tree->lit_root == NULL) /* empty tree, skipped */
383                                 continue;
384
385                         data.mode = tree->lit_mode;
386                         if (lockmode_compat(req_mode, tree->lit_mode)) {
387                                 struct ldlm_interval *node;
388                                 struct ldlm_extent *extent;
389
390                                 if (req_mode != LCK_GROUP)
391                                         continue;
392
393                                 /* group lock, grant it immediately if
394                                  * compatible */
395                                 node = to_ldlm_interval(tree->lit_root);
396                                 extent = ldlm_interval_extent(node);
397                                 if (req->l_policy_data.l_extent.gid ==
398                                     extent->gid)
399                                         RETURN(2);
400                         }
401
402                         if (tree->lit_mode == LCK_GROUP) {
403                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
404                                         compat = -EWOULDBLOCK;
405                                         goto destroylock;
406                                 }
407
408                                 *flags |= LDLM_FL_NO_TIMEOUT;
409                                 if (!work_list)
410                                         RETURN(0);
411
412                                 /* if work list is not NULL,add all
413                                    locks in the tree to work list */
414                                 compat = 0;
415                                 interval_iterate(tree->lit_root,
416                                                  ldlm_extent_compat_cb, &data);
417                                 continue;
418                         }
419
420                         if (!work_list) {
421                                 rc = interval_is_overlapped(tree->lit_root,&ex);
422                                 if (rc)
423                                         RETURN(0);
424                         } else {
425                                 interval_search(tree->lit_root, &ex,
426                                                 ldlm_extent_compat_cb, &data);
427                                 if (!list_empty(work_list) && compat)
428                                         compat = 0;
429                         }
430                 }
431         } else { /* for waiting queue */
432                 list_for_each(tmp, queue) {
433                         check_contention = 1;
434
435                         lock = list_entry(tmp, struct ldlm_lock, l_res_link);
436
437                         if (req == lock)
438                                 break;
439
440                         if (unlikely(scan)) {
441                                 /* We only get here if we are queuing GROUP lock
442                                    and met some incompatible one. The main idea of this
443                                    code is to insert GROUP lock past compatible GROUP
444                                    lock in the waiting queue or if there is not any,
445                                    then in front of first non-GROUP lock */
446                                 if (lock->l_req_mode != LCK_GROUP) {
447                                         /* Ok, we hit non-GROUP lock, there should
448                                          * be no more GROUP locks later on, queue in
449                                          * front of first non-GROUP lock */
450
451                                         ldlm_resource_insert_lock_after(lock, req);
452                                         list_del_init(&lock->l_res_link);
453                                         ldlm_resource_insert_lock_after(req, lock);
454                                         compat = 0;
455                                         break;
456                                 }
457                                 if (req->l_policy_data.l_extent.gid ==
458                                     lock->l_policy_data.l_extent.gid) {
459                                         /* found it */
460                                         ldlm_resource_insert_lock_after(lock, req);
461                                         compat = 0;
462                                         break;
463                                 }
464                                 continue;
465                         }
466
467                         /* locks are compatible, overlap doesn't matter */
468                         if (lockmode_compat(lock->l_req_mode, req_mode)) {
469                                 if (req_mode == LCK_PR &&
470                                     ((lock->l_policy_data.l_extent.start <=
471                                       req->l_policy_data.l_extent.start) &&
472                                      (lock->l_policy_data.l_extent.end >=
473                                       req->l_policy_data.l_extent.end))) {
474                                         /* If we met a PR lock just like us or wider,
475                                            and nobody down the list conflicted with
476                                            it, that means we can skip processing of
477                                            the rest of the list and safely place
478                                            ourselves at the end of the list, or grant
479                                            (dependent if we met an conflicting locks
480                                            before in the list).
481                                            In case of 1st enqueue only we continue
482                                            traversing if there is something conflicting
483                                            down the list because we need to make sure
484                                            that something is marked as AST_SENT as well,
485                                            in cse of empy worklist we would exit on
486                                            first conflict met. */
487                                         /* There IS a case where such flag is
488                                            not set for a lock, yet it blocks
489                                            something. Luckily for us this is
490                                            only during destroy, so lock is
491                                            exclusive. So here we are safe */
492                                         if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
493                                                 RETURN(compat);
494                                         }
495                                 }
496
497                                 /* non-group locks are compatible, overlap doesn't
498                                    matter */
499                                 if (likely(req_mode != LCK_GROUP))
500                                         continue;
501
502                                 /* If we are trying to get a GROUP lock and there is
503                                    another one of this kind, we need to compare gid */
504                                 if (req->l_policy_data.l_extent.gid ==
505                                     lock->l_policy_data.l_extent.gid) {
506                                         /* If existing lock with matched gid is granted,
507                                            we grant new one too. */
508                                         if (lock->l_req_mode == lock->l_granted_mode)
509                                                 RETURN(2);
510
511                                         /* Otherwise we are scanning queue of waiting
512                                          * locks and it means current request would
513                                          * block along with existing lock (that is
514                                          * already blocked.
515                                          * If we are in nonblocking mode - return
516                                          * immediately */
517                                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
518                                                 compat = -EWOULDBLOCK;
519                                                 goto destroylock;
520                                         }
521                                         /* If this group lock is compatible with another
522                                          * group lock on the waiting list, they must be
523                                          * together in the list, so they can be granted
524                                          * at the same time.  Otherwise the later lock
525                                          * can get stuck behind another, incompatible,
526                                          * lock. */
527                                         ldlm_resource_insert_lock_after(lock, req);
528                                         /* Because 'lock' is not granted, we can stop
529                                          * processing this queue and return immediately.
530                                          * There is no need to check the rest of the
531                                          * list. */
532                                         RETURN(0);
533                                 }
534                         }
535
536                         if (unlikely(req_mode == LCK_GROUP &&
537                                      (lock->l_req_mode != lock->l_granted_mode))) {
538                                 scan = 1;
539                                 compat = 0;
540                                 if (lock->l_req_mode != LCK_GROUP) {
541                                         /* Ok, we hit non-GROUP lock, there should be no
542                                            more GROUP locks later on, queue in front of
543                                            first non-GROUP lock */
544
545                                         ldlm_resource_insert_lock_after(lock, req);
546                                         list_del_init(&lock->l_res_link);
547                                         ldlm_resource_insert_lock_after(req, lock);
548                                         break;
549                                 }
550                                 if (req->l_policy_data.l_extent.gid ==
551                                     lock->l_policy_data.l_extent.gid) {
552                                         /* found it */
553                                         ldlm_resource_insert_lock_after(lock, req);
554                                         break;
555                                 }
556                                 continue;
557                         }
558
559                         if (unlikely(lock->l_req_mode == LCK_GROUP)) {
560                                 /* If compared lock is GROUP, then requested is PR/PW/
561                                  * so this is not compatible; extent range does not
562                                  * matter */
563                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
564                                         compat = -EWOULDBLOCK;
565                                         goto destroylock;
566                                 } else {
567                                         *flags |= LDLM_FL_NO_TIMEOUT;
568                                 }
569                         } else if (lock->l_policy_data.l_extent.end < req_start ||
570                                    lock->l_policy_data.l_extent.start > req_end) {
571                                 /* if a non group lock doesn't overlap skip it */
572                                 continue;
573                         } else if (lock->l_req_extent.end < req_start ||
574                                    lock->l_req_extent.start > req_end) {
575                                 /* false contention, the requests doesn't really overlap */
576                                 check_contention = 0;
577                         }
578
579                         if (!work_list)
580                                 RETURN(0);
581
582                         /* don't count conflicting glimpse locks */
583                         if (lock->l_req_mode == LCK_PR &&
584                             lock->l_policy_data.l_extent.start == 0 &&
585                             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
586                                 check_contention = 0;
587
588                         *contended_locks += check_contention;
589
590                         compat = 0;
591                         if (lock->l_blocking_ast)
592                                 ldlm_add_ast_work_item(lock, req, work_list);
593                 }
594         }
595
596         if (ldlm_check_contention(req, *contended_locks) &&
597             compat == 0 &&
598             (*flags & LDLM_FL_DENY_ON_CONTENTION) &&
599             req->l_req_mode != LCK_GROUP &&
600             req_end - req_start <=
601             req->l_resource->lr_namespace->ns_max_nolock_size)
602                 GOTO(destroylock, compat = -EUSERS);
603
604         RETURN(compat);
605 destroylock:
606         list_del_init(&req->l_res_link);
607         ldlm_lock_destroy_nolock(req);
608         *err = compat;
609         RETURN(compat);
610 }
611
612 static void discard_bl_list(struct list_head *bl_list)
613 {
614         struct list_head *tmp, *pos;
615         ENTRY;
616
617         list_for_each_safe(pos, tmp, bl_list) {
618                 struct ldlm_lock *lock =
619                         list_entry(pos, struct ldlm_lock, l_bl_ast);
620
621                 list_del_init(&lock->l_bl_ast);
622                 LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
623                 lock->l_flags &= ~LDLM_FL_AST_SENT;
624                 LASSERT(lock->l_bl_ast_run == 0);
625                 LASSERT(lock->l_blocking_lock);
626                 LDLM_LOCK_RELEASE(lock->l_blocking_lock);
627                 lock->l_blocking_lock = NULL;
628                 LDLM_LOCK_RELEASE(lock);
629         }
630         EXIT;
631 }
632
633 /* If first_enq is 0 (ie, called from ldlm_reprocess_queue):
634   *   - blocking ASTs have already been sent
635   *   - must call this function with the ns lock held
636   *
637   * If first_enq is 1 (ie, called from ldlm_lock_enqueue):
638   *   - blocking ASTs have not been sent
639   *   - must call this function with the ns lock held once */
640 int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
641                              ldlm_error_t *err, struct list_head *work_list)
642 {
643         struct ldlm_resource *res = lock->l_resource;
644         CFS_LIST_HEAD(rpc_list);
645         int rc, rc2;
646         int contended_locks = 0;
647         ENTRY;
648
649         LASSERT(list_empty(&res->lr_converting));
650         LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
651                 !(lock->l_flags & LDLM_AST_DISCARD_DATA));
652         check_res_locked(res);
653         *err = ELDLM_OK;
654
655         if (!first_enq) {
656                 /* Careful observers will note that we don't handle -EWOULDBLOCK
657                  * here, but it's ok for a non-obvious reason -- compat_queue
658                  * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
659                  * flags should always be zero here, and if that ever stops
660                  * being true, we want to find out. */
661                 LASSERT(*flags == 0);
662                 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
663                                               err, NULL, &contended_locks);
664                 if (rc == 1) {
665                         rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
666                                                       flags, err, NULL,
667                                                       &contended_locks);
668                 }
669                 if (rc == 0)
670                         RETURN(LDLM_ITER_STOP);
671
672                 ldlm_resource_unlink_lock(lock);
673
674                 if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE))
675                         ldlm_extent_policy(res, lock, flags);
676                 ldlm_grant_lock(lock, work_list);
677                 RETURN(LDLM_ITER_CONTINUE);
678         }
679
680  restart:
681         contended_locks = 0;
682         rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
683                                       &rpc_list, &contended_locks);
684         if (rc < 0)
685                 GOTO(out, rc); /* lock was destroyed */
686         if (rc == 2)
687                 goto grant;
688
689         rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err,
690                                        &rpc_list, &contended_locks);
691         if (rc2 < 0)
692                 GOTO(out, rc = rc2); /* lock was destroyed */
693
694         if (rc + rc2 == 2) {
695         grant:
696                 ldlm_extent_policy(res, lock, flags);
697                 ldlm_resource_unlink_lock(lock);
698                 ldlm_grant_lock(lock, NULL);
699         } else {
700                 /* If either of the compat_queue()s returned failure, then we
701                  * have ASTs to send and must go onto the waiting list.
702                  *
703                  * bug 2322: we used to unlink and re-add here, which was a
704                  * terrible folly -- if we goto restart, we could get
705                  * re-ordered!  Causes deadlock, because ASTs aren't sent! */
706                 if (list_empty(&lock->l_res_link))
707                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
708                 unlock_res(res);
709                 rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_BL_AST);
710                 lock_res(res);
711
712                 if (rc == -ERESTART) {
713                         /* lock was granted while resource was unlocked. */
714                         if (lock->l_granted_mode == lock->l_req_mode) {
715                                 /* bug 11300: if the lock has been granted,
716                                  * break earlier because otherwise, we will go
717                                  * to restart and ldlm_resource_unlink will be
718                                  * called and it causes the interval node to be
719                                  * freed. Then we will fail at 
720                                  * ldlm_extent_add_lock() */
721                                 *flags &= ~(LDLM_FL_BLOCK_GRANTED | LDLM_FL_BLOCK_CONV |
722                                             LDLM_FL_BLOCK_WAIT);
723                                 GOTO(out, rc = 0);
724                         }
725
726                         GOTO(restart, -ERESTART);
727                 }
728
729                 *flags |= LDLM_FL_BLOCK_GRANTED;
730                 /* this way we force client to wait for the lock
731                  * endlessly once the lock is enqueued -bzzz */
732                 *flags |= LDLM_FL_NO_TIMEOUT;
733
734         }
735         RETURN(0);
736 out:
737         if (!list_empty(&rpc_list)) {
738                 LASSERT(!(lock->l_flags & LDLM_AST_DISCARD_DATA));
739                 discard_bl_list(&rpc_list);
740         }
741         RETURN(rc);
742 }
743
744 /* When a lock is cancelled by a client, the KMS may undergo change if this
745  * is the "highest lock".  This function returns the new KMS value.
746  * Caller must hold ns_lock already.
747  *
748  * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes! */
749 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
750 {
751         struct ldlm_resource *res = lock->l_resource;
752         struct list_head *tmp;
753         struct ldlm_lock *lck;
754         __u64 kms = 0;
755         ENTRY;
756
757         /* don't let another thread in ldlm_extent_shift_kms race in
758          * just after we finish and take our lock into account in its
759          * calculation of the kms */
760         lock->l_flags |= LDLM_FL_KMS_IGNORE;
761
762         list_for_each(tmp, &res->lr_granted) {
763                 lck = list_entry(tmp, struct ldlm_lock, l_res_link);
764
765                 if (lck->l_flags & LDLM_FL_KMS_IGNORE)
766                         continue;
767
768                 if (lck->l_policy_data.l_extent.end >= old_kms)
769                         RETURN(old_kms);
770
771                 /* This extent _has_ to be smaller than old_kms (checked above)
772                  * so kms can only ever be smaller or the same as old_kms. */
773                 if (lck->l_policy_data.l_extent.end + 1 > kms)
774                         kms = lck->l_policy_data.l_extent.end + 1;
775         }
776         LASSERTF(kms <= old_kms, "kms "LPU64" old_kms "LPU64"\n", kms, old_kms);
777
778         RETURN(kms);
779 }
780
781 cfs_mem_cache_t *ldlm_interval_slab;
782 struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock)
783 {
784         struct ldlm_interval *node;
785         ENTRY;
786
787         LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
788         OBD_SLAB_ALLOC(node, ldlm_interval_slab, CFS_ALLOC_IO, sizeof(*node));
789         if (node == NULL)
790                 RETURN(NULL);
791
792         CFS_INIT_LIST_HEAD(&node->li_group);
793         ldlm_interval_attach(node, lock);
794         RETURN(node);
795 }
796
797 void ldlm_interval_free(struct ldlm_interval *node)
798 {
799         if (node) {
800                 LASSERT(list_empty(&node->li_group));
801                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
802         }
803 }
804
805 /* interval tree, for LDLM_EXTENT. */
806 void ldlm_interval_attach(struct ldlm_interval *n,
807                           struct ldlm_lock *l)
808 {
809         LASSERT(l->l_tree_node == NULL);
810         LASSERT(l->l_resource->lr_type == LDLM_EXTENT);
811
812         list_add_tail(&l->l_sl_policy, &n->li_group);
813         l->l_tree_node = n;
814 }
815
816 struct ldlm_interval *ldlm_interval_detach(struct ldlm_lock *l)
817 {
818         struct ldlm_interval *n = l->l_tree_node;
819
820         if (n == NULL)
821                 return NULL;
822
823         LASSERT(!list_empty(&n->li_group));
824         l->l_tree_node = NULL;
825         list_del_init(&l->l_sl_policy);
826
827         return (list_empty(&n->li_group) ? n : NULL);
828 }
829
830 static inline int lock_mode_to_index(ldlm_mode_t mode)
831 {
832         int index;
833
834         LASSERT(mode != 0);
835         LASSERT(IS_PO2(mode));
836         for (index = -1; mode; index++, mode >>= 1) ;
837         LASSERT(index < LCK_MODE_NUM);
838         return index;
839 }
840
841 void ldlm_extent_add_lock(struct ldlm_resource *res,
842                           struct ldlm_lock *lock)
843 {
844         struct interval_node *found, **root;
845         struct ldlm_interval *node;
846         struct ldlm_extent *extent;
847         int idx;
848
849         LASSERT(lock->l_granted_mode == lock->l_req_mode);
850
851         node = lock->l_tree_node;
852         LASSERT(node != NULL);
853
854         idx = lock_mode_to_index(lock->l_granted_mode);
855         LASSERT(lock->l_granted_mode == 1 << idx);
856         LASSERT(lock->l_granted_mode == res->lr_itree[idx].lit_mode);
857
858         /* node extent initialize */
859         extent = &lock->l_policy_data.l_extent;
860         interval_set(&node->li_node, extent->start, extent->end);
861
862         root = &res->lr_itree[idx].lit_root;
863         found = interval_insert(&node->li_node, root);
864         if (found) { /* The policy group found. */
865                 struct ldlm_interval *tmp = ldlm_interval_detach(lock);
866                 LASSERT(tmp != NULL);
867                 ldlm_interval_free(tmp);
868                 ldlm_interval_attach(to_ldlm_interval(found), lock);
869         }
870         res->lr_itree[idx].lit_size++;
871
872         /* even though we use interval tree to manage the extent lock, we also
873          * add the locks into grant list, for debug purpose, .. */
874         ldlm_resource_add_lock(res, &res->lr_granted, lock);
875 }
876
877 void ldlm_extent_unlink_lock(struct ldlm_lock *lock)
878 {
879         struct ldlm_resource *res = lock->l_resource;
880         struct ldlm_interval *node;
881         struct ldlm_interval_tree *tree;
882         int idx;
883
884         if (lock->l_granted_mode != lock->l_req_mode)
885                 return;
886
887         LASSERT(lock->l_tree_node != NULL);
888         idx = lock_mode_to_index(lock->l_granted_mode);
889         LASSERT(lock->l_granted_mode == 1 << idx);
890         tree = &res->lr_itree[idx];
891
892         LASSERT(tree->lit_root != NULL); /* assure the tree is not null */
893
894         tree->lit_size--;
895         node = ldlm_interval_detach(lock);
896         if (node) {
897                 interval_erase(&node->li_node, &tree->lit_root);
898                 ldlm_interval_free(node);
899         }
900 }