Whamcloud - gitweb
44442f2f71ee56b3d281504de6a8d8e555604bfd
[fs/lustre-release.git] / lustre / ldlm / ldlm_extent.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_extent.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LDLM
43 #ifndef __KERNEL__
44 # include <liblustre.h>
45 #else
46 # include <libcfs/libcfs.h>
47 #endif
48
49 #include <lustre_dlm.h>
50 #include <obd_support.h>
51 #include <obd.h>
52 #include <obd_class.h>
53 #include <lustre_lib.h>
54
55 #include "ldlm_internal.h"
56
57 #define LDLM_MAX_GROWN_EXTENT (32 * 1024 * 1024 - 1)
58
59 /* fixup the ldlm_extent after expanding */
60 static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req,
61                                               struct ldlm_extent *new_ex,
62                                               int conflicting)
63 {
64         ldlm_mode_t req_mode = req->l_req_mode;
65         __u64 req_start = req->l_req_extent.start;
66         __u64 req_end = req->l_req_extent.end;
67         __u64 req_align, mask;
68
69         if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) {
70                 if (req_end < req_start + LDLM_MAX_GROWN_EXTENT)
71                         new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT,
72                                           new_ex->end);
73         }
74
75         if (new_ex->start == 0 && new_ex->end == OBD_OBJECT_EOF) {
76                 EXIT;
77                 return;
78         }
79
80         /* we need to ensure that the lock extent is properly aligned to what
81          * the client requested.  We align it to the lowest-common denominator
82          * of the clients requested lock start and end alignment. */
83         mask = 0x1000ULL;
84         req_align = (req_end + 1) | req_start;
85         if (req_align != 0) {
86                 while ((req_align & mask) == 0)
87                         mask <<= 1;
88         }
89         mask -= 1;
90         /* We can only shrink the lock, not grow it.
91          * This should never cause lock to be smaller than requested,
92          * since requested lock was already aligned on these boundaries. */
93         new_ex->start = ((new_ex->start - 1) | mask) + 1;
94         new_ex->end = ((new_ex->end + 1) & ~mask) - 1;
95         LASSERTF(new_ex->start <= req_start,
96                  "mask "LPX64" grant start "LPU64" req start "LPU64"\n",
97                  mask, new_ex->start, req_start);
98         LASSERTF(new_ex->end >= req_end,
99                  "mask "LPX64" grant end "LPU64" req end "LPU64"\n",
100                  mask, new_ex->end, req_end);
101 }
102
103 static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
104 {
105         struct ldlm_resource *res = lock->l_resource;
106         cfs_time_t now = cfs_time_current();
107
108         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_SET_CONTENTION))
109                 return 1;
110
111         CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
112         if (contended_locks > res->lr_namespace->ns_contended_locks)
113                 res->lr_contention_time = now;
114         return cfs_time_before(now, cfs_time_add(res->lr_contention_time,
115                 cfs_time_seconds(res->lr_namespace->ns_contention_time)));
116 }
117
118 struct ldlm_extent_compat_args {
119         struct list_head *work_list;
120         struct ldlm_lock *lock;
121         ldlm_mode_t mode;
122         int *locks;
123         int *compat;
124         int *conflicts;
125 };
126
127 static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
128                                                 void *data)
129 {
130         struct ldlm_extent_compat_args *priv = data;
131         struct ldlm_interval *node = to_ldlm_interval(n);
132         struct ldlm_extent *extent;
133         struct list_head *work_list = priv->work_list;
134         struct ldlm_lock *lock, *enq = priv->lock;
135         ldlm_mode_t mode = priv->mode;
136         int count = 0;
137         ENTRY;
138
139         LASSERT(!list_empty(&node->li_group));
140
141         list_for_each_entry(lock, &node->li_group, l_sl_policy) {
142                 /* interval tree is for granted lock */
143                 LASSERTF(mode == lock->l_granted_mode,
144                          "mode = %s, lock->l_granted_mode = %s\n",
145                          ldlm_lockname[mode],
146                          ldlm_lockname[lock->l_granted_mode]);
147                 count++;
148                 /* only count _requested_ region overlapped locks as contended
149                  * locks */
150                 if (lock->l_req_extent.end >= enq->l_req_extent.start &&
151                     lock->l_req_extent.start <= enq->l_req_extent.end)
152                         (*priv->conflicts)++;
153                 if (lock->l_blocking_ast)
154                         ldlm_add_ast_work_item(lock, enq, work_list);
155         }
156
157         /* don't count conflicting glimpse locks */
158         extent = ldlm_interval_extent(node);
159         if (!(mode == LCK_PR &&
160             extent->start == 0 && extent->end == OBD_OBJECT_EOF))
161                 *priv->locks += count;
162
163         if (priv->compat)
164                 *priv->compat = 0;
165
166         RETURN(INTERVAL_ITER_CONT);
167 }
168
169 static int
170 ldlm_extent_compat_granted_queue(struct list_head *queue, struct ldlm_lock *req,
171                                  int *flags, ldlm_error_t *err,
172                                  struct list_head *work_list,
173                                  int *contended_locks)
174 {
175         struct ldlm_resource *res = req->l_resource;
176         ldlm_mode_t req_mode = req->l_req_mode;
177         __u64 req_start = req->l_req_extent.start;
178         __u64 req_end = req->l_req_extent.end;
179         int compat = 1, conflicts;
180         /* Using interval tree for granted lock */
181         struct ldlm_interval_tree *tree;
182         struct ldlm_extent_compat_args data = {.work_list = work_list,
183                                        .lock = req,
184                                        .locks = contended_locks,
185                                        .compat = &compat,
186                                        .conflicts = &conflicts };
187         struct interval_node_extent ex = { .start = req_start,
188                                            .end = req_end };
189         int idx, rc;
190         ENTRY;
191
192         for (idx = 0; idx < LCK_MODE_NUM; idx++) {
193                 conflicts = 0;
194                 tree = &res->lr_itree[idx];
195                 if (tree->lit_root == NULL) /* empty tree, skipped */
196                         continue;
197
198                 data.mode = tree->lit_mode;
199                 if (lockmode_compat(req_mode, tree->lit_mode)) {
200                         struct ldlm_interval *node;
201                         struct ldlm_extent *extent;
202
203                         if (req_mode != LCK_GROUP)
204                                 continue;
205
206                         /* group lock, grant it immediately if
207                          * compatible */
208                         node = to_ldlm_interval(tree->lit_root);
209                         extent = ldlm_interval_extent(node);
210                         if (req->l_policy_data.l_extent.gid ==
211                             extent->gid)
212                                 RETURN(2);
213                 }
214
215                 if (tree->lit_mode == LCK_GROUP) {
216                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
217                                 compat = -EWOULDBLOCK;
218                                 goto destroylock;
219                         }
220
221                         *flags |= LDLM_FL_NO_TIMEOUT;
222                         if (!work_list)
223                                 RETURN(0);
224
225
226                         /* if work list is not NULL,add all
227                            locks in the tree to work list */
228                         compat = 0;
229                         interval_iterate(tree->lit_root,
230                                          ldlm_extent_compat_cb, &data);
231                         continue;
232                 }
233
234                 if (!work_list) {
235                         rc = interval_is_overlapped(tree->lit_root, &ex);
236                         if (rc)
237                                 RETURN(0);
238                 } else {
239                         struct interval_node_extent result_ext = {
240                                 .start = req->l_policy_data.l_extent.start,
241                                 .end = req->l_policy_data.l_extent.end };
242
243                         interval_search_expand_extent(tree->lit_root, &ex,
244                                                       &result_ext,
245                                                       ldlm_extent_compat_cb,
246                                                       &data);
247                         req->l_policy_data.l_extent.start = result_ext.start;
248                         req->l_policy_data.l_extent.end = result_ext.end;
249                         /* for granted locks, count non-compatible not overlapping
250                          * locks in traffic index */
251                         req->l_traffic += tree->lit_size - conflicts;
252
253                         if (!list_empty(work_list)) {
254                                 compat = 0;
255                                /* if there is at least 1 conflicting lock, we
256                                 * do not expand to the left, since we often
257                                 * continue writing to the right.
258                                 */
259                                req->l_policy_data.l_extent.start = req_start;
260                         }
261                 }
262         }
263
264         RETURN(compat);
265 destroylock:
266         list_del_init(&req->l_res_link);
267         ldlm_lock_destroy_nolock(req);
268         *err = compat;
269         RETURN(compat);
270 }
271
272 static int
273 ldlm_extent_compat_waiting_queue(struct list_head *queue, struct ldlm_lock *req,
274                                  int *flags, ldlm_error_t *err,
275                                  struct list_head *work_list,
276                                  int *contended_locks)
277 {
278         struct list_head *tmp;
279         struct ldlm_lock *lock;
280         ldlm_mode_t req_mode = req->l_req_mode;
281         __u64 req_start = req->l_req_extent.start;
282         __u64 req_end = req->l_req_extent.end;
283         int compat = 1;
284         int scan = 0;
285         int check_contention;
286         ENTRY;
287
288         list_for_each(tmp, queue) {
289                 check_contention = 1;
290
291                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
292
293                 if (req == lock)
294                         break;
295
296                 if (unlikely(scan)) {
297                         /* We only get here if we are queuing GROUP lock
298                            and met some incompatible one. The main idea of this
299                            code is to insert GROUP lock past compatible GROUP
300                            lock in the waiting queue or if there is not any,
301                            then in front of first non-GROUP lock */
302                         if (lock->l_req_mode != LCK_GROUP) {
303                                 /* Ok, we hit non-GROUP lock, there should be no
304                                    more GROUP locks later on, queue in front of
305                                    first non-GROUP lock */
306
307                                 ldlm_resource_insert_lock_after(lock, req);
308                                 list_del_init(&lock->l_res_link);
309                                 ldlm_resource_insert_lock_after(req, lock);
310                                 compat = 0;
311                                 break;
312                         }
313                         if (req->l_policy_data.l_extent.gid ==
314                             lock->l_policy_data.l_extent.gid) {
315                                 /* found it */
316                                 ldlm_resource_insert_lock_after(lock, req);
317                                 compat = 0;
318                                 break;
319                         }
320                         continue;
321                 }
322
323                 /* locks are compatible, overlap doesn't matter */
324                 if (lockmode_compat(lock->l_req_mode, req_mode)) {
325                         if (req_mode == LCK_PR &&
326                             ((lock->l_policy_data.l_extent.start <=
327                               req->l_policy_data.l_extent.start) &&
328                              (lock->l_policy_data.l_extent.end >=
329                               req->l_policy_data.l_extent.end))) {
330                                 /* If we met a PR lock just like us or wider,
331                                    and nobody down the list conflicted with
332                                    it, that means we can skip processing of
333                                    the rest of the list and safely place
334                                    ourselves at the end of the list, or grant
335                                    (dependent if we met an conflicting locks
336                                    before in the list).
337                                    In case of 1st enqueue only we continue
338                                    traversing if there is something conflicting
339                                    down the list because we need to make sure
340                                    that something is marked as AST_SENT as well,
341                                    in cse of empy worklist we would exit on
342                                    first conflict met. */
343                                 /* There IS a case where such flag is
344                                    not set for a lock, yet it blocks
345                                    something. Luckily for us this is
346                                    only during destroy, so lock is
347                                    exclusive. So here we are safe */
348                                 if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
349                                         RETURN(compat);
350                                 }
351                         }
352
353                         /* non-group locks are compatible, overlap doesn't
354                            matter */
355                         if (likely(req_mode != LCK_GROUP))
356                                 continue;
357
358                         /* If we are trying to get a GROUP lock and there is
359                            another one of this kind, we need to compare gid */
360                         if (req->l_policy_data.l_extent.gid ==
361                             lock->l_policy_data.l_extent.gid) {
362                                 /* We are scanning queue of waiting
363                                  * locks and it means current request would
364                                  * block along with existing lock (that is
365                                  * already blocked.
366                                  * If we are in nonblocking mode - return
367                                  * immediately */
368                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
369                                         compat = -EWOULDBLOCK;
370                                         goto destroylock;
371                                 }
372                                 /* If this group lock is compatible with another
373                                  * group lock on the waiting list, they must be
374                                  * together in the list, so they can be granted
375                                  * at the same time.  Otherwise the later lock
376                                  * can get stuck behind another, incompatible,
377                                  * lock. */
378                                 ldlm_resource_insert_lock_after(lock, req);
379                                 /* Because 'lock' is not granted, we can stop
380                                  * processing this queue and return immediately.
381                                  * There is no need to check the rest of the
382                                  * list. */
383                                 RETURN(0);
384                         }
385                 }
386
387                 if (unlikely(req_mode == LCK_GROUP &&
388                              (lock->l_req_mode != lock->l_granted_mode))) {
389                         scan = 1;
390                         compat = 0;
391                         if (lock->l_req_mode != LCK_GROUP) {
392                                 /* Ok, we hit non-GROUP lock, there should
393                                  * be no more GROUP locks later on, queue in
394                                  * front of first non-GROUP lock */
395
396                                 ldlm_resource_insert_lock_after(lock, req);
397                                 list_del_init(&lock->l_res_link);
398                                 ldlm_resource_insert_lock_after(req, lock);
399                                 break;
400                         }
401                         if (req->l_policy_data.l_extent.gid ==
402                             lock->l_policy_data.l_extent.gid) {
403                                 /* found it */
404                                 ldlm_resource_insert_lock_after(lock, req);
405                                 break;
406                         }
407                         continue;
408                 }
409
410                 if (unlikely(lock->l_req_mode == LCK_GROUP)) {
411                         /* If compared lock is GROUP, then requested is PR/PW/
412                          * so this is not compatible; extent range does not
413                          * matter */
414                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
415                                 compat = -EWOULDBLOCK;
416                                 goto destroylock;
417                         } else {
418                                 *flags |= LDLM_FL_NO_TIMEOUT;
419                         }
420                 } else if (!work_list) {
421                         if (lock->l_policy_data.l_extent.end < req_start ||
422                             lock->l_policy_data.l_extent.start > req_end)
423                                 /* if a non group lock doesn't overlap skip it */
424                                 continue;
425                         RETURN(0);
426                 } else {
427                         /* for waiting locks, count all non-compatible locks in
428                          * traffic index */
429                         ++req->l_traffic;
430                         ++lock->l_traffic;
431
432                         /* adjust policy */
433                         if (lock->l_policy_data.l_extent.end < req_start) {
434                                 /*     lock            req
435                                  * ------------+
436                                  * ++++++      |   +++++++
437                                  *      +      |   +
438                                  * ++++++      |   +++++++
439                                  * ------------+
440                                  */
441                                 if (lock->l_policy_data.l_extent.end >
442                                     req->l_policy_data.l_extent.start)
443                                         req->l_policy_data.l_extent.start =
444                                              lock->l_policy_data.l_extent.end+1;
445                                 continue;
446                         } else if (lock->l_req_extent.end < req_start) {
447                                 /*     lock            req
448                                  * ------------------+
449                                  * ++++++          +++++++
450                                  *      +          + |
451                                  * ++++++          +++++++
452                                  * ------------------+
453                                  */
454                                 lock->l_policy_data.l_extent.end =
455                                                           req_start - 1;
456                                 req->l_policy_data.l_extent.start =
457                                                               req_start;
458                                 continue;
459                         } else if (lock->l_policy_data.l_extent.start >
460                                    req_end) {
461                                 /*  req              lock
462                                  *              +--------------
463                                  *  +++++++     |    +++++++
464                                  *        +     |    +
465                                  *  +++++++     |    +++++++
466                                  *              +--------------
467                                  */
468                                 if (lock->l_policy_data.l_extent.start <
469                                      req->l_policy_data.l_extent.end)
470                                         req->l_policy_data.l_extent.end =
471                                            lock->l_policy_data.l_extent.start-1;
472                                 continue;
473                         } else if (lock->l_req_extent.start > req_end) {
474                                 /*  req              lock
475                                  *      +----------------------
476                                  *  +++++++          +++++++
477                                  *      | +          +
478                                  *  +++++++          +++++++
479                                  *      +----------------------
480                                  */
481                                 lock->l_policy_data.l_extent.start =
482                                                             req_end + 1;
483                                 req->l_policy_data.l_extent.end=req_end;
484                                 continue;
485                         }
486                 } /* policy_adj */
487
488                 compat = 0;
489                 if (work_list) {
490                         /* don't count conflicting glimpse locks */
491                         if (lock->l_flags & LDLM_FL_HAS_INTENT)
492                                 check_contention = 0;
493
494                         *contended_locks += check_contention;
495
496                         if (lock->l_blocking_ast)
497                                 ldlm_add_ast_work_item(lock, req, work_list);
498                 }
499         }
500
501         RETURN(compat);
502 destroylock:
503         list_del_init(&req->l_res_link);
504         ldlm_lock_destroy_nolock(req);
505         *err = compat;
506         RETURN(compat);
507 }
508
509 /* Determine if the lock is compatible with all locks on the queue.
510  * We stop walking the queue if we hit ourselves so we don't take
511  * conflicting locks enqueued after us into accound, or we'd wait forever.
512  *
513  * 0 if the lock is not compatible
514  * 1 if the lock is compatible
515  * 2 if this group lock is compatible and requires no further checking
516  * negative error, such as EWOULDBLOCK for group locks
517  *
518  * Note: policy adjustment only happends during the 1st lock enqueue procedure
519  */
520 static int
521 ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
522                          int *flags, ldlm_error_t *err,
523                          struct list_head *work_list, int *contended_locks)
524 {
525         struct ldlm_resource *res = req->l_resource;
526         ldlm_mode_t req_mode = req->l_req_mode;
527         __u64 req_start = req->l_req_extent.start;
528         __u64 req_end = req->l_req_extent.end;
529         int compat = 1;
530         ENTRY;
531
532         lockmode_verify(req_mode);
533
534         if (queue == &res->lr_granted)
535                 compat = ldlm_extent_compat_granted_queue(queue, req, flags,
536                                                           err, work_list,
537                                                           contended_locks);
538         else
539                 compat = ldlm_extent_compat_waiting_queue(queue, req, flags,
540                                                           err, work_list,
541                                                           contended_locks);
542         if (ldlm_check_contention(req, *contended_locks) &&
543             compat == 0 &&
544             (*flags & LDLM_FL_DENY_ON_CONTENTION) &&
545             req->l_req_mode != LCK_GROUP &&
546             req_end - req_start <=
547             req->l_resource->lr_namespace->ns_max_nolock_size)
548                 GOTO(destroylock, compat = -EUSERS);
549
550         RETURN(compat);
551 destroylock:
552         list_del_init(&req->l_res_link);
553         ldlm_lock_destroy_nolock(req);
554         *err = compat;
555         RETURN(compat);
556 }
557
558 static void discard_bl_list(struct list_head *bl_list)
559 {
560         struct list_head *tmp, *pos;
561         ENTRY;
562
563         list_for_each_safe(pos, tmp, bl_list) {
564                 struct ldlm_lock *lock =
565                         list_entry(pos, struct ldlm_lock, l_bl_ast);
566
567                 list_del_init(&lock->l_bl_ast);
568                 LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
569                 lock->l_flags &= ~LDLM_FL_AST_SENT;
570                 LASSERT(lock->l_bl_ast_run == 0);
571                 LASSERT(lock->l_blocking_lock);
572                 LDLM_LOCK_RELEASE(lock->l_blocking_lock);
573                 lock->l_blocking_lock = NULL;
574                 LDLM_LOCK_RELEASE(lock);
575         }
576         EXIT;
577 }
578
579 static inline void ldlm_process_extent_init(struct ldlm_lock *lock)
580 {
581         lock->l_policy_data.l_extent.start = 0;
582         lock->l_policy_data.l_extent.end = OBD_OBJECT_EOF;
583 }
584
585 static inline void ldlm_process_extent_fini(struct ldlm_lock *lock, int *flags)
586 {
587         if (lock->l_traffic > 4)
588                 lock->l_policy_data.l_extent.start = lock->l_req_extent.start;
589         ldlm_extent_internal_policy_fixup(lock,
590                                           &lock->l_policy_data.l_extent,
591                                           lock->l_traffic);
592         if (lock->l_req_extent.start != lock->l_policy_data.l_extent.start ||
593             lock->l_req_extent.end   != lock->l_policy_data.l_extent.end)
594                 *flags |= LDLM_FL_LOCK_CHANGED;
595 }
596
597 /* If first_enq is 0 (ie, called from ldlm_reprocess_queue):
598   *   - blocking ASTs have already been sent
599   *   - must call this function with the ns lock held
600   *
601   * If first_enq is 1 (ie, called from ldlm_lock_enqueue):
602   *   - blocking ASTs have not been sent
603   *   - must call this function with the ns lock held once */
604 int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
605                              ldlm_error_t *err, struct list_head *work_list)
606 {
607         struct ldlm_resource *res = lock->l_resource;
608         CFS_LIST_HEAD(rpc_list);
609         int rc, rc2;
610         int contended_locks = 0;
611         ENTRY;
612
613         LASSERT(list_empty(&res->lr_converting));
614         LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
615                 !(lock->l_flags & LDLM_AST_DISCARD_DATA));
616         check_res_locked(res);
617         *err = ELDLM_OK;
618
619         if (!first_enq) {
620                 /* Careful observers will note that we don't handle -EWOULDBLOCK
621                  * here, but it's ok for a non-obvious reason -- compat_queue
622                  * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
623                  * flags should always be zero here, and if that ever stops
624                  * being true, we want to find out. */
625                 LASSERT(*flags == 0);
626                 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
627                                               err, NULL, &contended_locks);
628                 if (rc == 1) {
629                         rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
630                                                       flags, err, NULL,
631                                                       &contended_locks);
632                 }
633                 if (rc == 0)
634                         RETURN(LDLM_ITER_STOP);
635
636                 ldlm_resource_unlink_lock(lock);
637
638                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE)) {
639                         lock->l_policy_data.l_extent.start =
640                                 lock->l_req_extent.start;
641                         lock->l_policy_data.l_extent.end =
642                                 lock->l_req_extent.end;
643                 } else {
644                         ldlm_process_extent_fini(lock, flags);
645                 }
646
647                 ldlm_grant_lock(lock, work_list);
648                 RETURN(LDLM_ITER_CONTINUE);
649         }
650
651  restart:
652         contended_locks = 0;
653
654         ldlm_process_extent_init(lock);
655
656         rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
657                                       &rpc_list, &contended_locks);
658         if (rc < 0)
659                 GOTO(out, rc); /* lock was destroyed */
660         if (rc == 2)
661                 goto grant;
662
663         rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err,
664                                        &rpc_list, &contended_locks);
665         if (rc2 < 0)
666                 GOTO(out, rc = rc2); /* lock was destroyed */
667
668         if (rc + rc2 == 2) {
669         grant:
670                 ldlm_resource_unlink_lock(lock);
671                 ldlm_process_extent_fini(lock, flags);
672                 ldlm_grant_lock(lock, NULL);
673         } else {
674                 /* If either of the compat_queue()s returned failure, then we
675                  * have ASTs to send and must go onto the waiting list.
676                  *
677                  * bug 2322: we used to unlink and re-add here, which was a
678                  * terrible folly -- if we goto restart, we could get
679                  * re-ordered!  Causes deadlock, because ASTs aren't sent! */
680                 if (list_empty(&lock->l_res_link))
681                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
682                 unlock_res(res);
683                 rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_BL_AST);
684
685                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_FAIL_RACE) &&
686                     !ns_is_client(res->lr_namespace))
687                         class_fail_export(lock->l_export);
688
689                 lock_res(res);
690                 if (rc == -ERESTART) {
691
692                         /* 15715: The lock was granted and destroyed after
693                          * resource lock was dropped. Interval node was freed
694                          * in ldlm_lock_destroy. Anyway, this always happens
695                          * when a client is being evicted. So it would be
696                          * ok to return an error. -jay */
697                         if (lock->l_destroyed) {
698                                 *err = -EAGAIN;
699                                 GOTO(out, rc = -EAGAIN);
700                         }
701
702                         /* lock was granted while resource was unlocked. */
703                         if (lock->l_granted_mode == lock->l_req_mode) {
704                                 /* bug 11300: if the lock has been granted,
705                                  * break earlier because otherwise, we will go
706                                  * to restart and ldlm_resource_unlink will be
707                                  * called and it causes the interval node to be
708                                  * freed. Then we will fail at
709                                  * ldlm_extent_add_lock() */
710                                 *flags &= ~(LDLM_FL_BLOCK_GRANTED | LDLM_FL_BLOCK_CONV |
711                                             LDLM_FL_BLOCK_WAIT);
712                                 GOTO(out, rc = 0);
713                         }
714
715                         GOTO(restart, -ERESTART);
716                 }
717
718                 *flags |= LDLM_FL_BLOCK_GRANTED;
719                 /* this way we force client to wait for the lock
720                  * endlessly once the lock is enqueued -bzzz */
721                 *flags |= LDLM_FL_NO_TIMEOUT;
722
723         }
724         RETURN(0);
725 out:
726         if (!list_empty(&rpc_list)) {
727                 LASSERT(!(lock->l_flags & LDLM_AST_DISCARD_DATA));
728                 discard_bl_list(&rpc_list);
729         }
730         RETURN(rc);
731 }
732
733 /* When a lock is cancelled by a client, the KMS may undergo change if this
734  * is the "highest lock".  This function returns the new KMS value.
735  * Caller must hold ns_lock already.
736  *
737  * NB: A lock on [x,y] protects a KMS of up to y + 1 bytes! */
738 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms)
739 {
740         struct ldlm_resource *res = lock->l_resource;
741         struct list_head *tmp;
742         struct ldlm_lock *lck;
743         __u64 kms = 0;
744         ENTRY;
745
746         /* don't let another thread in ldlm_extent_shift_kms race in
747          * just after we finish and take our lock into account in its
748          * calculation of the kms */
749         lock->l_flags |= LDLM_FL_KMS_IGNORE;
750
751         list_for_each(tmp, &res->lr_granted) {
752                 lck = list_entry(tmp, struct ldlm_lock, l_res_link);
753
754                 if (lck->l_flags & LDLM_FL_KMS_IGNORE)
755                         continue;
756
757                 if (lck->l_policy_data.l_extent.end >= old_kms)
758                         RETURN(old_kms);
759
760                 /* This extent _has_ to be smaller than old_kms (checked above)
761                  * so kms can only ever be smaller or the same as old_kms. */
762                 if (lck->l_policy_data.l_extent.end + 1 > kms)
763                         kms = lck->l_policy_data.l_extent.end + 1;
764         }
765         LASSERTF(kms <= old_kms, "kms "LPU64" old_kms "LPU64"\n", kms, old_kms);
766
767         RETURN(kms);
768 }
769
770 cfs_mem_cache_t *ldlm_interval_slab;
771 struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock)
772 {
773         struct ldlm_interval *node;
774         ENTRY;
775
776         LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
777         OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, CFS_ALLOC_IO);
778         if (node == NULL)
779                 RETURN(NULL);
780
781         CFS_INIT_LIST_HEAD(&node->li_group);
782         ldlm_interval_attach(node, lock);
783         RETURN(node);
784 }
785
786 void ldlm_interval_free(struct ldlm_interval *node)
787 {
788         if (node) {
789                 LASSERT(list_empty(&node->li_group));
790                 LASSERT(!interval_is_intree(&node->li_node));
791                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
792         }
793 }
794
795 /* interval tree, for LDLM_EXTENT. */
796 void ldlm_interval_attach(struct ldlm_interval *n,
797                           struct ldlm_lock *l)
798 {
799         LASSERT(l->l_tree_node == NULL);
800         LASSERT(l->l_resource->lr_type == LDLM_EXTENT);
801
802         list_add_tail(&l->l_sl_policy, &n->li_group);
803         l->l_tree_node = n;
804 }
805
806 struct ldlm_interval *ldlm_interval_detach(struct ldlm_lock *l)
807 {
808         struct ldlm_interval *n = l->l_tree_node;
809
810         if (n == NULL)
811                 return NULL;
812
813         LASSERT(!list_empty(&n->li_group));
814         l->l_tree_node = NULL;
815         list_del_init(&l->l_sl_policy);
816
817         return (list_empty(&n->li_group) ? n : NULL);
818 }
819
820 static inline int lock_mode_to_index(ldlm_mode_t mode)
821 {
822         int index;
823
824         LASSERT(mode != 0);
825         LASSERT(IS_PO2(mode));
826         for (index = -1; mode; index++, mode >>= 1) ;
827         LASSERT(index < LCK_MODE_NUM);
828         return index;
829 }
830
831 void ldlm_extent_add_lock(struct ldlm_resource *res,
832                           struct ldlm_lock *lock)
833 {
834         struct interval_node *found, **root;
835         struct ldlm_interval *node;
836         struct ldlm_extent *extent;
837         int idx;
838
839         LASSERT(lock->l_granted_mode == lock->l_req_mode);
840
841         node = lock->l_tree_node;
842         LASSERT(node != NULL);
843         LASSERT(!interval_is_intree(&node->li_node));
844
845         idx = lock_mode_to_index(lock->l_granted_mode);
846         LASSERT(lock->l_granted_mode == 1 << idx);
847         LASSERT(lock->l_granted_mode == res->lr_itree[idx].lit_mode);
848
849         /* node extent initialize */
850         extent = &lock->l_policy_data.l_extent;
851         interval_set(&node->li_node, extent->start, extent->end);
852
853         root = &res->lr_itree[idx].lit_root;
854         found = interval_insert(&node->li_node, root);
855         if (found) { /* The policy group found. */
856                 struct ldlm_interval *tmp = ldlm_interval_detach(lock);
857                 LASSERT(tmp != NULL);
858                 ldlm_interval_free(tmp);
859                 ldlm_interval_attach(to_ldlm_interval(found), lock);
860         }
861         res->lr_itree[idx].lit_size++;
862
863         /* even though we use interval tree to manage the extent lock, we also
864          * add the locks into grant list, for debug purpose, .. */
865         ldlm_resource_add_lock(res, &res->lr_granted, lock);
866 }
867
868 void ldlm_extent_unlink_lock(struct ldlm_lock *lock)
869 {
870         struct ldlm_resource *res = lock->l_resource;
871         struct ldlm_interval *node = lock->l_tree_node;
872         struct ldlm_interval_tree *tree;
873         int idx;
874
875         if (!node || !interval_is_intree(&node->li_node)) /* duplicate unlink */
876                 return;
877
878         idx = lock_mode_to_index(lock->l_granted_mode);
879         LASSERT(lock->l_granted_mode == 1 << idx);
880         tree = &res->lr_itree[idx];
881
882         LASSERT(tree->lit_root != NULL); /* assure the tree is not null */
883
884         tree->lit_size--;
885         node = ldlm_interval_detach(lock);
886         if (node) {
887                 interval_erase(&node->li_node, &tree->lit_root);
888                 ldlm_interval_free(node);
889         }
890 }