Whamcloud - gitweb
b=20748
[fs/lustre-release.git] / lustre / ldlm / ldlm_extent.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_extent.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LDLM
43 #ifndef __KERNEL__
44 # include <liblustre.h>
45 #else
46 # include <libcfs/libcfs.h>
47 #endif
48
49 #include <lustre_dlm.h>
50 #include <obd_support.h>
51 #include <obd.h>
52 #include <obd_class.h>
53 #include <lustre_lib.h>
54
55 #include "ldlm_internal.h"
56
57 #define LDLM_MAX_GROWN_EXTENT (32 * 1024 * 1024 - 1)
58
59 /* fixup the ldlm_extent after expanding */
60 static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
61                                               struct ldlm_extent *new_ex,
62                                               int conflicting)
63 {
64         ldlm_mode_t req_mode = req->l_req_mode;
65         __u64 req_start = req->l_req_extent.start;
66         __u64 req_end = req->l_req_extent.end;
67         __u64 req_align, mask;
68
69         if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) {
70                 if (req_end < req_start + LDLM_MAX_GROWN_EXTENT)
71                         new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT,
72                                           new_ex->end);
73         }
74
75         if (new_ex->start == 0 && new_ex->end == OBD_OBJECT_EOF) {
76                 EXIT;
77                 return;
78         }
79
80         /* we need to ensure that the lock extent is properly aligned to what
81          * the client requested.  We align it to the lowest-common denominator
82          * of the clients requested lock start and end alignment. */
83         mask = 0x1000ULL;
84         req_align = (req_end + 1) | req_start;
85         if (req_align != 0) {
86                 while ((req_align & mask) == 0)
87                         mask <<= 1;
88         }
89         mask -= 1;
90         /* We can only shrink the lock, not grow it.
91          * This should never cause lock to be smaller than requested,
92          * since requested lock was already aligned on these boundaries. */
93         new_ex->start = ((new_ex->start - 1) | mask) + 1;
94         new_ex->end = ((new_ex->end + 1) & ~mask) - 1;
95         LASSERTF(new_ex->start <= req_start,
96                  "mask "LPX64" grant start "LPU64" req start "LPU64"\n",
97                  mask, new_ex->start, req_start);
98         LASSERTF(new_ex->end >= req_end,
99                  "mask "LPX64" grant end "LPU64" req end "LPU64"\n",
100                  mask, new_ex->end, req_end);
101 }
102
103 /* The purpose of this function is to return:
104  * - the maximum extent
105  * - containing the requested extent
106  * - and not overlapping existing conflicting extents outside the requested one
107  *
108  * Use interval tree to expand the lock extent for granted lock.
109  */
110 static void ldlm_extent_internal_policy_granted(struct ldlm_lock *req,
111                                                 struct ldlm_extent *new_ex)
112 {
113         struct ldlm_resource *res = req->l_resource;
114         ldlm_mode_t req_mode = req->l_req_mode;
115         __u64 req_start = req->l_req_extent.start;
116         __u64 req_end = req->l_req_extent.end;
117         struct ldlm_interval_tree *tree;
118         struct interval_node_extent limiter = { new_ex->start, new_ex->end };
119         int conflicting = 0;
120         int idx;
121         ENTRY;
122
123         lockmode_verify(req_mode);
124
125         /* using interval tree to handle the ldlm extent granted locks */
126         for (idx = 0; idx < LCK_MODE_NUM; idx++) {
127                 struct interval_node_extent ext = { req_start, req_end };
128
129                 tree = &res->lr_itree[idx];
130                 if (lockmode_compat(tree->lit_mode, req_mode))
131                         continue;
132
133                 conflicting += tree->lit_size;
134                 if (conflicting > 4)
135                         limiter.start = req_start;
136
137                 if (interval_is_overlapped(tree->lit_root, &ext))
138                         CDEBUG(D_INFO, 
139                                "req_mode = %d, tree->lit_mode = %d, "
140                                "tree->lit_size = %d\n",
141                                req_mode, tree->lit_mode, tree->lit_size);
142                 interval_expand(tree->lit_root, &ext, &limiter);
143                 limiter.start = max(limiter.start, ext.start);
144                 limiter.end = min(limiter.end, ext.end);
145                 if (limiter.start == req_start && limiter.end == req_end)
146                         break;
147         }
148
149         new_ex->start = limiter.start;
150         new_ex->end = limiter.end;
151         LASSERT(new_ex->start <= req_start);
152         LASSERT(new_ex->end >= req_end);
153
154         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
155         EXIT;
156 }
157
158 /* The purpose of this function is to return:
159  * - the maximum extent
160  * - containing the requested extent
161  * - and not overlapping existing conflicting extents outside the requested one
162  */
163 static void
164 ldlm_extent_internal_policy_waiting(struct ldlm_lock *req,
165                                     struct ldlm_extent *new_ex)
166 {
167         struct list_head *tmp;
168         struct ldlm_resource *res = req->l_resource;
169         ldlm_mode_t req_mode = req->l_req_mode;
170         __u64 req_start = req->l_req_extent.start;
171         __u64 req_end = req->l_req_extent.end;
172         int conflicting = 0;
173         ENTRY;
174
175         lockmode_verify(req_mode);
176
177         /* for waiting locks */
178         list_for_each(tmp, &res->lr_waiting) {
179                 struct ldlm_lock *lock;
180                 struct ldlm_extent *l_extent;
181
182                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
183                 l_extent = &lock->l_policy_data.l_extent;
184
185                 /* We already hit the minimum requested size, search no more */
186                 if (new_ex->start == req_start && new_ex->end == req_end) {
187                         EXIT;
188                         return;
189                 }
190
191                 /* Don't conflict with ourselves */
192                 if (req == lock)
193                         continue;
194
195                 /* Locks are compatible, overlap doesn't matter */
196                 /* Until bug 20 is fixed, try to avoid granting overlapping
197                  * locks on one client (they take a long time to cancel) */
198                 if (lockmode_compat(lock->l_req_mode, req_mode) &&
199                     lock->l_export != req->l_export)
200                         continue;
201
202                 /* If this is a high-traffic lock, don't grow downwards at all
203                  * or grow upwards too much */
204                 ++conflicting;
205                 if (conflicting > 4)
206                         new_ex->start = req_start;
207
208                 /* If lock doesn't overlap new_ex, skip it. */
209                 if (!ldlm_extent_overlap(l_extent, new_ex))
210                         continue;
211
212                 /* Locks conflicting in requested extents and we can't satisfy
213                  * both locks, so ignore it.  Either we will ping-pong this
214                  * extent (we would regardless of what extent we granted) or
215                  * lock is unused and it shouldn't limit our extent growth. */
216                 if (ldlm_extent_overlap(&lock->l_req_extent,&req->l_req_extent))
217                         continue;
218
219                 /* We grow extents downwards only as far as they don't overlap
220                  * with already-granted locks, on the assumption that clients
221                  * will be writing beyond the initial requested end and would
222                  * then need to enqueue a new lock beyond previous request.
223                  * l_req_extent->end strictly < req_start, checked above. */
224                 if (l_extent->start < req_start && new_ex->start != req_start) {
225                         if (l_extent->end >= req_start)
226                                 new_ex->start = req_start;
227                         else
228                                 new_ex->start = min(l_extent->end+1, req_start);
229                 }
230
231                 /* If we need to cancel this lock anyways because our request
232                  * overlaps the granted lock, we grow up to its requested
233                  * extent start instead of limiting this extent, assuming that
234                  * clients are writing forwards and the lock had over grown
235                  * its extent downwards before we enqueued our request. */
236                 if (l_extent->end > req_end) {
237                         if (l_extent->start <= req_end)
238                                 new_ex->end = max(lock->l_req_extent.start - 1,
239                                                   req_end);
240                         else
241                                 new_ex->end = max(l_extent->start - 1, req_end);
242                 }
243         }
244
245         ldlm_extent_internal_policy_fixup(req, new_ex, conflicting);
246         EXIT;
247 }
248
249
250 /* In order to determine the largest possible extent we can grant, we need
251  * to scan all of the queues. */
252 static void ldlm_extent_policy(struct ldlm_resource *res,
253                                struct ldlm_lock *lock, int *flags)
254 {
255         struct ldlm_extent new_ex = { .start = 0, .end = OBD_OBJECT_EOF };
256
257         if (lock->l_export == NULL)
258                 /*
259                  * this is local lock taken by server (e.g., as a part of
260                  * OST-side locking, or unlink handling). Expansion doesn't
261                  * make a lot of sense for local locks, because they are
262                  * dropped immediately on operation completion and would only
263                  * conflict with other threads.
264                  */
265                 return;
266
267         if (lock->l_policy_data.l_extent.start == 0 &&
268             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
269                 /* fast-path whole file locks */
270                 return;
271
272         ldlm_extent_internal_policy_granted(lock, &new_ex);
273         ldlm_extent_internal_policy_waiting(lock, &new_ex);
274
275         if (new_ex.start != lock->l_policy_data.l_extent.start ||
276             new_ex.end != lock->l_policy_data.l_extent.end) {
277                 *flags |= LDLM_FL_LOCK_CHANGED;
278                 lock->l_policy_data.l_extent.start = new_ex.start;
279                 lock->l_policy_data.l_extent.end = new_ex.end;
280         }
281 }
282
283 static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
284 {
285         struct ldlm_resource *res = lock->l_resource;
286         cfs_time_t now = cfs_time_current();
287
288         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_SET_CONTENTION))
289                 return 1;
290
291         CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
292         if (contended_locks > res->lr_namespace->ns_contended_locks)
293                 res->lr_contention_time = now;
294         return cfs_time_before(now, cfs_time_add(res->lr_contention_time,
295                 cfs_time_seconds(res->lr_namespace->ns_contention_time)));
296 }
297
298 struct ldlm_extent_compat_args {
299         struct list_head *work_list;
300         struct ldlm_lock *lock;
301         ldlm_mode_t mode;
302         int *locks;
303         int *compat;
304 };
305
306 static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
307                                                 void *data)
308 {
309         struct ldlm_extent_compat_args *priv = data;
310         struct ldlm_interval *node = to_ldlm_interval(n);
311         struct ldlm_extent *extent;
312         struct list_head *work_list = priv->work_list;
313         struct ldlm_lock *lock, *enq = priv->lock;
314         ldlm_mode_t mode = priv->mode;
315         int count = 0;
316         ENTRY;
317
318         LASSERT(!list_empty(&node->li_group));
319
320         list_for_each_entry(lock, &node->li_group, l_sl_policy) {
321                 /* interval tree is for granted lock */
322                 LASSERTF(mode == lock->l_granted_mode,
323                          "mode = %s, lock->l_granted_mode = %s\n",
324                          ldlm_lockname[mode],
325                          ldlm_lockname[lock->l_granted_mode]);
326                 count++;
327                 if (lock->l_blocking_ast)
328                         ldlm_add_ast_work_item(lock, enq, work_list);
329         }
330
331         /* don't count conflicting glimpse locks */
332         extent = ldlm_interval_extent(node);
333         if (!(mode == LCK_PR &&
334             extent->start == 0 && extent->end == OBD_OBJECT_EOF))
335                 *priv->locks += count;
336
337         if (priv->compat)
338                 *priv->compat = 0;
339
340         RETURN(INTERVAL_ITER_CONT);
341 }
342
343 /* Determine if the lock is compatible with all locks on the queue.
344  * We stop walking the queue if we hit ourselves so we don't take
345  * conflicting locks enqueued after us into accound, or we'd wait forever.
346  *
347  * 0 if the lock is not compatible
348  * 1 if the lock is compatible
349  * 2 if this group lock is compatible and requires no further checking
350  * negative error, such as EWOULDBLOCK for group locks
351  */
352 static int
353 ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
354                          int *flags, ldlm_error_t *err,
355                          struct list_head *work_list, int *contended_locks)
356 {
357         struct list_head *tmp;
358         struct ldlm_lock *lock;
359         struct ldlm_resource *res = req->l_resource;
360         ldlm_mode_t req_mode = req->l_req_mode;
361         __u64 req_start = req->l_req_extent.start;
362         __u64 req_end = req->l_req_extent.end;
363         int compat = 1;
364         int scan = 0;
365         int check_contention;
366         ENTRY;
367
368         lockmode_verify(req_mode);
369
370         /* Using interval tree for granted lock */
371         if (queue == &res->lr_granted) {
372                 struct ldlm_interval_tree *tree;
373                 struct ldlm_extent_compat_args data = {.work_list = work_list,
374                                                .lock = req,
375                                                .locks = contended_locks,
376                                                .compat = &compat };
377                 struct interval_node_extent ex = { .start = req_start,
378                                                    .end = req_end };
379                 int idx, rc;
380
381                 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
382                         tree = &res->lr_itree[idx];
383                         if (tree->lit_root == NULL) /* empty tree, skipped */
384                                 continue;
385
386                         data.mode = tree->lit_mode;
387                         if (lockmode_compat(req_mode, tree->lit_mode)) {
388                                 struct ldlm_interval *node;
389                                 struct ldlm_extent *extent;
390
391                                 if (req_mode != LCK_GROUP)
392                                         continue;
393
394                                 /* group lock, grant it immediately if
395                                  * compatible */
396                                 node = to_ldlm_interval(tree->lit_root);
397                                 extent = ldlm_interval_extent(node);
398                                 if (req->l_policy_data.l_extent.gid ==
399                                     extent->gid)
400                                         RETURN(2);
401                         }
402
403                         if (tree->lit_mode == LCK_GROUP) {
404                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
405                                         compat = -EWOULDBLOCK;
406                                         goto destroylock;
407                                 }
408
409                                 *flags |= LDLM_FL_NO_TIMEOUT;
410                                 if (!work_list)
411                                         RETURN(0);
412
413                                 /* if work list is not NULL,add all
414                                    locks in the tree to work list */
415                                 compat = 0;
416                                 interval_iterate(tree->lit_root,
417                                                  ldlm_extent_compat_cb, &data);
418                                 continue;
419                         }
420
421                         if (!work_list) {
422                                 rc = interval_is_overlapped(tree->lit_root,&ex);
423                                 if (rc)
424                                         RETURN(0);
425                         } else {
426                                 interval_search(tree->lit_root, &ex,
427                                                 ldlm_extent_compat_cb, &data);
428                                 if (!list_empty(work_list) && compat)
429                                         compat = 0;
430                         }
431                 }
432         } else { /* for waiting queue */
433                 list_for_each(tmp, queue) {
434                         check_contention = 1;
435
436                         lock = list_entry(tmp, struct ldlm_lock, l_res_link);
437
438                         if (req == lock)
439                                 break;
440
441                         if (unlikely(scan)) {
442                                 /* We only get here if we are queuing GROUP lock
443                                    and met some incompatible one. The main idea of this
444                                    code is to insert GROUP lock past compatible GROUP
445                                    lock in the waiting queue or if there is not any,
446                                    then in front of first non-GROUP lock */
447                                 if (lock->l_req_mode != LCK_GROUP) {
448                                         /* Ok, we hit non-GROUP lock, there should
449                                          * be no more GROUP locks later on, queue in
450                                          * front of first non-GROUP lock */
451
452                                         ldlm_resource_insert_lock_after(lock, req);
453                                         list_del_init(&lock->l_res_link);
454                                         ldlm_resource_insert_lock_after(req, lock);
455                                         compat = 0;
456                                         break;
457                                 }
458                                 if (req->l_policy_data.l_extent.gid ==
459                                     lock->l_policy_data.l_extent.gid) {
460                                         /* found it */
461                                         ldlm_resource_insert_lock_after(lock, req);
462                                         compat = 0;
463                                         break;
464                                 }
465                                 continue;
466                         }
467
468                         /* locks are compatible, overlap doesn't matter */
469                         if (lockmode_compat(lock->l_req_mode, req_mode)) {
470                                 if (req_mode == LCK_PR &&
471                                     ((lock->l_policy_data.l_extent.start <=
472                                       req->l_policy_data.l_extent.start) &&
473                                      (lock->l_policy_data.l_extent.end >=
474                                       req->l_policy_data.l_extent.end))) {
475                                         /* If we met a PR lock just like us or wider,
476                                            and nobody down the list conflicted with
477                                            it, that means we can skip processing of
478                                            the rest of the list and safely place
479                                            ourselves at the end of the list, or grant
480                                            (dependent if we met an conflicting locks
481                                            before in the list).
482                                            In case of 1st enqueue only we continue
483                                            traversing if there is something conflicting
484                                            down the list because we need to make sure
485                                            that something is marked as AST_SENT as well,
486                                            in cse of empy worklist we would exit on
487                                            first conflict met. */
488                                         /* There IS a case where such flag is
489                                            not set for a lock, yet it blocks
490                                            something. Luckily for us this is
491                                            only during destroy, so lock is
492                                            exclusive. So here we are safe */
493                                         if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
494                                                 RETURN(compat);
495                                         }
496                                 }
497
498                                 /* non-group locks are compatible, overlap doesn't
499                                    matter */
500                                 if (likely(req_mode != LCK_GROUP))
501                                         continue;
502
503                                 /* If we are trying to get a GROUP lock and there is
504                                    another one of this kind, we need to compare gid */
505                                 if (req->l_policy_data.l_extent.gid ==
506                                     lock->l_policy_data.l_extent.gid) {
507                                         /* If existing lock with matched gid is granted,
508                                            we grant new one too. */
509                                         if (lock->l_req_mode == lock->l_granted_mode)
510                                                 RETURN(2);
511
512                                         /* Otherwise we are scanning queue of waiting
513                                          * locks and it means current request would
514                                          * block along with existing lock (that is
515                                          * already blocked.
516                                          * If we are in nonblocking mode - return
517                                          * immediately */
518                                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
519                                                 compat = -EWOULDBLOCK;
520                                                 goto destroylock;
521                                         }
522                                         /* If this group lock is compatible with another
523                                          * group lock on the waiting list, they must be
524                                          * together in the list, so they can be granted
525                                          * at the same time.  Otherwise the later lock
526                                          * can get stuck behind another, incompatible,
527                                          * lock. */
528                                         ldlm_resource_insert_lock_after(lock, req);
529                                         /* Because 'lock' is not granted, we can stop
530                                          * processing this queue and return immediately.
531                                          * There is no need to check the rest of the
532                                          * list. */
533                                         RETURN(0);
534                                 }
535                         }
536
537                         if (unlikely(req_mode == LCK_GROUP &&
538                                      (lock->l_req_mode != lock->l_granted_mode))) {
539                                 scan = 1;
540                                 compat = 0;
541                                 if (lock->l_req_mode != LCK_GROUP) {
542                                         /* Ok, we hit non-GROUP lock, there should be no
543                                            more GROUP locks later on, queue in front of
544                                            first non-GROUP lock */
545
546                                         ldlm_resource_insert_lock_after(lock, req);
547                                         list_del_init(&lock->l_res_link);
548                                         ldlm_resource_insert_lock_after(req, lock);
549                                         break;
550                                 }
551                                 if (req->l_policy_data.l_extent.gid ==
552                                     lock->l_policy_data.l_extent.gid) {
553                                         /* found it */
554                                         ldlm_resource_insert_lock_after(lock, req);
555                                         break;
556                                 }
557                                 continue;
558                         }
559
560                         if (unlikely(lock->l_req_mode == LCK_GROUP)) {
561                                 /* If compared lock is GROUP, then requested is PR/PW/
562                                  * so this is not compatible; extent range does not
563                                  * matter */
564                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
565                                         compat = -EWOULDBLOCK;
566                                         goto destroylock;
567                                 } else {
568                                         *flags |= LDLM_FL_NO_TIMEOUT;
569                                 }
570                         } else if (lock->l_policy_data.l_extent.end < req_start ||
571                                    lock->l_policy_data.l_extent.start > req_end) {
572                                 /* if a non group lock doesn't overlap skip it */
573                                 continue;
574                         } else if (lock->l_req_extent.end < req_start ||
575                                    lock->l_req_extent.start > req_end) {
576                                 /* false contention, the requests doesn't really overlap */
577                                 check_contention = 0;
578                         }
579
580                         if (!work_list)
581                                 RETURN(0);
582
583                         /* don't count conflicting glimpse locks */
584                         if (lock->l_req_mode == LCK_PR &&
585                             lock->l_policy_data.l_extent.start == 0 &&
586                             lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
587                                 check_contention = 0;
588
589                         *contended_locks += check_contention;
590
591                         compat = 0;
592                         if (lock->l_blocking_ast)
593                                 ldlm_add_ast_work_item(lock, req, work_list);
594                 }
595         }
596
597         if (ldlm_check_contention(req, *contended_locks) &&
598             compat == 0 &&
599             (*flags & LDLM_FL_DENY_ON_CONTENTION) &&
600             req->l_req_mode != LCK_GROUP &&
601             req_end - req_start <=
602             req->l_resource->lr_namespace->ns_max_nolock_size)
603                 GOTO(destroylock, compat = -EUSERS);
604
605         RETURN(compat);
606 destroylock:
607         list_del_init(&req->l_res_link);
608         ldlm_lock_destroy_nolock(req);
609         *err = compat;
610         RETURN(compat);
611 }
612
613 static void discard_bl_list(struct list_head *bl_list)
614 {
615         struct list_head *tmp, *pos;
616         ENTRY;
617
618         list_for_each_safe(pos, tmp, bl_list) {
619                 struct ldlm_lock *lock =
620                         list_entry(pos, struct ldlm_lock, l_bl_ast);
621
622                 list_del_init(&lock->l_bl_ast);
623                 LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
624                 lock->l_flags &= ~LDLM_FL_AST_SENT;
625                 LASSERT(lock->l_bl_ast_run == 0);
626                 LASSERT(lock->l_blocking_lock);
627                 LDLM_LOCK_RELEASE(lock->l_blocking_lock);
628                 lock->l_blocking_lock = NULL;
629                 LDLM_LOCK_RELEASE(lock);
630         }
631         EXIT;
632 }
633
634 /* If first_enq is 0 (ie, called from ldlm_reprocess_queue):
635   *   - blocking ASTs have already been sent
636   *   - must call this function with the ns lock held
637   *
638   * If first_enq is 1 (ie, called from ldlm_lock_enqueue):
639   *   - blocking ASTs have not been sent
640   *   - must call this function with the ns lock held once */
641 int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
642                              ldlm_error_t *err, struct list_head *work_list)
643 {
644         struct ldlm_resource *res = lock->l_resource;
645         CFS_LIST_HEAD(rpc_list);
646         int rc, rc2;
647         int contended_locks = 0;
648         ENTRY;
649
650         LASSERT(list_empty(&res->lr_converting));
651         LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
652                 !(lock->l_flags & LDLM_AST_DISCARD_DATA));
653         check_res_locked(res);
654         *err = ELDLM_OK;
655
656         if (!first_enq) {
657                 /* Careful observers will note that we don't handle -EWOULDBLOCK
658                  * here, but it's ok for a non-obvious reason -- compat_queue
659                  * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
660                  * flags should always be zero here, and if that ever stops
661                  * being true, we want to find out. */
662                 LASSERT(*flags == 0);
663                 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
664                                               err, NULL, &contended_locks);
665                 if (rc == 1) {
666                         rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
667                                                       flags, err, NULL,
668                                                       &contended_locks);
669                 }
670                 if (rc == 0)
671                         RETURN(LDLM_ITER_STOP);
672
673                 ldlm_resource_unlink_lock(lock);
674
675                 if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE))
676                         ldlm_extent_policy(res, lock, flags);
677                 ldlm_grant_lock(lock, work_list);
678                 RETURN(LDLM_ITER_CONTINUE);
679         }
680
681  restart:
682         contended_locks = 0;
683         rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
684                                       &rpc_list, &contended_locks);
685         if (rc < 0)
686                 GOTO(out, rc); /* lock was destroyed */
687         if (rc == 2)
688                 goto grant;
689
690         rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err,
691                                        &rpc_list, &contended_locks);
692         if (rc2 < 0)
693                 GOTO(out, rc = rc2); /* lock was destroyed */
694
695         if (rc + rc2 == 2) {
696         grant:
697                 ldlm_extent_policy(res, lock, flags);
698                 ldlm_resource_unlink_lock(lock);
699                 ldlm_grant_lock(lock, NULL);
700         } else {
701                 /* If either of the compat_queue()s returned failure, then we
702                  * have ASTs to send and must go onto the waiting list.
703                  *
704                  * bug 2322: we used to unlink and re-add here, which was a
705                  * terrible folly -- if we goto restart, we could get
706                  * re-ordered!  Causes deadlock, because ASTs aren't sent! */
707                 if (list_empty(&lock->l_res_link))
708                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
709                 unlock_res(res);
710                 rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_BL_AST);
711
712                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_FAIL_RACE) &&
713                     !ns_is_client(res->lr_namespace))
714                         class_fail_export(lock->l_export);
715  
716                 lock_res(res);
717                 if (rc == -ERESTART) {
718
719                         /* 15715: The lock was granted and destroyed after
720                          * resource lock was dropped. Interval node was freed
721                          * in ldlm_lock_destroy. Anyway, this always happens
722                          * when a client is being evicted. So it would be
723                          * ok to return an error. -jay */
724                         if (lock->l_destroyed) {
725                                 *err = -EAGAIN;
726                                 GOTO(out, rc = -EAGAIN);
727                         }
728
729                         /* lock was granted while resource was unlocked. */
730                         if (lock->l_granted_mode == lock->l_req_mode) {
731                                 /* bug 11300: if the lock has been granted,
732                                  * break earlier because otherwise, we will go
733                                  * to restart and ldlm_resource_unlink will be
734                                  * called and it causes the interval node to be
735                                  * freed. Then we will fail at
736                                  * ldlm_extent_add_lock() */
737                                 *flags &= ~(LDLM_FL_BLOCK_GRANTED | LDLM_FL_BLOCK_CONV |
738                                             LDLM_FL_BLOCK_WAIT);
739                                 GOTO(out, rc = 0);
740                         }
741
742                         GOTO(restart, -ERESTART);
743                 }
744
745                 *flags |= LDLM_FL_BLOCK_GRANTED;
746                 /* this way we force client to wait for the lock
747                  * endlessly once the lock is enqueued -bzzz */
748                 *flags |= LDLM_FL_NO_TIMEOUT;
749
750         }
751         RETURN(0);
752 out:
753         if (!list_empty(&rpc_list)) {
754                 LASSERT(!(lock->l_flags & LDLM_AST_DISCARD_DATA));
755                 discard_bl_list(&rpc_list);
756         }
757         RETURN(rc);
758 }
759
760 /* When a lock is cancelled by a client, the KMS may undergo change if this
761  * is the "highest lock".  This function returns the new KMS value.
762  * Caller must hold ns_lock already.
763  *
764  * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes! */
765 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
766 {
767         struct ldlm_resource *res = lock->l_resource;
768         struct list_head *tmp;
769         struct ldlm_lock *lck;
770         __u64 kms = 0;
771         ENTRY;
772
773         /* don't let another thread in ldlm_extent_shift_kms race in
774          * just after we finish and take our lock into account in its
775          * calculation of the kms */
776         lock->l_flags |= LDLM_FL_KMS_IGNORE;
777
778         list_for_each(tmp, &res->lr_granted) {
779                 lck = list_entry(tmp, struct ldlm_lock, l_res_link);
780
781                 if (lck->l_flags & LDLM_FL_KMS_IGNORE)
782                         continue;
783
784                 if (lck->l_policy_data.l_extent.end >= old_kms)
785                         RETURN(old_kms);
786
787                 /* This extent _has_ to be smaller than old_kms (checked above)
788                  * so kms can only ever be smaller or the same as old_kms. */
789                 if (lck->l_policy_data.l_extent.end + 1 > kms)
790                         kms = lck->l_policy_data.l_extent.end + 1;
791         }
792         LASSERTF(kms <= old_kms, "kms "LPU64" old_kms "LPU64"\n", kms, old_kms);
793
794         RETURN(kms);
795 }
796
797 cfs_mem_cache_t *ldlm_interval_slab;
798 struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock)
799 {
800         struct ldlm_interval *node;
801         ENTRY;
802
803         LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
804         OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, CFS_ALLOC_IO);
805         if (node == NULL)
806                 RETURN(NULL);
807
808         CFS_INIT_LIST_HEAD(&node->li_group);
809         ldlm_interval_attach(node, lock);
810         RETURN(node);
811 }
812
813 void ldlm_interval_free(struct ldlm_interval *node)
814 {
815         if (node) {
816                 LASSERT(list_empty(&node->li_group));
817                 LASSERT(!interval_is_intree(&node->li_node));
818                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
819         }
820 }
821
822 /* interval tree, for LDLM_EXTENT. */
823 void ldlm_interval_attach(struct ldlm_interval *n,
824                           struct ldlm_lock *l)
825 {
826         LASSERT(l->l_tree_node == NULL);
827         LASSERT(l->l_resource->lr_type == LDLM_EXTENT);
828
829         list_add_tail(&l->l_sl_policy, &n->li_group);
830         l->l_tree_node = n;
831 }
832
833 struct ldlm_interval *ldlm_interval_detach(struct ldlm_lock *l)
834 {
835         struct ldlm_interval *n = l->l_tree_node;
836
837         if (n == NULL)
838                 return NULL;
839
840         LASSERT(!list_empty(&n->li_group));
841         l->l_tree_node = NULL;
842         list_del_init(&l->l_sl_policy);
843
844         return (list_empty(&n->li_group) ? n : NULL);
845 }
846
847 static inline int lock_mode_to_index(ldlm_mode_t mode)
848 {
849         int index;
850
851         LASSERT(mode != 0);
852         LASSERT(IS_PO2(mode));
853         for (index = -1; mode; index++, mode >>= 1) ;
854         LASSERT(index < LCK_MODE_NUM);
855         return index;
856 }
857
858 void ldlm_extent_add_lock(struct ldlm_resource *res,
859                           struct ldlm_lock *lock)
860 {
861         struct interval_node *found, **root;
862         struct ldlm_interval *node;
863         struct ldlm_extent *extent;
864         int idx;
865
866         LASSERT(lock->l_granted_mode == lock->l_req_mode);
867
868         node = lock->l_tree_node;
869         LASSERT(node != NULL);
870         LASSERT(!interval_is_intree(&node->li_node));
871
872         idx = lock_mode_to_index(lock->l_granted_mode);
873         LASSERT(lock->l_granted_mode == 1 << idx);
874         LASSERT(lock->l_granted_mode == res->lr_itree[idx].lit_mode);
875
876         /* node extent initialize */
877         extent = &lock->l_policy_data.l_extent;
878         interval_set(&node->li_node, extent->start, extent->end);
879
880         root = &res->lr_itree[idx].lit_root;
881         found = interval_insert(&node->li_node, root);
882         if (found) { /* The policy group found. */
883                 struct ldlm_interval *tmp = ldlm_interval_detach(lock);
884                 LASSERT(tmp != NULL);
885                 ldlm_interval_free(tmp);
886                 ldlm_interval_attach(to_ldlm_interval(found), lock);
887         }
888         res->lr_itree[idx].lit_size++;
889
890         /* even though we use interval tree to manage the extent lock, we also
891          * add the locks into grant list, for debug purpose, .. */
892         ldlm_resource_add_lock(res, &res->lr_granted, lock);
893 }
894
895 void ldlm_extent_unlink_lock(struct ldlm_lock *lock)
896 {
897         struct ldlm_resource *res = lock->l_resource;
898         struct ldlm_interval *node = lock->l_tree_node;
899         struct ldlm_interval_tree *tree;
900         int idx;
901
902         if (!node || !interval_is_intree(&node->li_node)) /* duplicate unlink */
903                 return;
904
905         idx = lock_mode_to_index(lock->l_granted_mode);
906         LASSERT(lock->l_granted_mode == 1 << idx);
907         tree = &res->lr_itree[idx];
908
909         LASSERT(tree->lit_root != NULL); /* assure the tree is not null */
910
911         tree->lit_size--;
912         node = ldlm_interval_detach(lock);
913         if (node) {
914                 interval_erase(&node->li_node, &tree->lit_root);
915                 ldlm_interval_free(node);
916         }
917 }