From 578d08cde3abba552adbe6bcc1b88c6f1552ce10 Mon Sep 17 00:00:00 2001 From: Vitaly Fertman Date: Thu, 27 May 2010 00:51:03 +0400 Subject: [PATCH] b=21128 send cancel rpc in a separate thread i=oleg.drokin i=rahul run sync ldlm_bl_to_thread_list() in separate thread to save stack space. --- lustre/ldlm/ldlm_internal.h | 2 +- lustre/ldlm/ldlm_lockd.c | 71 +++++++++++++++++++++++++++++++++++++-------- lustre/ldlm/ldlm_request.c | 18 +++++------- 3 files changed, 67 insertions(+), 24 deletions(-) diff --git a/lustre/ldlm/ldlm_internal.h b/lustre/ldlm/ldlm_internal.h index e068ba5..12dc18c 100644 --- a/lustre/ldlm/ldlm_internal.h +++ b/lustre/ldlm/ldlm_internal.h @@ -134,7 +134,7 @@ void ldlm_cancel_locks_for_export(struct obd_export *export); int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, struct ldlm_lock *lock); int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, - struct list_head *cancels, int count); + struct list_head *cancels, int count, int mode); void ldlm_handle_bl_callback(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, struct ldlm_lock *lock); diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 80b204b..a656fa8 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -133,9 +133,21 @@ struct ldlm_bl_work_item { struct ldlm_lock *blwi_lock; struct list_head blwi_head; int blwi_count; + struct completion blwi_comp; + atomic_t blwi_ref_count; }; #ifdef __KERNEL__ +static inline void ldlm_bl_work_item_get(struct ldlm_bl_work_item *blwi) +{ + atomic_inc(&blwi->blwi_ref_count); +} + +static inline void ldlm_bl_work_item_put(struct ldlm_bl_work_item *blwi) +{ + if (atomic_dec_and_test(&blwi->blwi_ref_count)) + OBD_FREE(blwi, sizeof(*blwi)); +} static inline int have_expired_locks(void) { @@ -1580,20 +1592,21 @@ static int ldlm_callback_reply(struct ptlrpc_request *req, int rc) } #ifdef __KERNEL__ -static int ldlm_bl_to_thread(struct ldlm_namespace *ns, +static int __ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_bl_work_item *blwi, struct ldlm_lock_desc *ld, struct ldlm_lock *lock, - struct list_head *cancels, int count) + struct list_head *cancels, int count, int mode) { struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool; - struct ldlm_bl_work_item *blwi; ENTRY; - if (cancels && count == 0) + if (cancels && count == 0) { + if (mode == LDLM_ASYNC) + OBD_FREE(blwi, sizeof(*blwi)); RETURN(0); + } - OBD_ALLOC(blwi, sizeof(*blwi)); - if (blwi == NULL) - RETURN(-ENOMEM); + init_completion(&blwi->blwi_comp); + atomic_set(&blwi->blwi_ref_count, 1); blwi->blwi_ns = ns; if (ld != NULL) @@ -1605,6 +1618,7 @@ static int ldlm_bl_to_thread(struct ldlm_namespace *ns, } else { blwi->blwi_lock = lock; } + spin_lock(&blp->blp_lock); if (lock && lock->l_flags & LDLM_FL_DISCARD_DATA) { /* add LDLM_FL_DISCARD_DATA requests to the priority list */ @@ -1613,28 +1627,60 @@ static int ldlm_bl_to_thread(struct ldlm_namespace *ns, /* other blocking callbacks are added to the regular list */ list_add_tail(&blwi->blwi_entry, &blp->blp_list); } - cfs_waitq_signal(&blp->blp_waitq); spin_unlock(&blp->blp_lock); + if (mode == LDLM_SYNC) { + /* keep ref count as object is on this stack for SYNC call */ + ldlm_bl_work_item_get(blwi); + cfs_waitq_signal(&blp->blp_waitq); + wait_for_completion(&blwi->blwi_comp); + } else { + cfs_waitq_signal(&blp->blp_waitq); + } + RETURN(0); } + +static int ldlm_bl_to_thread(struct ldlm_namespace *ns, + struct ldlm_lock_desc *ld, struct ldlm_lock *lock, + struct list_head *cancels, int count, int mode) +{ + ENTRY; + + if (mode == LDLM_SYNC) { + /* if it is synchronous call do minimum mem alloc, as it could + * be triggered from kernel shrinker + */ + struct ldlm_bl_work_item blwi; + memset(&blwi, 0, sizeof(blwi)); + /* have extra ref as this obj is on stack */ + RETURN(__ldlm_bl_to_thread(ns, &blwi, ld, lock, cancels, count, mode)); + } else { + struct ldlm_bl_work_item *blwi; + OBD_ALLOC(blwi, sizeof(*blwi)); + if (blwi == NULL) + RETURN(-ENOMEM); + + RETURN(__ldlm_bl_to_thread(ns, blwi, ld, lock, cancels, count, mode)); + } +} #endif int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, struct ldlm_lock *lock) { #ifdef __KERNEL__ - RETURN(ldlm_bl_to_thread(ns, ld, lock, NULL, 0)); + RETURN(ldlm_bl_to_thread(ns, ld, lock, NULL, 0, LDLM_ASYNC)); #else RETURN(-ENOSYS); #endif } int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, - struct list_head *cancels, int count) + struct list_head *cancels, int count, int mode) { #ifdef __KERNEL__ - RETURN(ldlm_bl_to_thread(ns, ld, NULL, cancels, count)); + RETURN(ldlm_bl_to_thread(ns, ld, NULL, cancels, count, mode)); #else RETURN(-ENOSYS); #endif @@ -1977,7 +2023,8 @@ static int ldlm_bl_thread_main(void *arg) ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld, blwi->blwi_lock); } - OBD_FREE(blwi, sizeof(*blwi)); + complete(&blwi->blwi_comp); + ldlm_bl_work_item_put(blwi); } atomic_dec(&blp->blp_busy_threads); diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 773dbd6..b92c925 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -1466,7 +1466,7 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels, * in a thread and this function will return after the thread has been * asked to call the callback. when called with LDLM_SYNC the blocking * callback will be performed in this function. */ -int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ldlm_sync_t sync, +int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ldlm_sync_t mode, int flags) { CFS_LIST_HEAD(cancels); @@ -1474,19 +1474,15 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ldlm_sync_t sync, ENTRY; #ifndef __KERNEL__ - sync = LDLM_SYNC; /* force to be sync in user space */ + mode = LDLM_SYNC; /* force to be sync in user space */ #endif count = ldlm_cancel_lru_local(ns, &cancels, nr, 0, 0, flags); - if (sync == LDLM_ASYNC) { - rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count); - if (rc == 0) - RETURN(count); - } - /* If an error occured in ASYNC mode, or this is SYNC mode, - * cancel the list. */ - ldlm_cli_cancel_list(&cancels, count, NULL, 0); - RETURN(count); + rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count, mode); + if (rc == 0) + RETURN(count); + + RETURN(0); } /* Find and cancel locally unused locks found on resource, matched to the -- 1.8.3.1