From 6db79314fb82d64c090547124e65a2f89b6a5539 Mon Sep 17 00:00:00 2001 From: alex Date: Thu, 22 Oct 2009 18:32:06 +0000 Subject: [PATCH] - update from HEAD --- lustre/ChangeLog | 11 + lustre/include/cl_object.h | 45 ++- lustre/include/interval_tree.h | 12 +- lustre/include/lustre_dlm.h | 10 +- lustre/include/lustre_net.h | 3 +- lustre/include/lustre_quota.h | 31 +- lustre/include/obd.h | 21 ++ lustre/ldlm/interval_tree.c | 84 ++++- lustre/ldlm/ldlm_extent.c | 757 ++++++++++++++++++++--------------------- lustre/liblustre/file.c | 15 +- lustre/lov/lov_cl_internal.h | 4 +- lustre/lov/lov_lock.c | 55 +-- lustre/lov/lovsub_lock.c | 7 +- lustre/lvfs/fsfilt_ext3.c | 4 +- lustre/lvfs/lustre_quota_fmt.c | 2 +- lustre/mdc/mdc_reint.c | 14 +- lustre/mdc/mdc_request.c | 87 +++-- lustre/mdd/mdd_lov.c | 10 +- lustre/mdt/mdt_handler.c | 17 +- lustre/mgc/mgc_request.c | 5 + lustre/obdclass/cl_lock.c | 239 +++++++++---- lustre/obdclass/cl_object.c | 2 +- lustre/osc/osc_lock.c | 2 +- lustre/ost/ost_handler.c | 4 +- lustre/ptlrpc/client.c | 5 +- lustre/quota/quota_context.c | 6 +- lustre/quota/quota_interface.c | 7 + lustre/tests/ost-pools.sh | 25 -- lustre/tests/replay-single.sh | 4 +- lustre/tests/sanity-quota.sh | 7 +- lustre/tests/sanity.sh | 69 ++-- lustre/tests/test-framework.sh | 45 ++- 32 files changed, 945 insertions(+), 664 deletions(-) diff --git a/lustre/ChangeLog b/lustre/ChangeLog index d2fb8b8..f873fe3 100644 --- a/lustre/ChangeLog +++ b/lustre/ChangeLog @@ -15,6 +15,17 @@ tbd Sun Microsystems, Inc. * File join has been disabled in this release, refer to Bugzilla 16929. Severity : enhancement +Bugzilla : 19325 +Description: Adjust locks' extents on their first enqueue, so that at the time + they get granted, there is no need for another pass through the + queues since they are already shaped into the proper forms. + +Severity : normal +Bugzilla : 20302 +Description: Fix in ptlrpc_expire_one_request() to print the signed time + difference. + +Severity : enhancement Bugzilla : 16312 Description: Build kernels (RHEL5, OEL5 and SLES10/11) using the vendor's own kernel spec file. diff --git a/lustre/include/cl_object.h b/lustre/include/cl_object.h index 9fd2d88..7536f80 100644 --- a/lustre/include/cl_object.h +++ b/lustre/include/cl_object.h @@ -1337,15 +1337,15 @@ const char *cl_lock_mode_name(const enum cl_lock_mode mode); * | | V * | | HELD<---------+ * | | | | - * | | | | + * | | | | cl_use_try() * | | cl_unuse_try() | | * | | | | - * | | V | cached - * | +------------>UNLOCKING (*) | lock found - * | | | - * | cl_unuse_try() | | + * | | V ---+ + * | +------------>INTRANSIT (D) <--+ * | | | + * | cl_unuse_try() | | cached lock found * | | | cl_use_try() + * | | | * | V | * +------------------CACHED---------+ * | @@ -1364,6 +1364,8 @@ const char *cl_lock_mode_name(const enum cl_lock_mode mode); * * (C) is the point where Cancellation call-back is invoked. * + * (D) is the transit state which means the lock is changing. + * * Transition to FREEING state is possible from any other state in the * diagram in case of unrecoverable error. * @@ -1382,9 +1384,6 @@ const char *cl_lock_mode_name(const enum cl_lock_mode mode); * handled, and is in ENQUEUED state after enqueue to S2 has been sent (note * that in this case, sub-locks move from state to state, and top-lock remains * in the same state). - * - * Separate UNLOCKING state is needed to maintain an invariant that in HELD - * state lock is immediately ready for use. */ enum cl_lock_state { /** @@ -1406,10 +1405,16 @@ enum cl_lock_state { */ CLS_HELD, /** - * Lock is in the transition from CLS_HELD to CLS_CACHED. Lock is in - * this state only while cl_unuse() is executing against it. + * This state is used to mark the lock is being used, or unused. + * We need this state because the lock may have several sublocks, + * so it's impossible to have an atomic way to bring all sublocks + * into CLS_HELD state at use case, or all sublocks to CLS_CACHED + * at unuse case. + * If a thread is referring to a lock, and it sees the lock is in this + * state, it must wait for the lock. + * See state diagram for details. */ - CLS_UNLOCKING, + CLS_INTRANSIT, /** * Lock granted, not used. */ @@ -1430,9 +1435,7 @@ enum cl_lock_flags { /** cancellation is pending for this lock. */ CLF_CANCELPEND = 1 << 1, /** destruction is pending for this lock. */ - CLF_DOOMED = 1 << 2, - /** State update is pending. */ - CLF_STATE = 1 << 3 + CLF_DOOMED = 1 << 2 }; /** @@ -1530,6 +1533,10 @@ struct cl_lock { cfs_task_t *cll_guarder; int cll_depth; + /** + * the owner for INTRANSIT state + */ + cfs_task_t *cll_intransit_owner; int cll_error; /** * Number of holds on a lock. A hold prevents a lock from being @@ -2779,6 +2786,14 @@ int cl_lock_user_del (const struct lu_env *env, struct cl_lock *lock); int cl_lock_compatible(const struct cl_lock *lock1, const struct cl_lock *lock2); +enum cl_lock_state cl_lock_intransit(const struct lu_env *env, + struct cl_lock *lock); + +void cl_lock_extransit(const struct lu_env *env, struct cl_lock *lock, + enum cl_lock_state state); + +int cl_lock_is_intransit(struct cl_lock *lock); + /** \name statemachine statemachine * Interface to lock state machine consists of 3 parts: * @@ -2819,7 +2834,7 @@ int cl_enqueue_try(const struct lu_env *env, struct cl_lock *lock, struct cl_io *io, __u32 flags); int cl_unuse_try (const struct lu_env *env, struct cl_lock *lock); int cl_wait_try (const struct lu_env *env, struct cl_lock *lock); -int cl_use_try (const struct lu_env *env, struct cl_lock *lock); +int cl_use_try (const struct lu_env *env, struct cl_lock *lock, int atomic); /** @} statemachine */ void cl_lock_signal (const struct lu_env *env, struct cl_lock *lock); diff --git a/lustre/include/interval_tree.h b/lustre/include/interval_tree.h index b50278b..5958790 100644 --- a/lustre/include/interval_tree.h +++ b/lustre/include/interval_tree.h @@ -92,7 +92,7 @@ static inline void interval_set(struct interval_node *node, * - the callback returns INTERVAL_ITER_STOP when it thinks the iteration * should be stopped. It will then cause the iteration function to return * immediately with return value INTERVAL_ITER_STOP. - * - callbacks for interval_iterate and interval_iterate_reverse: Every + * - callbacks for interval_iterate and interval_iterate_reverse: Every * nodes in the tree will be set to @node before the callback being called * - callback for interval_search: Only overlapped node will be set to @node * before the callback being called. @@ -109,17 +109,21 @@ void interval_erase(struct interval_node *node, struct interval_node **root); enum interval_iter interval_search(struct interval_node *root, struct interval_node_extent *ex, interval_callback_t func, void *data); +enum interval_iter interval_search_expand_extent(struct interval_node *root, + struct interval_node_extent *ex, + struct interval_node_extent *result_ext, + interval_callback_t func, void *data); /* Iterate every node in the tree - by reverse order or regular order. */ -enum interval_iter interval_iterate(struct interval_node *root, +enum interval_iter interval_iterate(struct interval_node *root, interval_callback_t func, void *data); enum interval_iter interval_iterate_reverse(struct interval_node *root, interval_callback_t func,void *data); -void interval_expand(struct interval_node *root, +void interval_expand(struct interval_node *root, struct interval_node_extent *ext, struct interval_node_extent *limiter); -int interval_is_overlapped(struct interval_node *root, +int interval_is_overlapped(struct interval_node *root, struct interval_node_extent *ex); struct interval_node *interval_find(struct interval_node *root, struct interval_node_extent *ex); diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index 042458e..5c6ea4b 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -189,7 +189,7 @@ typedef enum { /* Flags sent in AST lock_flags to be mapped into the receiving lock. */ #define LDLM_AST_FLAGS (LDLM_FL_DISCARD_DATA) -/* +/* * -------------------------------------------------------------------------- * NOTE! Starting from this point, that is, LDLM_FL_* flags with values above * 0x80000000 will not be sent over the wire. @@ -617,6 +617,10 @@ struct ldlm_lock { struct lustre_handle l_remote_handle; ldlm_policy_data_t l_policy_data; + /* traffic index indicating how busy the resource will be, if it is + * high, the lock's granted region will not be so big lest it conflicts + * other locks, causing frequent lock cancellation and re-enqueue */ + int l_traffic; /* * Protected by lr_lock. Various counters: readers, writers, etc. @@ -640,8 +644,8 @@ struct ldlm_lock { */ cfs_waitq_t l_waitq; - /** - * Seconds. it will be updated if there is any activity related to + /** + * Seconds. it will be updated if there is any activity related to * the lock, e.g. enqueue the lock or send block AST. */ cfs_time_t l_last_activity; diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index 5f5e840..52587e2 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -371,7 +371,8 @@ struct ptlrpc_request { rq_packed_final:1, /* packed final reply */ rq_hp:1, /* high priority RPC */ rq_at_linked:1, /* link into service's srv_at_array */ - rq_reply_truncate:1; + rq_reply_truncate:1, + rq_committed:1; enum rq_phase rq_phase; /* one of RQ_PHASE_* */ enum rq_phase rq_next_phase; /* one of RQ_PHASE_* to be used next */ diff --git a/lustre/include/lustre_quota.h b/lustre/include/lustre_quota.h index 3222ca3..5f92a78 100644 --- a/lustre/include/lustre_quota.h +++ b/lustre/include/lustre_quota.h @@ -381,33 +381,20 @@ static inline void lqs_getref(struct lustre_qunit_size *lqs) __lqs_getref(lqs); } -static inline void __lqs_putref(struct lustre_qunit_size *lqs, int del) +static inline void __lqs_putref(struct lustre_qunit_size *lqs) { - int count = atomic_read(&lqs->lqs_refcount); - - LASSERT(count > 0); - if (count == 1) { - CDEBUG(D_QUOTA, "lqs=%p refcount to be 0\n", lqs); - if (del) { - /* killing last ref, let's let hash table kill it */ - lustre_hash_del(lqs->lqs_ctxt->lqc_lqs_hash, - &lqs->lqs_key, &lqs->lqs_hash); - OBD_FREE_PTR(lqs); - } else { - atomic_dec(&lqs->lqs_refcount); - } - } else { - count = atomic_dec_return(&lqs->lqs_refcount); - if (count == 1) - if (atomic_dec_and_test(&lqs->lqs_ctxt->lqc_lqs)) - cfs_waitq_signal(&lqs->lqs_ctxt->lqc_lqs_waitq); - CDEBUG(D_QUOTA, "lqs=%p refcount %d\n", lqs, count); - } + LASSERT(atomic_read(&lqs->lqs_refcount) > 0); + + if (atomic_dec_return(&lqs->lqs_refcount) == 1) + if (atomic_dec_and_test(&lqs->lqs_ctxt->lqc_lqs)) + cfs_waitq_signal(&lqs->lqs_ctxt->lqc_lqs_waitq); + CDEBUG(D_QUOTA, "lqs=%p refcount %d\n", + lqs, atomic_read(&lqs->lqs_refcount)); } static inline void lqs_putref(struct lustre_qunit_size *lqs) { - __lqs_putref(lqs, 1); + __lqs_putref(lqs); } static inline void lqs_initref(struct lustre_qunit_size *lqs) diff --git a/lustre/include/obd.h b/lustre/include/obd.h index e084a10..a4e405d 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -1495,6 +1495,7 @@ struct md_open_data { struct obd_client_handle *mod_och; struct ptlrpc_request *mod_open_req; struct ptlrpc_request *mod_close_req; + atomic_t mod_refcount; }; struct lookup_intent; @@ -1682,4 +1683,24 @@ static inline struct lustre_capa *oinfo_capa(struct obd_info *oinfo) return oinfo->oi_capa; } +static inline struct md_open_data *obd_mod_alloc(void) +{ + struct md_open_data *mod; + OBD_ALLOC_PTR(mod); + if (mod == NULL) + return NULL; + atomic_set(&mod->mod_refcount, 1); + return mod; +} + +#define obd_mod_get(mod) atomic_inc(&(mod)->mod_refcount) +#define obd_mod_put(mod) \ +({ \ + if (atomic_dec_and_test(&(mod)->mod_refcount)) { \ + if ((mod)->mod_open_req) \ + ptlrpc_req_finished((mod)->mod_open_req); \ + OBD_FREE_PTR(mod); \ + } \ +}) + #endif /* __OBD_H */ diff --git a/lustre/ldlm/interval_tree.c b/lustre/ldlm/interval_tree.c index 60dcbeb..0b69afc 100644 --- a/lustre/ldlm/interval_tree.c +++ b/lustre/ldlm/interval_tree.c @@ -101,7 +101,7 @@ static inline int extent_equal(struct interval_node_extent *e1, return (e1->start == e2->start) && (e1->end == e2->end); } -static inline int extent_overlapped(struct interval_node_extent *e1, +static inline int extent_overlapped(struct interval_node_extent *e1, struct interval_node_extent *e2) { return (e1->start <= e2->end) && (e2->start <= e1->end); @@ -195,7 +195,7 @@ enum interval_iter interval_iterate(struct interval_node *root, struct interval_node *node; enum interval_iter rc = INTERVAL_ITER_CONT; ENTRY; - + interval_for_each(node, root) { rc = func(node, data); if (rc == INTERVAL_ITER_STOP) @@ -213,7 +213,7 @@ enum interval_iter interval_iterate_reverse(struct interval_node *root, struct interval_node *node; enum interval_iter rc = INTERVAL_ITER_CONT; ENTRY; - + interval_for_each_reverse(node, root) { rc = func(node, data); if (rc == INTERVAL_ITER_STOP) @@ -322,10 +322,10 @@ static void __rotate_right(struct interval_node *node, } while (0) /* - * Operations INSERT and DELETE, when run on a tree with n keys, - * take O(logN) time.Because they modify the tree, the result - * may violate the red-black properties.To restore these properties, - * we must change the colors of some of the nodes in the tree + * Operations INSERT and DELETE, when run on a tree with n keys, + * take O(logN) time.Because they modify the tree, the result + * may violate the red-black properties.To restore these properties, + * we must change the colors of some of the nodes in the tree * and also change the pointer structure. */ static void interval_insert_color(struct interval_node *node, @@ -384,7 +384,7 @@ static void interval_insert_color(struct interval_node *node, struct interval_node *interval_insert(struct interval_node *node, struct interval_node **root) - + { struct interval_node **p, *parent = NULL; ENTRY; @@ -402,7 +402,7 @@ struct interval_node *interval_insert(struct interval_node *node, if (node_compare(node, parent) < 0) p = &parent->in_left; - else + else p = &parent->in_right; } @@ -499,8 +499,8 @@ static void interval_erase_color(struct interval_node *node, EXIT; } -/* - * if the @max_high value of @node is changed, this function traverse a path +/* + * if the @max_high value of @node is changed, this function traverse a path * from node up to the root to update max_high for the whole tree. */ static void update_maxhigh(struct interval_node *node, @@ -656,13 +656,13 @@ enum interval_iter interval_search(struct interval_node *node, node = node->in_right; continue; } - } + } parent = node->in_parent; while (parent) { if (node_is_left_child(node) && parent->in_right) { - /* If we ever got the left, it means that the + /* If we ever got the left, it means that the * parent met ext->endend < interval_low(node)) { + if (result_ext->end > interval_low(node) - 1) + result_ext->end = interval_low(node) - 1; + if (node->in_left) { + node = node->in_left; + continue; + } + } else if (ext->start > node->in_max_high) { + if (result_ext->start < node->in_max_high + 1) + result_ext->start = node->in_max_high + 1; + } else { + if (extent_overlapped(ext, &node->in_extent)) { + rc = func(node, data); + if (rc == INTERVAL_ITER_STOP) + break; + } + + if (node->in_left) { + node = node->in_left; + continue; + } + if (node->in_right) { + node = node->in_right; + continue; + } + } + + parent = node->in_parent; + while (parent) { + if (node_is_left_child(node) && parent->in_right) { + node = parent->in_right; + break; + } + node = parent; + parent = node->in_parent; + } + if (parent == NULL) + break; + } + return rc; +} + static enum interval_iter interval_overlap_cb(struct interval_node *n, void *args) { @@ -723,7 +777,7 @@ EXPORT_SYMBOL(interval_is_overlapped); * return res; * } * - * It's much easy to eliminate the recursion, see interval_search for + * It's much easy to eliminate the recursion, see interval_search for * an example. -jay */ static inline __u64 interval_expand_low(struct interval_node *root, __u64 low) @@ -741,7 +795,7 @@ static inline __u64 interval_expand_high(struct interval_node *node, __u64 high) while (node != NULL) { if (node->in_max_high < high) break; - + if (interval_low(node) > high) { result = interval_low(node) - 1; node = node->in_left; diff --git a/lustre/ldlm/ldlm_extent.c b/lustre/ldlm/ldlm_extent.c index 03172d6..44442f2 100644 --- a/lustre/ldlm/ldlm_extent.c +++ b/lustre/ldlm/ldlm_extent.c @@ -100,186 +100,6 @@ static void ldlm_extent_internal_policy_fixup(struct ldlm_lock *req, mask, new_ex->end, req_end); } -/* The purpose of this function is to return: - * - the maximum extent - * - containing the requested extent - * - and not overlapping existing conflicting extents outside the requested one - * - * Use interval tree to expand the lock extent for granted lock. - */ -static void ldlm_extent_internal_policy_granted(struct ldlm_lock *req, - struct ldlm_extent *new_ex) -{ - struct ldlm_resource *res = req->l_resource; - ldlm_mode_t req_mode = req->l_req_mode; - __u64 req_start = req->l_req_extent.start; - __u64 req_end = req->l_req_extent.end; - struct ldlm_interval_tree *tree; - struct interval_node_extent limiter = { new_ex->start, new_ex->end }; - int conflicting = 0; - int idx; - ENTRY; - - lockmode_verify(req_mode); - - /* using interval tree to handle the ldlm extent granted locks */ - for (idx = 0; idx < LCK_MODE_NUM; idx++) { - struct interval_node_extent ext = { req_start, req_end }; - - tree = &res->lr_itree[idx]; - if (lockmode_compat(tree->lit_mode, req_mode)) - continue; - - conflicting += tree->lit_size; - if (conflicting > 4) - limiter.start = req_start; - - if (interval_is_overlapped(tree->lit_root, &ext)) - CDEBUG(D_INFO, - "req_mode = %d, tree->lit_mode = %d, " - "tree->lit_size = %d\n", - req_mode, tree->lit_mode, tree->lit_size); - interval_expand(tree->lit_root, &ext, &limiter); - limiter.start = max(limiter.start, ext.start); - limiter.end = min(limiter.end, ext.end); - if (limiter.start == req_start && limiter.end == req_end) - break; - } - - new_ex->start = limiter.start; - new_ex->end = limiter.end; - LASSERT(new_ex->start <= req_start); - LASSERT(new_ex->end >= req_end); - - ldlm_extent_internal_policy_fixup(req, new_ex, conflicting); - EXIT; -} - -/* The purpose of this function is to return: - * - the maximum extent - * - containing the requested extent - * - and not overlapping existing conflicting extents outside the requested one - */ -static void -ldlm_extent_internal_policy_waiting(struct ldlm_lock *req, - struct ldlm_extent *new_ex) -{ - struct list_head *tmp; - struct ldlm_resource *res = req->l_resource; - ldlm_mode_t req_mode = req->l_req_mode; - __u64 req_start = req->l_req_extent.start; - __u64 req_end = req->l_req_extent.end; - int conflicting = 0; - ENTRY; - - lockmode_verify(req_mode); - - /* for waiting locks */ - list_for_each(tmp, &res->lr_waiting) { - struct ldlm_lock *lock; - struct ldlm_extent *l_extent; - - lock = list_entry(tmp, struct ldlm_lock, l_res_link); - l_extent = &lock->l_policy_data.l_extent; - - /* We already hit the minimum requested size, search no more */ - if (new_ex->start == req_start && new_ex->end == req_end) { - EXIT; - return; - } - - /* Don't conflict with ourselves */ - if (req == lock) - continue; - - /* Locks are compatible, overlap doesn't matter */ - /* Until bug 20 is fixed, try to avoid granting overlapping - * locks on one client (they take a long time to cancel) */ - if (lockmode_compat(lock->l_req_mode, req_mode) && - lock->l_export != req->l_export) - continue; - - /* If this is a high-traffic lock, don't grow downwards at all - * or grow upwards too much */ - ++conflicting; - if (conflicting > 4) - new_ex->start = req_start; - - /* If lock doesn't overlap new_ex, skip it. */ - if (!ldlm_extent_overlap(l_extent, new_ex)) - continue; - - /* Locks conflicting in requested extents and we can't satisfy - * both locks, so ignore it. Either we will ping-pong this - * extent (we would regardless of what extent we granted) or - * lock is unused and it shouldn't limit our extent growth. */ - if (ldlm_extent_overlap(&lock->l_req_extent,&req->l_req_extent)) - continue; - - /* We grow extents downwards only as far as they don't overlap - * with already-granted locks, on the assumption that clients - * will be writing beyond the initial requested end and would - * then need to enqueue a new lock beyond previous request. - * l_req_extent->end strictly < req_start, checked above. */ - if (l_extent->start < req_start && new_ex->start != req_start) { - if (l_extent->end >= req_start) - new_ex->start = req_start; - else - new_ex->start = min(l_extent->end+1, req_start); - } - - /* If we need to cancel this lock anyways because our request - * overlaps the granted lock, we grow up to its requested - * extent start instead of limiting this extent, assuming that - * clients are writing forwards and the lock had over grown - * its extent downwards before we enqueued our request. */ - if (l_extent->end > req_end) { - if (l_extent->start <= req_end) - new_ex->end = max(lock->l_req_extent.start - 1, - req_end); - else - new_ex->end = max(l_extent->start - 1, req_end); - } - } - - ldlm_extent_internal_policy_fixup(req, new_ex, conflicting); - EXIT; -} - - -/* In order to determine the largest possible extent we can grant, we need - * to scan all of the queues. */ -static void ldlm_extent_policy(struct ldlm_resource *res, - struct ldlm_lock *lock, int *flags) -{ - struct ldlm_extent new_ex = { .start = 0, .end = OBD_OBJECT_EOF }; - - if (lock->l_export == NULL) - /* - * this is local lock taken by server (e.g., as a part of - * OST-side locking, or unlink handling). Expansion doesn't - * make a lot of sense for local locks, because they are - * dropped immediately on operation completion and would only - * conflict with other threads. - */ - return; - - if (lock->l_policy_data.l_extent.start == 0 && - lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF) - /* fast-path whole file locks */ - return; - - ldlm_extent_internal_policy_granted(lock, &new_ex); - ldlm_extent_internal_policy_waiting(lock, &new_ex); - - if (new_ex.start != lock->l_policy_data.l_extent.start || - new_ex.end != lock->l_policy_data.l_extent.end) { - *flags |= LDLM_FL_LOCK_CHANGED; - lock->l_policy_data.l_extent.start = new_ex.start; - lock->l_policy_data.l_extent.end = new_ex.end; - } -} - static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks) { struct ldlm_resource *res = lock->l_resource; @@ -301,6 +121,7 @@ struct ldlm_extent_compat_args { ldlm_mode_t mode; int *locks; int *compat; + int *conflicts; }; static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n, @@ -324,6 +145,11 @@ static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n, ldlm_lockname[mode], ldlm_lockname[lock->l_granted_mode]); count++; + /* only count _requested_ region overlapped locks as contended + * locks */ + if (lock->l_req_extent.end >= enq->l_req_extent.start && + lock->l_req_extent.start <= enq->l_req_extent.end) + (*priv->conflicts)++; if (lock->l_blocking_ast) ldlm_add_ast_work_item(lock, enq, work_list); } @@ -340,260 +166,379 @@ static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n, RETURN(INTERVAL_ITER_CONT); } -/* Determine if the lock is compatible with all locks on the queue. - * We stop walking the queue if we hit ourselves so we don't take - * conflicting locks enqueued after us into accound, or we'd wait forever. - * - * 0 if the lock is not compatible - * 1 if the lock is compatible - * 2 if this group lock is compatible and requires no further checking - * negative error, such as EWOULDBLOCK for group locks - */ static int -ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req, - int *flags, ldlm_error_t *err, - struct list_head *work_list, int *contended_locks) +ldlm_extent_compat_granted_queue(struct list_head *queue, struct ldlm_lock *req, + int *flags, ldlm_error_t *err, + struct list_head *work_list, + int *contended_locks) { - struct list_head *tmp; - struct ldlm_lock *lock; struct ldlm_resource *res = req->l_resource; ldlm_mode_t req_mode = req->l_req_mode; __u64 req_start = req->l_req_extent.start; __u64 req_end = req->l_req_extent.end; - int compat = 1; - int scan = 0; - int check_contention; + int compat = 1, conflicts; + /* Using interval tree for granted lock */ + struct ldlm_interval_tree *tree; + struct ldlm_extent_compat_args data = {.work_list = work_list, + .lock = req, + .locks = contended_locks, + .compat = &compat, + .conflicts = &conflicts }; + struct interval_node_extent ex = { .start = req_start, + .end = req_end }; + int idx, rc; ENTRY; - lockmode_verify(req_mode); + for (idx = 0; idx < LCK_MODE_NUM; idx++) { + conflicts = 0; + tree = &res->lr_itree[idx]; + if (tree->lit_root == NULL) /* empty tree, skipped */ + continue; - /* Using interval tree for granted lock */ - if (queue == &res->lr_granted) { - struct ldlm_interval_tree *tree; - struct ldlm_extent_compat_args data = {.work_list = work_list, - .lock = req, - .locks = contended_locks, - .compat = &compat }; - struct interval_node_extent ex = { .start = req_start, - .end = req_end }; - int idx, rc; - - for (idx = 0; idx < LCK_MODE_NUM; idx++) { - tree = &res->lr_itree[idx]; - if (tree->lit_root == NULL) /* empty tree, skipped */ + data.mode = tree->lit_mode; + if (lockmode_compat(req_mode, tree->lit_mode)) { + struct ldlm_interval *node; + struct ldlm_extent *extent; + + if (req_mode != LCK_GROUP) continue; - data.mode = tree->lit_mode; - if (lockmode_compat(req_mode, tree->lit_mode)) { - struct ldlm_interval *node; - struct ldlm_extent *extent; - - if (req_mode != LCK_GROUP) - continue; - - /* group lock, grant it immediately if - * compatible */ - node = to_ldlm_interval(tree->lit_root); - extent = ldlm_interval_extent(node); - if (req->l_policy_data.l_extent.gid == - extent->gid) - RETURN(2); + /* group lock, grant it immediately if + * compatible */ + node = to_ldlm_interval(tree->lit_root); + extent = ldlm_interval_extent(node); + if (req->l_policy_data.l_extent.gid == + extent->gid) + RETURN(2); + } + + if (tree->lit_mode == LCK_GROUP) { + if (*flags & LDLM_FL_BLOCK_NOWAIT) { + compat = -EWOULDBLOCK; + goto destroylock; } - if (tree->lit_mode == LCK_GROUP) { - if (*flags & LDLM_FL_BLOCK_NOWAIT) { - compat = -EWOULDBLOCK; - goto destroylock; - } + *flags |= LDLM_FL_NO_TIMEOUT; + if (!work_list) + RETURN(0); - *flags |= LDLM_FL_NO_TIMEOUT; - if (!work_list) - RETURN(0); - /* if work list is not NULL,add all - locks in the tree to work list */ - compat = 0; - interval_iterate(tree->lit_root, - ldlm_extent_compat_cb, &data); - continue; - } + /* if work list is not NULL,add all + locks in the tree to work list */ + compat = 0; + interval_iterate(tree->lit_root, + ldlm_extent_compat_cb, &data); + continue; + } - if (!work_list) { - rc = interval_is_overlapped(tree->lit_root,&ex); - if (rc) - RETURN(0); - } else { - interval_search(tree->lit_root, &ex, - ldlm_extent_compat_cb, &data); - if (!list_empty(work_list) && compat) - compat = 0; + if (!work_list) { + rc = interval_is_overlapped(tree->lit_root, &ex); + if (rc) + RETURN(0); + } else { + struct interval_node_extent result_ext = { + .start = req->l_policy_data.l_extent.start, + .end = req->l_policy_data.l_extent.end }; + + interval_search_expand_extent(tree->lit_root, &ex, + &result_ext, + ldlm_extent_compat_cb, + &data); + req->l_policy_data.l_extent.start = result_ext.start; + req->l_policy_data.l_extent.end = result_ext.end; + /* for granted locks, count non-compatible not overlapping + * locks in traffic index */ + req->l_traffic += tree->lit_size - conflicts; + + if (!list_empty(work_list)) { + compat = 0; + /* if there is at least 1 conflicting lock, we + * do not expand to the left, since we often + * continue writing to the right. + */ + req->l_policy_data.l_extent.start = req_start; } } - } else { /* for waiting queue */ - list_for_each(tmp, queue) { - check_contention = 1; + } - lock = list_entry(tmp, struct ldlm_lock, l_res_link); + RETURN(compat); +destroylock: + list_del_init(&req->l_res_link); + ldlm_lock_destroy_nolock(req); + *err = compat; + RETURN(compat); +} - if (req == lock) - break; +static int +ldlm_extent_compat_waiting_queue(struct list_head *queue, struct ldlm_lock *req, + int *flags, ldlm_error_t *err, + struct list_head *work_list, + int *contended_locks) +{ + struct list_head *tmp; + struct ldlm_lock *lock; + ldlm_mode_t req_mode = req->l_req_mode; + __u64 req_start = req->l_req_extent.start; + __u64 req_end = req->l_req_extent.end; + int compat = 1; + int scan = 0; + int check_contention; + ENTRY; - if (unlikely(scan)) { - /* We only get here if we are queuing GROUP lock - and met some incompatible one. The main idea of this - code is to insert GROUP lock past compatible GROUP - lock in the waiting queue or if there is not any, - then in front of first non-GROUP lock */ - if (lock->l_req_mode != LCK_GROUP) { - /* Ok, we hit non-GROUP lock, there should - * be no more GROUP locks later on, queue in - * front of first non-GROUP lock */ - - ldlm_resource_insert_lock_after(lock, req); - list_del_init(&lock->l_res_link); - ldlm_resource_insert_lock_after(req, lock); - compat = 0; - break; - } - if (req->l_policy_data.l_extent.gid == - lock->l_policy_data.l_extent.gid) { - /* found it */ - ldlm_resource_insert_lock_after(lock, req); - compat = 0; - break; - } - continue; - } + list_for_each(tmp, queue) { + check_contention = 1; - /* locks are compatible, overlap doesn't matter */ - if (lockmode_compat(lock->l_req_mode, req_mode)) { - if (req_mode == LCK_PR && - ((lock->l_policy_data.l_extent.start <= - req->l_policy_data.l_extent.start) && - (lock->l_policy_data.l_extent.end >= - req->l_policy_data.l_extent.end))) { - /* If we met a PR lock just like us or wider, - and nobody down the list conflicted with - it, that means we can skip processing of - the rest of the list and safely place - ourselves at the end of the list, or grant - (dependent if we met an conflicting locks - before in the list). - In case of 1st enqueue only we continue - traversing if there is something conflicting - down the list because we need to make sure - that something is marked as AST_SENT as well, - in cse of empy worklist we would exit on - first conflict met. */ - /* There IS a case where such flag is - not set for a lock, yet it blocks - something. Luckily for us this is - only during destroy, so lock is - exclusive. So here we are safe */ - if (!(lock->l_flags & LDLM_FL_AST_SENT)) { - RETURN(compat); - } - } + lock = list_entry(tmp, struct ldlm_lock, l_res_link); - /* non-group locks are compatible, overlap doesn't - matter */ - if (likely(req_mode != LCK_GROUP)) - continue; - - /* If we are trying to get a GROUP lock and there is - another one of this kind, we need to compare gid */ - if (req->l_policy_data.l_extent.gid == - lock->l_policy_data.l_extent.gid) { - /* If existing lock with matched gid is granted, - we grant new one too. */ - if (lock->l_req_mode == lock->l_granted_mode) - RETURN(2); - - /* Otherwise we are scanning queue of waiting - * locks and it means current request would - * block along with existing lock (that is - * already blocked. - * If we are in nonblocking mode - return - * immediately */ - if (*flags & LDLM_FL_BLOCK_NOWAIT) { - compat = -EWOULDBLOCK; - goto destroylock; - } - /* If this group lock is compatible with another - * group lock on the waiting list, they must be - * together in the list, so they can be granted - * at the same time. Otherwise the later lock - * can get stuck behind another, incompatible, - * lock. */ - ldlm_resource_insert_lock_after(lock, req); - /* Because 'lock' is not granted, we can stop - * processing this queue and return immediately. - * There is no need to check the rest of the - * list. */ - RETURN(0); - } - } + if (req == lock) + break; - if (unlikely(req_mode == LCK_GROUP && - (lock->l_req_mode != lock->l_granted_mode))) { - scan = 1; + if (unlikely(scan)) { + /* We only get here if we are queuing GROUP lock + and met some incompatible one. The main idea of this + code is to insert GROUP lock past compatible GROUP + lock in the waiting queue or if there is not any, + then in front of first non-GROUP lock */ + if (lock->l_req_mode != LCK_GROUP) { + /* Ok, we hit non-GROUP lock, there should be no + more GROUP locks later on, queue in front of + first non-GROUP lock */ + + ldlm_resource_insert_lock_after(lock, req); + list_del_init(&lock->l_res_link); + ldlm_resource_insert_lock_after(req, lock); compat = 0; - if (lock->l_req_mode != LCK_GROUP) { - /* Ok, we hit non-GROUP lock, there should be no - more GROUP locks later on, queue in front of - first non-GROUP lock */ - - ldlm_resource_insert_lock_after(lock, req); - list_del_init(&lock->l_res_link); - ldlm_resource_insert_lock_after(req, lock); - break; - } - if (req->l_policy_data.l_extent.gid == - lock->l_policy_data.l_extent.gid) { - /* found it */ - ldlm_resource_insert_lock_after(lock, req); - break; + break; + } + if (req->l_policy_data.l_extent.gid == + lock->l_policy_data.l_extent.gid) { + /* found it */ + ldlm_resource_insert_lock_after(lock, req); + compat = 0; + break; + } + continue; + } + + /* locks are compatible, overlap doesn't matter */ + if (lockmode_compat(lock->l_req_mode, req_mode)) { + if (req_mode == LCK_PR && + ((lock->l_policy_data.l_extent.start <= + req->l_policy_data.l_extent.start) && + (lock->l_policy_data.l_extent.end >= + req->l_policy_data.l_extent.end))) { + /* If we met a PR lock just like us or wider, + and nobody down the list conflicted with + it, that means we can skip processing of + the rest of the list and safely place + ourselves at the end of the list, or grant + (dependent if we met an conflicting locks + before in the list). + In case of 1st enqueue only we continue + traversing if there is something conflicting + down the list because we need to make sure + that something is marked as AST_SENT as well, + in cse of empy worklist we would exit on + first conflict met. */ + /* There IS a case where such flag is + not set for a lock, yet it blocks + something. Luckily for us this is + only during destroy, so lock is + exclusive. So here we are safe */ + if (!(lock->l_flags & LDLM_FL_AST_SENT)) { + RETURN(compat); } - continue; } - if (unlikely(lock->l_req_mode == LCK_GROUP)) { - /* If compared lock is GROUP, then requested is PR/PW/ - * so this is not compatible; extent range does not - * matter */ + /* non-group locks are compatible, overlap doesn't + matter */ + if (likely(req_mode != LCK_GROUP)) + continue; + + /* If we are trying to get a GROUP lock and there is + another one of this kind, we need to compare gid */ + if (req->l_policy_data.l_extent.gid == + lock->l_policy_data.l_extent.gid) { + /* We are scanning queue of waiting + * locks and it means current request would + * block along with existing lock (that is + * already blocked. + * If we are in nonblocking mode - return + * immediately */ if (*flags & LDLM_FL_BLOCK_NOWAIT) { compat = -EWOULDBLOCK; goto destroylock; - } else { - *flags |= LDLM_FL_NO_TIMEOUT; } - } else if (lock->l_policy_data.l_extent.end < req_start || - lock->l_policy_data.l_extent.start > req_end) { + /* If this group lock is compatible with another + * group lock on the waiting list, they must be + * together in the list, so they can be granted + * at the same time. Otherwise the later lock + * can get stuck behind another, incompatible, + * lock. */ + ldlm_resource_insert_lock_after(lock, req); + /* Because 'lock' is not granted, we can stop + * processing this queue and return immediately. + * There is no need to check the rest of the + * list. */ + RETURN(0); + } + } + + if (unlikely(req_mode == LCK_GROUP && + (lock->l_req_mode != lock->l_granted_mode))) { + scan = 1; + compat = 0; + if (lock->l_req_mode != LCK_GROUP) { + /* Ok, we hit non-GROUP lock, there should + * be no more GROUP locks later on, queue in + * front of first non-GROUP lock */ + + ldlm_resource_insert_lock_after(lock, req); + list_del_init(&lock->l_res_link); + ldlm_resource_insert_lock_after(req, lock); + break; + } + if (req->l_policy_data.l_extent.gid == + lock->l_policy_data.l_extent.gid) { + /* found it */ + ldlm_resource_insert_lock_after(lock, req); + break; + } + continue; + } + + if (unlikely(lock->l_req_mode == LCK_GROUP)) { + /* If compared lock is GROUP, then requested is PR/PW/ + * so this is not compatible; extent range does not + * matter */ + if (*flags & LDLM_FL_BLOCK_NOWAIT) { + compat = -EWOULDBLOCK; + goto destroylock; + } else { + *flags |= LDLM_FL_NO_TIMEOUT; + } + } else if (!work_list) { + if (lock->l_policy_data.l_extent.end < req_start || + lock->l_policy_data.l_extent.start > req_end) /* if a non group lock doesn't overlap skip it */ continue; - } else if (lock->l_req_extent.end < req_start || - lock->l_req_extent.start > req_end) { - /* false contention, the requests doesn't really overlap */ - check_contention = 0; + RETURN(0); + } else { + /* for waiting locks, count all non-compatible locks in + * traffic index */ + ++req->l_traffic; + ++lock->l_traffic; + + /* adjust policy */ + if (lock->l_policy_data.l_extent.end < req_start) { + /* lock req + * ------------+ + * ++++++ | +++++++ + * + | + + * ++++++ | +++++++ + * ------------+ + */ + if (lock->l_policy_data.l_extent.end > + req->l_policy_data.l_extent.start) + req->l_policy_data.l_extent.start = + lock->l_policy_data.l_extent.end+1; + continue; + } else if (lock->l_req_extent.end < req_start) { + /* lock req + * ------------------+ + * ++++++ +++++++ + * + + | + * ++++++ +++++++ + * ------------------+ + */ + lock->l_policy_data.l_extent.end = + req_start - 1; + req->l_policy_data.l_extent.start = + req_start; + continue; + } else if (lock->l_policy_data.l_extent.start > + req_end) { + /* req lock + * +-------------- + * +++++++ | +++++++ + * + | + + * +++++++ | +++++++ + * +-------------- + */ + if (lock->l_policy_data.l_extent.start < + req->l_policy_data.l_extent.end) + req->l_policy_data.l_extent.end = + lock->l_policy_data.l_extent.start-1; + continue; + } else if (lock->l_req_extent.start > req_end) { + /* req lock + * +---------------------- + * +++++++ +++++++ + * | + + + * +++++++ +++++++ + * +---------------------- + */ + lock->l_policy_data.l_extent.start = + req_end + 1; + req->l_policy_data.l_extent.end=req_end; + continue; } + } /* policy_adj */ - if (!work_list) - RETURN(0); - + compat = 0; + if (work_list) { /* don't count conflicting glimpse locks */ - if (lock->l_req_mode == LCK_PR && - lock->l_policy_data.l_extent.start == 0 && - lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF) + if (lock->l_flags & LDLM_FL_HAS_INTENT) check_contention = 0; *contended_locks += check_contention; - compat = 0; if (lock->l_blocking_ast) ldlm_add_ast_work_item(lock, req, work_list); } } + RETURN(compat); +destroylock: + list_del_init(&req->l_res_link); + ldlm_lock_destroy_nolock(req); + *err = compat; + RETURN(compat); +} + +/* Determine if the lock is compatible with all locks on the queue. + * We stop walking the queue if we hit ourselves so we don't take + * conflicting locks enqueued after us into accound, or we'd wait forever. + * + * 0 if the lock is not compatible + * 1 if the lock is compatible + * 2 if this group lock is compatible and requires no further checking + * negative error, such as EWOULDBLOCK for group locks + * + * Note: policy adjustment only happends during the 1st lock enqueue procedure + */ +static int +ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req, + int *flags, ldlm_error_t *err, + struct list_head *work_list, int *contended_locks) +{ + struct ldlm_resource *res = req->l_resource; + ldlm_mode_t req_mode = req->l_req_mode; + __u64 req_start = req->l_req_extent.start; + __u64 req_end = req->l_req_extent.end; + int compat = 1; + ENTRY; + + lockmode_verify(req_mode); + + if (queue == &res->lr_granted) + compat = ldlm_extent_compat_granted_queue(queue, req, flags, + err, work_list, + contended_locks); + else + compat = ldlm_extent_compat_waiting_queue(queue, req, flags, + err, work_list, + contended_locks); if (ldlm_check_contention(req, *contended_locks) && compat == 0 && (*flags & LDLM_FL_DENY_ON_CONTENTION) && @@ -631,6 +576,24 @@ static void discard_bl_list(struct list_head *bl_list) EXIT; } +static inline void ldlm_process_extent_init(struct ldlm_lock *lock) +{ + lock->l_policy_data.l_extent.start = 0; + lock->l_policy_data.l_extent.end = OBD_OBJECT_EOF; +} + +static inline void ldlm_process_extent_fini(struct ldlm_lock *lock, int *flags) +{ + if (lock->l_traffic > 4) + lock->l_policy_data.l_extent.start = lock->l_req_extent.start; + ldlm_extent_internal_policy_fixup(lock, + &lock->l_policy_data.l_extent, + lock->l_traffic); + if (lock->l_req_extent.start != lock->l_policy_data.l_extent.start || + lock->l_req_extent.end != lock->l_policy_data.l_extent.end) + *flags |= LDLM_FL_LOCK_CHANGED; +} + /* If first_enq is 0 (ie, called from ldlm_reprocess_queue): * - blocking ASTs have already been sent * - must call this function with the ns lock held @@ -672,14 +635,24 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, ldlm_resource_unlink_lock(lock); - if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE)) - ldlm_extent_policy(res, lock, flags); + if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE)) { + lock->l_policy_data.l_extent.start = + lock->l_req_extent.start; + lock->l_policy_data.l_extent.end = + lock->l_req_extent.end; + } else { + ldlm_process_extent_fini(lock, flags); + } + ldlm_grant_lock(lock, work_list); RETURN(LDLM_ITER_CONTINUE); } restart: contended_locks = 0; + + ldlm_process_extent_init(lock); + rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err, &rpc_list, &contended_locks); if (rc < 0) @@ -694,8 +667,8 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, if (rc + rc2 == 2) { grant: - ldlm_extent_policy(res, lock, flags); ldlm_resource_unlink_lock(lock); + ldlm_process_extent_fini(lock, flags); ldlm_grant_lock(lock, NULL); } else { /* If either of the compat_queue()s returned failure, then we @@ -712,7 +685,7 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_FAIL_RACE) && !ns_is_client(res->lr_namespace)) class_fail_export(lock->l_export); - + lock_res(res); if (rc == -ERESTART) { diff --git a/lustre/liblustre/file.c b/lustre/liblustre/file.c index 545a216..b1e65f0 100644 --- a/lustre/liblustre/file.c +++ b/lustre/liblustre/file.c @@ -237,14 +237,9 @@ int llu_iop_open(struct pnode *pnode, int flags, mode_t mode) fd = lli->lli_file_data; lsm = lli->lli_smd; - if (lsm == NULL) { - if (fd->fd_flags & O_LOV_DELAY_CREATE) { - CDEBUG(D_INODE, "object creation was delayed\n"); - GOTO(out_release, rc); - } - } - fd->fd_flags &= ~O_LOV_DELAY_CREATE; - + if (lsm) + flags &= ~O_LOV_DELAY_CREATE; + /*XXX: open_flags are overwritten and the previous ones are lost */ lli->lli_open_flags = flags & ~(O_CREAT | O_EXCL | O_TRUNC); out_release: @@ -397,7 +392,7 @@ int llu_md_close(struct obd_export *md_exp, struct inode *inode) op_data.op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET | ATTR_MTIME_SET | ATTR_CTIME_SET; - if (fd->fd_flags & FMODE_WRITE) { + if (lli->lli_open_flags & FMODE_WRITE) { struct llu_sb_info *sbi = llu_i2sbi(inode); if (!(sbi->ll_lco.lco_flags & OBD_CONNECT_SOM) || !S_ISREG(llu_i2stat(inode)->st_mode)) { @@ -431,7 +426,7 @@ int llu_md_close(struct obd_export *md_exp, struct inode *inode) if (rc == -EAGAIN) { /* We are the last writer, so the MDS has instructed us to get * the file size and any write cookies, then close again. */ - LASSERT(fd->fd_flags & FMODE_WRITE); + LASSERT(lli->lli_open_flags & FMODE_WRITE); rc = llu_sizeonmds_update(inode, &och->och_fh, op_data.op_ioepoch); if (rc) { diff --git a/lustre/lov/lov_cl_internal.h b/lustre/lov/lov_cl_internal.h index 5401f6b..dbc63dc 100644 --- a/lustre/lov/lov_cl_internal.h +++ b/lustre/lov/lov_cl_internal.h @@ -283,9 +283,9 @@ struct lov_lock { unsigned lls_nr_filled; /** * Set when sub-lock was canceled, while top-lock was being - * unlocked. + * used, or unused. */ - int lls_unuse_race; + int lls_cancel_race:1; /** * An array of sub-locks * diff --git a/lustre/lov/lov_lock.c b/lustre/lov/lov_lock.c index e2b1520..bc6ab44 100644 --- a/lustre/lov/lov_lock.c +++ b/lustre/lov/lov_lock.c @@ -49,6 +49,8 @@ static struct cl_lock_closure *lov_closure_get(const struct lu_env *env, struct cl_lock *parent); +static int lov_lock_unuse(const struct lu_env *env, + const struct cl_lock_slice *slice); /***************************************************************************** * * Lov lock operations. @@ -226,6 +228,7 @@ static int lov_sublock_lock(const struct lu_env *env, LASSERT(link != NULL); lov_lock_unlink(env, link, sublock); lov_sublock_unlock(env, sublock, closure, NULL); + lck->lls_cancel_race = 1; result = CLO_REPEAT; } else if (lsep) { struct lov_sublock_env *subenv; @@ -644,7 +647,7 @@ static int lov_lock_unuse(const struct lu_env *env, /* top-lock state cannot change concurrently, because single * thread (one that released the last hold) carries unlocking * to the completion. */ - LASSERT(slice->cls_lock->cll_state == CLS_UNLOCKING); + LASSERT(slice->cls_lock->cll_state == CLS_INTRANSIT); lls = &lck->lls_sub[i]; sub = lls->sub_lock; if (sub == NULL) @@ -653,7 +656,7 @@ static int lov_lock_unuse(const struct lu_env *env, sublock = sub->lss_cl.cls_lock; rc = lov_sublock_lock(env, lck, lls, closure, &subenv); if (rc == 0) { - if (lck->lls_sub[i].sub_flags & LSF_HELD) { + if (lls->sub_flags & LSF_HELD) { LASSERT(sublock->cll_state == CLS_HELD); rc = cl_unuse_try(subenv->lse_env, sublock); if (rc != CLO_WAIT) @@ -666,8 +669,9 @@ static int lov_lock_unuse(const struct lu_env *env, if (result < 0) break; } - if (result == 0 && lck->lls_unuse_race) { - lck->lls_unuse_race = 0; + + if (result == 0 && lck->lls_cancel_race) { + lck->lls_cancel_race = 0; result = -ESTALE; } cl_lock_closure_fini(closure); @@ -721,7 +725,7 @@ static int lov_lock_use(const struct lu_env *env, int result; int i; - LASSERT(slice->cls_lock->cll_state == CLS_CACHED); + LASSERT(slice->cls_lock->cll_state == CLS_INTRANSIT); ENTRY; for (result = 0, i = 0; i < lck->lls_nr; ++i) { @@ -731,37 +735,48 @@ static int lov_lock_use(const struct lu_env *env, struct lov_lock_sub *lls; struct lov_sublock_env *subenv; - if (slice->cls_lock->cll_state != CLS_CACHED) { - /* see comment in lov_lock_enqueue(). */ - LASSERT(i > 0 && result != 0); - break; - } - /* - * if a sub-lock was destroyed while top-lock was in - * CLS_CACHED state, top-lock would have been moved into - * CLS_NEW state, so all sub-locks have to be in place. - */ + LASSERT(slice->cls_lock->cll_state == CLS_INTRANSIT); + lls = &lck->lls_sub[i]; sub = lls->sub_lock; - LASSERT(sub != NULL); + if (sub == NULL) { + /* + * Sub-lock might have been canceled, while top-lock was + * cached. + */ + result = -ESTALE; + break; + } + sublock = sub->lss_cl.cls_lock; rc = lov_sublock_lock(env, lck, lls, closure, &subenv); if (rc == 0) { LASSERT(sublock->cll_state != CLS_FREEING); lov_sublock_hold(env, lck, i); if (sublock->cll_state == CLS_CACHED) { - rc = cl_use_try(subenv->lse_env, sublock); + rc = cl_use_try(subenv->lse_env, sublock, 0); if (rc != 0) rc = lov_sublock_release(env, lck, i, 1, rc); - } else - rc = 0; + } lov_sublock_unlock(env, sub, closure, subenv); } result = lov_subresult(result, rc); if (result != 0) break; } + + if (lck->lls_cancel_race) { + /* + * If there is unlocking happened at the same time, then + * sublock_lock state should be FREEING, and lov_sublock_lock + * should return CLO_REPEAT. In this case, it should return + * ESTALE, and up layer should reset the lock state to be NEW. + */ + lck->lls_cancel_race = 0; + LASSERT(result != 0); + result = -ESTALE; + } cl_lock_closure_fini(closure); RETURN(result); } @@ -984,7 +999,7 @@ static void lov_lock_delete(const struct lu_env *env, sublock = lsl->lss_cl.cls_lock; rc = lov_sublock_lock(env, lck, lls, closure, NULL); if (rc == 0) { - if (lck->lls_sub[i].sub_flags & LSF_HELD) + if (lls->sub_flags & LSF_HELD) lov_sublock_release(env, lck, i, 1, 0); if (sublock->cll_state < CLS_FREEING) { struct lov_lock_link *link; diff --git a/lustre/lov/lovsub_lock.c b/lustre/lov/lovsub_lock.c index c97cb35..e4ff065 100644 --- a/lustre/lov/lovsub_lock.c +++ b/lustre/lov/lovsub_lock.c @@ -344,7 +344,7 @@ static int lovsub_lock_delete_one(const struct lu_env *env, case CLS_FREEING: cl_lock_signal(env, parent); break; - case CLS_UNLOCKING: + case CLS_INTRANSIT: /* * Here lies a problem: a sub-lock is canceled while top-lock * is being unlocked. Top-lock cannot be moved into CLS_NEW @@ -356,13 +356,14 @@ static int lovsub_lock_delete_one(const struct lu_env *env, * to be reused immediately). Nor can we wait for top-lock * state to change, because this can be synchronous to the * current thread. - * + * * We know for sure that lov_lock_unuse() will be called at * least one more time to finish un-using, so leave a mark on * the top-lock, that will be seen by the next call to * lov_lock_unuse(). */ - lov->lls_unuse_race = 1; + if (cl_lock_is_intransit(parent)) + lov->lls_cancel_race = 1; break; case CLS_CACHED: /* diff --git a/lustre/lvfs/fsfilt_ext3.c b/lustre/lvfs/fsfilt_ext3.c index a51213e..35f03a3 100644 --- a/lustre/lvfs/fsfilt_ext3.c +++ b/lustre/lvfs/fsfilt_ext3.c @@ -2089,7 +2089,7 @@ static int fsfilt_ext3_quotainfo(struct lustre_quota_info *lqi, int type, if (lqi->qi_files[type] == NULL) { CERROR("operate qinfo before it's enabled!\n"); - RETURN(-EIO); + RETURN(-ESRCH); } switch (cmd) { @@ -2132,7 +2132,7 @@ static int fsfilt_ext3_dquot(struct lustre_dquot *dquot, int cmd) if (dquot->dq_info->qi_files[dquot->dq_type] == NULL) { CERROR("operate dquot before it's enabled!\n"); - RETURN(-EIO); + RETURN(-ESRCH); } switch (cmd) { diff --git a/lustre/lvfs/lustre_quota_fmt.c b/lustre/lvfs/lustre_quota_fmt.c index 0488c2c..937e496 100644 --- a/lustre/lvfs/lustre_quota_fmt.c +++ b/lustre/lvfs/lustre_quota_fmt.c @@ -843,7 +843,7 @@ int lustre_read_dquot(struct lustre_dquot *dquot) /* Invalidated quota? */ if (!dquot->dq_info || !(filp = dquot->dq_info->qi_files[type])) { CDEBUG(D_ERROR, "VFS: Quota invalidated while reading!\n"); - return -EIO; + return -ESRCH; } version = dquot->dq_info->qi_version; diff --git a/lustre/mdc/mdc_reint.c b/lustre/mdc/mdc_reint.c index 58b97de..ecab6c5 100644 --- a/lustre/mdc/mdc_reint.c +++ b/lustre/mdc/mdc_reint.c @@ -176,7 +176,7 @@ int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data, { LASSERT(*mod == NULL); - OBD_ALLOC_PTR(*mod); + *mod = obd_mod_alloc(); if (*mod == NULL) { DEBUG_REQ(D_ERROR, req, "Can't allocate " "md_open_data"); @@ -185,6 +185,13 @@ int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data, req->rq_cb_data = *mod; (*mod)->mod_open_req = req; req->rq_commit_cb = mdc_commit_open; + /** + * Take an extra reference on \var mod, it protects \var + * mod from being freed on eviction (commit callback is + * called despite rq_replay flag). + * Will be put on mdc_done_writing(). + */ + obd_mod_get(*mod); } } @@ -209,8 +216,11 @@ int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data, rc = 0; } *request = req; - if (rc && req->rq_commit_cb) + if (rc && req->rq_commit_cb) { + /* Put an extra reference on \var mod on error case. */ + obd_mod_put(*mod); req->rq_commit_cb(req); + } RETURN(rc); } diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index c4de6cf..a7a1bb4 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -678,11 +678,26 @@ void mdc_commit_open(struct ptlrpc_request *req) if (mod == NULL) return; - if (mod->mod_och != NULL) - mod->mod_och->och_mod = NULL; + /** + * No need to touch md_open_data::mod_och, it holds a reference on + * \var mod and will zero references to each other, \var mod will be + * freed after that when md_open_data::mod_och will put the reference. + */ - OBD_FREE(mod, sizeof(*mod)); + /** + * Do not let open request to disappear as it still may be needed + * for close rpc to happen (it may happen on evict only, otherwise + * ptlrpc_request::rq_replay does not let mdc_commit_open() to be + * called), just mark this rpc as committed to distinguish these 2 + * cases, see mdc_close() for details. The open request reference will + * be put along with freeing \var mod. + */ + ptlrpc_request_addref(req); + spin_lock(&req->rq_lock); + req->rq_committed = 1; + spin_unlock(&req->rq_lock); req->rq_cb_data = NULL; + obd_mod_put(mod); } int mdc_set_open_replay_data(struct obd_export *exp, @@ -707,13 +722,22 @@ int mdc_set_open_replay_data(struct obd_export *exp, /* Only if the import is replayable, we set replay_open data */ if (och && imp->imp_replayable) { - OBD_ALLOC_PTR(mod); + mod = obd_mod_alloc(); if (mod == NULL) { DEBUG_REQ(D_ERROR, open_req, "Can't allocate md_open_data"); RETURN(0); } + /** + * Take a reference on \var mod, to be freed on mdc_close(). + * It protects \var mod from being freed on eviction (commit + * callback is called despite rq_replay flag). + * Another reference for \var och. + */ + obd_mod_get(mod); + obd_mod_get(mod); + spin_lock(&open_req->rq_lock); och->och_mod = mod; mod->mod_och = och; @@ -743,17 +767,12 @@ int mdc_clear_open_replay_data(struct obd_export *exp, struct md_open_data *mod = och->och_mod; ENTRY; - /* - * Don't free the structure now (it happens in mdc_commit_open(), after - * we're sure we won't need to fix up the close request in the future), - * but make sure that replay doesn't poke at the och, which is about to - * be freed. - */ - LASSERT(mod != LP_POISON); - if (mod != NULL) - mod->mod_och = NULL; + LASSERT(mod != LP_POISON && mod != NULL); + mod->mod_och = NULL; och->och_mod = NULL; + obd_mod_put(mod); + RETURN(0); } @@ -786,10 +805,12 @@ int mdc_close(struct obd_export *exp, struct md_op_data *op_data, /* Ensure that this close's handle is fixed up during replay. */ if (likely(mod != NULL)) { - LASSERTF(mod->mod_open_req->rq_type != LI_POISON, + LASSERTF(mod->mod_open_req != NULL && + mod->mod_open_req->rq_type != LI_POISON, "POISONED open %p!\n", mod->mod_open_req); mod->mod_close_req = req; + DEBUG_REQ(D_HA, mod->mod_open_req, "matched open"); /* We no longer want to preserve this open for replay even * though the open was committed. b=3632, b=3633 */ @@ -837,13 +858,20 @@ int mdc_close(struct obd_export *exp, struct md_op_data *op_data, * server failed before close was sent. Let's check if mod * exists and return no error in that case */ - if (mod && (mod->mod_open_req == NULL)) - rc = 0; + if (mod) { + LASSERT(mod->mod_open_req != NULL); + if (mod->mod_open_req->rq_committed) + rc = 0; + } } - if (rc != 0 && mod) - mod->mod_close_req = NULL; - + if (mod) { + if (rc != 0) + mod->mod_close_req = NULL; + /* Since now, mod is accessed through open_req only, + * thus close req does not keep a reference on mod anymore. */ + obd_mod_put(mod); + } *request = req; RETURN(rc); } @@ -869,7 +897,8 @@ int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data, } if (mod != NULL) { - LASSERTF(mod->mod_open_req->rq_type != LI_POISON, + LASSERTF(mod->mod_open_req != NULL && + mod->mod_open_req->rq_type != LI_POISON, "POISONED setattr %p!\n", mod->mod_open_req); mod->mod_close_req = req; @@ -894,10 +923,20 @@ int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data, * committed and server failed before close was sent. * Let's check if mod exists and return no error in that case */ - if (mod && (mod->mod_open_req == NULL)) - rc = 0; + if (mod) { + LASSERT(mod->mod_open_req != NULL); + if (mod->mod_open_req->rq_committed) + rc = 0; + } } + if (mod) { + if (rc != 0) + mod->mod_close_req = NULL; + /* Since now, mod is accessed through setattr req only, + * thus DW req does not keep a reference on mod anymore. */ + obd_mod_put(mod); + } ptlrpc_req_finished(req); RETURN(rc); } @@ -1437,7 +1476,7 @@ static int mdc_pin(struct obd_export *exp, const struct lu_fid *fid, handle->och_fh = body->handle; handle->och_magic = OBD_CLIENT_HANDLE_MAGIC; - OBD_ALLOC_PTR(handle->och_mod); + handle->och_mod = obd_mod_alloc(); if (handle->och_mod == NULL) { DEBUG_REQ(D_ERROR, req, "can't allocate md_open_data"); GOTO(err_out, rc = -ENOMEM); @@ -1480,7 +1519,7 @@ static int mdc_unpin(struct obd_export *exp, struct obd_client_handle *handle, ptlrpc_req_finished(req); ptlrpc_req_finished(handle->och_mod->mod_open_req); - OBD_FREE(handle->och_mod, sizeof(*handle->och_mod)); + obd_mod_put(handle->och_mod); RETURN(rc); } diff --git a/lustre/mdd/mdd_lov.c b/lustre/mdd/mdd_lov.c index 1b6785b..bccc702 100644 --- a/lustre/mdd/mdd_lov.c +++ b/lustre/mdd/mdd_lov.c @@ -414,9 +414,13 @@ int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd, if (spec->no_create != 0) { *lmm = (struct lov_mds_md *)spec->u.sp_ea.eadata; *lmm_size = spec->u.sp_ea.eadatalen; - LASSERT(*lmm_size == lov_mds_md_size((*lmm)->lmm_stripe_count, - (*lmm)->lmm_magic)); - RETURN(0); + if (*lmm_size == lov_mds_md_size((*lmm)->lmm_stripe_count, + (*lmm)->lmm_magic)) { + RETURN(0); + } else { + CERROR("incorrect lsm received during recovery\n"); + RETURN(-EPROTO); + } } if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO)) diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 67e5d8f..967a4bd 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -1176,6 +1176,7 @@ static int mdt_sendpage(struct mdt_thread_info *info, struct lu_rdpg *rdpg) { struct ptlrpc_request *req = mdt_info_req(info); + struct obd_export *exp = req->rq_export; struct ptlrpc_bulk_desc *desc; struct l_wait_info *lwi = &info->mti_u.rdpg.mti_wait_info; int tmpcount; @@ -1212,8 +1213,10 @@ static int mdt_sendpage(struct mdt_thread_info *info, if (timeout < 0) CERROR("Req deadline already passed %lu (now: %lu)\n", req->rq_deadline, cfs_time_current_sec()); - *lwi = LWI_TIMEOUT(cfs_time_seconds(max(timeout, 1)), NULL, NULL); - rc = l_wait_event(desc->bd_waitq, !ptlrpc_server_bulk_active(desc), lwi); + *lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(max(timeout, 1)), + cfs_time_seconds(1), NULL, NULL); + rc = l_wait_event(desc->bd_waitq, !ptlrpc_server_bulk_active(desc) || + exp->exp_failed || exp->exp_abort_active_req, lwi); LASSERT (rc == 0 || rc == -ETIMEDOUT); if (rc == 0) { @@ -1221,16 +1224,18 @@ static int mdt_sendpage(struct mdt_thread_info *info, desc->bd_nob_transferred == rdpg->rp_count) GOTO(free_desc, rc); - rc = -ETIMEDOUT; /* XXX should this be a different errno? */ + rc = -ETIMEDOUT; + if (exp->exp_abort_active_req || exp->exp_failed) + GOTO(abort_bulk, rc); } DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s", (rc == -ETIMEDOUT) ? "timeout" : "network error", desc->bd_nob_transferred, rdpg->rp_count, - req->rq_export->exp_client_uuid.uuid, - req->rq_export->exp_connection->c_remote_uuid.uuid); + exp->exp_client_uuid.uuid, + exp->exp_connection->c_remote_uuid.uuid); - class_fail_export(req->rq_export); + class_fail_export(exp); EXIT; abort_bulk: diff --git a/lustre/mgc/mgc_request.c b/lustre/mgc/mgc_request.c index b308f36..46ddd8c 100644 --- a/lustre/mgc/mgc_request.c +++ b/lustre/mgc/mgc_request.c @@ -374,6 +374,10 @@ static void do_requeue(struct config_llog_data *cld) { LASSERT(atomic_read(&cld->cld_refcount) > 0); + /* Do not run mgc_process_log on a disconnected export or an + export which is being disconnected. Take the client + semaphore to make the check non-racy. */ + down_read(&cld->cld_mgcexp->exp_obd->u.cli.cl_sem); if (cld->cld_mgcexp->exp_obd->u.cli.cl_conn_count != 0) { CDEBUG(D_MGC, "updating log %s\n", cld->cld_logname); mgc_process_log(cld->cld_mgcexp->exp_obd, cld); @@ -381,6 +385,7 @@ static void do_requeue(struct config_llog_data *cld) CDEBUG(D_MGC, "disconnecting, won't update log %s\n", cld->cld_logname); } + up_read(&cld->cld_mgcexp->exp_obd->u.cli.cl_sem); /* Whether we enqueued again or not in mgc_process_log, we're done * with the ref from the old enqueue */ diff --git a/lustre/obdclass/cl_lock.c b/lustre/obdclass/cl_lock.c index 5316965..5b9ca93 100644 --- a/lustre/obdclass/cl_lock.c +++ b/lustre/obdclass/cl_lock.c @@ -399,6 +399,57 @@ static struct cl_lock *cl_lock_alloc(const struct lu_env *env, } /** + * Transfer the lock into INTRANSIT state and return the original state. + * + * \pre state: CLS_CACHED, CLS_HELD or CLS_ENQUEUED + * \post state: CLS_INTRANSIT + * \see CLS_INTRANSIT + */ +enum cl_lock_state cl_lock_intransit(const struct lu_env *env, + struct cl_lock *lock) +{ + enum cl_lock_state state = lock->cll_state; + + LASSERT(cl_lock_is_mutexed(lock)); + LASSERT(state != CLS_INTRANSIT); + LASSERTF(state >= CLS_ENQUEUED && state <= CLS_CACHED, + "Malformed lock state %d.\n", state); + + cl_lock_state_set(env, lock, CLS_INTRANSIT); + lock->cll_intransit_owner = cfs_current(); + cl_lock_hold_add(env, lock, "intransit", cfs_current()); + return state; +} +EXPORT_SYMBOL(cl_lock_intransit); + +/** + * Exit the intransit state and restore the lock state to the original state + */ +void cl_lock_extransit(const struct lu_env *env, struct cl_lock *lock, + enum cl_lock_state state) +{ + LASSERT(cl_lock_is_mutexed(lock)); + LASSERT(lock->cll_state == CLS_INTRANSIT); + LASSERT(state != CLS_INTRANSIT); + LASSERT(lock->cll_intransit_owner == cfs_current()); + + lock->cll_intransit_owner = NULL; + cl_lock_state_set(env, lock, state); + cl_lock_unhold(env, lock, "intransit", cfs_current()); +} +EXPORT_SYMBOL(cl_lock_extransit); + +/** + * Checking whether the lock is intransit state + */ +int cl_lock_is_intransit(struct cl_lock *lock) +{ + LASSERT(cl_lock_is_mutexed(lock)); + return lock->cll_state == CLS_INTRANSIT && + lock->cll_intransit_owner != cfs_current(); +} +EXPORT_SYMBOL(cl_lock_is_intransit); +/** * Returns true iff lock is "suitable" for given io. E.g., locks acquired by * truncate and O_APPEND cannot be reused for read/non-append-write, as they * cover multiple stripes and can trigger cascading timeouts. @@ -524,6 +575,7 @@ struct cl_lock *cl_lock_peek(const struct lu_env *env, const struct cl_io *io, struct cl_object_header *head; struct cl_object *obj; struct cl_lock *lock; + int ok; obj = need->cld_obj; head = cl_object_header(obj); @@ -532,24 +584,30 @@ struct cl_lock *cl_lock_peek(const struct lu_env *env, const struct cl_io *io, lock = cl_lock_lookup(env, obj, io, need); spin_unlock(&head->coh_lock_guard); - if (lock != NULL) { - int ok; + if (lock == NULL) + return NULL; - cl_lock_mutex_get(env, lock); - if (lock->cll_state == CLS_CACHED) - cl_use_try(env, lock); - ok = lock->cll_state == CLS_HELD; - if (ok) { - cl_lock_hold_add(env, lock, scope, source); - cl_lock_user_add(env, lock); - cl_lock_put(env, lock); - } - cl_lock_mutex_put(env, lock); - if (!ok) { - cl_lock_put(env, lock); - lock = NULL; - } + cl_lock_mutex_get(env, lock); + if (lock->cll_state == CLS_INTRANSIT) + cl_lock_state_wait(env, lock); /* Don't care return value. */ + if (lock->cll_state == CLS_CACHED) { + int result; + result = cl_use_try(env, lock, 1); + if (result < 0) + cl_lock_error(env, lock, result); } + ok = lock->cll_state == CLS_HELD; + if (ok) { + cl_lock_hold_add(env, lock, scope, source); + cl_lock_user_add(env, lock); + cl_lock_put(env, lock); + } + cl_lock_mutex_put(env, lock); + if (!ok) { + cl_lock_put(env, lock); + lock = NULL; + } + return lock; } EXPORT_SYMBOL(cl_lock_peek); @@ -666,7 +724,7 @@ int cl_lock_mutex_try(const struct lu_env *env, struct cl_lock *lock) EXPORT_SYMBOL(cl_lock_mutex_try); /** - * Unlocks cl_lock object. + {* Unlocks cl_lock object. * * \pre cl_lock_is_mutexed(lock) * @@ -885,7 +943,7 @@ int cl_lock_state_wait(const struct lu_env *env, struct cl_lock *lock) LASSERT(lock->cll_state != CLS_FREEING); /* too late to wait */ result = lock->cll_error; - if (result == 0 && !(lock->cll_flags & CLF_STATE)) { + if (result == 0) { cfs_waitlink_init(&waiter); cfs_waitq_add(&lock->cll_wq, &waiter); set_current_state(CFS_TASK_INTERRUPTIBLE); @@ -899,7 +957,6 @@ int cl_lock_state_wait(const struct lu_env *env, struct cl_lock *lock) cfs_waitq_del(&lock->cll_wq, &waiter); result = cfs_signal_pending() ? -EINTR : 0; } - lock->cll_flags &= ~CLF_STATE; RETURN(result); } EXPORT_SYMBOL(cl_lock_state_wait); @@ -916,7 +973,6 @@ static void cl_lock_state_signal(const struct lu_env *env, struct cl_lock *lock, list_for_each_entry(slice, &lock->cll_layers, cls_linkage) if (slice->cls_ops->clo_state != NULL) slice->cls_ops->clo_state(env, slice, state); - lock->cll_flags |= CLF_STATE; cfs_waitq_broadcast(&lock->cll_wq); EXIT; } @@ -955,9 +1011,10 @@ void cl_lock_state_set(const struct lu_env *env, struct cl_lock *lock, LASSERT(lock->cll_state <= state || (lock->cll_state == CLS_CACHED && (state == CLS_HELD || /* lock found in cache */ - state == CLS_NEW /* sub-lock canceled */)) || - /* sub-lock canceled during unlocking */ - (lock->cll_state == CLS_UNLOCKING && state == CLS_NEW)); + state == CLS_NEW || /* sub-lock canceled */ + state == CLS_INTRANSIT)) || + /* lock is in transit state */ + lock->cll_state == CLS_INTRANSIT); if (lock->cll_state != state) { atomic_dec(&site->cs_locks_state[lock->cll_state]); @@ -970,17 +1027,54 @@ void cl_lock_state_set(const struct lu_env *env, struct cl_lock *lock, } EXPORT_SYMBOL(cl_lock_state_set); +static int cl_unuse_try_internal(const struct lu_env *env, struct cl_lock *lock) +{ + const struct cl_lock_slice *slice; + int result; + + do { + result = 0; + + if (lock->cll_error != 0) + break; + + LINVRNT(cl_lock_is_mutexed(lock)); + LINVRNT(cl_lock_invariant(env, lock)); + LASSERT(lock->cll_state == CLS_INTRANSIT); + LASSERT(lock->cll_users > 0); + LASSERT(lock->cll_holds > 0); + + result = -ENOSYS; + list_for_each_entry_reverse(slice, &lock->cll_layers, + cls_linkage) { + if (slice->cls_ops->clo_unuse != NULL) { + result = slice->cls_ops->clo_unuse(env, slice); + if (result != 0) + break; + } + } + LASSERT(result != -ENOSYS); + } while (result == CLO_REPEAT); + + return result ?: lock->cll_error; +} + /** * Yanks lock from the cache (cl_lock_state::CLS_CACHED state) by calling * cl_lock_operations::clo_use() top-to-bottom to notify layers. + * @atomic = 1, it must unuse the lock to recovery the lock to keep the + * use process atomic */ -int cl_use_try(const struct lu_env *env, struct cl_lock *lock) +int cl_use_try(const struct lu_env *env, struct cl_lock *lock, int atomic) { - int result; const struct cl_lock_slice *slice; + int result; + enum cl_lock_state state; ENTRY; result = -ENOSYS; + + state = cl_lock_intransit(env, lock); list_for_each_entry(slice, &lock->cll_layers, cls_linkage) { if (slice->cls_ops->clo_use != NULL) { result = slice->cls_ops->clo_use(env, slice); @@ -989,8 +1083,43 @@ int cl_use_try(const struct lu_env *env, struct cl_lock *lock) } } LASSERT(result != -ENOSYS); - if (result == 0) - cl_lock_state_set(env, lock, CLS_HELD); + + LASSERT(lock->cll_state == CLS_INTRANSIT); + + if (result == 0) { + state = CLS_HELD; + } else { + if (result == -ESTALE) { + /* + * ESTALE means sublock being cancelled + * at this time, and set lock state to + * be NEW here and ask the caller to repeat. + */ + state = CLS_NEW; + result = CLO_REPEAT; + } + + /* @atomic means back-off-on-failure. */ + if (atomic) { + int rc; + + do { + rc = cl_unuse_try_internal(env, lock); + if (rc == 0) + break; + if (rc == CLO_WAIT) + rc = cl_lock_state_wait(env, lock); + if (rc < 0) + break; + } while(1); + + /* Vet the results. */ + if (rc < 0 && result > 0) + result = rc; + } + + } + cl_lock_extransit(env, lock, state); RETURN(result); } EXPORT_SYMBOL(cl_use_try); @@ -1056,14 +1185,13 @@ int cl_enqueue_try(const struct lu_env *env, struct cl_lock *lock, if (result == 0) cl_lock_state_set(env, lock, CLS_ENQUEUED); break; - case CLS_UNLOCKING: - /* wait until unlocking finishes, and enqueue lock - * afresh. */ + case CLS_INTRANSIT: + LASSERT(cl_lock_is_intransit(lock)); result = CLO_WAIT; break; case CLS_CACHED: /* yank lock from the cache. */ - result = cl_use_try(env, lock); + result = cl_use_try(env, lock, 0); break; case CLS_ENQUEUED: case CLS_HELD: @@ -1150,7 +1278,7 @@ EXPORT_SYMBOL(cl_enqueue); * This function is called repeatedly by cl_unuse() until either lock is * unlocked, or error occurs. * - * \pre lock->cll_state <= CLS_HELD || lock->cll_state == CLS_UNLOCKING + * \pre lock->cll_state <= CLS_HELD || cl_lock_is_intransit(lock) * * \post ergo(result == 0, lock->cll_state == CLS_CACHED) * @@ -1159,11 +1287,11 @@ EXPORT_SYMBOL(cl_enqueue); */ int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock) { - const struct cl_lock_slice *slice; int result; + enum cl_lock_state state = CLS_NEW; ENTRY; - if (lock->cll_state != CLS_UNLOCKING) { + if (lock->cll_state != CLS_INTRANSIT) { if (lock->cll_users > 1) { cl_lock_user_del(env, lock); RETURN(0); @@ -1174,31 +1302,11 @@ int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock) * CLS_CACHED, is reinitialized to CLS_NEW or fails into * CLS_FREEING. */ - cl_lock_state_set(env, lock, CLS_UNLOCKING); + state = cl_lock_intransit(env, lock); } - do { - result = 0; - - if (lock->cll_error != 0) - break; - - LINVRNT(cl_lock_is_mutexed(lock)); - LINVRNT(cl_lock_invariant(env, lock)); - LASSERT(lock->cll_state == CLS_UNLOCKING); - LASSERT(lock->cll_users > 0); - LASSERT(lock->cll_holds > 0); - result = -ENOSYS; - list_for_each_entry_reverse(slice, &lock->cll_layers, - cls_linkage) { - if (slice->cls_ops->clo_unuse != NULL) { - result = slice->cls_ops->clo_unuse(env, slice); - if (result != 0) - break; - } - } - LASSERT(result != -ENOSYS); - } while (result == CLO_REPEAT); + result = cl_unuse_try_internal(env, lock); + LASSERT(lock->cll_state == CLS_INTRANSIT); if (result != CLO_WAIT) /* * Once there is no more need to iterate ->clo_unuse() calls, @@ -1208,8 +1316,6 @@ int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock) */ cl_lock_user_del(env, lock); if (result == 0 || result == -ESTALE) { - enum cl_lock_state state; - /* * Return lock back to the cache. This is the only * place where lock is moved into CLS_CACHED state. @@ -1220,7 +1326,7 @@ int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock) * canceled while unlocking was in progress. */ state = result == 0 ? CLS_CACHED : CLS_NEW; - cl_lock_state_set(env, lock, state); + cl_lock_extransit(env, lock, state); /* * Hide -ESTALE error. @@ -1232,7 +1338,11 @@ int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock) * pages won't be written to OSTs. -jay */ result = 0; + } else { + CWARN("result = %d, this is unlikely!\n", result); + cl_lock_extransit(env, lock, state); } + result = result ?: lock->cll_error; if (result < 0) cl_lock_error(env, lock, result); @@ -1292,13 +1402,20 @@ int cl_wait_try(const struct lu_env *env, struct cl_lock *lock) LINVRNT(cl_lock_is_mutexed(lock)); LINVRNT(cl_lock_invariant(env, lock)); LASSERT(lock->cll_state == CLS_ENQUEUED || - lock->cll_state == CLS_HELD); + lock->cll_state == CLS_HELD || + lock->cll_state == CLS_INTRANSIT); LASSERT(lock->cll_users > 0); LASSERT(lock->cll_holds > 0); result = 0; if (lock->cll_error != 0) break; + + if (cl_lock_is_intransit(lock)) { + result = CLO_WAIT; + break; + } + if (lock->cll_state == CLS_HELD) /* nothing to do */ break; diff --git a/lustre/obdclass/cl_object.c b/lustre/obdclass/cl_object.c index 7ca27c6..4838a0a 100644 --- a/lustre/obdclass/cl_object.c +++ b/lustre/obdclass/cl_object.c @@ -480,7 +480,7 @@ int cl_site_stats_print(const struct cl_site *site, char *page, int count) [CLS_QUEUING] = "q", [CLS_ENQUEUED] = "e", [CLS_HELD] = "h", - [CLS_UNLOCKING] = "u", + [CLS_INTRANSIT] = "t", [CLS_CACHED] = "c", [CLS_FREEING] = "f" }; diff --git a/lustre/osc/osc_lock.c b/lustre/osc/osc_lock.c index 2cc82bc..6ca2014 100644 --- a/lustre/osc/osc_lock.c +++ b/lustre/osc/osc_lock.c @@ -1361,7 +1361,7 @@ static int osc_lock_use(const struct lu_env *env, * cl_lock mutex. */ lock = slice->cls_lock; - LASSERT(lock->cll_state == CLS_CACHED); + LASSERT(lock->cll_state == CLS_INTRANSIT); LASSERT(lock->cll_users > 0); /* set a flag for osc_dlm_blocking_ast0() to signal the * lock.*/ diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index 4747e12..2279a30 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -770,7 +770,7 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti) do { long timeoutl = req->rq_deadline - cfs_time_current_sec(); - cfs_duration_t timeout = (timeoutl <= 0 || rc) ? + cfs_duration_t timeout = timeoutl <= 0 ? CFS_TICK : cfs_time_seconds(timeoutl); lwi = LWI_TIMEOUT_INTERVAL(timeout, cfs_time_seconds(1), @@ -1016,7 +1016,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) do { long timeoutl = req->rq_deadline - cfs_time_current_sec(); - cfs_duration_t timeout = (timeoutl <= 0 || rc) ? + cfs_duration_t timeout = timeoutl <= 0 ? CFS_TICK : cfs_time_seconds(timeoutl); lwi = LWI_TIMEOUT_INTERVAL(timeout, cfs_time_seconds(1), ost_bulk_timeout, desc); diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 20b575e..73864f3 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -1520,8 +1520,9 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink) req->rq_timedout = 1; spin_unlock(&req->rq_lock); - DEBUG_REQ(D_WARNING, req, "Request x"LPU64" sent from %s to NID %s " - CFS_DURATION_T"s ago has %s (limit "CFS_DURATION_T"s).\n", + DEBUG_REQ(req->rq_fake ? D_INFO : D_WARNING, req, + "Request x"LPU64" sent from %s to NID %s "CFS_DURATION_T"s " + "ago has %s ("CFS_DURATION_T"s prior to deadline).\n", req->rq_xid, imp ? imp->imp_obd->obd_name : "", imp ? libcfs_nid2str(imp->imp_connection->c_peer.nid) : "", cfs_time_sub(cfs_time_current_sec(), req->rq_sent), diff --git a/lustre/quota/quota_context.c b/lustre/quota/quota_context.c index 440ddf8..9dd70e8 100644 --- a/lustre/quota/quota_context.c +++ b/lustre/quota/quota_context.c @@ -830,6 +830,10 @@ static int dqacq_interpret(const struct lu_env *env, rc = -EPROTO; } + if (unlikely(rc == -ESRCH)) + CERROR("quota for %s has been enabled by master, but disabled " + "by slave.\n", QDATA_IS_GRP(qdata) ? "group" : "user"); + rc = dqacq_completion(obd, qctxt, qdata, rc, lustre_msg_get_opc(req->rq_reqmsg)); @@ -1590,7 +1594,7 @@ lqs_put(struct hlist_node *hnode) hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); ENTRY; - __lqs_putref(q, 0); + __lqs_putref(q); RETURN(q); } diff --git a/lustre/quota/quota_interface.c b/lustre/quota/quota_interface.c index 1a8076b..df2039a 100644 --- a/lustre/quota/quota_interface.c +++ b/lustre/quota/quota_interface.c @@ -461,6 +461,13 @@ static int quota_chk_acq_common(struct obd_device *obd, const unsigned int id[], break; } + /* Related quota has been disabled by master, but enabled by + * slave, do not try again. */ + if (unlikely(rc == -ESRCH)) { + CERROR("mismatched quota configuration, stop try.\n"); + break; + } + /* -EBUSY and others, wait a second and try again */ if (rc < 0) { cfs_waitq_t waitq; diff --git a/lustre/tests/ost-pools.sh b/lustre/tests/ost-pools.sh index 0f2cb62..0d0d99c 100644 --- a/lustre/tests/ost-pools.sh +++ b/lustre/tests/ost-pools.sh @@ -188,31 +188,6 @@ drain_pool() { ||error "Failed to remove targets from pool: $pool" } -destroy_pool_int() { - OSTS=$(do_facet $SINGLEMDS lctl pool_list $1 | \ - awk '$1 !~ /^Pool:/ {print $1}') - for ost in $OSTS - do - do_facet $SINGLEMDS lctl pool_remove $1 $ost - done - do_facet $SINGLEMDS lctl pool_destroy $1 -} - -destroy_pool() { - local RC - - do_facet $SINGLEMDS lctl pool_list $FSNAME.$1 - RC=$? - [[ $RC -ne 0 ]] && return $RC - - destroy_pool_int $FSNAME.$1 - RC=$? - [[ $RC -ne 0 ]] && return $RC - - wait_update $HOSTNAME "lctl get_param -n lov.$FSNAME-*.pools.$1 \ - 2>/dev/null || echo foo" "foo" && return 0 -} - add_pool() { local pool=$1 local osts=$2 diff --git a/lustre/tests/replay-single.sh b/lustre/tests/replay-single.sh index b407cc2..8c8df2d 100755 --- a/lustre/tests/replay-single.sh +++ b/lustre/tests/replay-single.sh @@ -499,9 +499,9 @@ test_20c() { # bug 10480 df -P $DIR || df -P $DIR || true # reconnect kill -USR1 $pid - test -s $DIR/$tfile || error "File was truncated" - wait $pid || return 1 + [ -s $DIR/$tfile ] || error "File was truncated" + return 0 } run_test 20c "check that client eviction does not affect file content" diff --git a/lustre/tests/sanity-quota.sh b/lustre/tests/sanity-quota.sh index 20e0894..2219e4b 100644 --- a/lustre/tests/sanity-quota.sh +++ b/lustre/tests/sanity-quota.sh @@ -52,8 +52,6 @@ DIRECTIO=${DIRECTIO:-$LUSTRE/tests/directio} [ $MDSCOUNT -gt 1 ] && skip "CMD case" && exit 0 -unset ENABLE_QUOTA - remote_mds_nodsh && skip "remote MDS with nodsh" && exit 0 remote_ost_nodsh && skip "remote OST with nodsh" && exit 0 @@ -66,11 +64,15 @@ QUOTALOG=${TESTSUITELOG:-$TMP/$(basename $0 .sh).log} DIR=${DIR:-$MOUNT} DIR2=${DIR2:-$MOUNT2} +QUOTA_AUTO_OLD=$QUOTA_AUTO +export QUOTA_AUTO=0 + check_and_setup_lustre if [ x"$(som_check)" = x"enabled" ]; then echo "Som is enabled, Quota is temporary conflicts with it" check_and_cleanup_lustre + export QUOTA_AUTO=$QUOTA_AUTO_OLD exit 0 fi @@ -2134,5 +2136,6 @@ log "cleanup: ======================================================" cd $ORIG_PWD check_and_cleanup_lustre echo '=========================== finished ===============================' +export QUOTA_AUTO=$QUOTA_AUTO_OLD [ -f "$QUOTALOG" ] && cat $QUOTALOG && grep -q FAIL $QUOTALOG && exit 1 || true echo "$0: completed" diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index 69ef6a9..d8c6cb5 100644 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -5319,8 +5319,8 @@ run_test 121 "read cancel race =========" test_123a() { # was test 123, statahead(bug 11401) SLOWOK=0 if [ -z "$(grep "processor.*: 1" /proc/cpuinfo)" ]; then - log "testing on UP system. Performance may be not as good as expected." - SLOWOK=1 + log "testing on UP system. Performance may be not as good as expected." + SLOWOK=1 fi rm -rf $DIR/$tdir @@ -5352,49 +5352,32 @@ test_123a() { # was test 123, statahead(bug 11401) etime=`date +%s` delta_sa=$((etime - stime)) log "ls $i files with statahead: $delta_sa sec" - lctl get_param -n llite.*.statahead_stats + lctl get_param -n llite.*.statahead_stats ewrong=`lctl get_param -n llite.*.statahead_stats | grep "statahead wrong:" | awk '{print $3}'` - if [ $swrong -lt $ewrong ]; then - log "statahead was stopped, maybe too many locks held!" - fi - + [ $swrong -lt $ewrong ] && log "statahead was stopped, maybe too many locks held!" [ $delta -eq 0 -o $delta_sa -eq 0 ] && continue if [ $((delta_sa * 100)) -gt $((delta * 105)) -a $delta_sa -gt $((delta + 2)) ]; then + max=`lctl get_param -n llite.*.statahead_max | head -n 1` + lctl set_param -n llite.*.statahead_max 0 + lctl get_param llite.*.statahead_max + cancel_lru_locks mdc + cancel_lru_locks osc + stime=`date +%s` + time ls -l $DIR/$tdir | wc -l + etime=`date +%s` + delta=$((etime - stime)) + log "ls $i files again without statahead: $delta sec" + lctl set_param llite.*.statahead_max=$max + if [ $((delta_sa * 100)) -gt $((delta * 105)) -a $delta_sa -gt $((delta + 2)) ]; then if [ $SLOWOK -eq 0 ]; then error "ls $i files is slower with statahead!" - debugsave - - lctl set_param debug=-1 - max=`lctl get_param -n llite.*.statahead_max | head -n 1` - lctl set_param -n llite.*.statahead_max 0 - lctl get_param llite.*.statahead_max - cancel_lru_locks mdc - cancel_lru_locks osc - $LCTL clear - stime=`date +%s` - time ls -l $DIR/$tdir | wc -l - etime=`date +%s` - error "ls $i files (again) without statahead: $((etime - stime)) sec" - - lctl set_param debug=-1 - lctl set_param llite.*.statahead_max=$max - lctl get_param -n llite.*.statahead_max | grep '[0-9]' - cancel_lru_locks mdc - cancel_lru_locks osc - $LCTL clear - stime=`date +%s` - time ls -l $DIR/$tdir | wc -l - etime=`date +%s` - error "ls $i files (again) with statahead: $((etime - stime)) sec" - lctl get_param -n llite.*.statahead_stats - - debugrestore else log "ls $i files is slower with statahead!" fi break + fi fi [ $delta -gt 20 ] && break @@ -6743,11 +6726,20 @@ check_file_in_pool() return 0 } +cleanup_200 () { + trap 0 + destroy_pool $POOL +} + test_200a() { remote_mgs_nodsh && skip "remote MGS with nodsh" && return do_facet mgs $LCTL pool_new $FSNAME.$POOL - # get param should return err until pool is created - wait_update $HOSTNAME "lctl get_param -n lov.$FSNAME-*.pools.$POOL 2>/dev/null || echo foo" "" || error "Pool creation of $POOL failed" + + trap cleanup_200 EXIT + CLEANUP_200=yes + + # get param should return err until pool is created + wait_update $HOSTNAME "lctl get_param -n lov.$FSNAME-*.pools.$POOL 2>/dev/null || echo foo" "" || error "Pool creation of $POOL failed" [ $($LFS pool_list $FSNAME | grep -c $POOL) -eq 1 ] || error "$POOL not in lfs pool_list" } run_test 200a "Create new pool ==========================================" @@ -6847,11 +6839,14 @@ test_201c() { # was 200i remote_mgs_nodsh && skip "remote MGS with nodsh" && return do_facet mgs $LCTL pool_destroy $FSNAME.$POOL # get param should return err once pool is gone - wait_update $HOSTNAME "lctl get_param -n lov.$FSNAME-*.pools.$POOL 2>/dev/null || echo foo" "foo" && return 0 + wait_update $HOSTNAME "lctl get_param -n lov.$FSNAME-*.pools.$POOL 2>/dev/null || + echo foo" "foo" && unset CLEANUP_200 && trap 0 && return 0 error "Pool $FSNAME.$POOL is not destroyed" } run_test 201c "Remove a pool ============================================" +[ "$CLEANUP_200" ] && cleanup_200 + test_212() { size=`date +%s` size=$((size % 8192 + 1)) diff --git a/lustre/tests/test-framework.sh b/lustre/tests/test-framework.sh index 0f09652..4c871dd 100644 --- a/lustre/tests/test-framework.sh +++ b/lustre/tests/test-framework.sh @@ -15,6 +15,7 @@ export GSS=false export GSS_KRB5=false export GSS_PIPEFS=false export IDENTITY_UPCALL=default +export QUOTA_AUTO=1 #export PDSH="pdsh -S -Rssh -w" @@ -600,9 +601,7 @@ restore_quota_type () { setup_quota(){ local mntpt=$1 - # We need: - # 1. run quotacheck only if quota is off - # 2. save the original quota_type params, restore them after testing + # We need save the original quota_type params, and restore them after testing # Suppose that quota type the same on mds and ost local quota_type=$(quota_type | grep MDT | cut -d "=" -f2) @@ -611,6 +610,9 @@ setup_quota(){ if [ "$quota_type" != "$QUOTA_TYPE" ]; then export old_QUOTA_TYPE=$quota_type quota_save_version $QUOTA_TYPE + else + qtype=$(tr -c -d "ug" <<< $QUOTA_TYPE) + $LFS quotacheck -$qtype $mntpt || error "quotacheck has failed for $type" fi local quota_usrs=$QUOTA_USERS @@ -1907,9 +1909,16 @@ init_param_vars () { if [ x"$(som_check)" = x"enabled" ]; then ENABLE_QUOTA="" + echo "disable quota temporary when SOM enabled" fi - if [ "$ENABLE_QUOTA" ]; then - setup_quota $MOUNT || return 2 + if [ $QUOTA_AUTO -ne 0 ]; then + if [ "$ENABLE_QUOTA" ]; then + echo "enable quota as required" + setup_quota $MOUNT || return 2 + else + echo "disable quota as required" + $LFS quotaoff -ug $MOUNT > /dev/null 2>&1 + fi fi } @@ -3213,6 +3222,32 @@ oos_full() { return $OSCFULL } + +destroy_pool_int() { + local ost + local OSTS=$(do_facet $SINGLEMDS lctl pool_list $1 | \ + awk '$1 !~ /^Pool:/ {print $1}') + for ost in $OSTS; do + do_facet mgs lctl pool_remove $1 $ost + done + do_facet mgs lctl pool_destroy $1 +} + +destroy_pool() { + local RC + + do_facet $SINGLEMDS lctl pool_list $FSNAME.$1 + RC=$? + [[ $RC -ne 0 ]] && return $RC + + destroy_pool_int $FSNAME.$1 + RC=$? + [[ $RC -ne 0 ]] && return $RC + + wait_update $HOSTNAME "lctl get_param -n lov.$FSNAME-*.pools.$1 \ + 2>/dev/null || echo foo" "foo" && return 0 +} + gather_logs () { local list=$1 -- 1.8.3.1