From 2250e072c37855d611aa64027945981fe2c8f4d7 Mon Sep 17 00:00:00 2001 From: Andriy Skulysh Date: Thu, 6 Jun 2019 15:22:00 +0300 Subject: [PATCH] LU-12017 ldlm: DoM truncate deadlock setxattr takes inode lock and sends reint to MDS. truncate takes MDS_INODELOCK_DOM lock and wants to acquire inode lock. MDS locks are for different bits MDS_INODELOCK_UPDATE|MDS_INODELOCK_XATTR vs MDS_INODELOCK_DOM but they blocks each other if some blocking lock was present earlier. If IBITS waiting lock has no conflicts with any lock in the granted queue or any lock ahead in the waiting queue then it can be granted. Use separate waiting lists for each ibit to eliminate full lr_waiting list scan. Cray-bug-id: LUS-6970 Change-Id: I95b2ed0b1a0063b7ece5277a5ee06e2511d44e5f Signed-off-by: Andriy Skulysh Reviewed-on: https://review.whamcloud.com/35057 Tested-by: jenkins Tested-by: Maloo Reviewed-by: Mike Pershin Reviewed-by: Patrick Farrell Reviewed-by: Oleg Drokin --- lustre/include/lustre_dlm.h | 42 +++++++++++--- lustre/ldlm/ldlm_extent.c | 10 +++- lustre/ldlm/ldlm_flock.c | 2 +- lustre/ldlm/ldlm_inodebits.c | 134 +++++++++++++++++++++++++++++++++++++++++++ lustre/ldlm/ldlm_internal.h | 16 +++++- lustre/ldlm/ldlm_lock.c | 97 ++++++++++++++++++++----------- lustre/ldlm/ldlm_lockd.c | 21 +++++-- lustre/ldlm/ldlm_request.c | 4 +- lustre/ldlm/ldlm_resource.c | 97 +++++++++++++++++++++++-------- lustre/mdt/mdt_open.c | 6 +- lustre/mdt/mdt_reint.c | 2 +- lustre/tests/sanity-dom.sh | 35 +++++++++++ 12 files changed, 387 insertions(+), 79 deletions(-) diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index 6f9bb1e..cb265ae 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -651,6 +651,19 @@ struct ldlm_interval_tree { struct interval_node *lit_root; /* actual ldlm_interval */ }; +/** + * Lists of waiting locks for each inodebit type. + * A lock can be in several liq_waiting lists and it remains in lr_waiting. + */ +struct ldlm_ibits_queues { + struct list_head liq_waiting[MDS_INODELOCK_NUMBITS]; +}; + +struct ldlm_ibits_node { + struct list_head lin_link[MDS_INODELOCK_NUMBITS]; + struct ldlm_lock *lock; +}; + /** Whether to track references to exports by LDLM locks. */ #define LUSTRE_TRACKS_LOCK_EXP_REFS (0) @@ -747,9 +760,12 @@ struct ldlm_lock { */ struct list_head l_res_link; /** - * Tree node for ldlm_extent. + * Internal structures per lock type.. */ - struct ldlm_interval *l_tree_node; + union { + struct ldlm_interval *l_tree_node; + struct ldlm_ibits_node *l_ibits_node; + }; /** * Per export hash of locks. * Protected by per-bucket exp->exp_lock_hash locks. @@ -1005,10 +1021,14 @@ struct ldlm_resource { /** Resource name */ struct ldlm_res_id lr_name; - /** - * Interval trees (only for extent locks) for all modes of this resource - */ - struct ldlm_interval_tree *lr_itree; + union { + /** + * Interval trees (only for extent locks) for all modes of + * this resource + */ + struct ldlm_interval_tree *lr_itree; + struct ldlm_ibits_queues *lr_ibits_queues; + }; union { /** @@ -1274,6 +1294,12 @@ typedef int (*ldlm_processing_policy)(struct ldlm_lock *lock, __u64 *flags, enum ldlm_error *err, struct list_head *work_list); +typedef int (*ldlm_reprocessing_policy)(struct ldlm_resource *res, + struct list_head *queue, + struct list_head *work_list, + enum ldlm_process_intention intention, + struct ldlm_lock *hint); + /** * Return values for lock iterators. * Also used during deciding of lock grants and cancellations. @@ -1369,6 +1395,8 @@ struct ldlm_lock *ldlm_request_lock(struct ptlrpc_request *req); /* ldlm_lock.c */ #ifdef HAVE_SERVER_SUPPORT ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res); +ldlm_reprocessing_policy +ldlm_get_reprocessing_policy(struct ldlm_resource *res); #endif void ldlm_register_intent(struct ldlm_namespace *ns, ldlm_res_policy arg); void ldlm_lock2handle(const struct ldlm_lock *lock, @@ -1520,7 +1548,7 @@ enum ldlm_mode ldlm_revalidate_lock_handle(const struct lustre_handle *lockh, __u64 *bits); void ldlm_lock_mode_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode); void ldlm_lock_cancel(struct ldlm_lock *lock); -void ldlm_reprocess_all(struct ldlm_resource *res); +void ldlm_reprocess_all(struct ldlm_resource *res, struct ldlm_lock *hint); void ldlm_reprocess_recovery_done(struct ldlm_namespace *ns); void ldlm_lock_dump_handle(int level, const struct lustre_handle *lockh); void ldlm_unlink_lock_skiplist(struct ldlm_lock *req); diff --git a/lustre/ldlm/ldlm_extent.c b/lustre/ldlm/ldlm_extent.c index 330e5b7..9e046c9 100644 --- a/lustre/ldlm/ldlm_extent.c +++ b/lustre/ldlm/ldlm_extent.c @@ -965,7 +965,7 @@ __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms) EXPORT_SYMBOL(ldlm_extent_shift_kms); struct kmem_cache *ldlm_interval_slab; -struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock) +static struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock) { struct ldlm_interval *node; ENTRY; @@ -1025,6 +1025,14 @@ static inline int ldlm_mode_to_index(enum ldlm_mode mode) return index; } +int ldlm_extent_alloc_lock(struct ldlm_lock *lock) +{ + lock->l_tree_node = NULL; + if (ldlm_interval_alloc(lock) == NULL) + return -ENOMEM; + return 0; +} + /** Add newly granted lock into interval tree for the resource. */ void ldlm_extent_add_lock(struct ldlm_resource *res, struct ldlm_lock *lock) diff --git a/lustre/ldlm/ldlm_flock.c b/lustre/ldlm/ldlm_flock.c index f848a36..be84993 100644 --- a/lustre/ldlm/ldlm_flock.c +++ b/lustre/ldlm/ldlm_flock.c @@ -581,7 +581,7 @@ reprocess: restart: ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list, - LDLM_PROCESS_RESCAN); + LDLM_PROCESS_RESCAN, NULL); unlock_res_and_lock(req); rc = ldlm_run_ast_work(ns, &rpc_list, diff --git a/lustre/ldlm/ldlm_inodebits.c b/lustre/ldlm/ldlm_inodebits.c index 4d66c20..22daed5 100644 --- a/lustre/ldlm/ldlm_inodebits.c +++ b/lustre/ldlm/ldlm_inodebits.c @@ -59,6 +59,88 @@ #ifdef HAVE_SERVER_SUPPORT /** + * It should iterate through all waiting locks on a given resource queue and + * attempt to grant them. An optimization is to check only heads waitintg + * locks for each inodebit type. + * + * Must be called with resource lock held. + */ +int ldlm_reprocess_inodebits_queue(struct ldlm_resource *res, + struct list_head *queue, + struct list_head *work_list, + enum ldlm_process_intention intention, + struct ldlm_lock *hint) +{ + __u64 flags; + int rc = LDLM_ITER_CONTINUE; + enum ldlm_error err; + struct list_head bl_ast_list = LIST_HEAD_INIT(bl_ast_list); + struct ldlm_ibits_queues *queues = res->lr_ibits_queues; + int i; + + ENTRY; + + check_res_locked(res); + + LASSERT(res->lr_type == LDLM_IBITS); + LASSERT(intention == LDLM_PROCESS_RESCAN || + intention == LDLM_PROCESS_RECOVERY); + + if (intention == LDLM_PROCESS_RECOVERY) + return ldlm_reprocess_queue(res, queue, work_list, intention, + NULL); + +restart: + CDEBUG(D_DLMTRACE, "--- Reprocess resource "DLDLMRES" (%p)\n", + PLDLMRES(res), res); + + for (i = 0; i < MDS_INODELOCK_NUMBITS; i++) { + struct list_head rpc_list = LIST_HEAD_INIT(rpc_list); + struct list_head *head = &queues->liq_waiting[i]; + struct ldlm_lock *pending; + struct ldlm_ibits_node *node; + + if (list_empty(head)) + continue; + if (hint && !(hint->l_policy_data.l_inodebits.bits & (1 << i))) + continue; + + node = list_entry(head->next, struct ldlm_ibits_node, + lin_link[i]); + + pending = node->lock; + LDLM_DEBUG(pending, "Reprocessing lock from queue %d", i); + + flags = 0; + rc = ldlm_process_inodebits_lock(pending, &flags, intention, + &err, &rpc_list); + if (ldlm_is_granted(pending)) { + list_splice(&rpc_list, work_list); + /* Try to grant more locks from current queue */ + i--; + } else { + list_splice(&rpc_list, &bl_ast_list); + } + } + + if (!list_empty(&bl_ast_list)) { + unlock_res(res); + + rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &bl_ast_list, + LDLM_WORK_BL_AST); + + lock_res(res); + if (rc == -ERESTART) + GOTO(restart, rc); + } + + if (!list_empty(&bl_ast_list)) + ldlm_discard_bl_list(&bl_ast_list); + + RETURN(rc); +} + +/** * Determine if the lock is compatible with all locks on the queue. * * If \a work_list is provided, conflicting locks are linked there. @@ -428,3 +510,55 @@ exit: LDLM_DEBUG(lock, "client lock convert END"); return rc; } + + +int ldlm_inodebits_alloc_lock(struct ldlm_lock *lock) +{ + if (ldlm_is_ns_srv(lock)) { + int i; + + OBD_SLAB_ALLOC_PTR(lock->l_ibits_node, ldlm_inodebits_slab); + if (lock->l_ibits_node == NULL) + return -ENOMEM; + for (i = 0; i < MDS_INODELOCK_NUMBITS; i++) + INIT_LIST_HEAD(&lock->l_ibits_node->lin_link[i]); + lock->l_ibits_node->lock = lock; + } else { + lock->l_ibits_node = NULL; + } + return 0; +} + +void ldlm_inodebits_add_lock(struct ldlm_resource *res, struct list_head *head, + struct ldlm_lock *lock) +{ + int i; + + if (!ldlm_is_ns_srv(lock)) + return; + + if (head == &res->lr_waiting) { + for (i = 0; i < MDS_INODELOCK_NUMBITS; i++) { + if (lock->l_policy_data.l_inodebits.bits & (1 << i)) + list_add_tail(&lock->l_ibits_node->lin_link[i], + &res->lr_ibits_queues->liq_waiting[i]); + } + } else if (head == &res->lr_granted && lock->l_ibits_node != NULL) { + for (i = 0; i < MDS_INODELOCK_NUMBITS; i++) + LASSERT(list_empty(&lock->l_ibits_node->lin_link[i])); + OBD_SLAB_FREE_PTR(lock->l_ibits_node, ldlm_inodebits_slab); + lock->l_ibits_node = NULL; + } +} + +void ldlm_inodebits_unlink_lock(struct ldlm_lock *lock) +{ + int i; + + ldlm_unlink_lock_skiplist(lock); + if (!ldlm_is_ns_srv(lock)) + return; + + for (i = 0; i < MDS_INODELOCK_NUMBITS; i++) + list_del_init(&lock->l_ibits_node->lin_link[i]); +} diff --git a/lustre/ldlm/ldlm_internal.h b/lustre/ldlm/ldlm_internal.h index 3fac6fd..0b79fe5 100644 --- a/lustre/ldlm/ldlm_internal.h +++ b/lustre/ldlm/ldlm_internal.h @@ -120,6 +120,7 @@ extern unsigned int ldlm_enqueue_min; /* ldlm_resource.c */ extern struct kmem_cache *ldlm_resource_slab; extern struct kmem_cache *ldlm_lock_slab; +extern struct kmem_cache *ldlm_inodebits_slab; extern struct kmem_cache *ldlm_interval_tree_slab; void ldlm_resource_insert_lock_after(struct ldlm_lock *original, @@ -156,7 +157,8 @@ void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new, #ifdef HAVE_SERVER_SUPPORT int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue, struct list_head *work_list, - enum ldlm_process_intention intention); + enum ldlm_process_intention intention, + struct ldlm_lock *hint); int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags, struct list_head *rpc_list); void ldlm_discard_bl_list(struct list_head *bl_list); @@ -202,14 +204,25 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *flags, enum ldlm_process_intention intention, enum ldlm_error *err, struct list_head *work_list); +int ldlm_reprocess_inodebits_queue(struct ldlm_resource *res, + struct list_head *queue, + struct list_head *work_list, + enum ldlm_process_intention intention, + struct ldlm_lock *hint); /* ldlm_extent.c */ int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags, enum ldlm_process_intention intention, enum ldlm_error *err, struct list_head *work_list); #endif +int ldlm_extent_alloc_lock(struct ldlm_lock *lock); void ldlm_extent_add_lock(struct ldlm_resource *res, struct ldlm_lock *lock); void ldlm_extent_unlink_lock(struct ldlm_lock *lock); +int ldlm_inodebits_alloc_lock(struct ldlm_lock *lock); +void ldlm_inodebits_add_lock(struct ldlm_resource *res, struct list_head *head, + struct ldlm_lock *lock); +void ldlm_inodebits_unlink_lock(struct ldlm_lock *lock); + /* ldlm_flock.c */ int ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags, enum ldlm_process_intention intention, @@ -235,7 +248,6 @@ struct ldlm_state { extern struct kmem_cache *ldlm_interval_slab; /* slab cache for ldlm_interval */ extern void ldlm_interval_attach(struct ldlm_interval *n, struct ldlm_lock *l); extern struct ldlm_interval *ldlm_interval_detach(struct ldlm_lock *l); -extern struct ldlm_interval *ldlm_interval_alloc(struct ldlm_lock *lock); extern void ldlm_interval_free(struct ldlm_interval *node); /* this function must be called with res lock held */ static inline struct ldlm_extent * diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 73d522f..701a772 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -149,6 +149,19 @@ ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res) return ldlm_processing_policy_table[res->lr_type]; } EXPORT_SYMBOL(ldlm_get_processing_policy); + +static ldlm_reprocessing_policy ldlm_reprocessing_policy_table[] = { + [LDLM_PLAIN] = ldlm_reprocess_queue, + [LDLM_EXTENT] = ldlm_reprocess_queue, + [LDLM_FLOCK] = ldlm_reprocess_queue, + [LDLM_IBITS] = ldlm_reprocess_inodebits_queue, +}; + +ldlm_reprocessing_policy ldlm_get_reprocessing_policy(struct ldlm_resource *res) +{ + return ldlm_reprocessing_policy_table[res->lr_type]; +} + #endif /* HAVE_SERVER_SUPPORT */ void ldlm_register_intent(struct ldlm_namespace *ns, ldlm_res_policy arg) @@ -203,8 +216,6 @@ void ldlm_lock_put(struct ldlm_lock *lock) lprocfs_counter_decr(ldlm_res_to_ns(res)->ns_stats, LDLM_NSS_LOCKS); lu_ref_del(&res->lr_reference, "lock", lock); - ldlm_resource_putref(res); - lock->l_resource = NULL; if (lock->l_export) { class_export_lock_put(lock->l_export, lock); lock->l_export = NULL; @@ -213,7 +224,15 @@ void ldlm_lock_put(struct ldlm_lock *lock) if (lock->l_lvb_data != NULL) OBD_FREE_LARGE(lock->l_lvb_data, lock->l_lvb_len); - ldlm_interval_free(ldlm_interval_detach(lock)); + if (res->lr_type == LDLM_EXTENT) { + ldlm_interval_free(ldlm_interval_detach(lock)); + } else if (res->lr_type == LDLM_IBITS) { + if (lock->l_ibits_node != NULL) + OBD_SLAB_FREE_PTR(lock->l_ibits_node, + ldlm_inodebits_slab); + } + ldlm_resource_putref(res); + lock->l_resource = NULL; lu_ref_fini(&lock->l_reference); OBD_FREE_RCU(lock, sizeof(*lock), &lock->l_handle); } @@ -1666,11 +1685,18 @@ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns, lock->l_glimpse_ast = cbs->lcs_glimpse; } - lock->l_tree_node = NULL; - /* if this is the extent lock, allocate the interval tree node */ - if (type == LDLM_EXTENT) - if (ldlm_interval_alloc(lock) == NULL) - GOTO(out, rc = -ENOMEM); + switch (type) { + case LDLM_EXTENT: + rc = ldlm_extent_alloc_lock(lock); + break; + case LDLM_IBITS: + rc = ldlm_inodebits_alloc_lock(lock); + break; + default: + rc = 0; + } + if (rc) + GOTO(out, rc); if (lvb_len) { lock->l_lvb_len = lvb_len; @@ -1699,9 +1725,10 @@ static enum ldlm_error ldlm_lock_enqueue_helper(struct ldlm_lock *lock, enum ldlm_error rc = ELDLM_OK; struct list_head rpc_list = LIST_HEAD_INIT(rpc_list); ldlm_processing_policy policy; + ENTRY; - policy = ldlm_processing_policy_table[res->lr_type]; + policy = ldlm_get_processing_policy(res); restart: policy(lock, flags, LDLM_PROCESS_ENQUEUE, &rc, &rpc_list); if (rc == ELDLM_OK && lock->l_granted_mode != lock->l_req_mode && @@ -1880,7 +1907,8 @@ out: */ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue, struct list_head *work_list, - enum ldlm_process_intention intention) + enum ldlm_process_intention intention, + struct ldlm_lock *hint) { struct list_head *tmp, *pos; ldlm_processing_policy policy; @@ -1888,11 +1916,12 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue, int rc = LDLM_ITER_CONTINUE; enum ldlm_error err; struct list_head bl_ast_list = LIST_HEAD_INIT(bl_ast_list); + ENTRY; check_res_locked(res); - policy = ldlm_processing_policy_table[res->lr_type]; + policy = ldlm_get_processing_policy(res); LASSERT(policy); LASSERT(intention == LDLM_PROCESS_RESCAN || intention == LDLM_PROCESS_RECOVERY); @@ -2287,20 +2316,23 @@ out: * if anything could be granted as a result of the cancellation. */ static void __ldlm_reprocess_all(struct ldlm_resource *res, - enum ldlm_process_intention intention) + enum ldlm_process_intention intention, + struct ldlm_lock *hint) { struct list_head rpc_list; #ifdef HAVE_SERVER_SUPPORT + ldlm_reprocessing_policy reprocess; struct obd_device *obd; - int rc; - ENTRY; + int rc; + + ENTRY; INIT_LIST_HEAD(&rpc_list); - /* Local lock trees don't get reprocessed. */ - if (ns_is_client(ldlm_res_to_ns(res))) { - EXIT; - return; - } + /* Local lock trees don't get reprocessed. */ + if (ns_is_client(ldlm_res_to_ns(res))) { + EXIT; + return; + } /* Disable reprocess during lock replay stage but allow during * request replay stage. @@ -2311,7 +2343,8 @@ static void __ldlm_reprocess_all(struct ldlm_resource *res, RETURN_EXIT; restart: lock_res(res); - ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list, intention); + reprocess = ldlm_get_reprocessing_policy(res); + reprocess(res, &res->lr_waiting, &rpc_list, intention, hint); unlock_res(res); rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list, @@ -2321,21 +2354,21 @@ restart: goto restart; } #else - ENTRY; + ENTRY; INIT_LIST_HEAD(&rpc_list); - if (!ns_is_client(ldlm_res_to_ns(res))) { - CERROR("This is client-side-only module, cannot handle " - "LDLM_NAMESPACE_SERVER resource type lock.\n"); - LBUG(); - } + if (!ns_is_client(ldlm_res_to_ns(res))) { + CERROR("This is client-side-only module, cannot handle " + "LDLM_NAMESPACE_SERVER resource type lock.\n"); + LBUG(); + } #endif - EXIT; + EXIT; } -void ldlm_reprocess_all(struct ldlm_resource *res) +void ldlm_reprocess_all(struct ldlm_resource *res, struct ldlm_lock *hint) { - __ldlm_reprocess_all(res, LDLM_PROCESS_RESCAN); + __ldlm_reprocess_all(res, LDLM_PROCESS_RESCAN, hint); } EXPORT_SYMBOL(ldlm_reprocess_all); @@ -2345,7 +2378,7 @@ static int ldlm_reprocess_res(struct cfs_hash *hs, struct cfs_hash_bd *bd, struct ldlm_resource *res = cfs_hash_object(hs, hnode); /* This is only called once after recovery done. LU-8306. */ - __ldlm_reprocess_all(res, LDLM_PROCESS_RECOVERY); + __ldlm_reprocess_all(res, LDLM_PROCESS_RECOVERY, NULL); return 0; } @@ -2494,7 +2527,7 @@ static void ldlm_cancel_lock_for_export(struct obd_export *exp, ldlm_lvbo_update(res, lock, NULL, 1); ldlm_lock_cancel(lock); if (!exp->exp_obd->obd_stopping) - ldlm_reprocess_all(res); + ldlm_reprocess_all(res, lock); ldlm_resource_putref(res); ecl->ecl_loop++; @@ -2657,7 +2690,7 @@ void ldlm_lock_mode_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode) ldlm_grant_lock(lock, NULL); unlock_res_and_lock(lock); - ldlm_reprocess_all(lock->l_resource); + ldlm_reprocess_all(lock->l_resource, lock); EXIT; #endif diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 18221a7..980d2251 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -1189,7 +1189,7 @@ int ldlm_glimpse_locks(struct ldlm_resource *res, rc = ldlm_run_ast_work(ldlm_res_to_ns(res), gl_work_list, LDLM_WORK_GL_AST); if (rc == -ERESTART) - ldlm_reprocess_all(res); + ldlm_reprocess_all(res, NULL); RETURN(rc); } @@ -1543,7 +1543,7 @@ retry: if (!err && !ldlm_is_cbpending(lock) && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK) - ldlm_reprocess_all(lock->l_resource); + ldlm_reprocess_all(lock->l_resource, lock); LDLM_LOCK_RELEASE(lock); } @@ -1639,7 +1639,7 @@ int ldlm_handle_convert0(struct ptlrpc_request *req, ldlm_clear_blocking_data(lock); unlock_res_and_lock(lock); - ldlm_reprocess_all(lock->l_resource); + ldlm_reprocess_all(lock->l_resource, NULL); rc = ELDLM_OK; } @@ -1714,7 +1714,7 @@ int ldlm_request_cancel(struct ptlrpc_request *req, */ if (res != pres) { if (pres != NULL) { - ldlm_reprocess_all(pres); + ldlm_reprocess_all(pres, NULL); LDLM_RESOURCE_DELREF(pres); ldlm_resource_putref(pres); } @@ -1742,7 +1742,7 @@ int ldlm_request_cancel(struct ptlrpc_request *req, LDLM_LOCK_PUT(lock); } if (pres != NULL) { - ldlm_reprocess_all(pres); + ldlm_reprocess_all(pres, NULL); LDLM_RESOURCE_DELREF(pres); ldlm_resource_putref(pres); } @@ -3344,11 +3344,17 @@ int ldlm_init(void) goto out_interval; #ifdef HAVE_SERVER_SUPPORT + ldlm_inodebits_slab = kmem_cache_create("ldlm_ibits_node", + sizeof(struct ldlm_ibits_node), + 0, SLAB_HWCACHE_ALIGN, NULL); + if (ldlm_inodebits_slab == NULL) + goto out_interval_tree; + ldlm_glimpse_work_kmem = kmem_cache_create("ldlm_glimpse_work_kmem", sizeof(struct ldlm_glimpse_work), 0, 0, NULL); if (ldlm_glimpse_work_kmem == NULL) - goto out_interval_tree; + goto out_inodebits; #endif #if LUSTRE_TRACKS_LOCK_EXP_REFS @@ -3356,6 +3362,8 @@ int ldlm_init(void) #endif return 0; #ifdef HAVE_SERVER_SUPPORT +out_inodebits: + kmem_cache_destroy(ldlm_inodebits_slab); out_interval_tree: kmem_cache_destroy(ldlm_interval_tree_slab); #endif @@ -3384,6 +3392,7 @@ void ldlm_exit(void) kmem_cache_destroy(ldlm_interval_slab); kmem_cache_destroy(ldlm_interval_tree_slab); #ifdef HAVE_SERVER_SUPPORT + kmem_cache_destroy(ldlm_inodebits_slab); kmem_cache_destroy(ldlm_glimpse_work_kmem); #endif } diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index df59c6b..54100d7 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -239,7 +239,7 @@ int ldlm_completion_ast_async(struct ldlm_lock *lock, __u64 flags, void *data) LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, going forward"); - ldlm_reprocess_all(lock->l_resource); + ldlm_reprocess_all(lock->l_resource, NULL); RETURN(0); } EXPORT_SYMBOL(ldlm_completion_ast_async); @@ -1349,7 +1349,7 @@ static __u64 ldlm_cli_cancel_local(struct ldlm_lock *lock) } LDLM_DEBUG(lock, "server-side local cancel"); ldlm_lock_cancel(lock); - ldlm_reprocess_all(lock->l_resource); + ldlm_reprocess_all(lock->l_resource, lock); } RETURN(rc); diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index 311d2aa..baf935e 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -43,6 +43,7 @@ struct kmem_cache *ldlm_resource_slab, *ldlm_lock_slab; struct kmem_cache *ldlm_interval_tree_slab; +struct kmem_cache *ldlm_inodebits_slab; int ldlm_srv_namespace_nr = 0; int ldlm_cli_namespace_nr = 0; @@ -1401,29 +1402,59 @@ struct ldlm_namespace *ldlm_namespace_first_locked(enum ldlm_side client) struct ldlm_namespace, ns_list_chain); } +static bool ldlm_resource_extent_new(struct ldlm_resource *res) +{ + int idx; + + OBD_SLAB_ALLOC(res->lr_itree, ldlm_interval_tree_slab, + sizeof(*res->lr_itree) * LCK_MODE_NUM); + if (res->lr_itree == NULL) + return false; + /* Initialize interval trees for each lock mode. */ + for (idx = 0; idx < LCK_MODE_NUM; idx++) { + res->lr_itree[idx].lit_size = 0; + res->lr_itree[idx].lit_mode = 1 << idx; + res->lr_itree[idx].lit_root = NULL; + } + return true; +} + +static bool ldlm_resource_inodebits_new(struct ldlm_resource *res) +{ + int i; + + OBD_ALLOC_PTR(res->lr_ibits_queues); + if (res->lr_ibits_queues == NULL) + return false; + for (i = 0; i < MDS_INODELOCK_NUMBITS; i++) + INIT_LIST_HEAD(&res->lr_ibits_queues->liq_waiting[i]); + return true; +} + /** Create and initialize new resource. */ static struct ldlm_resource *ldlm_resource_new(enum ldlm_type ldlm_type) { struct ldlm_resource *res; - int idx; + bool rc; OBD_SLAB_ALLOC_PTR_GFP(res, ldlm_resource_slab, GFP_NOFS); if (res == NULL) return NULL; - if (ldlm_type == LDLM_EXTENT) { - OBD_SLAB_ALLOC(res->lr_itree, ldlm_interval_tree_slab, - sizeof(*res->lr_itree) * LCK_MODE_NUM); - if (res->lr_itree == NULL) { - OBD_SLAB_FREE_PTR(res, ldlm_resource_slab); - return NULL; - } - /* Initialize interval trees for each lock mode. */ - for (idx = 0; idx < LCK_MODE_NUM; idx++) { - res->lr_itree[idx].lit_size = 0; - res->lr_itree[idx].lit_mode = 1 << idx; - res->lr_itree[idx].lit_root = NULL; - } + switch (ldlm_type) { + case LDLM_EXTENT: + rc = ldlm_resource_extent_new(res); + break; + case LDLM_IBITS: + rc = ldlm_resource_inodebits_new(res); + break; + default: + rc = true; + break; + } + if (!rc) { + OBD_SLAB_FREE_PTR(res, ldlm_resource_slab); + return NULL; } INIT_LIST_HEAD(&res->lr_granted); @@ -1441,6 +1472,20 @@ static struct ldlm_resource *ldlm_resource_new(enum ldlm_type ldlm_type) return res; } +static void ldlm_resource_free(struct ldlm_resource *res) +{ + if (res->lr_type == LDLM_EXTENT) { + if (res->lr_itree != NULL) + OBD_SLAB_FREE(res->lr_itree, ldlm_interval_tree_slab, + sizeof(*res->lr_itree) * LCK_MODE_NUM); + } else if (res->lr_type == LDLM_IBITS) { + if (res->lr_ibits_queues != NULL) + OBD_FREE_PTR(res->lr_ibits_queues); + } + + OBD_SLAB_FREE(res, ldlm_resource_slab, sizeof *res); +} + /** * Return a reference to resource with given name, creating it if necessary. * Args: namespace with ns_lock unlocked @@ -1495,10 +1540,7 @@ ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent, cfs_hash_bd_unlock(ns->ns_rs_hash, &bd, 1); /* Clean lu_ref for failed resource. */ lu_ref_fini(&res->lr_reference); - if (res->lr_itree != NULL) - OBD_SLAB_FREE(res->lr_itree, ldlm_interval_tree_slab, - sizeof(*res->lr_itree) * LCK_MODE_NUM); - OBD_SLAB_FREE(res, ldlm_resource_slab, sizeof *res); + ldlm_resource_free(res); found: res = hlist_entry(hnode, struct ldlm_resource, lr_hash); return res; @@ -1574,10 +1616,7 @@ int ldlm_resource_putref(struct ldlm_resource *res) cfs_hash_bd_unlock(ns->ns_rs_hash, &bd, 1); if (ns->ns_lvbo && ns->ns_lvbo->lvbo_free) ns->ns_lvbo->lvbo_free(res); - if (res->lr_itree != NULL) - OBD_SLAB_FREE(res->lr_itree, ldlm_interval_tree_slab, - sizeof(*res->lr_itree) * LCK_MODE_NUM); - OBD_SLAB_FREE(res, ldlm_resource_slab, sizeof *res); + ldlm_resource_free(res); return 1; } return 0; @@ -1602,6 +1641,9 @@ void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head, LASSERT(list_empty(&lock->l_res_link)); list_add_tail(&lock->l_res_link, head); + + if (res->lr_type == LDLM_IBITS) + ldlm_inodebits_add_lock(res, head, lock); } /** @@ -1635,10 +1677,17 @@ void ldlm_resource_unlink_lock(struct ldlm_lock *lock) int type = lock->l_resource->lr_type; check_res_locked(lock->l_resource); - if (type == LDLM_IBITS || type == LDLM_PLAIN) + switch (type) { + case LDLM_PLAIN: ldlm_unlink_lock_skiplist(lock); - else if (type == LDLM_EXTENT) + break; + case LDLM_EXTENT: ldlm_extent_unlink_lock(lock); + break; + case LDLM_IBITS: + ldlm_inodebits_unlink_lock(lock); + break; + } list_del_init(&lock->l_res_link); } EXPORT_SYMBOL(ldlm_resource_unlink_lock); diff --git a/lustre/mdt/mdt_open.c b/lustre/mdt/mdt_open.c index 33db7d8..28ad9ab 100644 --- a/lustre/mdt/mdt_open.c +++ b/lustre/mdt/mdt_open.c @@ -2017,7 +2017,7 @@ out_unlock: } out_reprocess: - ldlm_reprocess_all(lease->l_resource); + ldlm_reprocess_all(lease->l_resource, lease); LDLM_LOCK_PUT(lease); ma->ma_valid = 0; @@ -2180,7 +2180,7 @@ out_unlock_sem: out_obj: mdt_object_put(info->mti_env, swap_objects ? o1 : o2); - ldlm_reprocess_all(lease->l_resource); + ldlm_reprocess_all(lease->l_resource, lease); out_lease: LDLM_LOCK_PUT(lease); @@ -2297,7 +2297,7 @@ out_unlock: OBD_FREE(resync_ids, resync_count * sizeof(__u32)); out_reprocess: - ldlm_reprocess_all(lease->l_resource); + ldlm_reprocess_all(lease->l_resource, lease); LDLM_LOCK_PUT(lease); ma->ma_valid = 0; diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index 9970f98..c9bb061 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -1953,7 +1953,7 @@ static int mdd_migrate_close(struct mdt_thread_info *info, * cancelled, it's okay to cancel it now as we've held mot_open_sem. */ ldlm_lock_cancel(lease); - ldlm_reprocess_all(lease->l_resource); + ldlm_reprocess_all(lease->l_resource, lease); LDLM_LOCK_PUT(lease); close: diff --git a/lustre/tests/sanity-dom.sh b/lustre/tests/sanity-dom.sh index 41b29eb..0b2ecd3 100644 --- a/lustre/tests/sanity-dom.sh +++ b/lustre/tests/sanity-dom.sh @@ -106,6 +106,41 @@ test_4() { } run_test 4 "DoM: glimpse doesn't produce duplicated locks" +test_5() { + local before=$(date +%s) + local evict + + dd if=/dev/zero of=$DIR/$tfile bs=4096 count=1 || return 1 + + multiop_bg_pause $DIR/$tfile O_Ac || return 1 + setxattr=$! + + multiop_bg_pause $DIR/$tfile O_Tc || return 1 + truncate=$! + + multiop $DIR2/$tfile Ow10 || return 1 + + getfattr -d $DIR2/$tfile + +#define OBD_FAIL_LLITE_TRUNCATE_INODE_PAUSE 0x1415 + $LCTL set_param fail_loc=0x80001415 fail_val=5 + kill -USR1 $truncate + sleep 1 + multiop $DIR2/$tfile Ow10 & + sleep 1 + kill -USR1 $setxattr + + wait + + evict=$(do_facet client $LCTL get_param mdc.$FSNAME-MDT*.state | + awk -F"[ [,]" '/EVICTED ]$/ { if (mx<$5) {mx=$5;} } END { print mx }') + + [ -z "$evict" ] || [[ $evict -le $before ]] || + (do_facet client $LCTL get_param mdc.$FSNAME-MDT*.state; + error "eviction happened: $evict before:$before") +} +run_test 5 "DoM truncate deadlock" + test_6() { $MULTIOP $DIR1/$tfile Oz40960w100_z200w100c & MULTIPID=$! -- 1.8.3.1