X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fldlm%2Fldlm_extent.c;h=f1f88cefbd298e2456437d251c849191d9666534;hp=1880c4352dad157e65d89255eae5c651fffaca56;hb=022b102258cd85314f4fa1fb8322638cc79f4634;hpb=5621b9f70e62da7b98d556de8cae39acd07f9e97 diff --git a/lustre/ldlm/ldlm_extent.c b/lustre/ldlm/ldlm_extent.c index 1880c43..f1f88ce 100644 --- a/lustre/ldlm/ldlm_extent.c +++ b/lustre/ldlm/ldlm_extent.c @@ -35,26 +35,126 @@ #include "ldlm_internal.h" +#define LDLM_MAX_GROWN_EXTENT (32 * 1024 * 1024 - 1) + +/* fixup the ldlm_extent after expanding */ +static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req, + struct ldlm_extent *new_ex, + int conflicting) +{ + ldlm_mode_t req_mode = req->l_req_mode; + __u64 req_start = req->l_req_extent.start; + __u64 req_end = req->l_req_extent.end; + __u64 req_align, mask; + + if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) { + if (req_end < req_start + LDLM_MAX_GROWN_EXTENT) + new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT, + new_ex->end); + } + + if (new_ex->start == 0 && new_ex->end == OBD_OBJECT_EOF) { + EXIT; + return; + } + + /* we need to ensure that the lock extent is properly aligned to what + * the client requested. We align it to the lowest-common denominator + * of the clients requested lock start and end alignment. */ + mask = 0x1000ULL; + req_align = (req_end + 1) | req_start; + if (req_align != 0) { + while ((req_align & mask) == 0) + mask <<= 1; + } + mask -= 1; + /* We can only shrink the lock, not grow it. + * This should never cause lock to be smaller than requested, + * since requested lock was already aligned on these boundaries. */ + new_ex->start = ((new_ex->start - 1) | mask) + 1; + new_ex->end = ((new_ex->end + 1) & ~mask) - 1; + LASSERTF(new_ex->start <= req_start, + "mask "LPX64" grant start "LPU64" req start "LPU64"\n", + mask, new_ex->start, req_start); + LASSERTF(new_ex->end >= req_end, + "mask "LPX64" grant end "LPU64" req end "LPU64"\n", + mask, new_ex->end, req_end); +} + +/* The purpose of this function is to return: + * - the maximum extent + * - containing the requested extent + * - and not overlapping existing conflicting extents outside the requested one + * + * Use interval tree to expand the lock extent for granted lock. + */ +static void ldlm_extent_internal_policy_granted(struct ldlm_lock *req, + struct ldlm_extent *new_ex) +{ + struct ldlm_resource *res = req->l_resource; + ldlm_mode_t req_mode = req->l_req_mode; + __u64 req_start = req->l_req_extent.start; + __u64 req_end = req->l_req_extent.end; + struct ldlm_interval_tree *tree; + struct interval_node_extent limiter = { new_ex->start, new_ex->end }; + int conflicting = 0; + int idx; + ENTRY; + + lockmode_verify(req_mode); + + /* using interval tree to handle the ldlm extent granted locks */ + for (idx = 0; idx < LCK_MODE_NUM; idx++) { + struct interval_node_extent ext = { req_start, req_end }; + + tree = &res->lr_itree[idx]; + if (lockmode_compat(tree->lit_mode, req_mode)) + continue; + + conflicting += tree->lit_size; + if (conflicting > 4) + limiter.start = req_start; + + if (interval_is_overlapped(tree->lit_root, &ext)) + printk("req_mode = %d, tree->lit_mode = %d, tree->lit_size = %d\n", + req_mode, tree->lit_mode, tree->lit_size); + interval_expand(tree->lit_root, &ext, &limiter); + limiter.start = max(limiter.start, ext.start); + limiter.end = min(limiter.end, ext.end); + if (limiter.start == req_start && limiter.end == req_end) + break; + } + + new_ex->start = limiter.start; + new_ex->end = limiter.end; + LASSERT(new_ex->start <= req_start); + LASSERT(new_ex->end >= req_end); + + ldlm_extent_internal_policy_fixup(req, new_ex, conflicting); + EXIT; +} + /* The purpose of this function is to return: * - the maximum extent * - containing the requested extent * - and not overlapping existing conflicting extents outside the requested one */ static void -ldlm_extent_internal_policy(struct list_head *queue, struct ldlm_lock *req, - struct ldlm_extent *new_ex) +ldlm_extent_internal_policy_waiting(struct ldlm_lock *req, + struct ldlm_extent *new_ex) { struct list_head *tmp; + struct ldlm_resource *res = req->l_resource; ldlm_mode_t req_mode = req->l_req_mode; __u64 req_start = req->l_req_extent.start; __u64 req_end = req->l_req_extent.end; - __u64 req_align, mask; int conflicting = 0; ENTRY; lockmode_verify(req_mode); - list_for_each(tmp, queue) { + /* for waiting locks */ + list_for_each(tmp, &res->lr_waiting) { struct ldlm_lock *lock; struct ldlm_extent *l_extent; @@ -85,16 +185,14 @@ ldlm_extent_internal_policy(struct list_head *queue, struct ldlm_lock *req, new_ex->start = req_start; /* If lock doesn't overlap new_ex, skip it. */ - if (l_extent->end < new_ex->start || - l_extent->start > new_ex->end) + if (!ldlm_extent_overlap(l_extent, new_ex)) continue; /* Locks conflicting in requested extents and we can't satisfy * both locks, so ignore it. Either we will ping-pong this * extent (we would regardless of what extent we granted) or * lock is unused and it shouldn't limit our extent growth. */ - if (lock->l_req_extent.end >= req_start && - lock->l_req_extent.start <= req_end) + if (ldlm_extent_overlap(&lock->l_req_extent,&req->l_req_extent)) continue; /* We grow extents downwards only as far as they don't overlap @@ -123,43 +221,11 @@ ldlm_extent_internal_policy(struct list_head *queue, struct ldlm_lock *req, } } -#define LDLM_MAX_GROWN_EXTENT (32 * 1024 * 1024 - 1) - if (conflicting > 32 && (req_mode == LCK_PW || req_mode == LCK_CW)) { - if (req_end < req_start + LDLM_MAX_GROWN_EXTENT) - new_ex->end = min(req_start + LDLM_MAX_GROWN_EXTENT, - new_ex->end); - } - - if (new_ex->start == 0 && new_ex->end == OBD_OBJECT_EOF) { - EXIT; - return; - } - - /* we need to ensure that the lock extent is properly aligned to what - * the client requested. We align it to the lowest-common denominator - * of the clients requested lock start and end alignment. */ - mask = 0x1000ULL; - req_align = (req_end + 1) | req_start; - if (req_align != 0) { - while ((req_align & mask) == 0) - mask <<= 1; - } - mask -= 1; - /* We can only shrink the lock, not grow it. - * This should never cause lock to be smaller than requested, - * since requested lock was already aligned on these boundaries. */ - new_ex->start = ((new_ex->start - 1) | mask) + 1; - new_ex->end = ((new_ex->end + 1) & ~mask) - 1; - LASSERTF(new_ex->start <= req_start, - "mask "LPX64" grant start "LPU64" req start "LPU64"\n", - mask, new_ex->start, req_start); - LASSERTF(new_ex->end >= req_end, - "mask "LPX64" grant end "LPU64" req end "LPU64"\n", - mask, new_ex->end, req_end); - + ldlm_extent_internal_policy_fixup(req, new_ex, conflicting); EXIT; } + /* In order to determine the largest possible extent we can grant, we need * to scan all of the queues. */ static void ldlm_extent_policy(struct ldlm_resource *res, @@ -182,8 +248,8 @@ static void ldlm_extent_policy(struct ldlm_resource *res, /* fast-path whole file locks */ return; - ldlm_extent_internal_policy(&res->lr_granted, lock, &new_ex); - ldlm_extent_internal_policy(&res->lr_waiting, lock, &new_ex); + ldlm_extent_internal_policy_granted(lock, &new_ex); + ldlm_extent_internal_policy_waiting(lock, &new_ex); if (new_ex.start != lock->l_policy_data.l_extent.start || new_ex.end != lock->l_policy_data.l_extent.end) { @@ -193,6 +259,42 @@ static void ldlm_extent_policy(struct ldlm_resource *res, } } +struct ldlm_extent_compat_args { + struct list_head *work_list; + struct ldlm_lock *lock; + ldlm_mode_t mode; + int *compat; +}; + +static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n, + void *data) +{ + struct ldlm_extent_compat_args *priv = data; + struct ldlm_interval *node = to_ldlm_interval(n); + struct list_head *work_list = priv->work_list; + struct ldlm_lock *lock, *enq = priv->lock; + ldlm_mode_t mode = priv->mode; + ENTRY; + + LASSERT(!list_empty(&node->li_group)); + + list_for_each_entry(lock, &node->li_group, l_sl_policy) { + /* interval tree is for granted lock */ + LASSERTF(mode == lock->l_granted_mode, + "mode = %s, lock->l_granted_mode = %s\n", + ldlm_lockname[mode], + ldlm_lockname[lock->l_granted_mode]); + + if (lock->l_blocking_ast) + ldlm_add_ast_work_item(lock, enq, work_list); + } + + if (priv->compat) + *priv->compat = 0; + + RETURN(INTERVAL_ITER_CONT); +} + /* Determine if the lock is compatible with all locks on the queue. * We stop walking the queue if we hit ourselves so we don't take * conflicting locks enqueued after us into accound, or we'd wait forever. @@ -209,6 +311,7 @@ ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req, { struct list_head *tmp; struct ldlm_lock *lock; + struct ldlm_resource *res = req->l_resource; ldlm_mode_t req_mode = req->l_req_mode; __u64 req_start = req->l_req_extent.start; __u64 req_end = req->l_req_extent.end; @@ -218,6 +321,71 @@ ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req, lockmode_verify(req_mode); + /* Using interval tree for granted lock */ + if (queue == &res->lr_granted) { + struct ldlm_interval_tree *tree; + struct ldlm_extent_compat_args data = {.work_list = work_list, + .lock = req, + .compat = &compat }; + struct interval_node_extent ex = { .start = req_start, + .end = req_end }; + int idx, rc; + + for (idx = 0; idx < LCK_MODE_NUM; idx++) { + tree = &res->lr_itree[idx]; + if (tree->lit_root == NULL) /* empty tree, skipped */ + continue; + + data.mode = tree->lit_mode; + if (lockmode_compat(req_mode, tree->lit_mode)) { + struct ldlm_interval *node; + struct ldlm_extent *extent; + + if (req_mode != LCK_GROUP) + continue; + + /* group lock, grant it immediately if + * compatible */ + node = to_ldlm_interval(tree->lit_root); + extent = ldlm_interval_extent(node); + if (req->l_policy_data.l_extent.gid == + extent->gid) + RETURN(2); + } + + if (tree->lit_mode == LCK_GROUP) { + if (*flags & LDLM_FL_BLOCK_NOWAIT) { + compat = -EWOULDBLOCK; + goto destroylock; + } + + *flags |= LDLM_FL_NO_TIMEOUT; + if (!work_list) + RETURN(0); + + /* if work list is not NULL,add all + locks in the tree to work list */ + compat = 0; + interval_iterate(tree->lit_root, + ldlm_extent_compat_cb, &data); + continue; + } + + if (!work_list) { + rc = interval_is_overlapped(tree->lit_root,&ex); + if (rc) + RETURN(0); + } else { + interval_search(tree->lit_root, &ex, + ldlm_extent_compat_cb, &data); + if (!list_empty(work_list) && compat) + compat = 0; + } + } + RETURN(compat); + } + + /* for waiting queue */ list_for_each(tmp, queue) { lock = list_entry(tmp, struct ldlm_lock, l_res_link); @@ -444,8 +612,24 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, unlock_res(res); rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_BL_AST); lock_res(res); - if (rc == -ERESTART) + + if (rc == -ERESTART) { + /* lock was granted while resource was unlocked. */ + if (lock->l_granted_mode == lock->l_req_mode) { + /* bug 11300: if the lock has been granted, + * break earlier because otherwise, we will go + * to restart and ldlm_resource_unlink will be + * called and it causes the interval node to be + * freed. Then we will fail at + * ldlm_extent_add_lock() */ + *flags &= ~(LDLM_FL_BLOCK_GRANTED | LDLM_FL_BLOCK_CONV | + LDLM_FL_BLOCK_WAIT); + GOTO(out, rc = 0); + } + GOTO(restart, -ERESTART); + } + *flags |= LDLM_FL_BLOCK_GRANTED; /* this way we force client to wait for the lock * endlessly once the lock is enqueued -bzzz */ @@ -493,3 +677,124 @@ __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms) RETURN(kms); } + +cfs_mem_cache_t *ldlm_interval_slab; +struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock) +{ + struct ldlm_interval *node; + ENTRY; + + LASSERT(lock->l_resource->lr_type == LDLM_EXTENT); + OBD_SLAB_ALLOC(node, ldlm_interval_slab, CFS_ALLOC_IO, sizeof(*node)); + if (node == NULL) + RETURN(NULL); + + CFS_INIT_LIST_HEAD(&node->li_group); + ldlm_interval_attach(node, lock); + RETURN(node); +} + +void ldlm_interval_free(struct ldlm_interval *node) +{ + if (node) { + LASSERT(list_empty(&node->li_group)); + OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node)); + } +} + +/* interval tree, for LDLM_EXTENT. */ +void ldlm_interval_attach(struct ldlm_interval *n, + struct ldlm_lock *l) +{ + LASSERT(l->l_tree_node == NULL); + LASSERT(l->l_resource->lr_type == LDLM_EXTENT); + + list_add_tail(&l->l_sl_policy, &n->li_group); + l->l_tree_node = n; +} + +struct ldlm_interval *ldlm_interval_detach(struct ldlm_lock *l) +{ + struct ldlm_interval *n = l->l_tree_node; + + if (n == NULL) + return NULL; + + LASSERT(!list_empty(&n->li_group)); + l->l_tree_node = NULL; + list_del_init(&l->l_sl_policy); + + return (list_empty(&n->li_group) ? n : NULL); +} + +static inline int lock_mode_to_index(ldlm_mode_t mode) +{ + int index; + + LASSERT(mode != 0); + LASSERT(IS_PO2(mode)); + for (index = -1; mode; index++, mode >>= 1) ; + LASSERT(index < LCK_MODE_NUM); + return index; +} + +void ldlm_extent_add_lock(struct ldlm_resource *res, + struct ldlm_lock *lock) +{ + struct interval_node *found, **root; + struct ldlm_interval *node; + struct ldlm_extent *extent; + int idx; + + LASSERT(lock->l_granted_mode == lock->l_req_mode); + + node = lock->l_tree_node; + LASSERT(node != NULL); + + idx = lock_mode_to_index(lock->l_granted_mode); + LASSERT(lock->l_granted_mode == 1 << idx); + LASSERT(lock->l_granted_mode == res->lr_itree[idx].lit_mode); + + /* node extent initialize */ + extent = &lock->l_policy_data.l_extent; + interval_set(&node->li_node, extent->start, extent->end); + + root = &res->lr_itree[idx].lit_root; + found = interval_insert(&node->li_node, root); + if (found) { /* The policy group found. */ + struct ldlm_interval *tmp = ldlm_interval_detach(lock); + LASSERT(tmp != NULL); + ldlm_interval_free(tmp); + ldlm_interval_attach(to_ldlm_interval(found), lock); + } + res->lr_itree[idx].lit_size++; + + /* even though we use interval tree to manage the extent lock, we also + * add the locks into grant list, for debug purpose, .. */ + ldlm_resource_add_lock(res, &res->lr_granted, lock); +} + +void ldlm_extent_unlink_lock(struct ldlm_lock *lock) +{ + struct ldlm_resource *res = lock->l_resource; + struct ldlm_interval *node; + struct ldlm_interval_tree *tree; + int idx; + + if (lock->l_granted_mode != lock->l_req_mode) + return; + + LASSERT(lock->l_tree_node != NULL); + idx = lock_mode_to_index(lock->l_granted_mode); + LASSERT(lock->l_granted_mode == 1 << idx); + tree = &res->lr_itree[idx]; + + LASSERT(tree->lit_root != NULL); /* assure the tree is not null */ + + tree->lit_size--; + node = ldlm_interval_detach(lock); + if (node) { + interval_erase(&node->li_node, &tree->lit_root); + ldlm_interval_free(node); + } +}