Whamcloud - gitweb
b=21815 lustre_hash_rehash_key() should use lh_read_unlock()
[fs/lustre-release.git] / lustre / ldlm / ldlm_extent.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_extent.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LDLM
43 #ifndef __KERNEL__
44 # include <liblustre.h>
45 #endif
46
47 #include <lustre_dlm.h>
48 #include <obd_support.h>
49 #include <obd.h>
50 #include <obd_class.h>
51 #include <lustre_lib.h>
52
53 #include "ldlm_internal.h"
54
55 #define LDLM_MAX_GROWN_EXTENT (32 * 1024 * 1024 - 1)
56
57 /* fixup the ldlm_extent after expanding */
58 static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
59                                               struct ldlm_extent *new_ex,
60                                               int conflicting)
61 {
62         ldlm_mode_t req_mode = req->l_req_mode;
63         __u64 req_start = req->l_req_extent.start;
64         __u64 req_end = req->l_req_extent.end;
65         __u64 req_align, mask;
66  
67         if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) {
68                 if (req_end < req_start + LDLM_MAX_GROWN_EXTENT)
69                         new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT,
70                                           new_ex->end);
71         }
72
73         if (new_ex->start == 0 && new_ex->end == OBD_OBJECT_EOF) {
74                 EXIT;
75                 return;
76         }
77
78         /* we need to ensure that the lock extent is properly aligned to what
79          * the client requested.  We align it to the lowest-common denominator
80          * of the clients requested lock start and end alignment. */
81         mask = 0x1000ULL;
82         req_align = (req_end + 1) | req_start;
83         if (req_align != 0) {
84                 while ((req_align & mask) == 0)
85                         mask <<= 1;
86         }
87         mask -= 1;
88         /* We can only shrink the lock, not grow it.
89          * This should never cause lock to be smaller than requested,
90          * since requested lock was already aligned on these boundaries. */
91         new_ex->start = ((new_ex->start - 1) | mask) + 1;
92         new_ex->end = ((new_ex->end + 1) & ~mask) - 1;
93         LASSERTF(new_ex->start <= req_start,
94                  "mask "LPX64" grant start "LPU64" req start "LPU64"\n",
95                  mask, new_ex->start, req_start);
96         LASSERTF(new_ex->end >= req_end,
97                  "mask "LPX64" grant end "LPU64" req end "LPU64"\n",
98                  mask, new_ex->end, req_end);
99 }
100
101 /* The purpose of this function is to return:
102  * - the maximum extent
103  * - containing the requested extent
104  * - and not overlapping existing conflicting extents outside the requested one
105  *
106  * Use interval tree to expand the lock extent for granted lock.
107  */
108 static void ldlm_extent_internal_policy_granted(struct ldlm_lock *req,
109                                                 struct ldlm_extent *new_ex)
110 {
111         struct ldlm_resource *res = req->l_resource;
112         ldlm_mode_t req_mode = req->l_req_mode;
113         __u64 req_start = req->l_req_extent.start;
114         __u64 req_end = req->l_req_extent.end;
115         struct ldlm_interval_tree *tree;
116         struct interval_node_extent limiter = { new_ex->start, new_ex->end };
117         int conflicting = 0;
118         int idx;
119         ENTRY;
120
121         lockmode_verify(req_mode);
122
123         /* using interval tree to handle the ldlm extent granted locks */
124         for (idx = 0; idx < LCK_MODE_NUM; idx++) {
125                 struct interval_node_extent ext = { req_start, req_end };
126
127                 tree = &res->lr_itree[idx];
128                 if (lockmode_compat(tree->lit_mode, req_mode))
129                         continue;
130
131                 conflicting += tree->lit_size;
132                 if (conflicting > 4)
133                         limiter.start = req_start;
134
135                 if (interval_is_overlapped(tree->lit_root, &ext))
136                         printk("req_mode = %d, tree->lit_mode = %d, tree->lit_size = %d\n",
137                                req_mode, tree->lit_mode, tree->lit_size);
138                 interval_expand(tree->lit_root, &ext, &limiter);
139                 limiter.start = max(limiter.start, ext.start);
140                 limiter.end = min(limiter.end, ext.end);
141                 if (limiter.start == req_start && limiter.end == req_end)
142                         break;
143         }
144
145         new_ex->start = limiter.start;
146         new_ex->end = limiter.end;
147         LASSERT(new_ex->start <= req_start);
148         LASSERT(new_ex->end >= req_end);
149
150         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
151         EXIT;
152 }
153
154 /* The purpose of this function is to return:
155  * - the maximum extent
156  * - containing the requested extent
157  * - and not overlapping existing conflicting extents outside the requested one
158  */
159 static void
160 ldlm_extent_internal_policy_waiting(struct ldlm_lock *req,
161                                     struct ldlm_extent *new_ex)
162 {
163         struct list_head *tmp;
164         struct ldlm_resource *res = req->l_resource;
165         ldlm_mode_t req_mode = req->l_req_mode;
166         __u64 req_start = req->l_req_extent.start;
167         __u64 req_end = req->l_req_extent.end;
168         int conflicting = 0;
169         ENTRY;
170
171         lockmode_verify(req_mode);
172
173         /* for waiting locks */
174         list_for_each(tmp, &res->lr_waiting) {
175                 struct ldlm_lock *lock;
176                 struct ldlm_extent *l_extent;
177
178                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
179                 l_extent = &lock->l_policy_data.l_extent;
180
181                 /* We already hit the minimum requested size, search no more */
182                 if (new_ex->start == req_start && new_ex->end == req_end) {
183                         EXIT;
184                         return;
185                 }
186
187                 /* Don't conflict with ourselves */
188                 if (req == lock)
189                         continue;
190
191                 /* Locks are compatible, overlap doesn't matter */
192                 /* Until bug 20 is fixed, try to avoid granting overlapping
193                  * locks on one client (they take a long time to cancel) */
194                 if (lockmode_compat(lock->l_req_mode, req_mode) &&
195                     lock->l_export != req->l_export)
196                         continue;
197
198                 /* If this is a high-traffic lock, don't grow downwards at all
199                  * or grow upwards too much */
200                 ++conflicting;
201                 if (conflicting > 4)
202                         new_ex->start = req_start;
203
204                 /* If lock doesn't overlap new_ex, skip it. */
205                 if (!ldlm_extent_overlap(l_extent, new_ex))
206                         continue;
207
208                 /* Locks conflicting in requested extents and we can't satisfy
209                  * both locks, so ignore it.  Either we will ping-pong this
210                  * extent (we would regardless of what extent we granted) or
211                  * lock is unused and it shouldn't limit our extent growth. */
212                 if (ldlm_extent_overlap(&lock->l_req_extent,&req->l_req_extent))
213                         continue;
214
215                 /* We grow extents downwards only as far as they don't overlap
216                  * with already-granted locks, on the assumtion that clients
217                  * will be writing beyond the initial requested end and would
218                  * then need to enqueue a new lock beyond previous request.
219                  * l_req_extent->end strictly < req_start, checked above. */
220                 if (l_extent->start < req_start && new_ex->start != req_start) {
221                         if (l_extent->end >= req_start)
222                                 new_ex->start = req_start;
223                         else
224                                 new_ex->start = min(l_extent->end+1, req_start);
225                 }
226
227                 /* If we need to cancel this lock anyways because our request
228                  * overlaps the granted lock, we grow up to its requested
229                  * extent start instead of limiting this extent, assuming that
230                  * clients are writing forwards and the lock had over grown
231                  * its extent downwards before we enqueued our request. */
232                 if (l_extent->end > req_end) {
233                         if (l_extent->start <= req_end)
234                                 new_ex->end = max(lock->l_req_extent.start - 1,
235                                                   req_end);
236                         else
237                                 new_ex->end = max(l_extent->start - 1, req_end);
238                 }
239         }
240
241         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
242         EXIT;
243 }
244
245
246 /* In order to determine the largest possible extent we can grant, we need
247  * to scan all of the queues. */
248 static void ldlm_extent_policy(struct ldlm_resource *res,
249                                struct ldlm_lock *lock, int *flags)
250 {
251         struct ldlm_extent new_ex = { .start = 0, .end = OBD_OBJECT_EOF };
252
253         if (lock->l_export == NULL)
254                 /*
255                  * this is local lock taken by server (e.g., as a part of
256                  * OST-side locking, or unlink handling). Expansion doesn't
257                  * make a lot of sense for local locks, because they are
258                  * dropped immediately on operation completion and would only
259                  * conflict with other threads.
260                  */
261                 return;
262
263         if (lock->l_policy_data.l_extent.start == 0 &&
264             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
265                 /* fast-path whole file locks */
266                 return;
267
268         ldlm_extent_internal_policy_granted(lock, &new_ex);
269         ldlm_extent_internal_policy_waiting(lock, &new_ex);
270
271         if (new_ex.start != lock->l_policy_data.l_extent.start ||
272             new_ex.end != lock->l_policy_data.l_extent.end) {
273                 *flags |= LDLM_FL_LOCK_CHANGED;
274                 lock->l_policy_data.l_extent.start = new_ex.start;
275                 lock->l_policy_data.l_extent.end = new_ex.end;
276         }
277 }
278
279 static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
280 {
281         struct ldlm_resource *res = lock->l_resource;
282         cfs_time_t now = cfs_time_current();
283
284         CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
285         if (contended_locks > res->lr_namespace->ns_contended_locks)
286                 res->lr_contention_time = now;
287         return cfs_time_before(now, cfs_time_add(res->lr_contention_time,
288                 cfs_time_seconds(res->lr_namespace->ns_contention_time)));
289 }
290
291 struct ldlm_extent_compat_args {
292         struct list_head *work_list;
293         struct ldlm_lock *lock;
294         ldlm_mode_t mode;
295         int *locks;
296         int *compat;
297 };
298
299 static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
300                                                 void *data)
301 {
302         struct ldlm_extent_compat_args *priv = data;
303         struct ldlm_interval *node = to_ldlm_interval(n);
304         struct ldlm_extent *extent;
305         struct list_head *work_list = priv->work_list;
306         struct ldlm_lock *lock, *enq = priv->lock;
307         ldlm_mode_t mode = priv->mode;
308         int count = 0;
309         ENTRY;
310
311         LASSERT(!list_empty(&node->li_group));
312
313         list_for_each_entry(lock, &node->li_group, l_sl_policy) {
314                 /* interval tree is for granted lock */
315                 LASSERTF(mode == lock->l_granted_mode,
316                          "mode = %s, lock->l_granted_mode = %s\n",
317                          ldlm_lockname[mode],
318                          ldlm_lockname[lock->l_granted_mode]);
319
320                 count++;
321                 if (lock->l_blocking_ast)
322                         ldlm_add_ast_work_item(lock, enq, work_list);
323         }
324         LASSERT(count > 0);
325
326         /* don't count conflicting glimpse locks */
327         extent = ldlm_interval_extent(node);
328         if (!(mode == LCK_PR &&
329             extent->start == 0 && extent->end == OBD_OBJECT_EOF))
330                 *priv->locks += count;
331
332         if (priv->compat)
333                 *priv->compat = 0;
334
335         RETURN(INTERVAL_ITER_CONT);
336 }
337
338 /* Determine if the lock is compatible with all locks on the queue.
339  * We stop walking the queue if we hit ourselves so we don't take
340  * conflicting locks enqueued after us into accound, or we'd wait forever.
341  *
342  * 0 if the lock is not compatible
343  * 1 if the lock is compatible
344  * 2 if this group lock is compatible and requires no further checking
345  * negative error, such as EWOULDBLOCK for group locks
346  */
347 static int
348 ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
349                          int *flags, ldlm_error_t *err,
350                          struct list_head *work_list, int *contended_locks)
351 {
352         struct list_head *tmp;
353         struct ldlm_lock *lock;
354         struct ldlm_resource *res = req->l_resource;
355         ldlm_mode_t req_mode = req->l_req_mode;
356         __u64 req_start = req->l_req_extent.start;
357         __u64 req_end = req->l_req_extent.end;
358         int compat = 1;
359         int scan = 0;
360         int check_contention;
361         ENTRY;
362
363         lockmode_verify(req_mode);
364
365         /* Using interval tree for granted lock */
366         if (queue == &res->lr_granted) {
367                 struct ldlm_interval_tree *tree;
368                 struct ldlm_extent_compat_args data = {.work_list = work_list,
369                                                .lock = req,
370                                                .locks = contended_locks,
371                                                .compat = &compat };
372                 struct interval_node_extent ex = { .start = req_start,
373                                                    .end = req_end };
374                 int idx, rc;
375
376                 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
377                         tree = &res->lr_itree[idx];
378                         if (tree->lit_root == NULL) /* empty tree, skipped */
379                                 continue;
380
381                         data.mode = tree->lit_mode;
382                         if (lockmode_compat(req_mode, tree->lit_mode)) {
383                                 struct ldlm_interval *node;
384                                 struct ldlm_extent *extent;
385
386                                 if (req_mode != LCK_GROUP)
387                                         continue;
388
389                                 /* group lock, grant it immediately if
390                                  * compatible */
391                                 node = to_ldlm_interval(tree->lit_root);
392                                 extent = ldlm_interval_extent(node);
393                                 if (req->l_policy_data.l_extent.gid ==
394                                     extent->gid)
395                                         RETURN(2);
396                         }
397
398                         if (tree->lit_mode == LCK_GROUP) {
399                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
400                                         compat = -EWOULDBLOCK;
401                                         goto destroylock;
402                                 }
403
404                                 *flags |= LDLM_FL_NO_TIMEOUT;
405                                 if (!work_list)
406                                         RETURN(0);
407
408                                 /* if work list is not NULL,add all
409                                    locks in the tree to work list */
410                                 compat = 0;
411                                 interval_iterate(tree->lit_root,
412                                                  ldlm_extent_compat_cb, &data);
413                                 continue;
414                         }
415
416                         if (!work_list) {
417                                 rc = interval_is_overlapped(tree->lit_root,&ex);
418                                 if (rc)
419                                         RETURN(0);
420                         } else {
421                                 interval_search(tree->lit_root, &ex,
422                                                 ldlm_extent_compat_cb, &data);
423                                 if (!list_empty(work_list) && compat)
424                                         compat = 0;
425                         }
426                 }
427         } else {
428                 /* for waiting queue */
429                 list_for_each(tmp, queue) {
430                         check_contention = 1;
431
432                         lock = list_entry(tmp, struct ldlm_lock, l_res_link);
433
434                         if (req == lock)
435                                 break;
436
437                         if (unlikely(scan)) {
438                                 /* We only get here if we are queuing GROUP lock
439                                    and met some incompatible one. The main idea of this
440                                    code is to insert GROUP lock past compatible GROUP
441                                    lock in the waiting queue or if there is not any,
442                                    then in front of first non-GROUP lock */
443                                 if (lock->l_req_mode != LCK_GROUP) {
444                                         /* Ok, we hit non-GROUP lock, there should be no
445                                            more GROUP locks later on, queue in front of
446                                            first non-GROUP lock */
447
448                                         ldlm_resource_insert_lock_after(lock, req);
449                                         list_del_init(&lock->l_res_link);
450                                         ldlm_resource_insert_lock_after(req, lock);
451                                         compat = 0;
452                                         break;
453                                 }
454                                 if (req->l_policy_data.l_extent.gid ==
455                                     lock->l_policy_data.l_extent.gid) {
456                                         /* found it */
457                                         ldlm_resource_insert_lock_after(lock, req);
458                                         compat = 0;
459                                         break;
460                                 }
461                                 continue;
462                         }
463
464                         /* locks are compatible, overlap doesn't matter */
465                         if (lockmode_compat(lock->l_req_mode, req_mode)) {
466                                 if (req_mode == LCK_PR &&
467                                     ((lock->l_policy_data.l_extent.start <=
468                                       req->l_policy_data.l_extent.start) &&
469                                      (lock->l_policy_data.l_extent.end >=
470                                       req->l_policy_data.l_extent.end))) {
471                                         /* If we met a PR lock just like us or wider,
472                                            and nobody down the list conflicted with
473                                            it, that means we can skip processing of
474                                            the rest of the list and safely place
475                                            ourselves at the end of the list, or grant
476                                            (dependent if we met an conflicting locks
477                                            before in the list).
478                                            In case of 1st enqueue only we continue
479                                            traversing if there is something conflicting
480                                            down the list because we need to make sure
481                                            that something is marked as AST_SENT as well,
482                                            in cse of empy worklist we would exit on
483                                            first conflict met. */
484                                         /* There IS a case where such flag is
485                                            not set for a lock, yet it blocks
486                                            something. Luckily for us this is
487                                            only during destroy, so lock is
488                                            exclusive. So here we are safe */
489                                         if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
490                                                 RETURN(compat);
491                                         }
492                                 }
493
494                                 /* non-group locks are compatible, overlap doesn't
495                                    matter */
496                                 if (likely(req_mode != LCK_GROUP))
497                                         continue;
498
499                                 /* If we are trying to get a GROUP lock and there is
500                                    another one of this kind, we need to compare gid */
501                                 if (req->l_policy_data.l_extent.gid ==
502                                     lock->l_policy_data.l_extent.gid) {
503                                         /* If existing lock with matched gid is granted,
504                                            we grant new one too. */
505                                         if (lock->l_req_mode == lock->l_granted_mode)
506                                                 RETURN(2);
507
508                                         /* Otherwise we are scanning queue of waiting
509                                          * locks and it means current request would
510                                          * block along with existing lock (that is
511                                          * already blocked.
512                                          * If we are in nonblocking mode - return
513                                          * immediately */
514                                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
515                                                 compat = -EWOULDBLOCK;
516                                                 goto destroylock;
517                                         }
518                                         /* If this group lock is compatible with another
519                                          * group lock on the waiting list, they must be
520                                          * together in the list, so they can be granted
521                                          * at the same time.  Otherwise the later lock
522                                          * can get stuck behind another, incompatible,
523                                          * lock. */
524                                         ldlm_resource_insert_lock_after(lock, req);
525                                         /* Because 'lock' is not granted, we can stop
526                                          * processing this queue and return immediately.
527                                          * There is no need to check the rest of the
528                                          * list. */
529                                         RETURN(0);
530                                 }
531                         }
532
533                         if (unlikely(req_mode == LCK_GROUP &&
534                                      (lock->l_req_mode != lock->l_granted_mode))) {
535                                 scan = 1;
536                                 compat = 0;
537                                 if (lock->l_req_mode != LCK_GROUP) {
538                                         /* Ok, we hit non-GROUP lock, there should
539                                          * be no more GROUP locks later on, queue in
540                                          * front of first non-GROUP lock */
541
542                                         ldlm_resource_insert_lock_after(lock, req);
543                                         list_del_init(&lock->l_res_link);
544                                         ldlm_resource_insert_lock_after(req, lock);
545                                         break;
546                                 }
547                                 if (req->l_policy_data.l_extent.gid ==
548                                     lock->l_policy_data.l_extent.gid) {
549                                         /* found it */
550                                         ldlm_resource_insert_lock_after(lock, req);
551                                         break;
552                                 }
553                                 continue;
554                         }
555
556                         if (unlikely(lock->l_req_mode == LCK_GROUP)) {
557                                 /* If compared lock is GROUP, then requested is PR/PW/
558                                  * so this is not compatible; extent range does not
559                                  * matter */
560                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
561                                         compat = -EWOULDBLOCK;
562                                         goto destroylock;
563                                 } else {
564                                         *flags |= LDLM_FL_NO_TIMEOUT;
565                                 }
566                         } else if (lock->l_policy_data.l_extent.end < req_start ||
567                                    lock->l_policy_data.l_extent.start > req_end) {
568                                 /* if a non group lock doesn't overlap skip it */
569                                 continue;
570                         } else if (lock->l_req_extent.end < req_start ||
571                                    lock->l_req_extent.start > req_end)
572                                 /* false contention, the requests doesn't really overlap */
573                                 check_contention = 0;
574
575                         if (!work_list)
576                                 RETURN(0);
577
578                         /* don't count conflicting glimpse locks */
579                         if (lock->l_req_mode == LCK_PR &&
580                             lock->l_policy_data.l_extent.start == 0 &&
581                             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
582                                 check_contention = 0;
583
584                         *contended_locks += check_contention;
585
586                         compat = 0;
587                         if (lock->l_blocking_ast)
588                                 ldlm_add_ast_work_item(lock, req, work_list);
589                 }
590         }
591
592         if (ldlm_check_contention(req, *contended_locks) &&
593             compat == 0 &&
594             (*flags & LDLM_FL_DENY_ON_CONTENTION) &&
595             req->l_req_mode != LCK_GROUP &&
596             req_end - req_start <=
597             req->l_resource->lr_namespace->ns_max_nolock_size)
598                 GOTO(destroylock, compat = -EUSERS);
599
600         RETURN(compat);
601 destroylock:
602         list_del_init(&req->l_res_link);
603         ldlm_lock_destroy_nolock(req);
604         *err = compat;
605         RETURN(compat);
606 }
607
608 static void discard_bl_list(struct list_head *bl_list)
609 {
610         struct list_head *tmp, *pos;
611         ENTRY;
612
613         list_for_each_safe(pos, tmp, bl_list) {
614                 struct ldlm_lock *lock =
615                         list_entry(pos, struct ldlm_lock, l_bl_ast);
616
617                 list_del_init(&lock->l_bl_ast);
618                 LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
619                 lock->l_flags &= ~LDLM_FL_AST_SENT;
620                 LASSERT(lock->l_bl_ast_run == 0);
621                 LASSERT(lock->l_blocking_lock);
622                 LDLM_LOCK_PUT(lock->l_blocking_lock);
623                 lock->l_blocking_lock = NULL;
624                 LDLM_LOCK_PUT(lock);
625         }
626         EXIT;
627 }
628
629 /* If first_enq is 0 (ie, called from ldlm_reprocess_queue):
630   *   - blocking ASTs have already been sent
631   *   - must call this function with the ns lock held
632   *
633   * If first_enq is 1 (ie, called from ldlm_lock_enqueue):
634   *   - blocking ASTs have not been sent
635   *   - must call this function with the ns lock held once */
636 int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
637                              ldlm_error_t *err, struct list_head *work_list)
638 {
639         struct ldlm_resource *res = lock->l_resource;
640         struct list_head rpc_list = CFS_LIST_HEAD_INIT(rpc_list);
641         int rc, rc2;
642         int contended_locks = 0;
643         ENTRY;
644
645         LASSERT(list_empty(&res->lr_converting));
646         LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
647                 !(lock->l_flags & LDLM_AST_DISCARD_DATA));
648         check_res_locked(res);
649         *err = ELDLM_OK;
650
651         if (!first_enq) {
652                 /* Careful observers will note that we don't handle -EWOULDBLOCK
653                  * here, but it's ok for a non-obvious reason -- compat_queue
654                  * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
655                  * flags should always be zero here, and if that ever stops
656                  * being true, we want to find out. */
657                 LASSERT(*flags == 0);
658                 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
659                                               err, NULL, &contended_locks);
660                 if (rc == 1) {
661                         rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
662                                                       flags, err, NULL,
663                                                       &contended_locks);
664                 }
665                 if (rc == 0)
666                         RETURN(LDLM_ITER_STOP);
667
668                 ldlm_resource_unlink_lock(lock);
669
670                 if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE))
671                         ldlm_extent_policy(res, lock, flags);
672                 ldlm_grant_lock(lock, work_list);
673                 RETURN(LDLM_ITER_CONTINUE);
674         }
675
676  restart:
677         contended_locks = 0;
678         rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
679                                       &rpc_list, &contended_locks);
680         if (rc < 0)
681                 GOTO(out, rc); /* lock was destroyed */
682         if (rc == 2)
683                 goto grant;
684
685         rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err,
686                                        &rpc_list, &contended_locks);
687         if (rc2 < 0)
688                 GOTO(out, rc = rc2); /* lock was destroyed */
689
690         if (rc + rc2 == 2) {
691         grant:
692                 ldlm_extent_policy(res, lock, flags);
693                 ldlm_resource_unlink_lock(lock);
694                 ldlm_grant_lock(lock, NULL);
695         } else {
696                 /* If either of the compat_queue()s returned failure, then we
697                  * have ASTs to send and must go onto the waiting list.
698                  *
699                  * bug 2322: we used to unlink and re-add here, which was a
700                  * terrible folly -- if we goto restart, we could get
701                  * re-ordered!  Causes deadlock, because ASTs aren't sent! */
702                 if (list_empty(&lock->l_res_link))
703                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
704                 unlock_res(res);
705
706                 rc = ldlm_run_bl_ast_work(&rpc_list);
707
708                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_FAIL_RACE) &&
709                     !ns_is_client(res->lr_namespace))
710                         class_fail_export(lock->l_export);
711
712                 lock_res(res);
713                 if (rc == -ERESTART) {
714                         /* 15715: The lock was granted and destroyed after
715                          * resource lock was dropped. Interval node was freed
716                          * in ldlm_lock_destroy. Anyway, this always happens
717                          * when a client is being evicted. So it would be
718                          * ok to return an error. -jay */
719                         if (lock->l_destroyed) {
720                                 *err = -EAGAIN;
721                                 GOTO(out, rc = -EAGAIN);
722                         }
723
724                         /* lock was granted while resource was unlocked. */
725                         if (lock->l_granted_mode == lock->l_req_mode) {
726                                 /* bug 11300: if the lock has been granted,
727                                  * break earlier because otherwise, we will go
728                                  * to restart and ldlm_resource_unlink will be
729                                  * called and it causes the interval node to be
730                                  * freed. Then we will fail at
731                                  * ldlm_extent_add_lock() */
732                                 *flags &= ~(LDLM_FL_BLOCK_GRANTED | LDLM_FL_BLOCK_CONV |
733                                             LDLM_FL_BLOCK_WAIT);
734                                 GOTO(out, rc = 0);
735                         }
736
737                         GOTO(restart, -ERESTART);
738                 }
739
740                 *flags |= LDLM_FL_BLOCK_GRANTED;
741                 /* this way we force client to wait for the lock
742                  * endlessly once the lock is enqueued -bzzz */
743                 *flags |= LDLM_FL_NO_TIMEOUT;
744
745         }
746         RETURN(0);
747 out:
748         if (!list_empty(&rpc_list)) {
749                 LASSERT(!(lock->l_flags & LDLM_AST_DISCARD_DATA));
750                 discard_bl_list(&rpc_list);
751         }
752         RETURN(rc);
753 }
754
755 /* When a lock is cancelled by a client, the KMS may undergo change if this
756  * is the "highest lock".  This function returns the new KMS value.
757  * Caller must hold ns_lock already.
758  *
759  * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes! */
760 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
761 {
762         struct ldlm_resource *res = lock->l_resource;
763         struct list_head *tmp;
764         struct ldlm_lock *lck;
765         __u64 kms = 0;
766         ENTRY;
767
768         /* don't let another thread in ldlm_extent_shift_kms race in
769          * just after we finish and take our lock into account in its
770          * calculation of the kms */
771         lock->l_flags |= LDLM_FL_KMS_IGNORE;
772
773         list_for_each(tmp, &res->lr_granted) {
774                 lck = list_entry(tmp, struct ldlm_lock, l_res_link);
775
776                 if (lck->l_flags & LDLM_FL_KMS_IGNORE)
777                         continue;
778
779                 if (lck->l_policy_data.l_extent.end >= old_kms)
780                         RETURN(old_kms);
781
782                 /* This extent _has_ to be smaller than old_kms (checked above)
783                  * so kms can only ever be smaller or the same as old_kms. */
784                 if (lck->l_policy_data.l_extent.end + 1 > kms)
785                         kms = lck->l_policy_data.l_extent.end + 1;
786         }
787         LASSERTF(kms <= old_kms, "kms "LPU64" old_kms "LPU64"\n", kms, old_kms);
788
789         RETURN(kms);
790 }
791
792 cfs_mem_cache_t *ldlm_interval_slab;
793 struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock)
794 {
795         struct ldlm_interval *node;
796         ENTRY;
797
798         LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
799         OBD_SLAB_ALLOC(node, ldlm_interval_slab, CFS_ALLOC_IO, sizeof(*node));
800         if (node == NULL)
801                 RETURN(NULL);
802
803         CFS_INIT_LIST_HEAD(&node->li_group);
804         ldlm_interval_attach(node, lock);
805         RETURN(node);
806 }
807
808 void ldlm_interval_free(struct ldlm_interval *node)
809 {
810         if (node) {
811                 LASSERT(list_empty(&node->li_group));
812                 LASSERT(!interval_is_intree(&node->li_node));
813                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
814         }
815 }
816
817 /* interval tree, for LDLM_EXTENT. */
818 void ldlm_interval_attach(struct ldlm_interval *n,
819                           struct ldlm_lock *l)
820 {
821         LASSERT(l->l_tree_node == NULL);
822         LASSERT(l->l_resource->lr_type == LDLM_EXTENT);
823
824         list_add_tail(&l->l_sl_policy, &n->li_group);
825         l->l_tree_node = n;
826 }
827
828 struct ldlm_interval *ldlm_interval_detach(struct ldlm_lock *l)
829 {
830         struct ldlm_interval *n = l->l_tree_node;
831
832         if (n == NULL)
833                 return NULL;
834
835         LASSERT(!list_empty(&n->li_group));
836         l->l_tree_node = NULL;
837         list_del_init(&l->l_sl_policy);
838
839         return (list_empty(&n->li_group) ? n : NULL);
840 }
841
842 static inline int lock_mode_to_index(ldlm_mode_t mode)
843 {
844         int index;
845
846         LASSERT(mode != 0);
847         LASSERT(IS_PO2(mode));
848         for (index = -1; mode; index++, mode >>= 1) ;
849         LASSERT(index < LCK_MODE_NUM);
850         return index;
851 }
852
853 void ldlm_extent_add_lock(struct ldlm_resource *res,
854                           struct ldlm_lock *lock)
855 {
856         struct interval_node *found, **root;
857         struct ldlm_interval *node;
858         struct ldlm_extent *extent;
859         int idx;
860
861         LASSERT(lock->l_granted_mode == lock->l_req_mode);
862         LASSERT(!lock->l_destroyed);
863
864         node = lock->l_tree_node;
865         LASSERT(node != NULL);
866         LASSERT(!interval_is_intree(&node->li_node));
867
868         idx = lock_mode_to_index(lock->l_granted_mode);
869         LASSERT(lock->l_granted_mode == 1 << idx);
870         LASSERT(lock->l_granted_mode == res->lr_itree[idx].lit_mode);
871
872         /* node extent initialize */
873         extent = &lock->l_policy_data.l_extent;
874         interval_set(&node->li_node, extent->start, extent->end);
875
876         root = &res->lr_itree[idx].lit_root;
877         found = interval_insert(&node->li_node, root);
878         if (found) { /* The policy group found. */
879                 struct ldlm_interval *tmp = ldlm_interval_detach(lock);
880                 LASSERT(tmp != NULL);
881                 ldlm_interval_free(tmp);
882                 ldlm_interval_attach(to_ldlm_interval(found), lock);
883         }
884         res->lr_itree[idx].lit_size++;
885
886         /* even though we use interval tree to manage the extent lock, we also
887          * add the locks into grant list, for debug purpose, .. */
888         ldlm_resource_add_lock(res, &res->lr_granted, lock);
889 }
890
891 void ldlm_extent_unlink_lock(struct ldlm_lock *lock)
892 {
893         struct ldlm_resource *res = lock->l_resource;
894         struct ldlm_interval *node = lock->l_tree_node;
895         struct ldlm_interval_tree *tree;
896         int idx;
897
898         if (!node || !interval_is_intree(&node->li_node)) /* duplicate unlink */
899                 return;
900
901         idx = lock_mode_to_index(lock->l_granted_mode);
902         LASSERT(lock->l_granted_mode == 1 << idx);
903         tree = &res->lr_itree[idx];
904
905         LASSERT(tree->lit_root != NULL); /* assure the tree is not null */
906
907         tree->lit_size--;
908         node = ldlm_interval_detach(lock);
909         if (node) {
910                 interval_erase(&node->li_node, &tree->lit_root);
911                 ldlm_interval_free(node);
912         }
913 }