Whamcloud - gitweb
b=11270
[fs/lustre-release.git] / lustre / ldlm / ldlm_extent.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of the Lustre file system, http://www.lustre.org
9  *   Lustre is a trademark of Cluster File Systems, Inc.
10  *
11  *   You may have signed or agreed to another license before downloading
12  *   this software.  If so, you are bound by the terms and conditions
13  *   of that agreement, and the following does not apply to you.  See the
14  *   LICENSE file included with this distribution for more information.
15  *
16  *   If you did not agree to a different license, then this copy of Lustre
17  *   is open source software; you can redistribute it and/or modify it
18  *   under the terms of version 2 of the GNU General Public License as
19  *   published by the Free Software Foundation.
20  *
21  *   In either case, Lustre is distributed in the hope that it will be
22  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
23  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24  *   license text for more details.
25  */
26
27 #define DEBUG_SUBSYSTEM S_LDLM
28 #ifndef __KERNEL__
29 # include <liblustre.h>
30 #endif
31
32 #include <lustre_dlm.h>
33 #include <obd_support.h>
34 #include <obd.h>
35 #include <lustre_lib.h>
36
37 #include "ldlm_internal.h"
38
39 #define LDLM_MAX_GROWN_EXTENT (32 * 1024 * 1024 - 1)
40
41 /* fixup the ldlm_extent after expanding */
42 static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
43                                               struct ldlm_extent *new_ex,
44                                               int conflicting)
45 {
46         ldlm_mode_t req_mode = req->l_req_mode;
47         __u64 req_start = req->l_req_extent.start;
48         __u64 req_end = req->l_req_extent.end;
49         __u64 req_align, mask;
50  
51         if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) {
52                 if (req_end < req_start + LDLM_MAX_GROWN_EXTENT)
53                         new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT,
54                                           new_ex->end);
55         }
56
57         if (new_ex->start == 0 && new_ex->end == OBD_OBJECT_EOF) {
58                 EXIT;
59                 return;
60         }
61
62         /* we need to ensure that the lock extent is properly aligned to what
63          * the client requested.  We align it to the lowest-common denominator
64          * of the clients requested lock start and end alignment. */
65         mask = 0x1000ULL;
66         req_align = (req_end + 1) | req_start;
67         if (req_align != 0) {
68                 while ((req_align & mask) == 0)
69                         mask <<= 1;
70         }
71         mask -= 1;
72         /* We can only shrink the lock, not grow it.
73          * This should never cause lock to be smaller than requested,
74          * since requested lock was already aligned on these boundaries. */
75         new_ex->start = ((new_ex->start - 1) | mask) + 1;
76         new_ex->end = ((new_ex->end + 1) & ~mask) - 1;
77         LASSERTF(new_ex->start <= req_start,
78                  "mask "LPX64" grant start "LPU64" req start "LPU64"\n",
79                  mask, new_ex->start, req_start);
80         LASSERTF(new_ex->end >= req_end,
81                  "mask "LPX64" grant end "LPU64" req end "LPU64"\n",
82                  mask, new_ex->end, req_end);
83 }
84
85 /* The purpose of this function is to return:
86  * - the maximum extent
87  * - containing the requested extent
88  * - and not overlapping existing conflicting extents outside the requested one
89  *
90  * Use interval tree to expand the lock extent for granted lock.
91  */
92 static void ldlm_extent_internal_policy_granted(struct ldlm_lock *req,
93                                                 struct ldlm_extent *new_ex)
94 {
95         struct ldlm_resource *res = req->l_resource;
96         ldlm_mode_t req_mode = req->l_req_mode;
97         __u64 req_start = req->l_req_extent.start;
98         __u64 req_end = req->l_req_extent.end;
99         struct ldlm_interval_tree *tree;
100         struct interval_node_extent limiter = { new_ex->start, new_ex->end };
101         int conflicting = 0;
102         int idx;
103         ENTRY;
104
105         lockmode_verify(req_mode);
106
107         /* using interval tree to handle the ldlm extent granted locks */
108         for (idx = 0; idx < LCK_MODE_NUM; idx++) {
109                 struct interval_node_extent ext = { req_start, req_end };
110
111                 tree = &res->lr_itree[idx];
112                 if (lockmode_compat(tree->lit_mode, req_mode))
113                         continue;
114
115                 conflicting += tree->lit_size;
116                 if (conflicting > 4)
117                         limiter.start = req_start;
118
119                 if (interval_is_overlapped(tree->lit_root, &ext))
120                         printk("req_mode = %d, tree->lit_mode = %d, tree->lit_size = %d\n",
121                                req_mode, tree->lit_mode, tree->lit_size);
122                 interval_expand(tree->lit_root, &ext, &limiter);
123                 limiter.start = max(limiter.start, ext.start);
124                 limiter.end = min(limiter.end, ext.end);
125                 if (limiter.start == req_start && limiter.end == req_end)
126                         break;
127         }
128
129         new_ex->start = limiter.start;
130         new_ex->end = limiter.end;
131         LASSERT(new_ex->start <= req_start);
132         LASSERT(new_ex->end >= req_end);
133
134         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
135         EXIT;
136 }
137
138 /* The purpose of this function is to return:
139  * - the maximum extent
140  * - containing the requested extent
141  * - and not overlapping existing conflicting extents outside the requested one
142  */
143 static void
144 ldlm_extent_internal_policy_waiting(struct ldlm_lock *req,
145                                     struct ldlm_extent *new_ex)
146 {
147         struct list_head *tmp;
148         struct ldlm_resource *res = req->l_resource;
149         ldlm_mode_t req_mode = req->l_req_mode;
150         __u64 req_start = req->l_req_extent.start;
151         __u64 req_end = req->l_req_extent.end;
152         int conflicting = 0;
153         ENTRY;
154
155         lockmode_verify(req_mode);
156
157         /* for waiting locks */
158         list_for_each(tmp, &res->lr_waiting) {
159                 struct ldlm_lock *lock;
160                 struct ldlm_extent *l_extent;
161
162                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
163                 l_extent = &lock->l_policy_data.l_extent;
164
165                 /* We already hit the minimum requested size, search no more */
166                 if (new_ex->start == req_start && new_ex->end == req_end) {
167                         EXIT;
168                         return;
169                 }
170
171                 /* Don't conflict with ourselves */
172                 if (req == lock)
173                         continue;
174
175                 /* Locks are compatible, overlap doesn't matter */
176                 /* Until bug 20 is fixed, try to avoid granting overlapping
177                  * locks on one client (they take a long time to cancel) */
178                 if (lockmode_compat(lock->l_req_mode, req_mode) &&
179                     lock->l_export != req->l_export)
180                         continue;
181
182                 /* If this is a high-traffic lock, don't grow downwards at all
183                  * or grow upwards too much */
184                 ++conflicting;
185                 if (conflicting > 4)
186                         new_ex->start = req_start;
187
188                 /* If lock doesn't overlap new_ex, skip it. */
189                 if (!ldlm_extent_overlap(l_extent, new_ex))
190                         continue;
191
192                 /* Locks conflicting in requested extents and we can't satisfy
193                  * both locks, so ignore it.  Either we will ping-pong this
194                  * extent (we would regardless of what extent we granted) or
195                  * lock is unused and it shouldn't limit our extent growth. */
196                 if (ldlm_extent_overlap(&lock->l_req_extent,&req->l_req_extent))
197                         continue;
198
199                 /* We grow extents downwards only as far as they don't overlap
200                  * with already-granted locks, on the assumtion that clients
201                  * will be writing beyond the initial requested end and would
202                  * then need to enqueue a new lock beyond previous request.
203                  * l_req_extent->end strictly < req_start, checked above. */
204                 if (l_extent->start < req_start && new_ex->start != req_start) {
205                         if (l_extent->end >= req_start)
206                                 new_ex->start = req_start;
207                         else
208                                 new_ex->start = min(l_extent->end+1, req_start);
209                 }
210
211                 /* If we need to cancel this lock anyways because our request
212                  * overlaps the granted lock, we grow up to its requested
213                  * extent start instead of limiting this extent, assuming that
214                  * clients are writing forwards and the lock had over grown
215                  * its extent downwards before we enqueued our request. */
216                 if (l_extent->end > req_end) {
217                         if (l_extent->start <= req_end)
218                                 new_ex->end = max(lock->l_req_extent.start - 1,
219                                                   req_end);
220                         else
221                                 new_ex->end = max(l_extent->start - 1, req_end);
222                 }
223         }
224
225         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
226         EXIT;
227 }
228
229
230 /* In order to determine the largest possible extent we can grant, we need
231  * to scan all of the queues. */
232 static void ldlm_extent_policy(struct ldlm_resource *res,
233                                struct ldlm_lock *lock, int *flags)
234 {
235         struct ldlm_extent new_ex = { .start = 0, .end = OBD_OBJECT_EOF };
236
237         if (lock->l_export == NULL)
238                 /*
239                  * this is local lock taken by server (e.g., as a part of
240                  * OST-side locking, or unlink handling). Expansion doesn't
241                  * make a lot of sense for local locks, because they are
242                  * dropped immediately on operation completion and would only
243                  * conflict with other threads.
244                  */
245                 return;
246
247         if (lock->l_policy_data.l_extent.start == 0 &&
248             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
249                 /* fast-path whole file locks */
250                 return;
251
252         ldlm_extent_internal_policy_granted(lock, &new_ex);
253         ldlm_extent_internal_policy_waiting(lock, &new_ex);
254
255         if (new_ex.start != lock->l_policy_data.l_extent.start ||
256             new_ex.end != lock->l_policy_data.l_extent.end) {
257                 *flags |= LDLM_FL_LOCK_CHANGED;
258                 lock->l_policy_data.l_extent.start = new_ex.start;
259                 lock->l_policy_data.l_extent.end = new_ex.end;
260         }
261 }
262
263 static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
264 {
265         struct ldlm_resource *res = lock->l_resource;
266         cfs_time_t now = cfs_time_current();
267
268         CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
269         if (contended_locks > res->lr_namespace->ns_contended_locks)
270                 res->lr_contention_time = now;
271         return cfs_time_before(now, cfs_time_add(res->lr_contention_time,
272                 cfs_time_seconds(res->lr_namespace->ns_contention_time)));
273 }
274
275 struct ldlm_extent_compat_args {
276         struct list_head *work_list;
277         struct ldlm_lock *lock;
278         ldlm_mode_t mode;
279         int *locks;
280         int *compat;
281 };
282
283 static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
284                                                 void *data)
285 {
286         struct ldlm_extent_compat_args *priv = data;
287         struct ldlm_interval *node = to_ldlm_interval(n);
288         struct ldlm_extent *extent;
289         struct list_head *work_list = priv->work_list;
290         struct ldlm_lock *lock, *enq = priv->lock;
291         ldlm_mode_t mode = priv->mode;
292         int count = 0;
293         ENTRY;
294
295         LASSERT(!list_empty(&node->li_group));
296
297         list_for_each_entry(lock, &node->li_group, l_sl_policy) {
298                 /* interval tree is for granted lock */
299                 LASSERTF(mode == lock->l_granted_mode,
300                          "mode = %s, lock->l_granted_mode = %s\n",
301                          ldlm_lockname[mode],
302                          ldlm_lockname[lock->l_granted_mode]);
303
304                 count++;
305                 if (lock->l_blocking_ast)
306                         ldlm_add_ast_work_item(lock, enq, work_list);
307         }
308         LASSERT(count > 0);
309
310         /* don't count conflicting glimpse locks */
311         extent = ldlm_interval_extent(node);
312         if (!(mode == LCK_PR &&
313             extent->start == 0 && extent->end == OBD_OBJECT_EOF))
314                 *priv->locks += count;
315
316         if (priv->compat)
317                 *priv->compat = 0;
318
319         RETURN(INTERVAL_ITER_CONT);
320 }
321
322 /* Determine if the lock is compatible with all locks on the queue.
323  * We stop walking the queue if we hit ourselves so we don't take
324  * conflicting locks enqueued after us into accound, or we'd wait forever.
325  *
326  * 0 if the lock is not compatible
327  * 1 if the lock is compatible
328  * 2 if this group lock is compatible and requires no further checking
329  * negative error, such as EWOULDBLOCK for group locks
330  */
331 static int
332 ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
333                          int *flags, ldlm_error_t *err,
334                          struct list_head *work_list, int *contended_locks)
335 {
336         struct list_head *tmp;
337         struct ldlm_lock *lock;
338         struct ldlm_resource *res = req->l_resource;
339         ldlm_mode_t req_mode = req->l_req_mode;
340         __u64 req_start = req->l_req_extent.start;
341         __u64 req_end = req->l_req_extent.end;
342         int compat = 1;
343         int scan = 0;
344         int check_contention;
345         ENTRY;
346
347         lockmode_verify(req_mode);
348
349         /* Using interval tree for granted lock */
350         if (queue == &res->lr_granted) {
351                 struct ldlm_interval_tree *tree;
352                 struct ldlm_extent_compat_args data = {.work_list = work_list,
353                                                .lock = req,
354                                                .locks = contended_locks,
355                                                .compat = &compat };
356                 struct interval_node_extent ex = { .start = req_start,
357                                                    .end = req_end };
358                 int idx, rc;
359
360                 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
361                         tree = &res->lr_itree[idx];
362                         if (tree->lit_root == NULL) /* empty tree, skipped */
363                                 continue;
364
365                         data.mode = tree->lit_mode;
366                         if (lockmode_compat(req_mode, tree->lit_mode)) {
367                                 struct ldlm_interval *node;
368                                 struct ldlm_extent *extent;
369
370                                 if (req_mode != LCK_GROUP)
371                                         continue;
372
373                                 /* group lock, grant it immediately if
374                                  * compatible */
375                                 node = to_ldlm_interval(tree->lit_root);
376                                 extent = ldlm_interval_extent(node);
377                                 if (req->l_policy_data.l_extent.gid ==
378                                     extent->gid)
379                                         RETURN(2);
380                         }
381
382                         if (tree->lit_mode == LCK_GROUP) {
383                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
384                                         compat = -EWOULDBLOCK;
385                                         goto destroylock;
386                                 }
387
388                                 *flags |= LDLM_FL_NO_TIMEOUT;
389                                 if (!work_list)
390                                         RETURN(0);
391
392                                 /* if work list is not NULL,add all
393                                    locks in the tree to work list */
394                                 compat = 0;
395                                 interval_iterate(tree->lit_root,
396                                                  ldlm_extent_compat_cb, &data);
397                                 continue;
398                         }
399
400                         if (!work_list) {
401                                 rc = interval_is_overlapped(tree->lit_root,&ex);
402                                 if (rc)
403                                         RETURN(0);
404                         } else {
405                                 interval_search(tree->lit_root, &ex,
406                                                 ldlm_extent_compat_cb, &data);
407                                 if (!list_empty(work_list) && compat)
408                                         compat = 0;
409                         }
410                 }
411         } else {
412                 /* for waiting queue */
413                 list_for_each(tmp, queue) {
414                         check_contention = 1;
415
416                         lock = list_entry(tmp, struct ldlm_lock, l_res_link);
417
418                         if (req == lock)
419                                 break;
420
421                         if (unlikely(scan)) {
422                                 /* We only get here if we are queuing GROUP lock
423                                    and met some incompatible one. The main idea of this
424                                    code is to insert GROUP lock past compatible GROUP
425                                    lock in the waiting queue or if there is not any,
426                                    then in front of first non-GROUP lock */
427                                 if (lock->l_req_mode != LCK_GROUP) {
428                                         /* Ok, we hit non-GROUP lock, there should be no
429                                            more GROUP locks later on, queue in front of
430                                            first non-GROUP lock */
431
432                                         ldlm_resource_insert_lock_after(lock, req);
433                                         list_del_init(&lock->l_res_link);
434                                         ldlm_resource_insert_lock_after(req, lock);
435                                         compat = 0;
436                                         break;
437                                 }
438                                 if (req->l_policy_data.l_extent.gid ==
439                                     lock->l_policy_data.l_extent.gid) {
440                                         /* found it */
441                                         ldlm_resource_insert_lock_after(lock, req);
442                                         compat = 0;
443                                         break;
444                                 }
445                                 continue;
446                         }
447
448                         /* locks are compatible, overlap doesn't matter */
449                         if (lockmode_compat(lock->l_req_mode, req_mode)) {
450                                 if (req_mode == LCK_PR &&
451                                     ((lock->l_policy_data.l_extent.start <=
452                                       req->l_policy_data.l_extent.start) &&
453                                      (lock->l_policy_data.l_extent.end >=
454                                       req->l_policy_data.l_extent.end))) {
455                                         /* If we met a PR lock just like us or wider,
456                                            and nobody down the list conflicted with
457                                            it, that means we can skip processing of
458                                            the rest of the list and safely place
459                                            ourselves at the end of the list, or grant
460                                            (dependent if we met an conflicting locks
461                                            before in the list).
462                                            In case of 1st enqueue only we continue
463                                            traversing if there is something conflicting
464                                            down the list because we need to make sure
465                                            that something is marked as AST_SENT as well,
466                                            in cse of empy worklist we would exit on
467                                            first conflict met. */
468                                         /* There IS a case where such flag is
469                                            not set for a lock, yet it blocks
470                                            something. Luckily for us this is
471                                            only during destroy, so lock is
472                                            exclusive. So here we are safe */
473                                         if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
474                                                 RETURN(compat);
475                                         }
476                                 }
477
478                                 /* non-group locks are compatible, overlap doesn't
479                                    matter */
480                                 if (likely(req_mode != LCK_GROUP))
481                                         continue;
482
483                                 /* If we are trying to get a GROUP lock and there is
484                                    another one of this kind, we need to compare gid */
485                                 if (req->l_policy_data.l_extent.gid ==
486                                     lock->l_policy_data.l_extent.gid) {
487                                         /* If existing lock with matched gid is granted,
488                                            we grant new one too. */
489                                         if (lock->l_req_mode == lock->l_granted_mode)
490                                                 RETURN(2);
491
492                                         /* Otherwise we are scanning queue of waiting
493                                          * locks and it means current request would
494                                          * block along with existing lock (that is
495                                          * already blocked.
496                                          * If we are in nonblocking mode - return
497                                          * immediately */
498                                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
499                                                 compat = -EWOULDBLOCK;
500                                                 goto destroylock;
501                                         }
502                                         /* If this group lock is compatible with another
503                                          * group lock on the waiting list, they must be
504                                          * together in the list, so they can be granted
505                                          * at the same time.  Otherwise the later lock
506                                          * can get stuck behind another, incompatible,
507                                          * lock. */
508                                         ldlm_resource_insert_lock_after(lock, req);
509                                         /* Because 'lock' is not granted, we can stop
510                                          * processing this queue and return immediately.
511                                          * There is no need to check the rest of the
512                                          * list. */
513                                         RETURN(0);
514                                 }
515                         }
516
517                         if (unlikely(req_mode == LCK_GROUP &&
518                                      (lock->l_req_mode != lock->l_granted_mode))) {
519                                 scan = 1;
520                                 compat = 0;
521                                 if (lock->l_req_mode != LCK_GROUP) {
522                                         /* Ok, we hit non-GROUP lock, there should
523                                          * be no more GROUP locks later on, queue in
524                                          * front of first non-GROUP lock */
525
526                                         ldlm_resource_insert_lock_after(lock, req);
527                                         list_del_init(&lock->l_res_link);
528                                         ldlm_resource_insert_lock_after(req, lock);
529                                         break;
530                                 }
531                                 if (req->l_policy_data.l_extent.gid ==
532                                     lock->l_policy_data.l_extent.gid) {
533                                         /* found it */
534                                         ldlm_resource_insert_lock_after(lock, req);
535                                         break;
536                                 }
537                                 continue;
538                         }
539
540                         if (unlikely(lock->l_req_mode == LCK_GROUP)) {
541                                 /* If compared lock is GROUP, then requested is PR/PW/
542                                  * so this is not compatible; extent range does not
543                                  * matter */
544                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
545                                         compat = -EWOULDBLOCK;
546                                         goto destroylock;
547                                 } else {
548                                         *flags |= LDLM_FL_NO_TIMEOUT;
549                                 }
550                         } else if (lock->l_policy_data.l_extent.end < req_start ||
551                                    lock->l_policy_data.l_extent.start > req_end) {
552                                 /* if a non group lock doesn't overlap skip it */
553                                 continue;
554                         } else if (lock->l_req_extent.end < req_start ||
555                                    lock->l_req_extent.start > req_end)
556                                 /* false contention, the requests doesn't really overlap */
557                                 check_contention = 0;
558
559                         if (!work_list)
560                                 RETURN(0);
561
562                         /* don't count conflicting glimpse locks */
563                         if (lock->l_req_mode == LCK_PR &&
564                             lock->l_policy_data.l_extent.start == 0 &&
565                             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
566                                 check_contention = 0;
567
568                         *contended_locks += check_contention;
569
570                         compat = 0;
571                         if (lock->l_blocking_ast)
572                                 ldlm_add_ast_work_item(lock, req, work_list);
573                 }
574         }
575
576         if (ldlm_check_contention(req, *contended_locks) &&
577             compat == 0 &&
578             (*flags & LDLM_FL_DENY_ON_CONTENTION) &&
579             req->l_req_mode != LCK_GROUP &&
580             req_end - req_start <=
581             req->l_resource->lr_namespace->ns_max_nolock_size)
582                 GOTO(destroylock, compat = -EUSERS);
583
584         RETURN(compat);
585 destroylock:
586         list_del_init(&req->l_res_link);
587         ldlm_lock_destroy_nolock(req);
588         *err = compat;
589         RETURN(compat);
590 }
591
592 static void discard_bl_list(struct list_head *bl_list)
593 {
594         struct list_head *tmp, *pos;
595         ENTRY;
596
597         list_for_each_safe(pos, tmp, bl_list) {
598                 struct ldlm_lock *lock =
599                         list_entry(pos, struct ldlm_lock, l_bl_ast);
600
601                 list_del_init(&lock->l_bl_ast);
602                 LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
603                 lock->l_flags &= ~LDLM_FL_AST_SENT;
604                 LASSERT(lock->l_bl_ast_run == 0);
605                 LASSERT(lock->l_blocking_lock);
606                 LDLM_LOCK_PUT(lock->l_blocking_lock);
607                 lock->l_blocking_lock = NULL;
608                 LDLM_LOCK_PUT(lock);
609         }
610         EXIT;
611 }
612
613 /* If first_enq is 0 (ie, called from ldlm_reprocess_queue):
614   *   - blocking ASTs have already been sent
615   *   - must call this function with the ns lock held
616   *
617   * If first_enq is 1 (ie, called from ldlm_lock_enqueue):
618   *   - blocking ASTs have not been sent
619   *   - must call this function with the ns lock held once */
620 int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
621                              ldlm_error_t *err, struct list_head *work_list)
622 {
623         struct ldlm_resource *res = lock->l_resource;
624         struct list_head rpc_list = CFS_LIST_HEAD_INIT(rpc_list);
625         int rc, rc2;
626         int contended_locks = 0;
627         ENTRY;
628
629         LASSERT(list_empty(&res->lr_converting));
630         LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
631                 !(lock->l_flags & LDLM_AST_DISCARD_DATA));
632         check_res_locked(res);
633         *err = ELDLM_OK;
634
635         if (!first_enq) {
636                 /* Careful observers will note that we don't handle -EWOULDBLOCK
637                  * here, but it's ok for a non-obvious reason -- compat_queue
638                  * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
639                  * flags should always be zero here, and if that ever stops
640                  * being true, we want to find out. */
641                 LASSERT(*flags == 0);
642                 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
643                                               err, NULL, &contended_locks);
644                 if (rc == 1) {
645                         rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
646                                                       flags, err, NULL,
647                                                       &contended_locks);
648                 }
649                 if (rc == 0)
650                         RETURN(LDLM_ITER_STOP);
651
652                 ldlm_resource_unlink_lock(lock);
653
654                 if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE))
655                         ldlm_extent_policy(res, lock, flags);
656                 ldlm_grant_lock(lock, work_list);
657                 RETURN(LDLM_ITER_CONTINUE);
658         }
659
660  restart:
661         contended_locks = 0;
662         rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
663                                       &rpc_list, &contended_locks);
664         if (rc < 0)
665                 GOTO(out, rc); /* lock was destroyed */
666         if (rc == 2)
667                 goto grant;
668
669         rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err,
670                                        &rpc_list, &contended_locks);
671         if (rc2 < 0)
672                 GOTO(out, rc = rc2); /* lock was destroyed */
673
674         if (rc + rc2 == 2) {
675         grant:
676                 ldlm_extent_policy(res, lock, flags);
677                 ldlm_resource_unlink_lock(lock);
678                 ldlm_grant_lock(lock, NULL);
679         } else {
680                 /* If either of the compat_queue()s returned failure, then we
681                  * have ASTs to send and must go onto the waiting list.
682                  *
683                  * bug 2322: we used to unlink and re-add here, which was a
684                  * terrible folly -- if we goto restart, we could get
685                  * re-ordered!  Causes deadlock, because ASTs aren't sent! */
686                 if (list_empty(&lock->l_res_link))
687                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
688                 unlock_res(res);
689                 rc = ldlm_run_bl_ast_work(&rpc_list);
690                 lock_res(res);
691
692                 if (rc == -ERESTART) {
693                         /* lock was granted while resource was unlocked. */
694                         if (lock->l_granted_mode == lock->l_req_mode) {
695                                 /* bug 11300: if the lock has been granted,
696                                  * break earlier because otherwise, we will go
697                                  * to restart and ldlm_resource_unlink will be
698                                  * called and it causes the interval node to be
699                                  * freed. Then we will fail at 
700                                  * ldlm_extent_add_lock() */
701                                 *flags &= ~(LDLM_FL_BLOCK_GRANTED | LDLM_FL_BLOCK_CONV |
702                                             LDLM_FL_BLOCK_WAIT);
703                                 GOTO(out, rc = 0);
704                         }
705
706                         GOTO(restart, -ERESTART);
707                 }
708
709                 *flags |= LDLM_FL_BLOCK_GRANTED;
710                 /* this way we force client to wait for the lock
711                  * endlessly once the lock is enqueued -bzzz */
712                 *flags |= LDLM_FL_NO_TIMEOUT;
713
714         }
715         RETURN(0);
716 out:
717         if (!list_empty(&rpc_list)) {
718                 LASSERT(!(lock->l_flags & LDLM_AST_DISCARD_DATA));
719                 discard_bl_list(&rpc_list);
720         }
721         RETURN(rc);
722 }
723
724 /* When a lock is cancelled by a client, the KMS may undergo change if this
725  * is the "highest lock".  This function returns the new KMS value.
726  * Caller must hold ns_lock already.
727  *
728  * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes! */
729 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
730 {
731         struct ldlm_resource *res = lock->l_resource;
732         struct list_head *tmp;
733         struct ldlm_lock *lck;
734         __u64 kms = 0;
735         ENTRY;
736
737         /* don't let another thread in ldlm_extent_shift_kms race in
738          * just after we finish and take our lock into account in its
739          * calculation of the kms */
740         lock->l_flags |= LDLM_FL_KMS_IGNORE;
741
742         list_for_each(tmp, &res->lr_granted) {
743                 lck = list_entry(tmp, struct ldlm_lock, l_res_link);
744
745                 if (lck->l_flags & LDLM_FL_KMS_IGNORE)
746                         continue;
747
748                 if (lck->l_policy_data.l_extent.end >= old_kms)
749                         RETURN(old_kms);
750
751                 /* This extent _has_ to be smaller than old_kms (checked above)
752                  * so kms can only ever be smaller or the same as old_kms. */
753                 if (lck->l_policy_data.l_extent.end + 1 > kms)
754                         kms = lck->l_policy_data.l_extent.end + 1;
755         }
756         LASSERTF(kms <= old_kms, "kms "LPU64" old_kms "LPU64"\n", kms, old_kms);
757
758         RETURN(kms);
759 }
760
761 cfs_mem_cache_t *ldlm_interval_slab;
762 struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock)
763 {
764         struct ldlm_interval *node;
765         ENTRY;
766
767         LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
768         OBD_SLAB_ALLOC(node, ldlm_interval_slab, CFS_ALLOC_IO, sizeof(*node));
769         if (node == NULL)
770                 RETURN(NULL);
771
772         CFS_INIT_LIST_HEAD(&node->li_group);
773         ldlm_interval_attach(node, lock);
774         RETURN(node);
775 }
776
777 void ldlm_interval_free(struct ldlm_interval *node)
778 {
779         if (node) {
780                 LASSERT(list_empty(&node->li_group));
781                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
782         }
783 }
784
785 /* interval tree, for LDLM_EXTENT. */
786 void ldlm_interval_attach(struct ldlm_interval *n,
787                           struct ldlm_lock *l)
788 {
789         LASSERT(l->l_tree_node == NULL);
790         LASSERT(l->l_resource->lr_type == LDLM_EXTENT);
791
792         list_add_tail(&l->l_sl_policy, &n->li_group);
793         l->l_tree_node = n;
794 }
795
796 struct ldlm_interval *ldlm_interval_detach(struct ldlm_lock *l)
797 {
798         struct ldlm_interval *n = l->l_tree_node;
799
800         if (n == NULL)
801                 return NULL;
802
803         LASSERT(!list_empty(&n->li_group));
804         l->l_tree_node = NULL;
805         list_del_init(&l->l_sl_policy);
806
807         return (list_empty(&n->li_group) ? n : NULL);
808 }
809
810 static inline int lock_mode_to_index(ldlm_mode_t mode)
811 {
812         int index;
813
814         LASSERT(mode != 0);
815         LASSERT(IS_PO2(mode));
816         for (index = -1; mode; index++, mode >>= 1) ;
817         LASSERT(index < LCK_MODE_NUM);
818         return index;
819 }
820
821 void ldlm_extent_add_lock(struct ldlm_resource *res,
822                           struct ldlm_lock *lock)
823 {
824         struct interval_node *found, **root;
825         struct ldlm_interval *node;
826         struct ldlm_extent *extent;
827         int idx;
828
829         LASSERT(lock->l_granted_mode == lock->l_req_mode);
830
831         node = lock->l_tree_node;
832         LASSERT(node != NULL);
833
834         idx = lock_mode_to_index(lock->l_granted_mode);
835         LASSERT(lock->l_granted_mode == 1 << idx);
836         LASSERT(lock->l_granted_mode == res->lr_itree[idx].lit_mode);
837
838         /* node extent initialize */
839         extent = &lock->l_policy_data.l_extent;
840         interval_set(&node->li_node, extent->start, extent->end);
841
842         root = &res->lr_itree[idx].lit_root;
843         found = interval_insert(&node->li_node, root);
844         if (found) { /* The policy group found. */
845                 struct ldlm_interval *tmp = ldlm_interval_detach(lock);
846                 LASSERT(tmp != NULL);
847                 ldlm_interval_free(tmp);
848                 ldlm_interval_attach(to_ldlm_interval(found), lock);
849         }
850         res->lr_itree[idx].lit_size++;
851
852         /* even though we use interval tree to manage the extent lock, we also
853          * add the locks into grant list, for debug purpose, .. */
854         ldlm_resource_add_lock(res, &res->lr_granted, lock);
855 }
856
857 void ldlm_extent_unlink_lock(struct ldlm_lock *lock)
858 {
859         struct ldlm_resource *res = lock->l_resource;
860         struct ldlm_interval *node;
861         struct ldlm_interval_tree *tree;
862         int idx;
863
864         if (lock->l_granted_mode != lock->l_req_mode)
865                 return;
866
867         LASSERT(lock->l_tree_node != NULL);
868         idx = lock_mode_to_index(lock->l_granted_mode);
869         LASSERT(lock->l_granted_mode == 1 << idx);
870         tree = &res->lr_itree[idx];
871
872         LASSERT(tree->lit_root != NULL); /* assure the tree is not null */
873
874         tree->lit_size--;
875         node = ldlm_interval_detach(lock);
876         if (node) {
877                 interval_erase(&node->li_node, &tree->lit_root);
878                 ldlm_interval_free(node);
879         }
880 }