From 9065e88a7257a98de247f18f997557bd7ccbcb1d Mon Sep 17 00:00:00 2001 From: alex Date: Sun, 10 Jul 2005 23:10:17 +0000 Subject: [PATCH] b=3984 - b_ldlm_newlocking landed. we need it to pass some CMD2 performance tests NOTE: the new locking rules introduced by the patch are still experimental! --- lustre/cmobd/cm_oss_reint.c | 8 +- lustre/include/linux/lustre_dlm.h | 86 +++++++- lustre/include/linux/lustre_export.h | 1 + lustre/include/linux/lustre_lib.h | 14 -- lustre/ldlm/l_lock.c | 89 --------- lustre/ldlm/ldlm_extent.c | 38 ++-- lustre/ldlm/ldlm_flock.c | 31 ++- lustre/ldlm/ldlm_inodebits.c | 33 ++-- lustre/ldlm/ldlm_internal.h | 25 ++- lustre/ldlm/ldlm_lock.c | 373 ++++++++++++++++++++--------------- lustre/ldlm/ldlm_lockd.c | 140 ++++++------- lustre/ldlm/ldlm_plain.c | 31 ++- lustre/ldlm/ldlm_request.c | 121 ++++++------ lustre/ldlm/ldlm_resource.c | 350 +++++++++++++++++++------------- lustre/llite/file.c | 8 +- lustre/llite/llite_lib.c | 6 +- lustre/mdc/mdc_locks.c | 4 +- lustre/mds/handler.c | 51 +++-- lustre/mds/mds_open.c | 4 +- lustre/mds/mds_reint.c | 12 +- lustre/obdclass/genops.c | 1 + lustre/obdfilter/filter.c | 35 ++-- lustre/obdfilter/filter_lvb.c | 5 + lustre/osc/osc_request.c | 4 +- 24 files changed, 785 insertions(+), 685 deletions(-) diff --git a/lustre/cmobd/cm_oss_reint.c b/lustre/cmobd/cm_oss_reint.c index eff4777..fc9a9b9 100644 --- a/lustre/cmobd/cm_oss_reint.c +++ b/lustre/cmobd/cm_oss_reint.c @@ -27,6 +27,7 @@ #include #include #include +#include #include #include #include @@ -156,20 +157,21 @@ static int cache_blocking_ast(struct ldlm_lock *lock, } /* XXX layering violation! -phil */ - l_lock(&lock->l_resource->lr_namespace->ns_lock); + lock_res(lock->l_resource); + /* Get this: if filter_blocking_ast is racing with ldlm_intent_policy, * such that filter_blocking_ast is called just before l_i_p takes the * ns_lock, then by the time we get the lock, we might not be the * correct blocking function anymore. So check, and return early, if * so. */ if (lock->l_blocking_ast != cache_blocking_ast) { - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + unlock_res(lock->l_resource); RETURN(0); } lock->l_flags |= LDLM_FL_CBPENDING; do_ast = (!lock->l_readers && !lock->l_writers); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + unlock_res(lock->l_resource); if (do_ast) { struct lustre_handle lockh; diff --git a/lustre/include/linux/lustre_dlm.h b/lustre/include/linux/lustre_dlm.h index 74b1c52..2ad9b21 100644 --- a/lustre/include/linux/lustre_dlm.h +++ b/lustre/include/linux/lustre_dlm.h @@ -99,6 +99,17 @@ typedef enum { * list. */ #define LDLM_FL_KMS_IGNORE 0x200000 +/* completion ast to be executed */ +#define LDLM_FL_CP_REQD 0x400000 + +/* cleanup_resource has already handled the lock */ +#define LDLM_FL_CLEANED 0x800000 + +/* optimization hint: LDLM can run blocking callback from current context + * w/o involving separate thread. in order to decrease cs rate -bzzz */ +#define LDLM_FL_ATOMIC_CB 0x1000000 + + /* The blocking callback is overloaded to perform two functions. These flags * indicate which operation should be performed. */ #define LDLM_CB_BLOCKING 1 @@ -148,6 +159,25 @@ static inline int lockmode_compat(ldlm_mode_t exist, ldlm_mode_t new) - */ +/* + * Locking rules: + * + * lr_lock + * + * lr_lock + * waiting_locks_spinlock + * + * lr_lock + * led_lock + * + * lr_lock + * ns_unused_lock + * + * lr_lvb_sem + * lr_lock + * + */ + struct ldlm_lock; struct ldlm_resource; struct ldlm_namespace; @@ -168,9 +198,9 @@ struct ldlm_namespace { char *ns_name; __u32 ns_client; /* is this a client-side lock tree? */ struct list_head *ns_hash; /* hash table for ns */ + spinlock_t ns_hash_lock; __u32 ns_refcount; /* count of resources in the hash */ struct list_head ns_root_list; /* all root resources in ns */ - struct lustre_lock ns_lock; /* protects hash, refcount, list */ struct list_head ns_list_chain; /* position in global NS list */ /* struct proc_dir_entry *ns_proc_dir; @@ -178,11 +208,12 @@ struct ldlm_namespace { struct list_head ns_unused_list; /* all root resources in ns */ int ns_nr_unused; + spinlock_t ns_unused_lock; + unsigned int ns_max_unused; unsigned long ns_next_dump; /* next dump time */ - spinlock_t ns_counter_lock; - __u64 ns_locks; + atomic_t ns_locks; __u64 ns_resources; ldlm_res_policy ns_policy; struct ldlm_valblock_ops *ns_lvbo; @@ -212,14 +243,27 @@ typedef int (*ldlm_glimpse_callback)(struct ldlm_lock *lock, void *data); struct ldlm_lock { struct portals_handle l_handle; // must be first in the structure atomic_t l_refc; + + /* ldlm_lock_change_resource() can change this */ struct ldlm_resource *l_resource; + + /* set once, no need to protect it */ struct ldlm_lock *l_parent; + + /* protected by ns_hash_lock */ struct list_head l_children; struct list_head l_childof; + + /* protected by ns_hash_lock. FIXME */ struct list_head l_lru; + + /* protected by lr_lock */ struct list_head l_res_link; // position in one of three res lists + + /* protected by led_lock */ struct list_head l_export_chain; // per-export chain of locks + /* protected by lr_lock */ ldlm_mode_t l_req_mode; ldlm_mode_t l_granted_mode; @@ -229,10 +273,14 @@ struct ldlm_lock { struct obd_export *l_export; struct obd_export *l_conn_export; + + /* protected by lr_lock */ __u32 l_flags; + struct lustre_handle l_remote_handle; ldlm_policy_data_t l_policy_data; + /* protected by lr_lock */ __u32 l_readers; __u32 l_writers; __u8 l_destroyed; @@ -253,12 +301,20 @@ struct ldlm_lock { void *l_ast_data; /* Server-side-only members */ + + /* protected by elt_lock */ struct list_head l_pending_chain; /* callbacks pending */ unsigned long l_callback_timeout; __u32 l_pid; /* pid which created this lock */ struct list_head l_tmp; + + /* for ldlm_add_ast_work_item() */ + struct list_head l_bl_ast; + struct list_head l_cp_ast; + struct ldlm_lock *l_blocking_lock; + int l_bl_ast_run; }; #define LDLM_PLAIN 10 @@ -271,18 +327,21 @@ struct ldlm_lock { struct ldlm_resource { struct ldlm_namespace *lr_namespace; + + /* protected by ns_hash_lock */ struct list_head lr_hash; struct ldlm_resource *lr_parent; /* 0 for a root resource */ struct list_head lr_children; /* list head for child resources */ struct list_head lr_childof; /* part of ns_root_list if root res, * part of lr_children if child */ + spinlock_t lr_lock; + /* protected by lr_lock */ struct list_head lr_granted; struct list_head lr_converting; struct list_head lr_waiting; ldlm_mode_t lr_most_restr; __u32 lr_type; /* LDLM_PLAIN or LDLM_EXTENT */ - struct ldlm_resource *lr_root; struct ldlm_res_id lr_name; atomic_t lr_refcount; @@ -436,7 +495,8 @@ do { \ CDEBUG(D_DLMTRACE, "### " format "\n" , ## a) typedef int (*ldlm_processing_policy)(struct ldlm_lock *lock, int *flags, - int first_enq, ldlm_error_t *err); + int first_enq, ldlm_error_t *err, + struct list_head *work_list); /* * Iterators. @@ -606,4 +666,20 @@ int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, #define IOC_LDLM_REGRESS_STOP _IOWR('f', 43, long) #define IOC_LDLM_MAX_NR 43 +static inline void lock_res(struct ldlm_resource *res) +{ + spin_lock(&res->lr_lock); +} + +static inline void unlock_res(struct ldlm_resource *res) +{ + spin_unlock(&res->lr_lock); +} + +static inline void check_res_locked(struct ldlm_resource *res) +{ + LASSERT_SPIN_LOCKED(&res->lr_lock); +} + + #endif diff --git a/lustre/include/linux/lustre_export.h b/lustre/include/linux/lustre_export.h index 1fc3263..23aeb88 100644 --- a/lustre/include/linux/lustre_export.h +++ b/lustre/include/linux/lustre_export.h @@ -39,6 +39,7 @@ struct osc_creator { struct ldlm_export_data { struct list_head led_held_locks; /* protected by namespace lock */ + spinlock_t led_lock; }; struct ec_export_data { /* echo client */ diff --git a/lustre/include/linux/lustre_lib.h b/lustre/include/linux/lustre_lib.h index c4ec73e..6eada1e 100644 --- a/lustre/include/linux/lustre_lib.h +++ b/lustre/include/linux/lustre_lib.h @@ -121,20 +121,6 @@ struct obd_client_handle { void statfs_pack(struct obd_statfs *osfs, struct kstatfs *sfs); void statfs_unpack(struct kstatfs *sfs, struct obd_statfs *osfs); -/* l_lock.c */ -struct lustre_lock { - int l_depth; - struct task_struct *l_owner; - struct semaphore l_sem; - spinlock_t l_spin; -}; - -void l_lock_init(struct lustre_lock *); -void l_lock(struct lustre_lock *); -void l_unlock(struct lustre_lock *); -int l_has_lock(struct lustre_lock *); - - /* * OBD IOCTLS */ diff --git a/lustre/ldlm/l_lock.c b/lustre/ldlm/l_lock.c index 11cd02d..746b485 100644 --- a/lustre/ldlm/l_lock.c +++ b/lustre/ldlm/l_lock.c @@ -48,92 +48,3 @@ #include #include -/* invariants: - - only the owner of the lock changes l_owner/l_depth - - if a non-owner changes or checks the variables a spin lock is taken -*/ - -void l_lock_init(struct lustre_lock *lock) -{ - sema_init(&lock->l_sem, 1); - spin_lock_init(&lock->l_spin); -} - -void l_lock(struct lustre_lock *lock) -{ - int owner = 0; - - spin_lock(&lock->l_spin); - if (lock->l_owner == current) - owner = 1; - spin_unlock(&lock->l_spin); - - /* This is safe to increment outside the spinlock because we - * can only have 1 CPU running on the current task - * (i.e. l_owner == current), regardless of the number of CPUs. - */ - if (owner) { - ++lock->l_depth; - } else { - down(&lock->l_sem); - spin_lock(&lock->l_spin); - lock->l_owner = current; - lock->l_depth = 0; - spin_unlock(&lock->l_spin); - } -} - -void l_unlock(struct lustre_lock *lock) -{ - LASSERTF(lock->l_owner == current, "lock %p, current %p\n", - lock->l_owner, current); - LASSERTF(lock->l_depth >= 0, "depth %d\n", lock->l_depth); - spin_lock(&lock->l_spin); - if (--lock->l_depth < 0) { - lock->l_owner = NULL; - spin_unlock(&lock->l_spin); - up(&lock->l_sem); - return; - } - spin_unlock(&lock->l_spin); -} - -int l_has_lock(struct lustre_lock *lock) -{ - int depth = -1, owner = 0; - - spin_lock(&lock->l_spin); - if (lock->l_owner == current) { - depth = lock->l_depth; - owner = 1; - } - spin_unlock(&lock->l_spin); - - if (depth >= 0) - CDEBUG(D_INFO, "lock_depth: %d\n", depth); - return owner; -} - -#ifdef __KERNEL__ -#include -void l_check_no_ns_lock(struct ldlm_namespace *ns) -{ - static unsigned long next_msg; - - if (l_has_lock(&ns->ns_lock) && time_after(jiffies, next_msg)) { - CERROR("namespace %s lock held illegally; tell phil\n", - ns->ns_name); - portals_debug_dumpstack(NULL); - next_msg = jiffies + 60 * HZ; - } -} - -#else -void l_check_no_ns_lock(struct ldlm_namespace *ns) -{ - if (l_has_lock(&ns->ns_lock)) { - CERROR("namespace %s lock held illegally; tell phil\n", - ns->ns_name); - } -} -#endif /* __KERNEL__ */ diff --git a/lustre/ldlm/ldlm_extent.c b/lustre/ldlm/ldlm_extent.c index 5661d73..391d493 100644 --- a/lustre/ldlm/ldlm_extent.c +++ b/lustre/ldlm/ldlm_extent.c @@ -156,7 +156,8 @@ static void ldlm_extent_policy(struct ldlm_resource *res, */ static int ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req, - int send_cbs, int *flags, ldlm_error_t *err) + int *flags, ldlm_error_t *err, + struct list_head *work_list) { struct list_head *tmp; struct ldlm_lock *lock; @@ -275,12 +276,12 @@ ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req, continue; } - if (!send_cbs) + if (!work_list) RETURN(0); compat = 0; if (lock->l_blocking_ast) - ldlm_add_ast_work_item(lock, req, NULL, 0); + ldlm_add_ast_work_item(lock, req, work_list); } return(compat); @@ -301,7 +302,7 @@ destroylock: * - the caller has NOT initialized req->lr_tmp, so we must * - must call this function with the ns lock held once */ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, - ldlm_error_t *err) + ldlm_error_t *err, struct list_head *work_list) { struct ldlm_resource *res = lock->l_resource; struct list_head rpc_list = LIST_HEAD_INIT(rpc_list); @@ -318,44 +319,38 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, * flags should always be zero here, and if that ever stops * being true, we want to find out. */ LASSERT(*flags == 0); - LASSERT(res->lr_tmp != NULL); - rc = ldlm_extent_compat_queue(&res->lr_granted, lock, 0, flags, - err); + rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, + err, NULL); if (rc == 1) { - rc = ldlm_extent_compat_queue(&res->lr_waiting, lock, 0, - flags, err); + rc = ldlm_extent_compat_queue(&res->lr_waiting, lock, + flags, err, NULL); } if (rc == 0) RETURN(LDLM_ITER_STOP); ldlm_resource_unlink_lock(lock); - ldlm_extent_policy(res, lock, flags); - ldlm_grant_lock(lock, NULL, 0, 1); + ldlm_grant_lock(lock, work_list); RETURN(LDLM_ITER_CONTINUE); } restart: - LASSERT(res->lr_tmp == NULL); - res->lr_tmp = &rpc_list; - rc = ldlm_extent_compat_queue(&res->lr_granted, lock, 1, flags, err); + rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err, &rpc_list); if (rc < 0) GOTO(out, rc); /* lock was destroyed */ if (rc == 2) { - res->lr_tmp = NULL; goto grant; } - rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, 1, flags, err); + rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err, &rpc_list); if (rc2 < 0) GOTO(out, rc = rc2); /* lock was destroyed */ - res->lr_tmp = NULL; if (rc + rc2 == 2) { grant: ldlm_extent_policy(res, lock, flags); ldlm_resource_unlink_lock(lock); - ldlm_grant_lock(lock, NULL, 0, 0); + ldlm_grant_lock(lock, NULL); } else { /* If either of the compat_queue()s returned failure, then we * have ASTs to send and must go onto the waiting list. @@ -365,16 +360,15 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, * re-ordered! Causes deadlock, because ASTs aren't sent! */ if (list_empty(&lock->l_res_link)) ldlm_resource_add_lock(res, &res->lr_waiting, lock); - l_unlock(&res->lr_namespace->ns_lock); - rc = ldlm_run_ast_work(res->lr_namespace, &rpc_list); - l_lock(&res->lr_namespace->ns_lock); + unlock_res(res); + rc = ldlm_run_bl_ast_work(&rpc_list); + lock_res(res); if (rc == -ERESTART) GOTO(restart, -ERESTART); *flags |= LDLM_FL_BLOCK_GRANTED; } rc = 0; out: - res->lr_tmp = NULL; RETURN(rc); } diff --git a/lustre/ldlm/ldlm_flock.c b/lustre/ldlm/ldlm_flock.c index 63fb58c..a86c021 100644 --- a/lustre/ldlm/ldlm_flock.c +++ b/lustre/ldlm/ldlm_flock.c @@ -119,7 +119,7 @@ restart: int ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, - ldlm_error_t *err) + ldlm_error_t *err, struct list_head *work_list) { struct ldlm_resource *res = req->l_resource; struct ldlm_namespace *ns = res->lr_namespace; @@ -353,7 +353,7 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, &new2->l_export->exp_ldlm_data.led_held_locks); } if (*flags == LDLM_FL_WAIT_NOREPROC) - ldlm_lock_addref_internal(new2, lock->l_granted_mode); + ldlm_lock_addref_internal_nolock(new2, lock->l_granted_mode); /* insert new2 at lock */ ldlm_resource_add_lock(res, ownlocks, new2); @@ -387,20 +387,16 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, = LIST_HEAD_INIT(rpc_list); int rc; restart: - res->lr_tmp = &rpc_list; - ldlm_reprocess_queue(res, &res->lr_waiting); - res->lr_tmp = NULL; - - l_unlock(&ns->ns_lock); - rc = ldlm_run_ast_work(res->lr_namespace, - &rpc_list); - l_lock(&ns->ns_lock); + ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list); + unlock_res(res); + rc = ldlm_run_cp_ast_work(&rpc_list); + lock_res(res); if (rc == -ERESTART) GOTO(restart, -ERESTART); } } else { LASSERT(req->l_completion_ast); - ldlm_add_ast_work_item(req, NULL, NULL, 0); + ldlm_add_ast_work_item(req, NULL, NULL); } } @@ -495,7 +491,7 @@ granted: LDLM_DEBUG(lock, "client-side enqueue granted"); ns = lock->l_resource->lr_namespace; - l_lock(&ns->ns_lock); + lock_res(lock->l_resource); /* take lock off the deadlock detection waitq. */ list_del_init(&lock->l_flock_waitq); @@ -526,28 +522,25 @@ granted: /* We need to reprocess the lock to do merges or splits * with existing locks owned by this process. */ - ldlm_process_flock_lock(lock, &noreproc, 1, &err); + ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL); if (flags == 0) wake_up(&lock->l_waitq); } - l_unlock(&ns->ns_lock); + unlock_res(lock->l_resource); RETURN(0); } int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, void *data, int flag) { - struct ldlm_namespace *ns; ENTRY; LASSERT(lock); LASSERT(flag == LDLM_CB_CANCELING); - ns = lock->l_resource->lr_namespace; - /* take lock off the deadlock detection waitq. */ - l_lock(&ns->ns_lock); + lock_res(lock->l_resource); list_del_init(&lock->l_flock_waitq); - l_unlock(&ns->ns_lock); + unlock_res(lock->l_resource); RETURN(0); } diff --git a/lustre/ldlm/ldlm_inodebits.c b/lustre/ldlm/ldlm_inodebits.c index e3511dd..56c88cf 100644 --- a/lustre/ldlm/ldlm_inodebits.c +++ b/lustre/ldlm/ldlm_inodebits.c @@ -35,7 +35,7 @@ /* Determine if the lock is compatible with all locks on the queue. */ static int ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req, - int send_cbs) + struct list_head *work_list) { struct list_head *tmp; struct ldlm_lock *lock; @@ -61,12 +61,12 @@ ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req, if (!(lock->l_policy_data.l_inodebits.bits & req_bits)) continue; - if (!send_cbs) + if (!work_list) RETURN(0); compat = 0; if (lock->l_blocking_ast) - ldlm_add_ast_work_item(lock, req, NULL, 0); + ldlm_add_ast_work_item(lock, req, work_list); } RETURN(compat); @@ -82,7 +82,8 @@ ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req, * - the caller has NOT initialized req->lr_tmp, so we must * - must call this function with the ns lock held once */ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, int *flags, - int first_enq, ldlm_error_t *err) + int first_enq, ldlm_error_t *err, + struct list_head *work_list) { struct ldlm_resource *res = lock->l_resource; struct list_head rpc_list = LIST_HEAD_INIT(rpc_list); @@ -90,27 +91,25 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, int *flags, ENTRY; LASSERT(list_empty(&res->lr_converting)); + check_res_locked(res); if (!first_enq) { - LASSERT(res->lr_tmp != NULL); - rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, 0); + LASSERT(work_list != NULL); + rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, NULL); if (!rc) RETURN(LDLM_ITER_STOP); - rc = ldlm_inodebits_compat_queue(&res->lr_waiting, lock, 0); + rc = ldlm_inodebits_compat_queue(&res->lr_waiting, lock, NULL); if (!rc) RETURN(LDLM_ITER_STOP); ldlm_resource_unlink_lock(lock); - ldlm_grant_lock(lock, NULL, 0, 1); + ldlm_grant_lock(lock, work_list); RETURN(LDLM_ITER_CONTINUE); } restart: - LASSERT(res->lr_tmp == NULL); - res->lr_tmp = &rpc_list; - rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, 1); - rc += ldlm_inodebits_compat_queue(&res->lr_waiting, lock, 1); - res->lr_tmp = NULL; + rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, &rpc_list); + rc += ldlm_inodebits_compat_queue(&res->lr_waiting, lock, &rpc_list); if (rc != 2) { /* If either of the compat_queue()s returned 0, then we @@ -121,15 +120,15 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, int *flags, * re-ordered! Causes deadlock, because ASTs aren't sent! */ if (list_empty(&lock->l_res_link)) ldlm_resource_add_lock(res, &res->lr_waiting, lock); - l_unlock(&res->lr_namespace->ns_lock); - rc = ldlm_run_ast_work(res->lr_namespace, &rpc_list); - l_lock(&res->lr_namespace->ns_lock); + unlock_res(res); + rc = ldlm_run_bl_ast_work(&rpc_list); + lock_res(res); if (rc == -ERESTART) GOTO(restart, -ERESTART); *flags |= LDLM_FL_BLOCK_GRANTED; } else { ldlm_resource_unlink_lock(lock); - ldlm_grant_lock(lock, NULL, 0, 0); + ldlm_grant_lock(lock, NULL); } RETURN(0); } diff --git a/lustre/ldlm/ldlm_internal.h b/lustre/ldlm/ldlm_internal.h index 3a79a52..c6ee99e 100644 --- a/lustre/ldlm/ldlm_internal.h +++ b/lustre/ldlm/ldlm_internal.h @@ -32,10 +32,11 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync); /* ldlm_resource.c */ void ldlm_resource_insert_lock_after(struct ldlm_lock *original, struct ldlm_lock *new); +int ldlm_resource_putref_locked(struct ldlm_resource *res); /* ldlm_lock.c */ -void ldlm_grant_lock(struct ldlm_lock *lock, void *data, int datalen, - int run_ast); +void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list); + struct ldlm_lock * ldlm_lock_create(struct ldlm_namespace *ns, struct lustre_handle *parent_lock_handle, struct ldlm_res_id, @@ -44,12 +45,15 @@ ldlm_lock_create(struct ldlm_namespace *ns, __u32 lvb_len); ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *, struct ldlm_lock **, void *cookie, int *flags); +void ldlm_lock_addref_internal_nolock(struct ldlm_lock *, __u32 mode); void ldlm_lock_addref_internal(struct ldlm_lock *, __u32 mode); void ldlm_lock_decref_internal(struct ldlm_lock *, __u32 mode); void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new, - void *data, int datalen); -int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue); -int ldlm_run_ast_work(struct ldlm_namespace *, struct list_head *rpc_list); + struct list_head *work_list); +int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue, + struct list_head *work_list); +int ldlm_run_bl_ast_work(struct list_head *rpc_list); +int ldlm_run_cp_ast_work(struct list_head *rpc_list); /* ldlm_lockd.c */ int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, @@ -59,19 +63,20 @@ void ldlm_handle_bl_callback(struct ldlm_namespace *ns, /* ldlm_plain.c */ int ldlm_process_plain_lock(struct ldlm_lock *lock, int *flags, int first_enq, - ldlm_error_t *err); + ldlm_error_t *err, struct list_head *work_list); /* ldlm_extent.c */ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, - ldlm_error_t *err); + ldlm_error_t *err, struct list_head *work_list); /* ldlm_flock.c */ -int ldlm_process_flock_lock(struct ldlm_lock *lock, int *flags, int first_enq, - ldlm_error_t *err); +int ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, + ldlm_error_t *err, struct list_head *work_list); /* ldlm_inodebits.c */ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, int *flags, - int first_enq, ldlm_error_t *err); + int first_enq, ldlm_error_t *err, + struct list_head *work); /* l_lock.c */ void l_check_no_ns_lock(struct ldlm_namespace *ns); diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 6d2dae8..d73b52a 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -85,7 +85,6 @@ char *ldlm_it2str(int it) } extern kmem_cache_t *ldlm_lock_slab; -struct lustre_lock ldlm_handle_lock; static ldlm_processing_policy ldlm_processing_policy_table[] = { [LDLM_PLAIN] ldlm_process_plain_lock, @@ -127,31 +126,33 @@ void ldlm_lock_put(struct ldlm_lock *lock) { ENTRY; + LASSERT(lock->l_resource != LP_POISON); + LASSERT(atomic_read(&lock->l_refc) > 0); if (atomic_dec_and_test(&lock->l_refc)) { - struct ldlm_namespace *ns = lock->l_resource->lr_namespace; + struct ldlm_resource *res = lock->l_resource; + struct ldlm_namespace *ns = res->lr_namespace; - l_lock(&ns->ns_lock); LDLM_DEBUG(lock, "final lock_put on destroyed lock, freeing"); + + LASSERT(lock->l_resource != LP_POISON); + lock_res(res); LASSERT(lock->l_destroyed); LASSERT(list_empty(&lock->l_res_link)); - spin_lock(&ns->ns_counter_lock); - ns->ns_locks--; - spin_unlock(&ns->ns_counter_lock); + if (lock->l_parent) + LDLM_LOCK_PUT(lock->l_parent); + unlock_res(res); ldlm_resource_putref(lock->l_resource); lock->l_resource = NULL; if (lock->l_export) class_export_put(lock->l_export); - - if (lock->l_parent) - LDLM_LOCK_PUT(lock->l_parent); + atomic_dec(&ns->ns_locks); if (lock->l_lvb_data != NULL) OBD_FREE(lock->l_lvb_data, lock->l_lvb_len); OBD_SLAB_FREE(lock, ldlm_lock_slab, sizeof(*lock)); - l_unlock(&ns->ns_lock); } EXIT; @@ -160,14 +161,14 @@ void ldlm_lock_put(struct ldlm_lock *lock) void ldlm_lock_remove_from_lru(struct ldlm_lock *lock) { ENTRY; - l_lock(&lock->l_resource->lr_namespace->ns_lock); + spin_lock(&lock->l_resource->lr_namespace->ns_unused_lock); if (!list_empty(&lock->l_lru)) { LASSERT(lock->l_resource->lr_type != LDLM_FLOCK); list_del_init(&lock->l_lru); lock->l_resource->lr_namespace->ns_nr_unused--; LASSERT(lock->l_resource->lr_namespace->ns_nr_unused >= 0); } - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + spin_unlock(&lock->l_resource->lr_namespace->ns_unused_lock); EXIT; } @@ -179,7 +180,8 @@ void ldlm_lock_remove_from_lru(struct ldlm_lock *lock) void ldlm_lock_destroy(struct ldlm_lock *lock) { ENTRY; - l_lock(&lock->l_resource->lr_namespace->ns_lock); + + lock_res(lock->l_resource); if (!list_empty(&lock->l_children)) { LDLM_ERROR(lock, "still has children (%p)!", @@ -201,13 +203,21 @@ void ldlm_lock_destroy(struct ldlm_lock *lock) if (lock->l_destroyed) { LASSERT(list_empty(&lock->l_lru)); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + unlock_res(lock->l_resource); EXIT; return; } lock->l_destroyed = 1; - list_del_init(&lock->l_export_chain); + if (lock->l_export) { + spin_lock(&lock->l_export->exp_ldlm_data.led_lock); + if (!list_empty(&lock->l_export_chain)) + list_del_init(&lock->l_export_chain); + spin_unlock(&lock->l_export->exp_ldlm_data.led_lock); + } else { + LASSERT(list_empty(&lock->l_export_chain)); + } + ldlm_lock_remove_from_lru(lock); class_handle_unhash(&lock->l_handle); @@ -222,7 +232,7 @@ void ldlm_lock_destroy(struct ldlm_lock *lock) lock->l_completion_ast(lock, 0); #endif - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + unlock_res(lock->l_resource); LDLM_LOCK_PUT(lock); EXIT; } @@ -261,17 +271,18 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent, INIT_LIST_HEAD(&lock->l_export_chain); INIT_LIST_HEAD(&lock->l_pending_chain); INIT_LIST_HEAD(&lock->l_tmp); + INIT_LIST_HEAD(&lock->l_bl_ast); + INIT_LIST_HEAD(&lock->l_cp_ast); init_waitqueue_head(&lock->l_waitq); + lock->l_blocking_lock = NULL; - spin_lock(&resource->lr_namespace->ns_counter_lock); - resource->lr_namespace->ns_locks++; - spin_unlock(&resource->lr_namespace->ns_counter_lock); + atomic_inc(&resource->lr_namespace->ns_locks); if (parent != NULL) { - l_lock(&parent->l_resource->lr_namespace->ns_lock); + spin_lock(&resource->lr_namespace->ns_hash_lock); lock->l_parent = LDLM_LOCK_GET(parent); list_add(&lock->l_childof, &parent->l_children); - l_unlock(&parent->l_resource->lr_namespace->ns_lock); + spin_unlock(&resource->lr_namespace->ns_hash_lock); } INIT_LIST_HEAD(&lock->l_handle.h_link); @@ -286,11 +297,11 @@ int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock, struct ldlm_resource *oldres = lock->l_resource; ENTRY; - l_lock(&ns->ns_lock); + lock_res(oldres); if (memcmp(&new_resid, &lock->l_resource->lr_name, sizeof(lock->l_resource->lr_name)) == 0) { /* Nothing to do */ - l_unlock(&ns->ns_lock); + unlock_res(oldres); RETURN(0); } @@ -307,10 +318,11 @@ int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock, RETURN(-ENOMEM); } + unlock_res(oldres); + /* ...and the flowers are still standing! */ ldlm_resource_putref(oldres); - l_unlock(&ns->ns_lock); RETURN(0); } @@ -343,17 +355,19 @@ struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *handle, int flags) ns = lock->l_resource->lr_namespace; LASSERT(ns != NULL); - l_lock(&ns->ns_lock); + lock_res(lock->l_resource); /* It's unlikely but possible that someone marked the lock as * destroyed after we did handle2object on it */ if (lock->l_destroyed) { + unlock_res(lock->l_resource); CDEBUG(D_INFO, "lock already destroyed: lock %p\n", lock); LDLM_LOCK_PUT(lock); GOTO(out, retval); } if (flags && (lock->l_flags & flags)) { + unlock_res(lock->l_resource); LDLM_LOCK_PUT(lock); GOTO(out, retval); } @@ -361,10 +375,10 @@ struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *handle, int flags) if (flags) lock->l_flags |= flags; + unlock_res(lock->l_resource); retval = lock; EXIT; out: - l_unlock(&ns->ns_lock); return retval; } @@ -372,11 +386,7 @@ struct ldlm_lock *ldlm_handle2lock_ns(struct ldlm_namespace *ns, struct lustre_handle *handle) { struct ldlm_lock *retval = NULL; - - l_lock(&ns->ns_lock); retval = __ldlm_handle2lock(handle, 0); - l_unlock(&ns->ns_lock); - return retval; } @@ -389,42 +399,46 @@ void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc) sizeof(desc->l_policy_data)); } -void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new, - void *data, int datalen) +void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new, + struct list_head *work_list) { - struct ldlm_ast_work *w; - ENTRY; - - l_lock(&lock->l_resource->lr_namespace->ns_lock); - if (new && (lock->l_flags & LDLM_FL_AST_SENT)) - GOTO(out, 0); - - CDEBUG(D_OTHER, "lock %p incompatible; sending blocking AST.\n", lock); - - OBD_ALLOC(w, sizeof(*w)); - if (!w) { - LBUG(); - GOTO(out, 0); - } - - w->w_data = data; - w->w_datalen = datalen; - if (new) { + if ((lock->l_flags & LDLM_FL_AST_SENT) == 0) { LDLM_DEBUG(lock, "lock incompatible; sending blocking AST."); lock->l_flags |= LDLM_FL_AST_SENT; /* If the enqueuing client said so, tell the AST recipient to * discard dirty data, rather than writing back. */ if (new->l_flags & LDLM_AST_DISCARD_DATA) lock->l_flags |= LDLM_FL_DISCARD_DATA; - w->w_blocking = 1; - ldlm_lock2desc(new, &w->w_desc); + LASSERT(list_empty(&lock->l_bl_ast)); + list_add(&lock->l_bl_ast, work_list); + LDLM_LOCK_GET(lock); + LASSERT(lock->l_blocking_lock == NULL); + lock->l_blocking_lock = LDLM_LOCK_GET(new); } +} + +void ldlm_add_cp_work_item(struct ldlm_lock *lock, struct list_head *work_list) +{ + if ((lock->l_flags & LDLM_FL_CP_REQD) == 0) { + LDLM_DEBUG(lock, "lock granted; sending completion AST."); + lock->l_flags |= LDLM_FL_CP_REQD; + LASSERT(list_empty(&lock->l_cp_ast)); + list_add(&lock->l_cp_ast, work_list); + LDLM_LOCK_GET(lock); + } +} - w->w_lock = LDLM_LOCK_GET(lock); - list_add(&w->w_list, lock->l_resource->lr_tmp); +/* must be called with lr_lock held */ +void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new, + struct list_head *work_list) +{ + ENTRY; + check_res_locked(lock->l_resource); + if (new) + ldlm_add_bl_work_item(lock, new, work_list); + else + ldlm_add_cp_work_item(lock, work_list); EXIT; - out: - l_unlock(&lock->l_resource->lr_namespace->ns_lock); } void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode) @@ -436,10 +450,8 @@ void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode) LDLM_LOCK_PUT(lock); } -/* only called for local locks */ -void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode) +void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock, __u32 mode) { - l_lock(&lock->l_resource->lr_namespace->ns_lock); ldlm_lock_remove_from_lru(lock); if (mode & (LCK_NL | LCK_CR | LCK_PR)) lock->l_readers++; @@ -448,7 +460,14 @@ void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode) lock->l_last_used = jiffies; LDLM_LOCK_GET(lock); LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); +} + +/* only called for local locks */ +void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode) +{ + lock_res(lock->l_resource); + ldlm_lock_addref_internal_nolock(lock, mode); + unlock_res(lock->l_resource); } void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) @@ -458,7 +477,8 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) ns = lock->l_resource->lr_namespace; - l_lock(&ns->ns_lock); + lock_res(lock->l_resource); + LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]); if (mode & (LCK_NL | LCK_CR | LCK_PR)) { LASSERT(lock->l_readers > 0); @@ -489,8 +509,9 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) LDLM_LOCK_GET(lock); /* dropped by bl thread */ ldlm_lock_remove_from_lru(lock); - l_unlock(&ns->ns_lock); - if (ldlm_bl_to_thread(ns, NULL, lock) != 0) + unlock_res(lock->l_resource); + if ((lock->l_flags & LDLM_FL_ATOMIC_CB) || + ldlm_bl_to_thread(ns, NULL, lock) != 0) ldlm_handle_bl_callback(ns, NULL, lock); } else if (ns->ns_client == LDLM_NAMESPACE_CLIENT && !lock->l_readers && !lock->l_writers) { @@ -498,12 +519,14 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) * reference, put it on the LRU. */ LASSERT(list_empty(&lock->l_lru)); LASSERT(ns->ns_nr_unused >= 0); + spin_lock(&ns->ns_unused_lock); list_add_tail(&lock->l_lru, &ns->ns_unused_list); ns->ns_nr_unused++; - l_unlock(&ns->ns_lock); + spin_unlock(&ns->ns_unused_lock); + unlock_res(lock->l_resource); ldlm_cancel_lru(ns, LDLM_ASYNC); } else { - l_unlock(&ns->ns_lock); + unlock_res(lock->l_resource); } LDLM_LOCK_PUT(lock); /* matches the ldlm_lock_get in addref */ @@ -529,9 +552,9 @@ void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode) LASSERT(lock != NULL); LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]); - l_lock(&lock->l_resource->lr_namespace->ns_lock); + lock_res(lock->l_resource); lock->l_flags |= LDLM_FL_CBPENDING; - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + unlock_res(lock->l_resource); ldlm_lock_decref_internal(lock, mode); LDLM_LOCK_PUT(lock); } @@ -540,24 +563,25 @@ void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode) * - ldlm_lock_enqueue * - ldlm_reprocess_queue * - ldlm_lock_convert + * + * must be called with lr_lock held */ -void ldlm_grant_lock(struct ldlm_lock *lock, void *data, int datalen, - int run_ast) +void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list) { struct ldlm_resource *res = lock->l_resource; ENTRY; - l_lock(&lock->l_resource->lr_namespace->ns_lock); + check_res_locked(res); + lock->l_granted_mode = lock->l_req_mode; ldlm_resource_add_lock(res, &res->lr_granted, lock); if (lock->l_granted_mode < res->lr_most_restr) res->lr_most_restr = lock->l_granted_mode; - if (run_ast && lock->l_completion_ast != NULL) - ldlm_add_ast_work_item(lock, NULL, data, datalen); + if (work_list && lock->l_completion_ast != NULL) + ldlm_add_ast_work_item(lock, NULL, work_list); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); EXIT; } @@ -621,7 +645,7 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode, if (flags & LDLM_FL_TEST_LOCK) LDLM_LOCK_GET(lock); else - ldlm_lock_addref_internal(lock, mode); + ldlm_lock_addref_internal_nolock(lock, mode); return lock; } @@ -630,10 +654,10 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode, void ldlm_lock_allow_match(struct ldlm_lock *lock) { - l_lock(&lock->l_resource->lr_namespace->ns_lock); + lock_res(lock->l_resource); lock->l_flags |= LDLM_FL_CAN_MATCH; wake_up(&lock->l_waitq); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + unlock_res(lock->l_resource); } /* Can be called in two ways: @@ -682,7 +706,7 @@ int ldlm_lock_match(struct ldlm_namespace *ns, int flags, RETURN(0); } - l_lock(&ns->ns_lock); + lock_res(res); lock = search_queue(&res->lr_granted, mode, policy, old_lock, flags); if (lock != NULL) @@ -698,8 +722,8 @@ int ldlm_lock_match(struct ldlm_namespace *ns, int flags, EXIT; out: + unlock_res(res); ldlm_resource_putref(res); - l_unlock(&ns->ns_lock); if (lock) { ldlm_lock2handle(lock, lockh); @@ -725,13 +749,11 @@ int ldlm_lock_match(struct ldlm_namespace *ns, int flags, out2: if (rc) { - l_lock(&ns->ns_lock); LDLM_DEBUG(lock, "matched ("LPU64" "LPU64")", type == LDLM_PLAIN ? res_id->name[2] : policy->l_extent.start, type == LDLM_PLAIN ? res_id->name[3] : policy->l_extent.end); - l_unlock(&ns->ns_lock); } else if (!(flags & LDLM_FL_TEST_LOCK)) {/* less verbose for test-only */ LDLM_DEBUG_NOLOCK("not matched ns %p type %u mode %u res " LPU64"/"LPU64" ("LPU64" "LPU64")", ns, @@ -837,7 +859,7 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns, LASSERT(rc == ELDLM_OK); } - l_lock(&ns->ns_lock); + lock_res(lock->l_resource); if (local && lock->l_req_mode == lock->l_granted_mode) { /* The server returned a blocked lock, but it was granted before * we got a chance to actually enqueue it. We don't need to do @@ -869,7 +891,7 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns, else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED)) ldlm_resource_add_lock(res, &res->lr_waiting, lock); else - ldlm_grant_lock(lock, NULL, 0, 0); + ldlm_grant_lock(lock, NULL); GOTO(out, ELDLM_OK); } else if (*flags & LDLM_FL_REPLAY) { if (*flags & LDLM_FL_BLOCK_CONV) { @@ -879,22 +901,23 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns, ldlm_resource_add_lock(res, &res->lr_waiting, lock); GOTO(out, ELDLM_OK); } else if (*flags & LDLM_FL_BLOCK_GRANTED) { - ldlm_grant_lock(lock, NULL, 0, 0); + ldlm_grant_lock(lock, NULL); GOTO(out, ELDLM_OK); } /* If no flags, fall through to normal enqueue path. */ } policy = ldlm_processing_policy_table[res->lr_type]; - policy(lock, flags, 1, &rc); + policy(lock, flags, 1, &rc, NULL); EXIT; out: - l_unlock(&ns->ns_lock); + unlock_res(lock->l_resource); return rc; } /* Must be called with namespace taken: queue is waiting or converting. */ -int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue) +int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue, + struct list_head *work_list) { struct list_head *tmp, *pos; ldlm_processing_policy policy; @@ -903,6 +926,8 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue) ldlm_error_t err; ENTRY; + check_res_locked(res); + policy = ldlm_processing_policy_table[res->lr_type]; LASSERT(policy); @@ -913,7 +938,7 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue) CDEBUG(D_INFO, "Reprocessing lock %p\n", pending); flags = 0; - rc = policy(pending, &flags, 0, &err); + rc = policy(pending, &flags, 0, &err, work_list); if (rc != LDLM_ITER_CONTINUE) break; } @@ -921,49 +946,80 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue) RETURN(rc); } -int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list) +int ldlm_run_bl_ast_work(struct list_head *rpc_list) { struct list_head *tmp, *pos; - int rc, retval = 0; + struct ldlm_lock_desc d; + int rc = 0, retval = 0; ENTRY; - l_check_no_ns_lock(ns); + list_for_each_safe(tmp, pos, rpc_list) { + struct ldlm_lock *lock = + list_entry(tmp, struct ldlm_lock, l_bl_ast); + + /* nobody should touch l_bl_ast */ + lock_res(lock->l_resource); + list_del_init(&lock->l_bl_ast); + + LASSERT(lock->l_flags & LDLM_FL_AST_SENT); + LASSERT(lock->l_bl_ast_run == 0); + LASSERT(lock->l_blocking_lock); + lock->l_bl_ast_run++; + unlock_res(lock->l_resource); + + ldlm_lock2desc(lock->l_blocking_lock, &d); + + LDLM_LOCK_PUT(lock->l_blocking_lock); + lock->l_blocking_lock = NULL; + rc = lock->l_blocking_ast(lock, &d, NULL, LDLM_CB_BLOCKING); + + if (rc == -ERESTART) + retval = rc; + else if (rc) + CDEBUG(D_DLMTRACE, "Failed AST - should clean & " + "disconnect client\n"); + LDLM_LOCK_PUT(lock); + } + RETURN(retval); +} + +int ldlm_run_cp_ast_work(struct list_head *rpc_list) +{ + struct list_head *tmp, *pos; + int rc = 0, retval = 0; + ENTRY; + + /* It's possible to receive a completion AST before we've set + * the l_completion_ast pointer: either because the AST arrived + * before the reply, or simply because there's a small race + * window between receiving the reply and finishing the local + * enqueue. (bug 842) + * + * This can't happen with the blocking_ast, however, because we + * will never call the local blocking_ast until we drop our + * reader/writer reference, which we won't do until we get the + * reply and finish enqueueing. */ list_for_each_safe(tmp, pos, rpc_list) { - struct ldlm_ast_work *w = - list_entry(tmp, struct ldlm_ast_work, w_list); - - /* It's possible to receive a completion AST before we've set - * the l_completion_ast pointer: either because the AST arrived - * before the reply, or simply because there's a small race - * window between receiving the reply and finishing the local - * enqueue. (bug 842) - * - * This can't happen with the blocking_ast, however, because we - * will never call the local blocking_ast until we drop our - * reader/writer reference, which we won't do until we get the - * reply and finish enqueueing. */ - LASSERT(w->w_lock != NULL); - if (w->w_blocking) { - LASSERT(w->w_lock->l_blocking_ast != NULL); - rc = w->w_lock->l_blocking_ast - (w->w_lock, &w->w_desc, w->w_data, - LDLM_CB_BLOCKING); - } else if (w->w_lock->l_completion_ast != NULL) { - LASSERT(w->w_lock->l_completion_ast != NULL); - rc = w->w_lock->l_completion_ast(w->w_lock, w->w_flags, - w->w_data); - } else { - rc = 0; - } + struct ldlm_lock *lock = + list_entry(tmp, struct ldlm_lock, l_cp_ast); + + /* nobody should touch l_cp_ast */ + lock_res(lock->l_resource); + list_del_init(&lock->l_cp_ast); + LASSERT(lock->l_flags & LDLM_FL_CP_REQD); + lock->l_flags &= ~LDLM_FL_CP_REQD; + unlock_res(lock->l_resource); + + if (lock->l_completion_ast != NULL) + rc = lock->l_completion_ast(lock, 0, 0); + if (rc == -ERESTART) retval = rc; else if (rc) CDEBUG(D_DLMTRACE, "Failed AST - should clean & " "disconnect client\n"); - LDLM_LOCK_PUT(w->w_lock); - list_del(&w->w_list); - OBD_FREE(w, sizeof(*w)); + LDLM_LOCK_PUT(lock); } RETURN(retval); } @@ -976,27 +1032,31 @@ static int reprocess_one_queue(struct ldlm_resource *res, void *closure) void ldlm_reprocess_all_ns(struct ldlm_namespace *ns) { + struct list_head *tmp; int i, rc; - l_lock(&ns->ns_lock); + spin_lock(&ns->ns_hash_lock); for (i = 0; i < RES_HASH_SIZE; i++) { - struct list_head *tmp, *next; - list_for_each_safe(tmp, next, &(ns->ns_hash[i])) { + tmp = ns->ns_hash[i].next; + while (tmp != &(ns->ns_hash[i])) { struct ldlm_resource *res = list_entry(tmp, struct ldlm_resource, lr_hash); ldlm_resource_getref(res); - l_unlock(&ns->ns_lock); + spin_unlock(&ns->ns_hash_lock); + rc = reprocess_one_queue(res, NULL); - l_lock(&ns->ns_lock); - next = tmp->next; - ldlm_resource_putref(res); + + spin_lock(&ns->ns_hash_lock); + tmp = tmp->next; + ldlm_resource_putref_locked(res); + if (rc == LDLM_ITER_STOP) GOTO(out, rc); } } out: - l_unlock(&ns->ns_lock); + spin_unlock(&ns->ns_hash_lock); EXIT; } @@ -1013,17 +1073,13 @@ void ldlm_reprocess_all(struct ldlm_resource *res) } restart: - l_lock(&res->lr_namespace->ns_lock); - res->lr_tmp = &rpc_list; - - rc = ldlm_reprocess_queue(res, &res->lr_converting); + lock_res(res); + rc = ldlm_reprocess_queue(res, &res->lr_converting, &rpc_list); if (rc == LDLM_ITER_CONTINUE) - ldlm_reprocess_queue(res, &res->lr_waiting); - - res->lr_tmp = NULL; - l_unlock(&res->lr_namespace->ns_lock); + ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list); + unlock_res(res); - rc = ldlm_run_ast_work(res->lr_namespace, &rpc_list); + rc = ldlm_run_cp_ast_work(&rpc_list); if (rc == -ERESTART) { LASSERT(list_empty(&rpc_list)); goto restart; @@ -1033,20 +1089,19 @@ void ldlm_reprocess_all(struct ldlm_resource *res) void ldlm_cancel_callback(struct ldlm_lock *lock) { - l_lock(&lock->l_resource->lr_namespace->ns_lock); + check_res_locked(lock->l_resource); + if (!(lock->l_flags & LDLM_FL_CANCEL)) { lock->l_flags |= LDLM_FL_CANCEL; if (lock->l_blocking_ast) { - l_unlock(&lock->l_resource->lr_namespace->ns_lock); - // l_check_no_ns_lock(lock->l_resource->lr_namespace); + unlock_res(lock->l_resource); lock->l_blocking_ast(lock, NULL, lock->l_ast_data, LDLM_CB_CANCELING); - return; + lock_res(lock->l_resource); } else { LDLM_DEBUG(lock, "no blocking ast"); } } - l_unlock(&lock->l_resource->lr_namespace->ns_lock); } void ldlm_lock_cancel(struct ldlm_lock *lock) @@ -1058,9 +1113,9 @@ void ldlm_lock_cancel(struct ldlm_lock *lock) res = lock->l_resource; ns = res->lr_namespace; - l_lock(&ns->ns_lock); ldlm_del_waiting_lock(lock); - + lock_res(res); + /* Please do not, no matter how tempting, remove this LBUG without * talking to me first. -phik */ if (lock->l_readers || lock->l_writers) { @@ -1071,8 +1126,10 @@ void ldlm_lock_cancel(struct ldlm_lock *lock) ldlm_cancel_callback(lock); ldlm_resource_unlink_lock(lock); + unlock_res(res); + ldlm_lock_destroy(lock); - l_unlock(&ns->ns_lock); + EXIT; } @@ -1091,23 +1148,26 @@ int ldlm_lock_set_data(struct lustre_handle *lockh, void *data) void ldlm_cancel_locks_for_export(struct obd_export *exp) { - struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; struct ldlm_lock *lock; struct ldlm_resource *res; - l_lock(&ns->ns_lock); + spin_lock(&exp->exp_ldlm_data.led_lock); while(!list_empty(&exp->exp_ldlm_data.led_held_locks)) { lock = list_entry(exp->exp_ldlm_data.led_held_locks.next, struct ldlm_lock, l_export_chain); res = ldlm_resource_getref(lock->l_resource); + LDLM_LOCK_GET(lock); + spin_unlock(&exp->exp_ldlm_data.led_lock); + LDLM_DEBUG(lock, "export %p", exp); ldlm_lock_cancel(lock); - l_unlock(&ns->ns_lock); ldlm_reprocess_all(res); + ldlm_resource_putref(res); - l_lock(&ns->ns_lock); + LDLM_LOCK_PUT(lock); + spin_lock(&exp->exp_ldlm_data.led_lock); } - l_unlock(&ns->ns_lock); + spin_unlock(&exp->exp_ldlm_data.led_lock); } struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, @@ -1132,7 +1192,7 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, res = lock->l_resource; ns = res->lr_namespace; - l_lock(&ns->ns_lock); + lock_res(res); old_mode = lock->l_req_mode; lock->l_req_mode = new_mode; @@ -1149,9 +1209,7 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, *flags); LBUG(); - res->lr_tmp = &rpc_list; - ldlm_grant_lock(lock, NULL, 0, 0); - res->lr_tmp = NULL; + ldlm_grant_lock(lock, &rpc_list); granted = 1; /* FIXME: completion handling not with ns_lock held ! */ if (lock->l_completion_ast) @@ -1161,9 +1219,7 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, int pflags = 0; ldlm_processing_policy policy; policy = ldlm_processing_policy_table[res->lr_type]; - res->lr_tmp = &rpc_list; - rc = policy(lock, &pflags, 0, &err); - res->lr_tmp = NULL; + rc = policy(lock, &pflags, 0, &err, &rpc_list); if (rc == LDLM_ITER_STOP) { lock->l_req_mode = old_mode; ldlm_resource_add_lock(res, &res->lr_granted, lock); @@ -1173,11 +1229,10 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, granted = 1; } } - - l_unlock(&ns->ns_lock); + unlock_res(lock->l_resource); if (granted) - ldlm_run_ast_work(ns, &rpc_list); + ldlm_run_cp_ast_work(&rpc_list); RETURN(res); } diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 340891d..a7275d0 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -42,7 +42,6 @@ extern kmem_cache_t *ldlm_resource_slab; extern kmem_cache_t *ldlm_lock_slab; -extern struct lustre_lock ldlm_handle_lock; extern struct list_head ldlm_namespace_list; static DECLARE_MUTEX(ldlm_ref_sem); @@ -361,9 +360,7 @@ static int ldlm_handle_ast_error(struct ldlm_lock *lock, ldlm_lock_cancel(lock); rc = -ERESTART; } else { - l_lock(&lock->l_resource->lr_namespace->ns_lock); ldlm_del_waiting_lock(lock); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); ldlm_failed_ast(lock, rc, ast_type); } } else if (rc) { @@ -405,21 +402,6 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, LASSERT(lock); - l_lock(&lock->l_resource->lr_namespace->ns_lock); - if (lock->l_granted_mode != lock->l_req_mode) { - /* this blocking AST will be communicated as part of the - * completion AST instead */ - LDLM_DEBUG(lock, "lock not granted, not sending blocking AST"); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); - RETURN(0); - } - - if (lock->l_destroyed) { - /* What's the point? */ - l_unlock(&lock->l_resource->lr_namespace->ns_lock); - RETURN(0); - } - #if 0 if (LTIME_S(CURRENT_TIME) - lock->l_export->exp_last_request_time > 30){ ldlm_failed_ast(lock, -ETIMEDOUT, "Not-attempted blocking"); @@ -431,9 +413,24 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse, LUSTRE_DLM_VERSION, LDLM_BL_CALLBACK, 1, &size, NULL); - if (req == NULL) { - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + if (req == NULL) RETURN(-ENOMEM); + + lock_res(lock->l_resource); + if (lock->l_granted_mode != lock->l_req_mode) { + /* this blocking AST will be communicated as part of the + * completion AST instead */ + unlock_res(lock->l_resource); + LDLM_DEBUG(lock, "lock not granted, not sending blocking AST"); + ptlrpc_req_finished(req); + RETURN(0); + } + + if (lock->l_destroyed) { + /* What's the point? */ + unlock_res(lock->l_resource); + ptlrpc_req_finished(req); + RETURN(0); } body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body)); @@ -447,7 +444,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, if (lock->l_granted_mode == lock->l_req_mode) ldlm_add_waiting_lock(lock); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + unlock_res(lock->l_resource); req->rq_send_state = LUSTRE_IMP_FULL; req->rq_timeout = ldlm_timeout; /* timeout for initial AST reply */ @@ -484,12 +481,12 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) if (total_enqueue_wait / 1000000 > obd_timeout) LDLM_ERROR(lock, "enqueue wait took %ldus", total_enqueue_wait); - down(&lock->l_resource->lr_lvb_sem); + lock_res(lock->l_resource); if (lock->l_resource->lr_lvb_len) { buffers = 2; size[1] = lock->l_resource->lr_lvb_len; } - up(&lock->l_resource->lr_lvb_sem); + unlock_res(lock->l_resource); req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse, LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK, @@ -506,13 +503,12 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) if (buffers == 2) { void *lvb; - down(&lock->l_resource->lr_lvb_sem); lvb = lustre_msg_buf(req->rq_reqmsg, 1, lock->l_resource->lr_lvb_len); - + lock_res(lock->l_resource); memcpy(lvb, lock->l_resource->lr_lvb_data, lock->l_resource->lr_lvb_len); - up(&lock->l_resource->lr_lvb_sem); + unlock_res(lock->l_resource); } LDLM_DEBUG(lock, "server preparing completion AST (after %ldus wait)", @@ -523,12 +519,12 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) req->rq_timeout = ldlm_timeout; /* timeout for initial AST reply */ /* We only send real blocking ASTs after the lock is granted */ - l_lock(&lock->l_resource->lr_namespace->ns_lock); + lock_res(lock->l_resource); if (lock->l_flags & LDLM_FL_AST_SENT) { body->lock_flags |= LDLM_FL_AST_SENT; ldlm_add_waiting_lock(lock); /* start the lock-timeout clock */ } - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + unlock_res(lock->l_resource); rc = ptlrpc_queue_wait(req); if (rc != 0) @@ -560,9 +556,9 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data) sizeof(body->lock_handle1)); ldlm_lock2desc(lock, &body->lock_desc); - down(&lock->l_resource->lr_lvb_sem); + lock_res(lock->l_resource); size = lock->l_resource->lr_lvb_len; - up(&lock->l_resource->lr_lvb_sem); + unlock_res(lock->l_resource); req->rq_replen = lustre_msg_size(1, &size); req->rq_send_state = LUSTRE_IMP_FULL; @@ -583,20 +579,19 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data) static struct ldlm_lock * find_existing_lock(struct obd_export *exp, struct lustre_handle *remote_hdl) { - struct obd_device *obd = exp->exp_obd; struct list_head *iter; - l_lock(&obd->obd_namespace->ns_lock); + spin_lock(&exp->exp_ldlm_data.led_lock); list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) { struct ldlm_lock *lock; lock = list_entry(iter, struct ldlm_lock, l_export_chain); if (lock->l_remote_handle.cookie == remote_hdl->cookie) { LDLM_LOCK_GET(lock); - l_unlock(&obd->obd_namespace->ns_lock); + spin_unlock(&exp->exp_ldlm_data.led_lock); return lock; } } - l_unlock(&obd->obd_namespace->ns_lock); + spin_unlock(&exp->exp_ldlm_data.led_lock); return NULL; } @@ -657,17 +652,16 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req, LASSERT(req->rq_export); OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_BLOCKED, obd_timeout * 2); - l_lock(&lock->l_resource->lr_namespace->ns_lock); if (req->rq_export->exp_failed) { LDLM_ERROR(lock,"lock on destroyed export %p\n",req->rq_export); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); GOTO(out, err = -ENOTCONN); } - lock->l_export = class_export_get(req->rq_export); + lock->l_export = class_export_get(req->rq_export); + spin_lock(&lock->l_export->exp_ldlm_data.led_lock); list_add(&lock->l_export_chain, &lock->l_export->exp_ldlm_data.led_held_locks); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + spin_unlock(&lock->l_export->exp_ldlm_data.led_lock); existing_lock: @@ -677,12 +671,12 @@ existing_lock: cookie = req; } else { int buffers = 1; - down(&lock->l_resource->lr_lvb_sem); + lock_res(lock->l_resource); if (lock->l_resource->lr_lvb_len) { size[1] = lock->l_resource->lr_lvb_len; buffers = 2; } - up(&lock->l_resource->lr_lvb_sem); + unlock_res(lock->l_resource); if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR)) GOTO(out, rc = -ENOMEM); @@ -711,13 +705,13 @@ existing_lock: /* We never send a blocking AST until the lock is granted, but * we can tell it right now */ - l_lock(&lock->l_resource->lr_namespace->ns_lock); + lock_res(lock->l_resource); if (lock->l_flags & LDLM_FL_AST_SENT) { dlm_rep->lock_flags |= LDLM_FL_AST_SENT; if (lock->l_granted_mode == lock->l_req_mode) ldlm_add_waiting_lock(lock); } - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + unlock_res(lock->l_resource); EXIT; out: @@ -732,13 +726,11 @@ existing_lock: /* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this * ldlm_reprocess_all. If this moves, revisit that code. -phil */ if (lock) { - l_lock(&lock->l_resource->lr_namespace->ns_lock); LDLM_DEBUG(lock, "server-side enqueue handler, sending reply" "(err=%d, rc=%d)", err, rc); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); if (rc == 0) { - down(&lock->l_resource->lr_lvb_sem); + lock_res(lock->l_resource); size[1] = lock->l_resource->lr_lvb_len; if (size[1] > 0) { void *lvb = lustre_msg_buf(req->rq_repmsg, @@ -749,7 +741,7 @@ existing_lock: memcpy(lvb, lock->l_resource->lr_lvb_data, size[1]); } - up(&lock->l_resource->lr_lvb_sem); + unlock_res(lock->l_resource); } else { ldlm_lock_destroy(lock); } @@ -793,18 +785,14 @@ int ldlm_handle_convert(struct ptlrpc_request *req) } else { void *res = NULL; - l_lock(&lock->l_resource->lr_namespace->ns_lock); LDLM_DEBUG(lock, "server-side convert handler START"); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); do_gettimeofday(&lock->l_enqueued_time); res = ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode, (int *)&dlm_rep->lock_flags); if (res) { - l_lock(&lock->l_resource->lr_namespace->ns_lock); if (ldlm_del_waiting_lock(lock)) - CDEBUG(D_DLMTRACE, "converted waiting lock %p\n", lock); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + LDLM_DEBUG(lock, "converted waiting lock"); req->rq_status = 0; } else { req->rq_status = EDEADLOCK; @@ -814,9 +802,7 @@ int ldlm_handle_convert(struct ptlrpc_request *req) if (lock) { if (!req->rq_status) ldlm_reprocess_all(lock->l_resource); - l_lock(&lock->l_resource->lr_namespace->ns_lock); LDLM_DEBUG(lock, "server-side convert handler END"); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); LDLM_LOCK_PUT(lock); } else LDLM_DEBUG_NOLOCK("server-side convert handler END"); @@ -866,11 +852,9 @@ int ldlm_handle_cancel(struct ptlrpc_request *req) //(res, req->rq_reqmsg, 1); } - l_lock(&res->lr_namespace->ns_lock); ldlm_lock_cancel(lock); if (ldlm_del_waiting_lock(lock)) CDEBUG(D_DLMTRACE, "cancelled waiting lock %p\n", lock); - l_unlock(&res->lr_namespace->ns_lock); req->rq_status = rc; } @@ -879,9 +863,7 @@ int ldlm_handle_cancel(struct ptlrpc_request *req) if (lock) { ldlm_reprocess_all(lock->l_resource); - l_lock(&lock->l_resource->lr_namespace->ns_lock); LDLM_DEBUG(lock, "server-side cancel handler END"); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); LDLM_LOCK_PUT(lock); } @@ -894,29 +876,25 @@ void ldlm_handle_bl_callback(struct ldlm_namespace *ns, int do_ast; ENTRY; - l_lock(&ns->ns_lock); LDLM_DEBUG(lock, "client blocking AST callback handler START"); - + + lock_res(lock->l_resource); lock->l_flags |= LDLM_FL_CBPENDING; do_ast = (!lock->l_readers && !lock->l_writers); + unlock_res(lock->l_resource); if (do_ast) { LDLM_DEBUG(lock, "already unused, calling " "callback (%p)", lock->l_blocking_ast); - if (lock->l_blocking_ast != NULL) { - l_unlock(&ns->ns_lock); - l_check_no_ns_lock(ns); + if (lock->l_blocking_ast != NULL) lock->l_blocking_ast(lock, ld, lock->l_ast_data, LDLM_CB_BLOCKING); - l_lock(&ns->ns_lock); - } } else { LDLM_DEBUG(lock, "Lock still has references, will be" " cancelled later"); } LDLM_DEBUG(lock, "client blocking callback handler END"); - l_unlock(&ns->ns_lock); LDLM_LOCK_PUT(lock); EXIT; } @@ -926,12 +904,14 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, struct ldlm_request *dlm_req, struct ldlm_lock *lock) { + struct ldlm_resource *res = lock->l_resource; LIST_HEAD(ast_list); ENTRY; - l_lock(&ns->ns_lock); LDLM_DEBUG(lock, "client completion callback handler START"); + lock_res(res); + /* If we receive the completion AST before the actual enqueue returned, * then we might need to switch lock modes, resources, or extents. */ if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) { @@ -949,9 +929,11 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, if (memcmp(&dlm_req->lock_desc.l_resource.lr_name, &lock->l_resource->lr_name, sizeof(lock->l_resource->lr_name)) != 0) { + unlock_res(res); ldlm_lock_change_resource(ns, lock, dlm_req->lock_desc.l_resource.lr_name); LDLM_DEBUG(lock, "completion AST, new resource"); + lock_res(res); } if (dlm_req->lock_flags & LDLM_FL_AST_SENT) { @@ -971,14 +953,13 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, } } - lock->l_resource->lr_tmp = &ast_list; - ldlm_grant_lock(lock, req, sizeof(*req), 1); - lock->l_resource->lr_tmp = NULL; + ldlm_grant_lock(lock, &ast_list); + unlock_res(res); + LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work"); - l_unlock(&ns->ns_lock); LDLM_LOCK_PUT(lock); - ldlm_run_ast_work(ns, &ast_list); + ldlm_run_cp_ast_work(&ast_list); LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)", lock); @@ -993,15 +974,10 @@ static void ldlm_handle_gl_callback(struct ptlrpc_request *req, int rc = -ENOSYS; ENTRY; - l_lock(&ns->ns_lock); LDLM_DEBUG(lock, "client glimpse AST callback handler"); - if (lock->l_glimpse_ast != NULL) { - l_unlock(&ns->ns_lock); - l_check_no_ns_lock(ns); + if (lock->l_glimpse_ast != NULL) rc = lock->l_glimpse_ast(lock, req); - l_lock(&ns->ns_lock); - } if (req->rq_repmsg != NULL) { ptlrpc_reply(req); @@ -1010,16 +986,18 @@ static void ldlm_handle_gl_callback(struct ptlrpc_request *req, ptlrpc_error(req); } - l_unlock(&ns->ns_lock); + lock_res(lock->l_resource); if (lock->l_granted_mode == LCK_PW && !lock->l_readers && !lock->l_writers && time_after(jiffies, lock->l_last_used + 10 * HZ)) { + unlock_res(lock->l_resource); if (ldlm_bl_to_thread(ns, NULL, lock)) ldlm_handle_bl_callback(ns, NULL, lock); EXIT; return; } + unlock_res(lock->l_resource); LDLM_LOCK_PUT(lock); EXIT; } @@ -1575,8 +1553,6 @@ int __init ldlm_init(void) return -ENOMEM; } - l_lock_init(&ldlm_handle_lock); - return 0; } @@ -1661,10 +1637,6 @@ EXPORT_SYMBOL(ldlm_dump_all_namespaces); EXPORT_SYMBOL(ldlm_resource_get); EXPORT_SYMBOL(ldlm_resource_putref); -/* l_lock.c */ -EXPORT_SYMBOL(l_lock); -EXPORT_SYMBOL(l_unlock); - /* ldlm_lib.c */ EXPORT_SYMBOL(client_import_add_conn); EXPORT_SYMBOL(client_import_del_conn); diff --git a/lustre/ldlm/ldlm_plain.c b/lustre/ldlm/ldlm_plain.c index 9a693e3..f8c10fd 100644 --- a/lustre/ldlm/ldlm_plain.c +++ b/lustre/ldlm/ldlm_plain.c @@ -35,7 +35,7 @@ static inline int ldlm_plain_compat_queue(struct list_head *queue, struct ldlm_lock *req, - int send_cbs) + struct list_head *work_list) { struct list_head *tmp; struct ldlm_lock *lock; @@ -54,12 +54,12 @@ ldlm_plain_compat_queue(struct list_head *queue, struct ldlm_lock *req, if (lockmode_compat(lock->l_req_mode, req_mode)) continue; - if (!send_cbs) + if (!work_list) RETURN(0); compat = 0; if (lock->l_blocking_ast) - ldlm_add_ast_work_item(lock, req, NULL, 0); + ldlm_add_ast_work_item(lock, req, work_list); } RETURN(compat); @@ -75,7 +75,7 @@ ldlm_plain_compat_queue(struct list_head *queue, struct ldlm_lock *req, * - the caller has NOT initialized req->lr_tmp, so we must * - must call this function with the ns lock held once */ int ldlm_process_plain_lock(struct ldlm_lock *lock, int *flags, int first_enq, - ldlm_error_t *err) + ldlm_error_t *err, struct list_head *work_list) { struct ldlm_resource *res = lock->l_resource; struct list_head rpc_list = LIST_HEAD_INIT(rpc_list); @@ -85,25 +85,22 @@ int ldlm_process_plain_lock(struct ldlm_lock *lock, int *flags, int first_enq, LASSERT(list_empty(&res->lr_converting)); if (!first_enq) { - LASSERT(res->lr_tmp != NULL); - rc = ldlm_plain_compat_queue(&res->lr_granted, lock, 0); + LASSERT(work_list != NULL); + rc = ldlm_plain_compat_queue(&res->lr_granted, lock, NULL); if (!rc) RETURN(LDLM_ITER_STOP); - rc = ldlm_plain_compat_queue(&res->lr_waiting, lock, 0); + rc = ldlm_plain_compat_queue(&res->lr_waiting, lock, NULL); if (!rc) RETURN(LDLM_ITER_STOP); ldlm_resource_unlink_lock(lock); - ldlm_grant_lock(lock, NULL, 0, 1); + ldlm_grant_lock(lock, work_list); RETURN(LDLM_ITER_CONTINUE); } restart: - LASSERT(res->lr_tmp == NULL); - res->lr_tmp = &rpc_list; - rc = ldlm_plain_compat_queue(&res->lr_granted, lock, 1); - rc += ldlm_plain_compat_queue(&res->lr_waiting, lock, 1); - res->lr_tmp = NULL; + rc = ldlm_plain_compat_queue(&res->lr_granted, lock, &rpc_list); + rc += ldlm_plain_compat_queue(&res->lr_waiting, lock, &rpc_list); if (rc != 2) { /* If either of the compat_queue()s returned 0, then we @@ -114,15 +111,15 @@ int ldlm_process_plain_lock(struct ldlm_lock *lock, int *flags, int first_enq, * re-ordered! Causes deadlock, because ASTs aren't sent! */ if (list_empty(&lock->l_res_link)) ldlm_resource_add_lock(res, &res->lr_waiting, lock); - l_unlock(&res->lr_namespace->ns_lock); - rc = ldlm_run_ast_work(res->lr_namespace, &rpc_list); - l_lock(&res->lr_namespace->ns_lock); + unlock_res(res); + rc = ldlm_run_bl_ast_work(&rpc_list); + lock_res(res); if (rc == -ERESTART) GOTO(restart, -ERESTART); *flags |= LDLM_FL_BLOCK_GRANTED; } else { ldlm_resource_unlink_lock(lock); - ldlm_grant_lock(lock, NULL, 0, 0); + ldlm_grant_lock(lock, NULL); } RETURN(0); } diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 3bc90b0..2a9d8a8 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -177,6 +177,8 @@ static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, ldlm_lock_addref_internal(lock, mode); ldlm_lock2handle(lock, lockh); lock->l_flags |= LDLM_FL_LOCAL; + if (*flags & LDLM_FL_ATOMIC_CB) + lock->l_flags |= LDLM_FL_ATOMIC_CB; lock->l_lvb_swabber = lvb_swabber; if (policy != NULL) memcpy(&lock->l_policy_data, policy, sizeof(*policy)); @@ -212,10 +214,10 @@ static void failed_lock_cleanup(struct ldlm_namespace *ns, struct lustre_handle *lockh, int mode) { /* Set a flag to prevent us from sending a CANCEL (bug 407) */ - l_lock(&ns->ns_lock); + lock_res(lock->l_resource); lock->l_flags |= LDLM_FL_LOCAL_ONLY; + unlock_res(lock->l_resource); LDLM_DEBUG(lock, "setting FL_LOCAL_ONLY"); - l_unlock(&ns->ns_lock); ldlm_lock_decref_and_cancel(lockh, mode); @@ -400,9 +402,9 @@ int ldlm_cli_enqueue(struct obd_export *exp, } if ((*flags) & LDLM_FL_AST_SENT) { - l_lock(&ns->ns_lock); + lock_res(lock->l_resource); lock->l_flags |= LDLM_FL_CBPENDING; - l_unlock(&ns->ns_lock); + unlock_res(lock->l_resource); LDLM_DEBUG(lock, "enqueue reply includes blocking AST"); } @@ -571,11 +573,11 @@ int ldlm_cli_cancel(struct lustre_handle *lockh) LDLM_DEBUG(lock, "client-side cancel"); /* Set this flag to prevent others from getting new references*/ - l_lock(&lock->l_resource->lr_namespace->ns_lock); + lock_res(lock->l_resource); lock->l_flags |= LDLM_FL_CBPENDING; local_only = lock->l_flags & LDLM_FL_LOCAL_ONLY; - l_unlock(&lock->l_resource->lr_namespace->ns_lock); ldlm_cancel_callback(lock); + unlock_res(lock->l_resource); if (local_only) { CDEBUG(D_INFO, "not sending request (at caller's " @@ -658,17 +660,25 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync) sync = LDLM_SYNC; /* force to be sync in user space */ #endif - l_lock(&ns->ns_lock); + spin_lock(&ns->ns_unused_lock); count = ns->ns_nr_unused - ns->ns_max_unused; if (count <= 0) { - l_unlock(&ns->ns_lock); + spin_unlock(&ns->ns_unused_lock); RETURN(0); } - list_for_each_entry_safe(lock, next, &ns->ns_unused_list, l_lru) { + while (!list_empty(&ns->ns_unused_list)) { + struct list_head *tmp = ns->ns_unused_list.next; + lock = list_entry(tmp, struct ldlm_lock, l_lru); LASSERT(!lock->l_readers && !lock->l_writers); + LDLM_LOCK_GET(lock); /* dropped by bl thread */ + spin_unlock(&ns->ns_unused_lock); + + lock_res(lock->l_resource); + ldlm_lock_remove_from_lru(lock); + /* Setting the CBPENDING flag is a little misleading, but * prevents an important race; namely, once CBPENDING is set, * the lock can accumulate no more readers/writers. Since @@ -676,9 +686,6 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync) * won't see this flag and call l_blocking_ast */ lock->l_flags |= LDLM_FL_CBPENDING; - LDLM_LOCK_GET(lock); /* dropped by bl thread */ - ldlm_lock_remove_from_lru(lock); - /* We can't re-add to l_lru as it confuses the refcounting in * ldlm_lock_remove_from_lru() if an AST arrives after we drop * ns_lock below. We use l_tmp and can't use l_pending_chain as @@ -687,10 +694,14 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync) if (sync != LDLM_ASYNC || ldlm_bl_to_thread(ns, NULL, lock)) list_add(&lock->l_tmp, &cblist); + unlock_res(lock->l_resource); + + spin_lock(&ns->ns_unused_lock); + if (--count == 0) break; } - l_unlock(&ns->ns_lock); + spin_unlock(&ns->ns_unused_lock); list_for_each_entry_safe(lock, next, &cblist, l_tmp) { list_del_init(&lock->l_tmp); @@ -704,9 +715,9 @@ static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns, struct ldlm_res_id res_id, int flags, void *opaque) { - struct ldlm_resource *res; struct list_head *tmp, *next, list = LIST_HEAD_INIT(list); - struct ldlm_ast_work *w; + struct ldlm_resource *res; + struct ldlm_lock *lock; ENTRY; res = ldlm_resource_get(ns, NULL, res_id, 0, 0); @@ -716,9 +727,8 @@ static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns, RETURN(0); } - l_lock(&ns->ns_lock); + lock_res(res); list_for_each(tmp, &res->lr_granted) { - struct ldlm_lock *lock; lock = list_entry(tmp, struct ldlm_lock, l_res_link); if (opaque != NULL && lock->l_ast_data != opaque) { @@ -738,31 +748,27 @@ static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns, /* See CBPENDING comment in ldlm_cancel_lru */ lock->l_flags |= LDLM_FL_CBPENDING; - OBD_ALLOC(w, sizeof(*w)); - LASSERT(w); - - w->w_lock = LDLM_LOCK_GET(lock); - - list_add(&w->w_list, &list); + LASSERT(list_empty(&lock->l_bl_ast)); + list_add(&lock->l_bl_ast, &list); + LDLM_LOCK_GET(lock); } - l_unlock(&ns->ns_lock); + unlock_res(res); list_for_each_safe(tmp, next, &list) { struct lustre_handle lockh; int rc; - w = list_entry(tmp, struct ldlm_ast_work, w_list); + lock = list_entry(tmp, struct ldlm_lock, l_bl_ast); if (flags & LDLM_FL_LOCAL_ONLY) { - ldlm_lock_cancel(w->w_lock); + ldlm_lock_cancel(lock); } else { - ldlm_lock2handle(w->w_lock, &lockh); + ldlm_lock2handle(lock, &lockh); rc = ldlm_cli_cancel(&lockh); if (rc != ELDLM_OK) CERROR("ldlm_cli_cancel: %d\n", rc); } - list_del(&w->w_list); - LDLM_LOCK_PUT(w->w_lock); - OBD_FREE(w, sizeof(*w)); + list_del_init(&lock->l_bl_ast); + LDLM_LOCK_PUT(lock); } ldlm_resource_putref(res); @@ -774,10 +780,10 @@ static inline int have_no_nsresource(struct ldlm_namespace *ns) { int no_resource = 0; - spin_lock(&ns->ns_counter_lock); + spin_lock(&ns->ns_hash_lock); if (ns->ns_resources == 0) no_resource = 1; - spin_unlock(&ns->ns_counter_lock); + spin_unlock(&ns->ns_hash_lock); RETURN(no_resource); } @@ -805,15 +811,17 @@ int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, RETURN(ldlm_cli_cancel_unused_resource(ns, *res_id, flags, opaque)); - l_lock(&ns->ns_lock); + spin_lock(&ns->ns_hash_lock); for (i = 0; i < RES_HASH_SIZE; i++) { - struct list_head *tmp, *next; - list_for_each_safe(tmp, next, &(ns->ns_hash[i])) { - int rc; + struct list_head *tmp; + tmp = ns->ns_hash[i].next; + while (tmp != &(ns->ns_hash[i])) { struct ldlm_resource *res; + int rc; + res = list_entry(tmp, struct ldlm_resource, lr_hash); ldlm_resource_getref(res); - l_unlock(&ns->ns_lock); + spin_unlock(&ns->ns_hash_lock); rc = ldlm_cli_cancel_unused_resource(ns, res->lr_name, flags, opaque); @@ -821,12 +829,13 @@ int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, CERROR("cancel_unused_res ("LPU64"): %d\n", res->lr_name.name[0], rc); - l_lock(&ns->ns_lock); - next = tmp->next; - ldlm_resource_putref(res); + spin_lock(&ns->ns_hash_lock); + tmp = tmp->next; + ldlm_resource_putref_locked(res); } } - l_unlock(&ns->ns_lock); + spin_unlock(&ns->ns_hash_lock); + if (flags & LDLM_FL_CONFIG_CHANGE) l_wait_event(ns->ns_waitq, have_no_nsresource(ns), &lwi); @@ -841,14 +850,13 @@ int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter, struct list_head *tmp, *next; struct ldlm_lock *lock; int rc = LDLM_ITER_CONTINUE; - struct ldlm_namespace *ns = res->lr_namespace; ENTRY; if (!res) RETURN(LDLM_ITER_CONTINUE); - l_lock(&ns->ns_lock); + lock_res(res); list_for_each_safe(tmp, next, &res->lr_granted) { lock = list_entry(tmp, struct ldlm_lock, l_res_link); @@ -870,7 +878,7 @@ int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter, GOTO(out, rc = LDLM_ITER_STOP); } out: - l_unlock(&ns->ns_lock); + unlock_res(res); RETURN(rc); } @@ -901,23 +909,28 @@ int ldlm_namespace_foreach_res(struct ldlm_namespace *ns, ldlm_res_iterator_t iter, void *closure) { int i, rc = LDLM_ITER_CONTINUE; + struct ldlm_resource *res; + struct list_head *tmp; - l_lock(&ns->ns_lock); + spin_lock(&ns->ns_hash_lock); for (i = 0; i < RES_HASH_SIZE; i++) { - struct list_head *tmp, *next; - list_for_each_safe(tmp, next, &(ns->ns_hash[i])) { - struct ldlm_resource *res = - list_entry(tmp, struct ldlm_resource, lr_hash); - + tmp = ns->ns_hash[i].next; + while (tmp != &(ns->ns_hash[i])) { + res = list_entry(tmp, struct ldlm_resource, lr_hash); ldlm_resource_getref(res); + spin_unlock(&ns->ns_hash_lock); + rc = iter(res, closure); - ldlm_resource_putref(res); + + spin_lock(&ns->ns_hash_lock); + tmp = tmp->next; + ldlm_resource_putref_locked(res); if (rc == LDLM_ITER_STOP) GOTO(out, rc); } } out: - l_unlock(&ns->ns_lock); + spin_unlock(&ns->ns_hash_lock); RETURN(rc); } @@ -941,9 +954,7 @@ void ldlm_change_cbdata(struct ldlm_namespace *ns, return; } - l_lock(&ns->ns_lock); ldlm_resource_foreach(res, iter, data); - l_unlock(&ns->ns_lock); ldlm_resource_putref(res); EXIT; } @@ -1074,7 +1085,6 @@ int ldlm_replay_locks(struct obd_import *imp) /* ensure this doesn't fall to 0 before all have been queued */ atomic_inc(&imp->imp_replay_inflight); - l_lock(&ns->ns_lock); (void)ldlm_namespace_foreach(ns, ldlm_chain_lock_for_replay, &list); list_for_each_safe(pos, next, &list) { @@ -1083,7 +1093,6 @@ int ldlm_replay_locks(struct obd_import *imp) if (rc) break; /* or try to do the rest? */ } - l_unlock(&ns->ns_lock); atomic_dec(&imp->imp_replay_inflight); diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index 9cd393a..18e93f3 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -236,11 +236,10 @@ struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 client) strcpy(ns->ns_name, name); INIT_LIST_HEAD(&ns->ns_root_list); - l_lock_init(&ns->ns_lock); ns->ns_refcount = 0; ns->ns_client = client; - spin_lock_init(&ns->ns_counter_lock); - ns->ns_locks = 0; + spin_lock_init(&ns->ns_hash_lock); + atomic_set(&ns->ns_locks, 0); ns->ns_resources = 0; init_waitqueue_head(&ns->ns_waitq); @@ -251,6 +250,7 @@ struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 client) INIT_LIST_HEAD(&ns->ns_unused_list); ns->ns_nr_unused = 0; ns->ns_max_unused = LDLM_DEFAULT_LRU_SIZE; + spin_lock_init(&ns->ns_unused_lock); down(&ldlm_namespace_lock); list_add(&ns->ns_list_chain, &ldlm_namespace_list); @@ -280,15 +280,33 @@ extern struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock); static void cleanup_resource(struct ldlm_resource *res, struct list_head *q, int flags) { - struct list_head *tmp, *pos; + struct list_head *tmp; int rc = 0, client = res->lr_namespace->ns_client; int local_only = (flags & LDLM_FL_LOCAL_ONLY); ENTRY; - list_for_each_safe(tmp, pos, q) { - struct ldlm_lock *lock; - lock = list_entry(tmp, struct ldlm_lock, l_res_link); - LDLM_LOCK_GET(lock); + + do { + struct ldlm_lock *lock = NULL; + + /* first, we look for non-cleaned-yet lock + * all cleaned locks are marked by CLEANED flag */ + lock_res(res); + list_for_each(tmp, q) { + lock = list_entry(tmp, struct ldlm_lock, l_res_link); + if (lock->l_flags & LDLM_FL_CLEANED) { + lock = NULL; + continue; + } + LDLM_LOCK_GET(lock); + lock->l_flags |= LDLM_FL_CLEANED; + break; + } + + if (lock == NULL) { + unlock_res(res); + break; + } /* Set CBPENDING so nothing in the cancellation path * can match this lock */ @@ -303,6 +321,7 @@ static void cleanup_resource(struct ldlm_resource *res, struct list_head *q, * will go away ... */ /* ... without sending a CANCEL message. */ lock->l_flags |= LDLM_FL_LOCAL_ONLY; + unlock_res(res); LDLM_DEBUG(lock, "setting FL_LOCAL_ONLY"); if (lock->l_completion_ast) lock->l_completion_ast(lock, 0, NULL); @@ -312,6 +331,8 @@ static void cleanup_resource(struct ldlm_resource *res, struct list_head *q, if (client) { struct lustre_handle lockh; + + unlock_res(res); ldlm_lock2handle(lock, &lockh); if (!local_only) { rc = ldlm_cli_cancel(&lockh); @@ -322,19 +343,21 @@ static void cleanup_resource(struct ldlm_resource *res, struct list_head *q, if (local_only || rc != ELDLM_OK) ldlm_lock_cancel(lock); } else { + ldlm_resource_unlink_lock(lock); + unlock_res(res); LDLM_DEBUG(lock, "Freeing a lock still held by a " "client node"); - - ldlm_resource_unlink_lock(lock); ldlm_lock_destroy(lock); } LDLM_LOCK_PUT(lock); - } + } while (1); + EXIT; } int ldlm_namespace_cleanup(struct ldlm_namespace *ns, int flags) { + struct list_head *tmp; int i; if (ns == NULL) { @@ -342,34 +365,39 @@ int ldlm_namespace_cleanup(struct ldlm_namespace *ns, int flags) return ELDLM_OK; } - l_lock(&ns->ns_lock); + /* FIXME: protect by ns_hash_lock -bzzz */ for (i = 0; i < RES_HASH_SIZE; i++) { - struct list_head *tmp, *pos; - list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) { + spin_lock(&ns->ns_hash_lock); + tmp = ns->ns_hash[i].next; + while (tmp != &(ns->ns_hash[i])) { struct ldlm_resource *res; res = list_entry(tmp, struct ldlm_resource, lr_hash); + spin_unlock(&ns->ns_hash_lock); ldlm_resource_getref(res); cleanup_resource(res, &res->lr_granted, flags); cleanup_resource(res, &res->lr_converting, flags); cleanup_resource(res, &res->lr_waiting, flags); + spin_lock(&ns->ns_hash_lock); + tmp = tmp->next; + /* XXX what a mess: don't force cleanup if we're * local_only (which is only used by recovery). In that * case, we probably still have outstanding lock refs * which reference these resources. -phil */ - if (!ldlm_resource_putref(res) && + if (!ldlm_resource_putref_locked(res) && !(flags & LDLM_FL_LOCAL_ONLY)) { CERROR("Resource refcount nonzero (%d) after " "lock cleanup; forcing cleanup.\n", atomic_read(&res->lr_refcount)); ldlm_resource_dump(D_ERROR, res); atomic_set(&res->lr_refcount, 1); - ldlm_resource_putref(res); + ldlm_resource_putref_locked(res); } } + spin_unlock(&ns->ns_hash_lock); } - l_unlock(&ns->ns_lock); return ELDLM_OK; } @@ -438,20 +466,43 @@ static struct ldlm_resource *ldlm_resource_new(void) INIT_LIST_HEAD(&res->lr_granted); INIT_LIST_HEAD(&res->lr_converting); INIT_LIST_HEAD(&res->lr_waiting); - sema_init(&res->lr_lvb_sem, 1); atomic_set(&res->lr_refcount, 1); + spin_lock_init(&res->lr_lock); + + /* one who creates the resource must unlock + * the semaphore after lvb initialization */ + init_MUTEX_LOCKED(&res->lr_lvb_sem); return res; } +/* must be called with hash lock held */ +static struct ldlm_resource * +ldlm_resource_find(struct ldlm_namespace *ns, struct ldlm_res_id name, __u32 hash) +{ + struct list_head *bucket, *tmp; + struct ldlm_resource *res; + + LASSERT_SPIN_LOCKED(&ns->ns_hash_lock); + bucket = ns->ns_hash + hash; + + list_for_each(tmp, bucket) { + res = list_entry(tmp, struct ldlm_resource, lr_hash); + if (memcmp(&res->lr_name, &name, sizeof(res->lr_name)) == 0) + return res; + } + + return NULL; +} + /* Args: locked namespace * Returns: newly-allocated, referenced, unlocked resource */ static struct ldlm_resource * ldlm_resource_add(struct ldlm_namespace *ns, struct ldlm_resource *parent, - struct ldlm_res_id name, __u32 type) + struct ldlm_res_id name, __u32 hash, __u32 type) { struct list_head *bucket; - struct ldlm_resource *res; + struct ldlm_resource *res, *old_res; ENTRY; LASSERTF(type >= LDLM_MIN_TYPE && type <= LDLM_MAX_TYPE, @@ -461,20 +512,31 @@ ldlm_resource_add(struct ldlm_namespace *ns, struct ldlm_resource *parent, if (!res) RETURN(NULL); - spin_lock(&ns->ns_counter_lock); - ns->ns_resources++; - spin_unlock(&ns->ns_counter_lock); - - l_lock(&ns->ns_lock); memcpy(&res->lr_name, &name, sizeof(res->lr_name)); res->lr_namespace = ns; - ns->ns_refcount++; - res->lr_type = type; res->lr_most_restr = LCK_NL; - bucket = ns->ns_hash + ldlm_hash_fn(parent, name); + spin_lock(&ns->ns_hash_lock); + old_res = ldlm_resource_find(ns, name, hash); + if (old_res) { + /* someone won the race and added the resource before */ + ldlm_resource_getref(old_res); + spin_unlock(&ns->ns_hash_lock); + OBD_SLAB_FREE(res, ldlm_resource_slab, sizeof *res); + /* synchronize WRT resource creation */ + if (ns->ns_lvbo && ns->ns_lvbo->lvbo_init) { + down(&old_res->lr_lvb_sem); + up(&old_res->lr_lvb_sem); + } + RETURN(old_res); + } + + /* we won! let's add the resource */ + bucket = ns->ns_hash + hash; list_add(&res->lr_hash, bucket); + ns->ns_resources++; + ns->ns_refcount++; if (parent == NULL) { list_add(&res->lr_childof, &ns->ns_root_list); @@ -482,8 +544,19 @@ ldlm_resource_add(struct ldlm_namespace *ns, struct ldlm_resource *parent, res->lr_parent = parent; list_add(&res->lr_childof, &parent->lr_children); } - l_unlock(&ns->ns_lock); + spin_unlock(&ns->ns_hash_lock); + if (ns->ns_lvbo && ns->ns_lvbo->lvbo_init) { + int rc; + + OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CREATE_RESOURCE, 2); + rc = ns->ns_lvbo->lvbo_init(res); + if (rc) + CERROR("lvbo_init failed for resource " + LPU64": rc %d\n", name.name[0], rc); + /* we create resource with locked lr_lvb_sem */ + up(&res->lr_lvb_sem); + } RETURN(res); } @@ -495,55 +568,32 @@ struct ldlm_resource * ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent, struct ldlm_res_id name, __u32 type, int create) { + __u32 hash = ldlm_hash_fn(parent, name); struct ldlm_resource *res = NULL; - struct list_head *bucket, *tmp; ENTRY; LASSERT(ns != NULL); LASSERT(ns->ns_hash != NULL); LASSERT(name.name[0] != 0); - l_lock(&ns->ns_lock); - bucket = ns->ns_hash + ldlm_hash_fn(parent, name); - - list_for_each(tmp, bucket) { - res = list_entry(tmp, struct ldlm_resource, lr_hash); - - if (memcmp(&res->lr_name, &name, sizeof(res->lr_name)) == 0) { - ldlm_resource_getref(res); - l_unlock(&ns->ns_lock); - RETURN(res); + spin_lock(&ns->ns_hash_lock); + res = ldlm_resource_find(ns, name, hash); + if (res) { + ldlm_resource_getref(res); + spin_unlock(&ns->ns_hash_lock); + /* synchronize WRT resource creation */ + if (ns->ns_lvbo && ns->ns_lvbo->lvbo_init) { + down(&res->lr_lvb_sem); + up(&res->lr_lvb_sem); } + RETURN(res); } + spin_unlock(&ns->ns_hash_lock); - if (create) { - res = ldlm_resource_add(ns, parent, name, type); - if (res == NULL) - GOTO(out, NULL); - } else { - res = NULL; - } - - if (create && ns->ns_lvbo && ns->ns_lvbo->lvbo_init) { - int rc; - - /* Although this is technically a lock inversion risk (lvb_sem - * should be taken before DLM lock), this resource was just - * created, so nobody else can take the lvb_sem yet. -p */ - down(&res->lr_lvb_sem); - /* Drop the dlm lock, because lvbo_init can touch the disk */ - l_unlock(&ns->ns_lock); - OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CREATE_RESOURCE, 2); - rc = ns->ns_lvbo->lvbo_init(res); - up(&res->lr_lvb_sem); - if (rc) - CERROR("lvbo_init failed for resource " - LPU64": rc %d\n", name.name[0], rc); - } else { -out: - l_unlock(&ns->ns_lock); - } + if (create == 0) + RETURN(NULL); + res = ldlm_resource_add(ns, parent, name, hash, type); RETURN(res); } @@ -557,6 +607,60 @@ struct ldlm_resource *ldlm_resource_getref(struct ldlm_resource *res) return res; } +int __ldlm_resource_putref_final(struct ldlm_resource *res, int locked) +{ + struct ldlm_namespace *ns = res->lr_namespace; + ENTRY; + + if (!locked) + spin_lock(&ns->ns_hash_lock); + + if (atomic_read(&res->lr_refcount) != 0) { + /* We lost the race. */ + if (!locked) + spin_unlock(&ns->ns_hash_lock); + RETURN(0); + } + + if (!list_empty(&res->lr_granted)) { + ldlm_resource_dump(D_ERROR, res); + LBUG(); + } + + if (!list_empty(&res->lr_converting)) { + ldlm_resource_dump(D_ERROR, res); + LBUG(); + } + + if (!list_empty(&res->lr_waiting)) { + ldlm_resource_dump(D_ERROR, res); + LBUG(); + } + + if (!list_empty(&res->lr_children)) { + ldlm_resource_dump(D_ERROR, res); + LBUG(); + } + + ns->ns_refcount--; + list_del_init(&res->lr_hash); + list_del_init(&res->lr_childof); + + ns->ns_resources--; + if (ns->ns_resources == 0) + wake_up(&ns->ns_waitq); + + if (!locked) + spin_unlock(&ns->ns_hash_lock); + + /* we just unhashed the resource, nobody should find it */ + LASSERT(atomic_read(&res->lr_refcount) == 0); + if (res->lr_lvb_data) + OBD_FREE(res->lr_lvb_data, res->lr_lvb_len); + OBD_SLAB_FREE(res, ldlm_resource_slab, sizeof *res); + RETURN(1); +} + /* Returns 1 if the resource was freed, 0 if it remains. */ int ldlm_resource_putref(struct ldlm_resource *res) { @@ -568,56 +672,27 @@ int ldlm_resource_putref(struct ldlm_resource *res) LASSERT(atomic_read(&res->lr_refcount) > 0); LASSERT(atomic_read(&res->lr_refcount) < LI_POISON); - if (atomic_dec_and_test(&res->lr_refcount)) { - struct ldlm_namespace *ns = res->lr_namespace; - ENTRY; - - l_lock(&ns->ns_lock); - - if (atomic_read(&res->lr_refcount) != 0) { - /* We lost the race. */ - l_unlock(&ns->ns_lock); - RETURN(rc); - } - - if (!list_empty(&res->lr_granted)) { - ldlm_resource_dump(D_ERROR, res); - LBUG(); - } + LASSERT(atomic_read(&res->lr_refcount) >= 0); + if (atomic_dec_and_test(&res->lr_refcount)) + rc = __ldlm_resource_putref_final(res, 0); - if (!list_empty(&res->lr_converting)) { - ldlm_resource_dump(D_ERROR, res); - LBUG(); - } - - if (!list_empty(&res->lr_waiting)) { - ldlm_resource_dump(D_ERROR, res); - LBUG(); - } - - if (!list_empty(&res->lr_children)) { - ldlm_resource_dump(D_ERROR, res); - LBUG(); - } - - ns->ns_refcount--; - list_del_init(&res->lr_hash); - list_del_init(&res->lr_childof); - if (res->lr_lvb_data) - OBD_FREE(res->lr_lvb_data, res->lr_lvb_len); - l_unlock(&ns->ns_lock); + RETURN(rc); +} - OBD_SLAB_FREE(res, ldlm_resource_slab, sizeof *res); +/* Returns 1 if the resource was freed, 0 if it remains. */ +int ldlm_resource_putref_locked(struct ldlm_resource *res) +{ + int rc = 0; + ENTRY; - spin_lock(&ns->ns_counter_lock); - ns->ns_resources--; - if (ns->ns_resources == 0) - wake_up(&ns->ns_waitq); - spin_unlock(&ns->ns_counter_lock); + CDEBUG(D_INFO, "putref res: %p count: %d\n", res, + atomic_read(&res->lr_refcount) - 1); + LASSERT(atomic_read(&res->lr_refcount) > 0); + LASSERT(atomic_read(&res->lr_refcount) < LI_POISON); - rc = 1; - EXIT; - } + LASSERT(atomic_read(&res->lr_refcount) >= 0); + if (atomic_dec_and_test(&res->lr_refcount)) + rc = __ldlm_resource_putref_final(res, 1); RETURN(rc); } @@ -625,7 +700,7 @@ int ldlm_resource_putref(struct ldlm_resource *res) void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head, struct ldlm_lock *lock) { - l_lock(&res->lr_namespace->ns_lock); + check_res_locked(res); ldlm_resource_dump(D_OTHER, res); CDEBUG(D_OTHER, "About to add this lock:\n"); @@ -633,14 +708,12 @@ void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head, if (lock->l_destroyed) { CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n"); - goto out; + return; } LASSERT(list_empty(&lock->l_res_link)); list_add_tail(&lock->l_res_link, head); - out: - l_unlock(&res->lr_namespace->ns_lock); } void ldlm_resource_insert_lock_after(struct ldlm_lock *original, @@ -648,7 +721,7 @@ void ldlm_resource_insert_lock_after(struct ldlm_lock *original, { struct ldlm_resource *res = original->l_resource; - l_lock(&res->lr_namespace->ns_lock); + check_res_locked(res); ldlm_resource_dump(D_OTHER, res); CDEBUG(D_OTHER, "About to insert this lock after %p:\n", original); @@ -656,21 +729,17 @@ void ldlm_resource_insert_lock_after(struct ldlm_lock *original, if (new->l_destroyed) { CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n"); - goto out; + return; } LASSERT(list_empty(&new->l_res_link)); - list_add(&new->l_res_link, &original->l_res_link); - out: - l_unlock(&res->lr_namespace->ns_lock); } void ldlm_resource_unlink_lock(struct ldlm_lock *lock) { - l_lock(&lock->l_resource->lr_namespace->ns_lock); + check_res_locked(lock->l_resource); list_del_init(&lock->l_res_link); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); } void ldlm_res2desc(struct ldlm_resource *res, struct ldlm_resource_desc *desc) @@ -701,19 +770,28 @@ void ldlm_namespace_dump(int level, struct ldlm_namespace *ns) CDEBUG(level, "--- Namespace: %s (rc: %d, client: %d)\n", ns->ns_name, ns->ns_refcount, ns->ns_client); - l_lock(&ns->ns_lock); - if (time_after(jiffies, ns->ns_next_dump)) { - list_for_each(tmp, &ns->ns_root_list) { - struct ldlm_resource *res; - res = list_entry(tmp, struct ldlm_resource, lr_childof); - - /* Once we have resources with children, this should - * really dump them recursively. */ - ldlm_resource_dump(level, res); - } - ns->ns_next_dump = jiffies + 10 * HZ; + if (time_before(jiffies, ns->ns_next_dump)) + return; + + spin_lock(&ns->ns_hash_lock); + tmp = ns->ns_root_list.next; + while (tmp != &ns->ns_root_list) { + struct ldlm_resource *res; + res = list_entry(tmp, struct ldlm_resource, lr_childof); + + ldlm_resource_getref(res); + spin_unlock(&ns->ns_hash_lock); + + lock_res(res); + ldlm_resource_dump(level, res); + unlock_res(res); + + spin_lock(&ns->ns_hash_lock); + tmp = tmp->next; + ldlm_resource_putref_locked(res); } - l_unlock(&ns->ns_lock); + ns->ns_next_dump = jiffies + 10 * HZ; + spin_unlock(&ns->ns_hash_lock); } void ldlm_resource_dump(int level, struct ldlm_resource *res) diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 8ac42b3..6fc869f 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -805,8 +805,8 @@ static int ll_extent_lock_callback(struct ldlm_lock *lock, goto iput; ll_pgcache_remove_extent(inode, lsm, lock, stripe); - l_lock(&lock->l_resource->lr_namespace->ns_lock); down(&lli->lli_size_sem); + lock_res(lock->l_resource); kms = ldlm_extent_shift_kms(lock, lsm->lsm_oinfo[stripe].loi_kms); @@ -814,8 +814,8 @@ static int ll_extent_lock_callback(struct ldlm_lock *lock, LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64, lsm->lsm_oinfo[stripe].loi_kms, kms); lsm->lsm_oinfo[stripe].loi_kms = kms; + unlock_res(lock->l_resource); up(&lli->lli_size_sem); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); //ll_try_done_writing(inode); iput: iput(inode); @@ -861,16 +861,16 @@ int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data) lvb = lock->l_lvb_data; lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size; - l_lock(&lock->l_resource->lr_namespace->ns_lock); down(&inode->i_sem); + lock_res(lock->l_resource); kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size); kms = ldlm_extent_shift_kms(NULL, kms); if (lsm->lsm_oinfo[stripe].loi_kms != kms) LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64, lsm->lsm_oinfo[stripe].loi_kms, kms); lsm->lsm_oinfo[stripe].loi_kms = kms; + unlock_res(lock->l_resource); up(&inode->i_sem); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); } iput: diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 8aeb2db..8e52be4 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -977,7 +977,9 @@ int ll_process_config_update(struct ll_sb_info *sbi, int clean) struct inode *ll_inode_from_lock(struct ldlm_lock *lock) { struct inode *inode = NULL; - l_lock(&lock->l_resource->lr_namespace->ns_lock); + + /* NOTE: we depend on atomic igrab() -bzzz */ + lock_res(lock->l_resource); if (lock->l_ast_data) { struct ll_inode_info *lli = ll_i2info(lock->l_ast_data); if (lli->lli_inode_magic == LLI_INODE_MAGIC) { @@ -990,7 +992,7 @@ struct inode *ll_inode_from_lock(struct ldlm_lock *lock) inode = NULL; } } - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + unlock_res(lock->l_resource); return inode; } diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index 009df55..96b87f0 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -138,7 +138,7 @@ int mdc_set_lock_data(struct obd_export *exp, __u64 *l, void *data) lock = ldlm_handle2lock(lockh); LASSERT(lock != NULL); - l_lock(&lock->l_resource->lr_namespace->ns_lock); + lock_res(lock->l_resource); #ifdef __KERNEL__ if (lock->l_ast_data && lock->l_ast_data != data) { struct inode *new_inode = data; @@ -152,7 +152,7 @@ int mdc_set_lock_data(struct obd_export *exp, __u64 *l, void *data) } #endif lock->l_ast_data = data; - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + unlock_res(lock->l_resource); LDLM_LOCK_PUT(lock); EXIT; diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 54bdddb..c2ead6b 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -243,7 +243,7 @@ struct dentry *mds_id2locked_dentry(struct obd_device *obd, struct lustre_id *id struct dentry *de = mds_id2dentry(obd, id, mnt), *retval = de; ldlm_policy_data_t policy = { .l_inodebits = { lockpart } }; struct ldlm_res_id res_id = { .name = {0} }; - int flags = 0, rc; + int flags = LDLM_FL_ATOMIC_CB, rc; ENTRY; if (IS_ERR(de)) @@ -271,7 +271,7 @@ struct dentry *mds_id2locked_dentry(struct obd_device *obd, struct lustre_id *id RETURN(ERR_PTR(-ENOLCK)); } } - flags = 0; + flags = LDLM_FL_ATOMIC_CB; res_id.name[2] = full_name_hash((unsigned char *)name, namelen); @@ -804,7 +804,7 @@ int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, } /* XXX layering violation! -phil */ - l_lock(&lock->l_resource->lr_namespace->ns_lock); + lock_res(lock->l_resource); /* * get this: if mds_blocking_ast is racing with mds_intent_policy, such @@ -813,13 +813,13 @@ int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, * blocking function anymore. So check, and return early, if so. */ if (lock->l_blocking_ast != mds_blocking_ast) { - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + unlock_res(lock->l_resource); RETURN(0); } lock->l_flags |= LDLM_FL_CBPENDING; do_ast = (!lock->l_readers && !lock->l_writers); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + unlock_res(lock->l_resource); if (do_ast) { struct lustre_handle lockh; @@ -2229,8 +2229,9 @@ static int mdt_obj_create(struct ptlrpc_request *req) if (!IS_ERR(new) && new->d_inode) { struct lustre_id sid; - CWARN("mkdir() repairing is on its way: %lu/%lu\n", - (unsigned long)id_ino(&id), (unsigned long)id_gen(&id)); + CDEBUG(D_OTHER, "mkdir repairing %lu/%lu\n", + (unsigned long)id_ino(&id), + (unsigned long)id_gen(&id)); obdo_from_inode(&repbody->oa, new->d_inode, FILTER_VALID_FLAGS); @@ -2571,6 +2572,7 @@ static void mds_revoke_export_locks(struct obd_export *exp) { struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; struct list_head *locklist = &exp->exp_ldlm_data.led_held_locks; + struct list_head work; struct ldlm_lock *lock, *next; struct ldlm_lock_desc desc; @@ -2578,20 +2580,31 @@ static void mds_revoke_export_locks(struct obd_export *exp) return; ENTRY; - l_lock(&ns->ns_lock); + CERROR("implement right locking here! -bzzz\n"); + INIT_LIST_HEAD(&work); + spin_lock(&exp->exp_ldlm_data.led_lock); list_for_each_entry_safe(lock, next, locklist, l_export_chain) { - if (lock->l_req_mode != lock->l_granted_mode) + + lock_res(lock->l_resource); + if (lock->l_req_mode != lock->l_granted_mode) { + unlock_res(lock->l_resource); continue; + } LASSERT(lock->l_resource); if (lock->l_resource->lr_type != LDLM_IBITS && - lock->l_resource->lr_type != LDLM_PLAIN) + lock->l_resource->lr_type != LDLM_PLAIN) { + unlock_res(lock->l_resource); continue; + } - if (lock->l_flags & LDLM_FL_AST_SENT) + if (lock->l_flags & LDLM_FL_AST_SENT) { + unlock_res(lock->l_resource); continue; + } lock->l_flags |= LDLM_FL_AST_SENT; + unlock_res(lock->l_resource); /* the desc just pretend to exclusive */ ldlm_lock2desc(lock, &desc); @@ -2600,7 +2613,8 @@ static void mds_revoke_export_locks(struct obd_export *exp) lock->l_blocking_ast(lock, &desc, NULL, LDLM_CB_BLOCKING); } - l_unlock(&ns->ns_lock); + spin_unlock(&exp->exp_ldlm_data.led_lock); + EXIT; } @@ -3766,7 +3780,7 @@ static void fixup_handle_for_resent_req(struct ptlrpc_request *req, if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)) return; - l_lock(&obd->obd_namespace->ns_lock); + spin_lock(&obd->obd_namespace->ns_hash_lock); list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) { struct ldlm_lock *lock; lock = list_entry(iter, struct ldlm_lock, l_export_chain); @@ -3779,11 +3793,11 @@ static void fixup_handle_for_resent_req(struct ptlrpc_request *req, lockh->cookie); if (old_lock) *old_lock = LDLM_LOCK_GET(lock); - l_unlock(&obd->obd_namespace->ns_lock); + spin_unlock(&obd->obd_namespace->ns_hash_lock); return; } } - l_unlock(&obd->obd_namespace->ns_lock); + spin_unlock(&obd->obd_namespace->ns_hash_lock); /* If the xid matches, then we know this is a resent request, * and allow it. (It's probably an OPEN, for which we don't @@ -3981,13 +3995,16 @@ static int mds_intent_policy(struct ldlm_namespace *ns, } /* Fixup the lock to be given to the client */ - l_lock(&new_lock->l_resource->lr_namespace->ns_lock); + lock_res(new_lock->l_resource); new_lock->l_readers = 0; new_lock->l_writers = 0; new_lock->l_export = class_export_get(req->rq_export); + + spin_lock(&new_lock->l_export->exp_ldlm_data.led_lock); list_add(&new_lock->l_export_chain, &new_lock->l_export->exp_ldlm_data.led_held_locks); + spin_unlock(&new_lock->l_export->exp_ldlm_data.led_lock); new_lock->l_blocking_ast = lock->l_blocking_ast; new_lock->l_completion_ast = lock->l_completion_ast; @@ -3997,8 +4014,8 @@ static int mds_intent_policy(struct ldlm_namespace *ns, new_lock->l_flags &= ~LDLM_FL_LOCAL; + unlock_res(new_lock->l_resource); LDLM_LOCK_PUT(new_lock); - l_unlock(&new_lock->l_resource->lr_namespace->ns_lock); RETURN(ELDLM_LOCK_REPLACED); } diff --git a/lustre/mds/mds_open.c b/lustre/mds/mds_open.c index 8072ef3..7910673 100644 --- a/lustre/mds/mds_open.c +++ b/lustre/mds/mds_open.c @@ -874,7 +874,7 @@ int mds_lock_new_child(struct obd_device *obd, struct inode *inode, { struct ldlm_res_id child_res_id = { .name = { inode->i_ino, 0, 1, 0 } }; struct lustre_handle lockh; - int lock_flags = 0; + int lock_flags = LDLM_FL_ATOMIC_CB; int rc; ENTRY; @@ -1311,7 +1311,7 @@ got_child: if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)) { struct ldlm_res_id child_res_id = { .name = {0}}; ldlm_policy_data_t policy; - int lock_flags = 0; + int lock_flags = LDLM_FL_ATOMIC_CB; /* LOOKUP lock will protect dentry on client -bzzz */ policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP | diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c index e571b9b..fa802cf 100644 --- a/lustre/mds/mds_reint.c +++ b/lustre/mds/mds_reint.c @@ -1263,7 +1263,7 @@ int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id, CDEBUG(D_DLMTRACE, "lock order: "LPU64"/"LPU64"\n", res_id[0]->name[0], res_id[1]->name[0]); - flags = LDLM_FL_LOCAL_ONLY; + flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB; rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, *res_id[0], LDLM_IBITS, policies[0], lock_modes[0], &flags, mds_blocking_ast, ldlm_completion_ast, NULL, NULL, @@ -1277,7 +1277,7 @@ int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id, memcpy(handles[1], handles[0], sizeof(*(handles[1]))); ldlm_lock_addref(handles[1], lock_modes[1]); } else if (res_id[1]->name[0] != 0) { - flags = LDLM_FL_LOCAL_ONLY; + flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB; rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, *res_id[1], LDLM_IBITS, policies[1], lock_modes[1], &flags, mds_blocking_ast, @@ -1469,7 +1469,7 @@ changed: *dchildp = dchild = vchild; if (dchild->d_inode || (dchild->d_flags & DCACHE_CROSS_REF)) { - int flags = 0; + int flags = LDLM_FL_ATOMIC_CB; if (dchild->d_inode) { down(&dchild->d_inode->i_sem); @@ -1560,7 +1560,7 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds, if (name && IS_PDIROPS((*dparentp)->d_inode)) { struct ldlm_res_id res_id = { .name = {0} }; ldlm_policy_data_t policy; - int flags = 0; + int flags = LDLM_FL_ATOMIC_CB; *update_mode = mds_lock_mode_for_dir(obd, *dparentp, parent_mode); if (*update_mode) { @@ -2341,7 +2341,7 @@ static int mds_reint_link_acquire(struct mds_update_record *rec, int rc = 0, cleanup_phase = 0; struct dentry *de_src = NULL; ldlm_policy_data_t policy; - int flags = 0; + int flags = LDLM_FL_ATOMIC_CB; ENTRY; DEBUG_REQ(D_INODE, req, "%s: request to acquire i_nlinks "DLID4"\n", @@ -2585,7 +2585,7 @@ static int mds_reint_link(struct mds_update_record *rec, int offset, #ifdef S_PDIROPS if (IS_PDIROPS(de_tgt_dir->d_inode)) { - int flags = 0; + int flags = LDLM_FL_ATOMIC_CB; update_mode = mds_lock_mode_for_dir(obd, de_tgt_dir, LCK_EX); if (update_mode) { rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, diff --git a/lustre/obdclass/genops.c b/lustre/obdclass/genops.c index 09ef3c2..007a40d 100644 --- a/lustre/obdclass/genops.c +++ b/lustre/obdclass/genops.c @@ -490,6 +490,7 @@ struct obd_export *class_new_export(struct obd_device *obd) INIT_LIST_HEAD(&export->exp_outstanding_replies); /* XXX this should be in LDLM init */ INIT_LIST_HEAD(&export->exp_ldlm_data.led_held_locks); + spin_lock_init(&export->exp_ldlm_data.led_lock); INIT_LIST_HEAD(&export->exp_handle.h_link); class_handle_hash(&export->exp_handle, export_handle_addref); diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 77123c8..f9e4bf9 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -1068,20 +1068,20 @@ static int filter_blocking_ast(struct ldlm_lock *lock, } /* XXX layering violation! -phil */ - l_lock(&lock->l_resource->lr_namespace->ns_lock); + lock_res(lock->l_resource); /* Get this: if filter_blocking_ast is racing with ldlm_intent_policy, * such that filter_blocking_ast is called just before l_i_p takes the * ns_lock, then by the time we get the lock, we might not be the * correct blocking function anymore. So check, and return early, if * so. */ if (lock->l_blocking_ast != filter_blocking_ast) { - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + unlock_res(lock->l_resource); RETURN(0); } lock->l_flags |= LDLM_FL_CBPENDING; do_ast = (!lock->l_readers && !lock->l_writers); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + unlock_res(lock->l_resource); if (do_ast) { struct lustre_handle lockh; @@ -1308,25 +1308,24 @@ static int filter_intent_policy(struct ldlm_namespace *ns, lock->l_policy_data.l_extent.end = OBD_OBJECT_EOF; lock->l_req_mode = LCK_PR; - l_lock(&res->lr_namespace->ns_lock); - - res->lr_tmp = &rpc_list; - rc = policy(lock, &tmpflags, 0, &err); - res->lr_tmp = NULL; + lock_res(res); + rc = policy(lock, &tmpflags, 0, &err, &rpc_list); /* FIXME: we should change the policy function slightly, to not make * this list at all, since we just turn around and free it */ while (!list_empty(&rpc_list)) { - struct ldlm_ast_work *w = - list_entry(rpc_list.next, struct ldlm_ast_work, w_list); - list_del(&w->w_list); - LDLM_LOCK_PUT(w->w_lock); - OBD_FREE(w, sizeof(*w)); + struct ldlm_lock *wlock = + list_entry(rpc_list.next, struct ldlm_lock, l_cp_ast); + LASSERT((lock->l_flags & LDLM_FL_AST_SENT) == 0); + LASSERT(lock->l_flags & LDLM_FL_CP_REQD); + lock->l_flags &= ~LDLM_FL_CP_REQD; + list_del_init(&wlock->l_cp_ast); + LDLM_LOCK_PUT(wlock); } if (rc == LDLM_ITER_CONTINUE) { /* The lock met with no resistance; we're finished. */ - l_unlock(&res->lr_namespace->ns_lock); + unlock_res(res); RETURN(ELDLM_LOCK_REPLACED); } @@ -1334,11 +1333,9 @@ static int filter_intent_policy(struct ldlm_namespace *ns, * policy nicely created a list of all PW locks for us. We will choose * the highest of those which are larger than the size in the LVB, if * any, and perform a glimpse callback. */ - down(&res->lr_lvb_sem); res_lvb = res->lr_lvb_data; LASSERT(res_lvb != NULL); *reply_lvb = *res_lvb; - up(&res->lr_lvb_sem); list_for_each(tmp, &res->lr_granted) { struct ldlm_lock *tmplock = @@ -1362,7 +1359,7 @@ static int filter_intent_policy(struct ldlm_namespace *ns, LDLM_LOCK_PUT(l); l = LDLM_LOCK_GET(tmplock); } - l_unlock(&res->lr_namespace->ns_lock); + unlock_res(res); /* There were no PW locks beyond the size in the LVB; finished. */ if (l == NULL) @@ -1382,9 +1379,9 @@ static int filter_intent_policy(struct ldlm_namespace *ns, res->lr_namespace->ns_lvbo->lvbo_update(res, NULL, 0, 1); } - down(&res->lr_lvb_sem); + lock_res(res); *reply_lvb = *res_lvb; - up(&res->lr_lvb_sem); + unlock_res(res); out: LDLM_LOCK_PUT(l); diff --git a/lustre/obdfilter/filter_lvb.c b/lustre/obdfilter/filter_lvb.c index b539f3f..b1ac24c 100644 --- a/lustre/obdfilter/filter_lvb.c +++ b/lustre/obdfilter/filter_lvb.c @@ -142,6 +142,8 @@ static int filter_lvbo_update(struct ldlm_resource *res, struct lustre_msg *m, //GOTO(out, rc = -EPROTO); GOTO(out, rc = 0); } + + lock_res(res); if (new->lvb_size > lvb->lvb_size || !increase) { CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb size: " LPU64" -> "LPU64"\n", res->lr_name.name[0], @@ -166,6 +168,7 @@ static int filter_lvbo_update(struct ldlm_resource *res, struct lustre_msg *m, lvb->lvb_ctime, new->lvb_ctime); lvb->lvb_ctime = new->lvb_ctime; } + unlock_res(res); } /* Update the LVB from the disk inode */ @@ -187,6 +190,7 @@ static int filter_lvbo_update(struct ldlm_resource *res, struct lustre_msg *m, oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP; obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS); + lock_res(res); if (dentry->d_inode->i_size > lvb->lvb_size || !increase) { CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb size from disk: " LPU64" -> %llu\n", res->lr_name.name[0], @@ -216,6 +220,7 @@ static int filter_lvbo_update(struct ldlm_resource *res, struct lustre_msg *m, LPU64" -> %lu\n", res->lr_name.name[0], lvb->lvb_blocks, dentry->d_inode->i_blocks); lvb->lvb_blocks = dentry->d_inode->i_blocks; + unlock_res(res); f_dput(dentry); out: diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 4dc4bf9..edc83ec 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -2377,7 +2377,7 @@ static void osc_set_data_with_check(struct lustre_handle *lockh, void *data) return; } - l_lock(&lock->l_resource->lr_namespace->ns_lock); + lock_res(lock->l_resource); #ifdef __KERNEL__ if (lock->l_ast_data && lock->l_ast_data != data) { struct inode *new_inode = data; @@ -2393,7 +2393,7 @@ static void osc_set_data_with_check(struct lustre_handle *lockh, void *data) } #endif lock->l_ast_data = data; - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + unlock_res(lock->l_resource); LDLM_LOCK_PUT(lock); } -- 1.8.3.1