From: alex Date: Mon, 19 Jun 2006 10:29:26 +0000 (+0000) Subject: b=10088 X-Git-Tag: v1_7_100~1^90~8^2~102 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=e73dee98c921e4888e8247b1024c940b137e2d7d;p=fs%2Flustre-release.git b=10088 - fine-grained locking for LDLM landed --- diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index c08a379..eff1ecd 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -114,6 +114,21 @@ typedef enum { /* Flags sent in AST lock_flags to be mapped into the receiving lock. */ #define LDLM_AST_FLAGS (LDLM_FL_DISCARD_DATA) +/* completion ast to be executed */ +#define LDLM_FL_CP_REQD 0x1000000 + +/* cleanup_resource has already handled the lock */ +#define LDLM_FL_CLEANED 0x2000000 + +/* optimization hint: LDLM can run blocking callback from current context + * w/o involving separate thread. in order to decrease cs rate */ +#define LDLM_FL_ATOMIC_CB 0x4000000 + +/* while this flag is set, the lock can't change resource */ +#define LDLM_FL_LOCK_PROTECT 0x8000000 +#define LDLM_FL_LOCK_PROTECT_BIT 27 + + /* The blocking callback is overloaded to perform two functions. These flags * indicate which operation should be performed. */ #define LDLM_CB_BLOCKING 1 @@ -155,6 +170,25 @@ static inline int lockmode_compat(ldlm_mode_t exist, ldlm_mode_t new) - */ +/* + * Locking rules: + * + * lr_lock + * + * lr_lock + * waiting_locks_spinlock + * + * lr_lock + * led_lock + * + * lr_lock + * ns_unused_lock + * + * lr_lvb_sem + * lr_lock + * + */ + struct ldlm_lock; struct ldlm_resource; struct ldlm_namespace; @@ -173,22 +207,24 @@ struct ldlm_namespace { char *ns_name; __u32 ns_client; /* is this a client-side lock tree? */ struct list_head *ns_hash; /* hash table for ns */ - cfs_waitq_t ns_refcount_waitq; /* for cleanup */ - atomic_t ns_refcount; /* count of resources in the hash */ + spinlock_t ns_hash_lock; + __u32 ns_refcount; /* count of resources in the hash */ struct list_head ns_root_list; /* all root resources in ns */ - struct lustre_lock ns_lock; /* protects hash, refcount, list */ struct list_head ns_list_chain; /* position in global NS list */ struct list_head ns_unused_list; /* all root resources in ns */ int ns_nr_unused; + spinlock_t ns_unused_lock; + unsigned int ns_max_unused; cfs_time_t ns_next_dump; /* next debug dump, jiffies */ - spinlock_t ns_counter_lock; - __u64 ns_locks; + atomic_t ns_locks; + __u64 ns_resources; ldlm_res_policy ns_policy; struct ldlm_valblock_ops *ns_lvbo; - void *ns_lvbp; + void *ns_lvbp; + cfs_waitq_t ns_waitq; }; /* @@ -213,14 +249,27 @@ typedef int (*ldlm_glimpse_callback)(struct ldlm_lock *lock, void *data); struct ldlm_lock { struct portals_handle l_handle; // must be first in the structure atomic_t l_refc; + + /* ldlm_lock_change_resource() can change this */ struct ldlm_resource *l_resource; + + /* set once, no need to protect it */ struct ldlm_lock *l_parent; + + /* protected by ns_hash_lock */ struct list_head l_children; struct list_head l_childof; + + /* protected by ns_hash_lock. FIXME */ struct list_head l_lru; + + /* protected by lr_lock */ struct list_head l_res_link; // position in one of three res lists + + /* protected by led_lock */ struct list_head l_export_chain; // per-export chain of locks + /* protected by lr_lock */ ldlm_mode_t l_req_mode; ldlm_mode_t l_granted_mode; @@ -230,10 +279,14 @@ struct ldlm_lock { struct obd_export *l_export; struct obd_export *l_conn_export; + + /* protected by lr_lock */ __u32 l_flags; + struct lustre_handle l_remote_handle; ldlm_policy_data_t l_policy_data; + /* protected by lr_lock */ __u32 l_readers; __u32 l_writers; __u8 l_destroyed; @@ -254,26 +307,40 @@ struct ldlm_lock { void *l_ast_data; /* Server-side-only members */ + + /* protected by elt_lock */ struct list_head l_pending_chain; /* callbacks pending */ cfs_time_t l_callback_timeout; /* jiffies */ __u32 l_pid; /* pid which created this lock */ + __u32 l_pidb; /* who holds LOCK_PROTECT_BIT */ + + struct list_head l_tmp; + + /* for ldlm_add_ast_work_item() */ + struct list_head l_bl_ast; + struct list_head l_cp_ast; + struct ldlm_lock *l_blocking_lock; + int l_bl_ast_run; }; struct ldlm_resource { struct ldlm_namespace *lr_namespace; + + /* protected by ns_hash_lock */ struct list_head lr_hash; struct ldlm_resource *lr_parent; /* 0 for a root resource */ struct list_head lr_children; /* list head for child resources */ struct list_head lr_childof; /* part of ns_root_list if root res, * part of lr_children if child */ + spinlock_t lr_lock; + /* protected by lr_lock */ struct list_head lr_granted; struct list_head lr_converting; struct list_head lr_waiting; ldlm_mode_t lr_most_restr; ldlm_type_t lr_type; /* LDLM_{PLAIN,EXTENT,FLOCK} */ - struct ldlm_resource *lr_root; struct ldlm_res_id lr_name; atomic_t lr_refcount; @@ -421,7 +488,8 @@ do { \ CDEBUG(D_DLMTRACE, "### " format "\n" , ## a) typedef int (*ldlm_processing_policy)(struct ldlm_lock *lock, int *flags, - int first_enq, ldlm_error_t *err); + int first_enq, ldlm_error_t *err, + struct list_head *work_list); /* * Iterators. @@ -473,7 +541,7 @@ void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh); struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *, int flags); void ldlm_cancel_callback(struct ldlm_lock *); int ldlm_lock_set_data(struct lustre_handle *, void *data); -void ldlm_lock_remove_from_lru(struct ldlm_lock *); +int ldlm_lock_remove_from_lru(struct ldlm_lock *); struct ldlm_lock *ldlm_handle2lock_ns(struct ldlm_namespace *, struct lustre_handle *); @@ -594,4 +662,36 @@ void intent_set_disposition(struct ldlm_reply *rep, int flag); #define IOC_LDLM_REGRESS_STOP _IOWR('f', 43, long) #define IOC_LDLM_MAX_NR 43 +static inline void lock_res(struct ldlm_resource *res) +{ + spin_lock(&res->lr_lock); +} + +static inline void unlock_res(struct ldlm_resource *res) +{ + spin_unlock(&res->lr_lock); +} + +static inline void check_res_locked(struct ldlm_resource *res) +{ + LASSERT_SPIN_LOCKED(&res->lr_lock); +} +#ifdef __KERNEL__ +static inline void lock_bitlock(struct ldlm_lock *lock) +{ + bit_spin_lock(LDLM_FL_LOCK_PROTECT_BIT, (void *) &lock->l_flags); + LASSERT(lock->l_pidb == 0); + lock->l_pidb = current->pid; +} + +static inline void unlock_bitlock(struct ldlm_lock *lock) +{ + LASSERT(lock->l_pidb == current->pid); + lock->l_pidb = 0; + bit_spin_unlock(LDLM_FL_LOCK_PROTECT_BIT, (void *) &lock->l_flags); +} +#endif +struct ldlm_resource * lock_res_and_lock(struct ldlm_lock *lock); +void unlock_res_and_lock(struct ldlm_lock *lock); + #endif diff --git a/lustre/include/lustre_export.h b/lustre/include/lustre_export.h index f6e3f36..2590fec 100644 --- a/lustre/include/lustre_export.h +++ b/lustre/include/lustre_export.h @@ -34,6 +34,7 @@ struct osc_creator { struct ldlm_export_data { struct list_head led_held_locks; /* protected by namespace lock */ + spinlock_t led_lock; }; struct ec_export_data { /* echo client */ diff --git a/lustre/ldlm/l_lock.c b/lustre/ldlm/l_lock.c index 7601bce..63f2ad532 100644 --- a/lustre/ldlm/l_lock.c +++ b/lustre/ldlm/l_lock.c @@ -33,112 +33,39 @@ #include #include -/* invariants: - - only the owner of the lock changes l_owner/l_depth - - if a non-owner changes or checks the variables a spin lock is taken -*/ - -void l_lock_init(struct lustre_lock *lock) -{ - sema_init(&lock->l_sem, 1); - spin_lock_init(&lock->l_spin); -} - -void l_lock(struct lustre_lock *lock) +/* + * ldlm locking uses resource to serialize access to locks + * but there is a case when we change resource of lock upon + * enqueue reply. we rely on that lock->l_resource = new_res + * is atomic + */ +struct ldlm_resource * lock_res_and_lock(struct ldlm_lock *lock) { - int owner = 0; - - spin_lock(&lock->l_spin); - if (lock->l_owner == cfs_current()) - owner = 1; - spin_unlock(&lock->l_spin); - - /* This is safe to increment outside the spinlock because we - * can only have 1 CPU running on the current task - * (i.e. l_owner == current), regardless of the number of CPUs. - */ - if (owner) { - ++lock->l_depth; - } else { - mutex_down(&lock->l_sem); - spin_lock(&lock->l_spin); - lock->l_owner = cfs_current(); - lock->l_depth = 0; - spin_unlock(&lock->l_spin); - } + struct ldlm_resource *res = lock->l_resource; + + if (!res->lr_namespace->ns_client) { + /* on server-side resource of lock doesn't change */ + lock_res(res); + return res; + } + + lock_bitlock(lock); + res = lock->l_resource; + lock_res(res); + return res; } -void l_unlock(struct lustre_lock *lock) +void unlock_res_and_lock(struct ldlm_lock *lock) { - LASSERTF(lock->l_owner == cfs_current(), "lock %p, current %p\n", - lock->l_owner, cfs_current()); - LASSERTF(lock->l_depth >= 0, "depth %d\n", lock->l_depth); + struct ldlm_resource *res = lock->l_resource; - spin_lock(&lock->l_spin); - if (--lock->l_depth < 0) { - lock->l_owner = NULL; - spin_unlock(&lock->l_spin); - mutex_up(&lock->l_sem); + if (!res->lr_namespace->ns_client) { + /* on server-side resource of lock doesn't change */ + unlock_res(res); return; } - spin_unlock(&lock->l_spin); -} - -int l_has_lock(struct lustre_lock *lock) -{ - int depth = -1, owner = 0; - - spin_lock(&lock->l_spin); - if (lock->l_owner == cfs_current()) { - depth = lock->l_depth; - owner = 1; - } - spin_unlock(&lock->l_spin); - if (depth >= 0) - CDEBUG(D_INFO, "lock_depth: %d\n", depth); - return owner; + unlock_res(res); + unlock_bitlock(lock); } -#ifdef __KERNEL__ -void l_check_ns_lock(struct ldlm_namespace *ns) -{ - static cfs_time_t next_msg; - - if (!l_has_lock(&ns->ns_lock) && cfs_time_after(cfs_time_current(), next_msg)) { - CERROR("namespace %s lock not held when it should be; tell " - "phil\n", ns->ns_name); - libcfs_debug_dumpstack(NULL); - next_msg = cfs_time_shift(60); - } -} - -void l_check_no_ns_lock(struct ldlm_namespace *ns) -{ - static cfs_time_t next_msg; - - if (l_has_lock(&ns->ns_lock) && cfs_time_after(cfs_time_current(), next_msg)) { - CERROR("namespace %s lock held illegally; tell phil\n", - ns->ns_name); - libcfs_debug_dumpstack(NULL); - next_msg = cfs_time_shift(60); - } -} - -#else -void l_check_ns_lock(struct ldlm_namespace *ns) -{ - if (!l_has_lock(&ns->ns_lock)) { - CERROR("namespace %s lock not held when it should be; tell " - "phil\n", ns->ns_name); - } -} - -void l_check_no_ns_lock(struct ldlm_namespace *ns) -{ - if (l_has_lock(&ns->ns_lock)) { - CERROR("namespace %s lock held illegally; tell phil\n", - ns->ns_name); - } -} -#endif /* __KERNEL__ */ diff --git a/lustre/ldlm/ldlm_extent.c b/lustre/ldlm/ldlm_extent.c index 0f279e8..77970c3 100644 --- a/lustre/ldlm/ldlm_extent.c +++ b/lustre/ldlm/ldlm_extent.c @@ -174,7 +174,8 @@ static void ldlm_extent_policy(struct ldlm_resource *res, */ static int ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req, - int send_cbs, int *flags, ldlm_error_t *err) + int *flags, ldlm_error_t *err, + struct list_head *work_list) { struct list_head *tmp; struct ldlm_lock *lock; @@ -298,18 +299,18 @@ ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req, continue; } - if (!send_cbs) + if (!work_list) RETURN(0); compat = 0; if (lock->l_blocking_ast) - ldlm_add_ast_work_item(lock, req, NULL, 0); + ldlm_add_ast_work_item(lock, req, work_list); } RETURN(compat); destroylock: list_del_init(&req->l_res_link); - ldlm_lock_destroy(req); + ldlm_lock_destroy_nolock(req); *err = compat; RETURN(compat); } @@ -324,7 +325,7 @@ destroylock: * - the caller has NOT initialized req->lr_tmp, so we must * - must call this function with the ns lock held once */ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, - ldlm_error_t *err) + ldlm_error_t *err, struct list_head *work_list) { struct ldlm_resource *res = lock->l_resource; struct list_head rpc_list = CFS_LIST_HEAD_INIT(rpc_list); @@ -332,6 +333,7 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, ENTRY; LASSERT(list_empty(&res->lr_converting)); + check_res_locked(res); *err = ELDLM_OK; if (!first_enq) { @@ -341,12 +343,11 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, * flags should always be zero here, and if that ever stops * being true, we want to find out. */ LASSERT(*flags == 0); - LASSERT(res->lr_tmp != NULL); - rc = ldlm_extent_compat_queue(&res->lr_granted, lock, 0, flags, - err); + rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, + err, NULL); if (rc == 1) { - rc = ldlm_extent_compat_queue(&res->lr_waiting, lock, 0, - flags, err); + rc = ldlm_extent_compat_queue(&res->lr_waiting, lock, + flags, err, NULL); } if (rc == 0) RETURN(LDLM_ITER_STOP); @@ -354,31 +355,26 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, ldlm_resource_unlink_lock(lock); ldlm_extent_policy(res, lock, flags); - ldlm_grant_lock(lock, NULL, 0, 1); + ldlm_grant_lock(lock, work_list); RETURN(LDLM_ITER_CONTINUE); } restart: - LASSERT(res->lr_tmp == NULL); - res->lr_tmp = &rpc_list; - rc = ldlm_extent_compat_queue(&res->lr_granted, lock, 1, flags, err); + rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err, &rpc_list); if (rc < 0) GOTO(out, rc); /* lock was destroyed */ - if (rc == 2) { - res->lr_tmp = NULL; + if (rc == 2) goto grant; - } - rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, 1, flags, err); + rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err, &rpc_list); if (rc2 < 0) GOTO(out, rc = rc2); /* lock was destroyed */ - res->lr_tmp = NULL; if (rc + rc2 == 2) { grant: ldlm_extent_policy(res, lock, flags); ldlm_resource_unlink_lock(lock); - ldlm_grant_lock(lock, NULL, 0, 0); + ldlm_grant_lock(lock, NULL); } else { /* If either of the compat_queue()s returned failure, then we * have ASTs to send and must go onto the waiting list. @@ -388,9 +384,9 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, * re-ordered! Causes deadlock, because ASTs aren't sent! */ if (list_empty(&lock->l_res_link)) ldlm_resource_add_lock(res, &res->lr_waiting, lock); - l_unlock(&res->lr_namespace->ns_lock); - rc = ldlm_run_ast_work(res->lr_namespace, &rpc_list); - l_lock(&res->lr_namespace->ns_lock); + unlock_res(res); + rc = ldlm_run_bl_ast_work(&rpc_list); + lock_res(res); if (rc == -ERESTART) GOTO(restart, -ERESTART); *flags |= LDLM_FL_BLOCK_GRANTED; @@ -401,7 +397,6 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, } rc = 0; out: - res->lr_tmp = NULL; RETURN(rc); } diff --git a/lustre/ldlm/ldlm_flock.c b/lustre/ldlm/ldlm_flock.c index c86ee5c..2f837f2 100644 --- a/lustre/ldlm/ldlm_flock.c +++ b/lustre/ldlm/ldlm_flock.c @@ -91,7 +91,7 @@ ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, int flags) ldlm_lock_decref_internal(lock, mode); } - ldlm_lock_destroy(lock); + ldlm_lock_destroy_nolock(lock); EXIT; } @@ -124,7 +124,7 @@ restart: int ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, - ldlm_error_t *err) + ldlm_error_t *err, struct list_head *work_list) { struct ldlm_resource *res = req->l_resource; struct ldlm_namespace *ns = res->lr_namespace; @@ -393,20 +393,18 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, = CFS_LIST_HEAD_INIT(rpc_list); int rc; restart: - res->lr_tmp = &rpc_list; - ldlm_reprocess_queue(res, &res->lr_waiting); - res->lr_tmp = NULL; - - l_unlock(&ns->ns_lock); - rc = ldlm_run_ast_work(res->lr_namespace, - &rpc_list); - l_lock(&ns->ns_lock); + ldlm_reprocess_queue(res, &res->lr_waiting, + &rpc_list); + + unlock_res(res); + rc = ldlm_run_bl_ast_work(&rpc_list); + lock_res(res); if (rc == -ERESTART) GOTO(restart, -ERESTART); } } else { LASSERT(req->l_completion_ast); - ldlm_add_ast_work_item(req, NULL, NULL, 0); + ldlm_add_ast_work_item(req, NULL, work_list); } } @@ -500,7 +498,7 @@ granted: LDLM_DEBUG(lock, "client-side enqueue granted"); ns = lock->l_resource->lr_namespace; - l_lock(&ns->ns_lock); + lock_res(lock->l_resource); /* take lock off the deadlock detection waitq. */ list_del_init(&lock->l_flock_waitq); @@ -531,11 +529,11 @@ granted: /* We need to reprocess the lock to do merges or splits * with existing locks owned by this process. */ - ldlm_process_flock_lock(lock, &noreproc, 1, &err); + ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL); if (flags == 0) cfs_waitq_signal(&lock->l_waitq); } - l_unlock(&ns->ns_lock); + unlock_res(lock->l_resource); RETURN(0); } EXPORT_SYMBOL(ldlm_flock_completion_ast); @@ -552,8 +550,8 @@ int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, ns = lock->l_resource->lr_namespace; /* take lock off the deadlock detection waitq. */ - l_lock(&ns->ns_lock); + lock_res_and_lock(lock); list_del_init(&lock->l_flock_waitq); - l_unlock(&ns->ns_lock); + unlock_res_and_lock(lock); RETURN(0); } diff --git a/lustre/ldlm/ldlm_inodebits.c b/lustre/ldlm/ldlm_inodebits.c index 8c473dd..da1823d 100644 --- a/lustre/ldlm/ldlm_inodebits.c +++ b/lustre/ldlm/ldlm_inodebits.c @@ -35,7 +35,7 @@ /* Determine if the lock is compatible with all locks on the queue. */ static int ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req, - int send_cbs) + struct list_head *work_list) { struct list_head *tmp; struct ldlm_lock *lock; @@ -61,12 +61,12 @@ ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req, if (!(lock->l_policy_data.l_inodebits.bits & req_bits)) continue; - if (!send_cbs) + if (!work_list) RETURN(0); compat = 0; if (lock->l_blocking_ast) - ldlm_add_ast_work_item(lock, req, NULL, 0); + ldlm_add_ast_work_item(lock, req, work_list); } RETURN(compat); @@ -82,7 +82,8 @@ ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req, * - the caller has NOT initialized req->lr_tmp, so we must * - must call this function with the ns lock held once */ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, int *flags, - int first_enq, ldlm_error_t *err) + int first_enq, ldlm_error_t *err, + struct list_head *work_list) { struct ldlm_resource *res = lock->l_resource; struct list_head rpc_list = CFS_LIST_HEAD_INIT(rpc_list); @@ -90,27 +91,25 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, int *flags, ENTRY; LASSERT(list_empty(&res->lr_converting)); + check_res_locked(res); if (!first_enq) { - LASSERT(res->lr_tmp != NULL); - rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, 0); + LASSERT(work_list != NULL); + rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, NULL); if (!rc) RETURN(LDLM_ITER_STOP); - rc = ldlm_inodebits_compat_queue(&res->lr_waiting, lock, 0); + rc = ldlm_inodebits_compat_queue(&res->lr_waiting, lock, NULL); if (!rc) RETURN(LDLM_ITER_STOP); ldlm_resource_unlink_lock(lock); - ldlm_grant_lock(lock, NULL, 0, 1); + ldlm_grant_lock(lock, work_list); RETURN(LDLM_ITER_CONTINUE); } restart: - LASSERT(res->lr_tmp == NULL); - res->lr_tmp = &rpc_list; - rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, 1); - rc += ldlm_inodebits_compat_queue(&res->lr_waiting, lock, 1); - res->lr_tmp = NULL; + rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, &rpc_list); + rc += ldlm_inodebits_compat_queue(&res->lr_waiting, lock, &rpc_list); if (rc != 2) { /* If either of the compat_queue()s returned 0, then we @@ -121,15 +120,15 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, int *flags, * re-ordered! Causes deadlock, because ASTs aren't sent! */ if (list_empty(&lock->l_res_link)) ldlm_resource_add_lock(res, &res->lr_waiting, lock); - l_unlock(&res->lr_namespace->ns_lock); - rc = ldlm_run_ast_work(res->lr_namespace, &rpc_list); - l_lock(&res->lr_namespace->ns_lock); + unlock_res(res); + rc = ldlm_run_bl_ast_work(&rpc_list); + lock_res(res); if (rc == -ERESTART) GOTO(restart, -ERESTART); *flags |= LDLM_FL_BLOCK_GRANTED; } else { ldlm_resource_unlink_lock(lock); - ldlm_grant_lock(lock, NULL, 0, 0); + ldlm_grant_lock(lock, NULL); } RETURN(0); } diff --git a/lustre/ldlm/ldlm_internal.h b/lustre/ldlm/ldlm_internal.h index f9f6c43..fbc9c18 100644 --- a/lustre/ldlm/ldlm_internal.h +++ b/lustre/ldlm/ldlm_internal.h @@ -11,12 +11,12 @@ typedef enum { int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync); /* ldlm_resource.c */ +int ldlm_resource_putref_locked(struct ldlm_resource *res); void ldlm_resource_insert_lock_after(struct ldlm_lock *original, struct ldlm_lock *new); /* ldlm_lock.c */ -void ldlm_grant_lock(struct ldlm_lock *lock, void *data, int datalen, - int run_ast); +void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list); struct ldlm_lock * ldlm_lock_create(struct ldlm_namespace *ns, struct lustre_handle *parent_lock_handle, struct ldlm_res_id, @@ -28,9 +28,13 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *, struct ldlm_lock **, void ldlm_lock_addref_internal(struct ldlm_lock *, __u32 mode); void ldlm_lock_decref_internal(struct ldlm_lock *, __u32 mode); void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new, - void *data, int datalen); -int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue); -int ldlm_run_ast_work(struct ldlm_namespace *, struct list_head *rpc_list); + struct list_head *work_list); +int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue, + struct list_head *work_list); +int ldlm_run_bl_ast_work(struct list_head *rpc_list); +int ldlm_run_cp_ast_work(struct list_head *rpc_list); +int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock); +void ldlm_lock_destroy_nolock(struct ldlm_lock *lock); /* ldlm_lockd.c */ int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, @@ -40,19 +44,20 @@ void ldlm_handle_bl_callback(struct ldlm_namespace *ns, /* ldlm_plain.c */ int ldlm_process_plain_lock(struct ldlm_lock *lock, int *flags, int first_enq, - ldlm_error_t *err); + ldlm_error_t *err, struct list_head *work_list); /* ldlm_extent.c */ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, - ldlm_error_t *err); + ldlm_error_t *err, struct list_head *work_list); /* ldlm_flock.c */ -int ldlm_process_flock_lock(struct ldlm_lock *lock, int *flags, int first_enq, - ldlm_error_t *err); +int ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, + ldlm_error_t *err, struct list_head *work_list); /* ldlm_inodebits.c */ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, int *flags, - int first_enq, ldlm_error_t *err); + int first_enq, ldlm_error_t *err, + struct list_head *work_list); /* l_lock.c */ void l_check_ns_lock(struct ldlm_namespace *ns); diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 4d813d6..b99d074 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -83,7 +83,6 @@ char *ldlm_it2str(int it) } extern cfs_mem_cache_t *ldlm_lock_slab; -struct lustre_lock ldlm_handle_lock; static ldlm_processing_policy ldlm_processing_policy_table[] = { [LDLM_PLAIN] ldlm_process_plain_lock, @@ -125,50 +124,59 @@ void ldlm_lock_put(struct ldlm_lock *lock) { ENTRY; + LASSERT(lock->l_resource != LP_POISON); + LASSERT(atomic_read(&lock->l_refc) > 0); if (atomic_dec_and_test(&lock->l_refc)) { - struct ldlm_namespace *ns = lock->l_resource->lr_namespace; - struct obd_export *export = NULL; + struct ldlm_resource *res; - l_lock(&ns->ns_lock); LDLM_DEBUG(lock, "final lock_put on destroyed lock, freeing it."); + + lock_res_and_lock(lock); + res = lock->l_resource; LASSERT(lock->l_destroyed); LASSERT(list_empty(&lock->l_res_link)); - spin_lock(&ns->ns_counter_lock); - ns->ns_locks--; - spin_unlock(&ns->ns_counter_lock); - - ldlm_resource_putref(lock->l_resource); - lock->l_resource = NULL; - export = lock->l_export; - if (lock->l_parent) LDLM_LOCK_PUT(lock->l_parent); + unlock_res_and_lock(lock); + + atomic_dec(&res->lr_namespace->ns_locks); + ldlm_resource_putref(res); + lock->l_resource = NULL; + if (lock->l_export) + class_export_put(lock->l_export); if (lock->l_lvb_data != NULL) OBD_FREE(lock->l_lvb_data, lock->l_lvb_len); OBD_SLAB_FREE(lock, ldlm_lock_slab, sizeof(*lock)); - l_unlock(&ns->ns_lock); - if (export) - class_export_put(export); } EXIT; } -void ldlm_lock_remove_from_lru(struct ldlm_lock *lock) +int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock) { - ENTRY; - l_lock(&lock->l_resource->lr_namespace->ns_lock); + int rc = 0; if (!list_empty(&lock->l_lru)) { LASSERT(lock->l_resource->lr_type != LDLM_FLOCK); list_del_init(&lock->l_lru); lock->l_resource->lr_namespace->ns_nr_unused--; LASSERT(lock->l_resource->lr_namespace->ns_nr_unused >= 0); + rc = 1; } - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + return rc; +} + +int ldlm_lock_remove_from_lru(struct ldlm_lock *lock) +{ + int rc; + ENTRY; + spin_lock(&lock->l_resource->lr_namespace->ns_unused_lock); + rc = ldlm_lock_remove_from_lru_nolock(lock); + spin_unlock(&lock->l_resource->lr_namespace->ns_unused_lock); EXIT; + return rc; } /* This used to have a 'strict' flag, which recovery would use to mark an @@ -176,10 +184,9 @@ void ldlm_lock_remove_from_lru(struct ldlm_lock *lock) * shall explain why it's gone: with the new hash table scheme, once you call * ldlm_lock_destroy, you can never drop your final references on this lock. * Because it's not in the hash table anymore. -phil */ -void ldlm_lock_destroy(struct ldlm_lock *lock) +int ldlm_lock_destroy_internal(struct ldlm_lock *lock) { ENTRY; - l_lock(&lock->l_resource->lr_namespace->ns_lock); if (!list_empty(&lock->l_children)) { LDLM_ERROR(lock, "still has children (%p)!", @@ -201,13 +208,17 @@ void ldlm_lock_destroy(struct ldlm_lock *lock) if (lock->l_destroyed) { LASSERT(list_empty(&lock->l_lru)); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); EXIT; - return; + return 0; } lock->l_destroyed = 1; + if (lock->l_export) + spin_lock(&lock->l_export->exp_ldlm_data.led_lock); list_del_init(&lock->l_export_chain); + if (lock->l_export) + spin_unlock(&lock->l_export->exp_ldlm_data.led_lock); + ldlm_lock_remove_from_lru(lock); class_handle_unhash(&lock->l_handle); @@ -221,9 +232,32 @@ void ldlm_lock_destroy(struct ldlm_lock *lock) if (lock->l_export && lock->l_completion_ast) lock->l_completion_ast(lock, 0); #endif + EXIT; + return 1; +} - l_unlock(&lock->l_resource->lr_namespace->ns_lock); - LDLM_LOCK_PUT(lock); +void ldlm_lock_destroy(struct ldlm_lock *lock) +{ + int first; + ENTRY; + lock_res_and_lock(lock); + first = ldlm_lock_destroy_internal(lock); + unlock_res_and_lock(lock); + + /* drop reference from hashtable only for first destroy */ + if (first) + LDLM_LOCK_PUT(lock); + EXIT; +} + +void ldlm_lock_destroy_nolock(struct ldlm_lock *lock) +{ + int first; + ENTRY; + first = ldlm_lock_destroy_internal(lock); + /* drop reference from hashtable only for first destroy */ + if (first) + LDLM_LOCK_PUT(lock); EXIT; } @@ -256,22 +290,24 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent, atomic_set(&lock->l_refc, 2); CFS_INIT_LIST_HEAD(&lock->l_children); - CFS_INIT_LIST_HEAD(&lock->l_childof); CFS_INIT_LIST_HEAD(&lock->l_res_link); CFS_INIT_LIST_HEAD(&lock->l_lru); CFS_INIT_LIST_HEAD(&lock->l_export_chain); CFS_INIT_LIST_HEAD(&lock->l_pending_chain); + CFS_INIT_LIST_HEAD(&lock->l_tmp); + CFS_INIT_LIST_HEAD(&lock->l_bl_ast); + CFS_INIT_LIST_HEAD(&lock->l_cp_ast); cfs_waitq_init(&lock->l_waitq); + lock->l_blocking_lock = NULL; + lock->l_pidb = 0; - spin_lock(&resource->lr_namespace->ns_counter_lock); - resource->lr_namespace->ns_locks++; - spin_unlock(&resource->lr_namespace->ns_counter_lock); + atomic_inc(&resource->lr_namespace->ns_locks); if (parent != NULL) { - l_lock(&parent->l_resource->lr_namespace->ns_lock); + spin_lock(&resource->lr_namespace->ns_hash_lock); lock->l_parent = LDLM_LOCK_GET(parent); list_add(&lock->l_childof, &parent->l_children); - l_unlock(&parent->l_resource->lr_namespace->ns_lock); + spin_unlock(&resource->lr_namespace->ns_hash_lock); } CFS_INIT_LIST_HEAD(&lock->l_handle.h_link); @@ -284,13 +320,17 @@ int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock, struct ldlm_res_id new_resid) { struct ldlm_resource *oldres = lock->l_resource; + struct ldlm_resource *newres; + int type; ENTRY; - l_lock(&ns->ns_lock); + LASSERT(ns->ns_client != 0); + + lock_res_and_lock(lock); if (memcmp(&new_resid, &lock->l_resource->lr_name, sizeof(lock->l_resource->lr_name)) == 0) { /* Nothing to do */ - l_unlock(&ns->ns_lock); + unlock_res_and_lock(lock); RETURN(0); } @@ -299,17 +339,27 @@ int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock, /* This function assumes that the lock isn't on any lists */ LASSERT(list_empty(&lock->l_res_link)); - lock->l_resource = ldlm_resource_get(ns, NULL, new_resid, - lock->l_resource->lr_type, 1); - if (lock->l_resource == NULL) { + type = oldres->lr_type; + unlock_res_and_lock(lock); + + newres = ldlm_resource_get(ns, NULL, new_resid, type, 1); + if (newres == NULL) { LBUG(); RETURN(-ENOMEM); } + lock_res_and_lock(lock); + LASSERT(memcmp(&new_resid, &lock->l_resource->lr_name, + sizeof(lock->l_resource->lr_name)) != 0); + lock_res(newres); + lock->l_resource = newres; + unlock_res(newres); + unlock_res(oldres); + unlock_bitlock(lock); + /* ...and the flowers are still standing! */ ldlm_resource_putref(oldres); - l_unlock(&ns->ns_lock); RETURN(0); } @@ -342,17 +392,19 @@ struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *handle, int flags) ns = lock->l_resource->lr_namespace; LASSERT(ns != NULL); - l_lock(&ns->ns_lock); + lock_res_and_lock(lock); /* It's unlikely but possible that someone marked the lock as * destroyed after we did handle2object on it */ if (lock->l_destroyed) { + unlock_res_and_lock(lock); CDEBUG(D_INFO, "lock already destroyed: lock %p\n", lock); LDLM_LOCK_PUT(lock); GOTO(out, retval); } if (flags && (lock->l_flags & flags)) { + unlock_res_and_lock(lock); LDLM_LOCK_PUT(lock); GOTO(out, retval); } @@ -360,10 +412,10 @@ struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *handle, int flags) if (flags) lock->l_flags |= flags; + unlock_res_and_lock(lock); retval = lock; EXIT; out: - l_unlock(&ns->ns_lock); return retval; } @@ -371,11 +423,7 @@ struct ldlm_lock *ldlm_handle2lock_ns(struct ldlm_namespace *ns, struct lustre_handle *handle) { struct ldlm_lock *retval = NULL; - - l_lock(&ns->ns_lock); retval = __ldlm_handle2lock(handle, 0); - l_unlock(&ns->ns_lock); - return retval; } @@ -425,42 +473,46 @@ void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc) } } -void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new, - void *data, int datalen) +void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new, + struct list_head *work_list) { - struct ldlm_ast_work *w; - ENTRY; - - l_lock(&lock->l_resource->lr_namespace->ns_lock); - if (new && (lock->l_flags & LDLM_FL_AST_SENT)) - GOTO(out, 0); - - CDEBUG(D_OTHER, "lock %p incompatible; sending blocking AST.\n", lock); - - OBD_ALLOC(w, sizeof(*w)); - if (!w) { - LBUG(); - GOTO(out, 0); - } - - w->w_data = data; - w->w_datalen = datalen; - if (new) { + if ((lock->l_flags & LDLM_FL_AST_SENT) == 0) { LDLM_DEBUG(lock, "lock incompatible; sending blocking AST."); lock->l_flags |= LDLM_FL_AST_SENT; /* If the enqueuing client said so, tell the AST recipient to * discard dirty data, rather than writing back. */ if (new->l_flags & LDLM_AST_DISCARD_DATA) lock->l_flags |= LDLM_FL_DISCARD_DATA; - w->w_blocking = 1; - ldlm_lock2desc(new, &w->w_desc); + LASSERT(list_empty(&lock->l_bl_ast)); + list_add(&lock->l_bl_ast, work_list); + LDLM_LOCK_GET(lock); + LASSERT(lock->l_blocking_lock == NULL); + lock->l_blocking_lock = LDLM_LOCK_GET(new); + } +} + +void ldlm_add_cp_work_item(struct ldlm_lock *lock, struct list_head *work_list) +{ + if ((lock->l_flags & LDLM_FL_CP_REQD) == 0) { + lock->l_flags |= LDLM_FL_CP_REQD; + LDLM_DEBUG(lock, "lock granted; sending completion AST."); + LASSERT(list_empty(&lock->l_cp_ast)); + list_add(&lock->l_cp_ast, work_list); + LDLM_LOCK_GET(lock); } +} - w->w_lock = LDLM_LOCK_GET(lock); - list_add(&w->w_list, lock->l_resource->lr_tmp); +/* must be called with lr_lock held */ +void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new, + struct list_head *work_list) +{ + ENTRY; + check_res_locked(lock->l_resource); + if (new) + ldlm_add_bl_work_item(lock, new, work_list); + else + ldlm_add_cp_work_item(lock, work_list); EXIT; - out: - l_unlock(&lock->l_resource->lr_namespace->ns_lock); } void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode) @@ -472,10 +524,8 @@ void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode) LDLM_LOCK_PUT(lock); } -/* only called for local locks */ -void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode) +void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock, __u32 mode) { - l_lock(&lock->l_resource->lr_namespace->ns_lock); ldlm_lock_remove_from_lru(lock); if (mode & (LCK_NL | LCK_CR | LCK_PR)) lock->l_readers++; @@ -484,7 +534,14 @@ void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode) lock->l_last_used = cfs_time_current(); LDLM_LOCK_GET(lock); LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); +} + +/* only called for local locks */ +void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode) +{ + lock_res_and_lock(lock); + ldlm_lock_addref_internal_nolock(lock, mode); + unlock_res_and_lock(lock); } void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) @@ -492,8 +549,10 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) struct ldlm_namespace *ns; ENTRY; + lock_res_and_lock(lock); + ns = lock->l_resource->lr_namespace; - l_lock(&ns->ns_lock); + LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]); if (mode & (LCK_NL | LCK_CR | LCK_PR)) { LASSERT(lock->l_readers > 0); @@ -524,8 +583,9 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) LDLM_LOCK_GET(lock); /* dropped by bl thread */ ldlm_lock_remove_from_lru(lock); - l_unlock(&ns->ns_lock); - if (ldlm_bl_to_thread(ns, NULL, lock) != 0) + unlock_res_and_lock(lock); + if ((lock->l_flags & LDLM_FL_ATOMIC_CB) || + ldlm_bl_to_thread(ns, NULL, lock) != 0) ldlm_handle_bl_callback(ns, NULL, lock); } else if (ns->ns_client == LDLM_NAMESPACE_CLIENT && !lock->l_readers && !lock->l_writers && @@ -534,12 +594,14 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) * reference, put it on the LRU. */ LASSERT(list_empty(&lock->l_lru)); LASSERT(ns->ns_nr_unused >= 0); + spin_lock(&ns->ns_unused_lock); list_add_tail(&lock->l_lru, &ns->ns_unused_list); ns->ns_nr_unused++; - l_unlock(&ns->ns_lock); + spin_unlock(&ns->ns_unused_lock); + unlock_res_and_lock(lock); ldlm_cancel_lru(ns, LDLM_ASYNC); } else { - l_unlock(&ns->ns_lock); + unlock_res_and_lock(lock); } LDLM_LOCK_PUT(lock); /* matches the ldlm_lock_get in addref */ @@ -564,10 +626,10 @@ void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode) LASSERT(lock != NULL); - l_lock(&lock->l_resource->lr_namespace->ns_lock); LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]); + lock_res_and_lock(lock); lock->l_flags |= LDLM_FL_CBPENDING; - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + unlock_res_and_lock(lock); ldlm_lock_decref_internal(lock, mode); LDLM_LOCK_PUT(lock); } @@ -576,24 +638,25 @@ void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode) * - ldlm_lock_enqueue * - ldlm_reprocess_queue * - ldlm_lock_convert + * + * must be called with lr_lock held */ -void ldlm_grant_lock(struct ldlm_lock *lock, void *data, int datalen, - int run_ast) +void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list) { struct ldlm_resource *res = lock->l_resource; ENTRY; - l_lock(&lock->l_resource->lr_namespace->ns_lock); + check_res_locked(res); + lock->l_granted_mode = lock->l_req_mode; ldlm_resource_add_lock(res, &res->lr_granted, lock); if (lock->l_granted_mode < res->lr_most_restr) res->lr_most_restr = lock->l_granted_mode; - if (run_ast && lock->l_completion_ast != NULL) - ldlm_add_ast_work_item(lock, NULL, data, datalen); + if (work_list && lock->l_completion_ast != NULL) + ldlm_add_ast_work_item(lock, NULL, work_list); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); EXIT; } @@ -657,7 +720,7 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode, if (flags & LDLM_FL_TEST_LOCK) LDLM_LOCK_GET(lock); else - ldlm_lock_addref_internal(lock, mode); + ldlm_lock_addref_internal_nolock(lock, mode); return lock; } @@ -666,10 +729,10 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode, void ldlm_lock_allow_match(struct ldlm_lock *lock) { - l_lock(&lock->l_resource->lr_namespace->ns_lock); + lock_res_and_lock(lock); lock->l_flags |= LDLM_FL_CAN_MATCH; cfs_waitq_signal(&lock->l_waitq); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + unlock_res_and_lock(lock); } /* Can be called in two ways: @@ -718,7 +781,7 @@ int ldlm_lock_match(struct ldlm_namespace *ns, int flags, RETURN(0); } - l_lock(&ns->ns_lock); + lock_res(res); lock = search_queue(&res->lr_granted, mode, policy, old_lock, flags); if (lock != NULL) @@ -734,8 +797,8 @@ int ldlm_lock_match(struct ldlm_namespace *ns, int flags, EXIT; out: + unlock_res(res); ldlm_resource_putref(res); - l_unlock(&ns->ns_lock); if (lock) { ldlm_lock2handle(lock, lockh); @@ -764,13 +827,11 @@ int ldlm_lock_match(struct ldlm_namespace *ns, int flags, } out2: if (rc) { - l_lock(&ns->ns_lock); LDLM_DEBUG(lock, "matched ("LPU64" "LPU64")", (type == LDLM_PLAIN || type == LDLM_IBITS) ? res_id->name[2] : policy->l_extent.start, (type == LDLM_PLAIN || type == LDLM_IBITS) ? res_id->name[3] : policy->l_extent.end); - l_unlock(&ns->ns_lock); } else if (!(flags & LDLM_FL_TEST_LOCK)) {/*less verbose for test-only*/ LDLM_DEBUG_NOLOCK("not matched ns %p type %u mode %u res " LPU64"/"LPU64" ("LPU64" "LPU64")", ns, @@ -874,7 +935,7 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns, } } - l_lock(&ns->ns_lock); + lock_res_and_lock(lock); if (local && lock->l_req_mode == lock->l_granted_mode) { /* The server returned a blocked lock, but it was granted before * we got a chance to actually enqueue it. We don't need to do @@ -906,7 +967,7 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns, else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED)) ldlm_resource_add_lock(res, &res->lr_waiting, lock); else - ldlm_grant_lock(lock, NULL, 0, 0); + ldlm_grant_lock(lock, NULL); GOTO(out, ELDLM_OK); } else if (*flags & LDLM_FL_REPLAY) { if (*flags & LDLM_FL_BLOCK_CONV) { @@ -916,22 +977,23 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns, ldlm_resource_add_lock(res, &res->lr_waiting, lock); GOTO(out, ELDLM_OK); } else if (*flags & LDLM_FL_BLOCK_GRANTED) { - ldlm_grant_lock(lock, NULL, 0, 0); + ldlm_grant_lock(lock, NULL); GOTO(out, ELDLM_OK); } /* If no flags, fall through to normal enqueue path. */ } policy = ldlm_processing_policy_table[res->lr_type]; - policy(lock, flags, 1, &rc); + policy(lock, flags, 1, &rc, NULL); GOTO(out, rc); out: - l_unlock(&ns->ns_lock); + unlock_res_and_lock(lock); return rc; } /* Must be called with namespace taken: queue is waiting or converting. */ -int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue) +int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue, + struct list_head *work_list) { struct list_head *tmp, *pos; ldlm_processing_policy policy; @@ -940,6 +1002,8 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue) ldlm_error_t err; ENTRY; + check_res_locked(res); + policy = ldlm_processing_policy_table[res->lr_type]; LASSERT(policy); @@ -950,7 +1014,7 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue) CDEBUG(D_INFO, "Reprocessing lock %p\n", pending); flags = 0; - rc = policy(pending, &flags, 0, &err); + rc = policy(pending, &flags, 0, &err, work_list); if (rc != LDLM_ITER_CONTINUE) break; } @@ -958,49 +1022,79 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue) RETURN(rc); } -int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list) +int ldlm_run_bl_ast_work(struct list_head *rpc_list) { struct list_head *tmp, *pos; - int rc, retval = 0; + struct ldlm_lock_desc d; + int rc = 0, retval = 0; ENTRY; - l_check_no_ns_lock(ns); - list_for_each_safe(tmp, pos, rpc_list) { - struct ldlm_ast_work *w = - list_entry(tmp, struct ldlm_ast_work, w_list); - - /* It's possible to receive a completion AST before we've set - * the l_completion_ast pointer: either because the AST arrived - * before the reply, or simply because there's a small race - * window between receiving the reply and finishing the local - * enqueue. (bug 842) - * - * This can't happen with the blocking_ast, however, because we - * will never call the local blocking_ast until we drop our - * reader/writer reference, which we won't do until we get the - * reply and finish enqueueing. */ - LASSERT(w->w_lock != NULL); - if (w->w_blocking) { - LASSERT(w->w_lock->l_blocking_ast != NULL); - rc = w->w_lock->l_blocking_ast - (w->w_lock, &w->w_desc, w->w_data, - LDLM_CB_BLOCKING); - } else if (w->w_lock->l_completion_ast != NULL) { - rc = w->w_lock->l_completion_ast(w->w_lock, w->w_flags, - w->w_data); - } else { - rc = 0; - } + struct ldlm_lock *lock = + list_entry(tmp, struct ldlm_lock, l_bl_ast); + + /* nobody should touch l_bl_ast */ + lock_res_and_lock(lock); + list_del_init(&lock->l_bl_ast); + + LASSERT(lock->l_flags & LDLM_FL_AST_SENT); + LASSERT(lock->l_bl_ast_run == 0); + LASSERT(lock->l_blocking_lock); + lock->l_bl_ast_run++; + unlock_res_and_lock(lock); + + ldlm_lock2desc(lock->l_blocking_lock, &d); + + LDLM_LOCK_PUT(lock->l_blocking_lock); + lock->l_blocking_lock = NULL; + rc = lock->l_blocking_ast(lock, &d, NULL, LDLM_CB_BLOCKING); + + if (rc == -ERESTART) + retval = rc; + else if (rc) + CDEBUG(D_DLMTRACE, "Failed AST - should clean & " + "disconnect client\n"); + LDLM_LOCK_PUT(lock); + } + RETURN(retval); +} +int ldlm_run_cp_ast_work(struct list_head *rpc_list) +{ + struct list_head *tmp, *pos; + int rc = 0, retval = 0; + ENTRY; + + /* It's possible to receive a completion AST before we've set + * the l_completion_ast pointer: either because the AST arrived + * before the reply, or simply because there's a small race + * window between receiving the reply and finishing the local + * enqueue. (bug 842) + * + * This can't happen with the blocking_ast, however, because we + * will never call the local blocking_ast until we drop our + * reader/writer reference, which we won't do until we get the + * reply and finish enqueueing. */ + + list_for_each_safe(tmp, pos, rpc_list) { + struct ldlm_lock *lock = + list_entry(tmp, struct ldlm_lock, l_cp_ast); + + /* nobody should touch l_cp_ast */ + lock_res_and_lock(lock); + list_del_init(&lock->l_cp_ast); + LASSERT(lock->l_flags & LDLM_FL_CP_REQD); + lock->l_flags &= ~LDLM_FL_CP_REQD; + unlock_res_and_lock(lock); + + if (lock->l_completion_ast != NULL) + rc = lock->l_completion_ast(lock, 0, 0); if (rc == -ERESTART) retval = rc; else if (rc) CDEBUG(D_DLMTRACE, "Failed AST - should clean & " "disconnect client\n"); - LDLM_LOCK_PUT(w->w_lock); - list_del(&w->w_list); - OBD_FREE(w, sizeof(*w)); + LDLM_LOCK_PUT(lock); } RETURN(retval); } @@ -1013,28 +1107,32 @@ static int reprocess_one_queue(struct ldlm_resource *res, void *closure) void ldlm_reprocess_all_ns(struct ldlm_namespace *ns) { + struct list_head *tmp; int i, rc; ENTRY; - l_lock(&ns->ns_lock); + spin_lock(&ns->ns_hash_lock); for (i = 0; i < RES_HASH_SIZE; i++) { - struct list_head *tmp, *next; - list_for_each_safe(tmp, next, &(ns->ns_hash[i])) { + tmp = ns->ns_hash[i].next; + while (tmp != &(ns->ns_hash[i])) { struct ldlm_resource *res = list_entry(tmp, struct ldlm_resource, lr_hash); ldlm_resource_getref(res); - l_unlock(&ns->ns_lock); + spin_unlock(&ns->ns_hash_lock); + rc = reprocess_one_queue(res, NULL); - l_lock(&ns->ns_lock); - next = tmp->next; - ldlm_resource_putref(res); + + spin_lock(&ns->ns_hash_lock); + tmp = tmp->next; + ldlm_resource_putref_locked(res); + if (rc == LDLM_ITER_STOP) GOTO(out, rc); } } out: - l_unlock(&ns->ns_lock); + spin_unlock(&ns->ns_hash_lock); EXIT; } @@ -1051,17 +1149,13 @@ void ldlm_reprocess_all(struct ldlm_resource *res) } restart: - l_lock(&res->lr_namespace->ns_lock); - res->lr_tmp = &rpc_list; - - rc = ldlm_reprocess_queue(res, &res->lr_converting); + lock_res(res); + rc = ldlm_reprocess_queue(res, &res->lr_converting, &rpc_list); if (rc == LDLM_ITER_CONTINUE) - ldlm_reprocess_queue(res, &res->lr_waiting); + ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list); + unlock_res(res); - res->lr_tmp = NULL; - l_unlock(&res->lr_namespace->ns_lock); - - rc = ldlm_run_ast_work(res->lr_namespace, &rpc_list); + rc = ldlm_run_cp_ast_work(&rpc_list); if (rc == -ERESTART) { LASSERT(list_empty(&rpc_list)); goto restart; @@ -1071,23 +1165,19 @@ void ldlm_reprocess_all(struct ldlm_resource *res) void ldlm_cancel_callback(struct ldlm_lock *lock) { - struct ldlm_namespace *ns; - - ns = lock->l_resource->lr_namespace; - l_lock(&ns->ns_lock); + check_res_locked(lock->l_resource); if (!(lock->l_flags & LDLM_FL_CANCEL)) { lock->l_flags |= LDLM_FL_CANCEL; if (lock->l_blocking_ast) { - l_unlock(&ns->ns_lock); // l_check_no_ns_lock(ns); + unlock_res_and_lock(lock); lock->l_blocking_ast(lock, NULL, lock->l_ast_data, LDLM_CB_CANCELING); - return; + lock_res_and_lock(lock); } else { LDLM_DEBUG(lock, "no blocking ast"); } } - l_unlock(&ns->ns_lock); } void ldlm_lock_cancel(struct ldlm_lock *lock) @@ -1096,12 +1186,12 @@ void ldlm_lock_cancel(struct ldlm_lock *lock) struct ldlm_namespace *ns; ENTRY; + ldlm_del_waiting_lock(lock); + lock_res_and_lock(lock); + res = lock->l_resource; ns = res->lr_namespace; - l_lock(&ns->ns_lock); - ldlm_del_waiting_lock(lock); - /* Please do not, no matter how tempting, remove this LBUG without * talking to me first. -phik */ if (lock->l_readers || lock->l_writers) { @@ -1112,8 +1202,10 @@ void ldlm_lock_cancel(struct ldlm_lock *lock) ldlm_cancel_callback(lock); ldlm_resource_unlink_lock(lock); + unlock_res_and_lock(lock); + ldlm_lock_destroy(lock); - l_unlock(&ns->ns_lock); + EXIT; } @@ -1132,23 +1224,26 @@ int ldlm_lock_set_data(struct lustre_handle *lockh, void *data) void ldlm_cancel_locks_for_export(struct obd_export *exp) { - struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; struct ldlm_lock *lock; struct ldlm_resource *res; - l_lock(&ns->ns_lock); + spin_lock(&exp->exp_ldlm_data.led_lock); while(!list_empty(&exp->exp_ldlm_data.led_held_locks)) { lock = list_entry(exp->exp_ldlm_data.led_held_locks.next, struct ldlm_lock, l_export_chain); res = ldlm_resource_getref(lock->l_resource); + LDLM_LOCK_GET(lock); + spin_unlock(&exp->exp_ldlm_data.led_lock); + LDLM_DEBUG(lock, "export %p", exp); ldlm_lock_cancel(lock); - l_unlock(&ns->ns_lock); ldlm_reprocess_all(res); + ldlm_resource_putref(res); - l_lock(&ns->ns_lock); + LDLM_LOCK_PUT(lock); + spin_lock(&exp->exp_ldlm_data.led_lock); } - l_unlock(&ns->ns_lock); + spin_unlock(&exp->exp_ldlm_data.led_lock); } struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, @@ -1170,11 +1265,11 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, LASSERTF(new_mode == LCK_PW && lock->l_granted_mode == LCK_PR, "new_mode %u, granted %u\n", new_mode, lock->l_granted_mode); + lock_res_and_lock(lock); + res = lock->l_resource; ns = res->lr_namespace; - l_lock(&ns->ns_lock); - old_mode = lock->l_req_mode; lock->l_req_mode = new_mode; ldlm_resource_unlink_lock(lock); @@ -1190,9 +1285,7 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, *flags); LBUG(); - res->lr_tmp = &rpc_list; - ldlm_grant_lock(lock, NULL, 0, 0); - res->lr_tmp = NULL; + ldlm_grant_lock(lock, &rpc_list); granted = 1; /* FIXME: completion handling not with ns_lock held ! */ if (lock->l_completion_ast) @@ -1202,9 +1295,7 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, int pflags = 0; ldlm_processing_policy policy; policy = ldlm_processing_policy_table[res->lr_type]; - res->lr_tmp = &rpc_list; - rc = policy(lock, &pflags, 0, &err); - res->lr_tmp = NULL; + rc = policy(lock, &pflags, 0, &err, &rpc_list); if (rc == LDLM_ITER_STOP) { lock->l_req_mode = old_mode; ldlm_resource_add_lock(res, &res->lr_granted, lock); @@ -1214,11 +1305,10 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, granted = 1; } } - - l_unlock(&ns->ns_lock); + unlock_res_and_lock(lock); if (granted) - ldlm_run_ast_work(ns, &rpc_list); + ldlm_run_cp_ast_work(&rpc_list); RETURN(res); } diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 95c44dc..034219e 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -280,7 +280,6 @@ static int ldlm_add_waiting_lock(struct ldlm_lock *lock) { int ret; - l_check_ns_lock(lock->l_resource->lr_namespace); LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)); spin_lock_bh(&waiting_locks_spinlock); @@ -340,8 +339,6 @@ int ldlm_del_waiting_lock(struct ldlm_lock *lock) { int ret; - l_check_ns_lock(lock->l_resource->lr_namespace); - if (lock->l_export == NULL) { /* We don't have a "waiting locks list" on clients. */ LDLM_DEBUG(lock, "client lock: no-op"); @@ -363,8 +360,6 @@ int ldlm_del_waiting_lock(struct ldlm_lock *lock) */ int ldlm_refresh_waiting_lock(struct ldlm_lock *lock) { - l_check_ns_lock(lock->l_resource->lr_namespace); - if (lock->l_export == NULL) { /* We don't have a "waiting locks list" on clients. */ LDLM_DEBUG(lock, "client lock: no-op"); @@ -446,13 +441,10 @@ static int ldlm_handle_ast_error(struct ldlm_lock *lock, ldlm_lock_cancel(lock); rc = -ERESTART; } else { - l_lock(&lock->l_resource->lr_namespace->ns_lock); ldlm_del_waiting_lock(lock); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); ldlm_failed_ast(lock, rc, ast_type); } } else if (rc) { - l_lock(&lock->l_resource->lr_namespace->ns_lock); if (rc == -EINVAL) LDLM_DEBUG(lock, "client (nid %s) returned %d" " from %s AST - normal race", @@ -466,7 +458,6 @@ static int ldlm_handle_ast_error(struct ldlm_lock *lock, lustre_msg_get_status(req->rq_repmsg) : 0, ast_type); ldlm_lock_cancel(lock); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); /* Server-side AST functions are called from ldlm_reprocess_all, * which needs to be told to please restart its reprocessing. */ rc = -ERESTART; @@ -493,25 +484,34 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, LASSERT(lock); - l_lock(&lock->l_resource->lr_namespace->ns_lock); + req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse, + LUSTRE_DLM_VERSION, LDLM_BL_CALLBACK, 2, size, + NULL); + if (req == NULL) + RETURN(-ENOMEM); + + lock_res(lock->l_resource); if (lock->l_granted_mode != lock->l_req_mode) { /* this blocking AST will be communicated as part of the * completion AST instead */ + unlock_res(lock->l_resource); + ptlrpc_req_finished(req); LDLM_DEBUG(lock, "lock not granted, not sending blocking AST"); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); RETURN(0); } if (lock->l_destroyed) { /* What's the point? */ - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + unlock_res(lock->l_resource); + ptlrpc_req_finished(req); RETURN(0); } #if 0 if (CURRENT_SECONDS - lock->l_export->exp_last_request_time > 30){ + unlock_res(lock->l_resource); + ptlrpc_req_finished(req); ldlm_failed_ast(lock, -ETIMEDOUT, "Not-attempted blocking"); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); RETURN(-ETIMEDOUT); } #endif @@ -519,14 +519,6 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) instant_cancel = 1; - req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse, - LUSTRE_DLM_VERSION, LDLM_BL_CALLBACK, 2, size, - NULL); - if (req == NULL) { - l_unlock(&lock->l_resource->lr_namespace->ns_lock); - RETURN(-ENOMEM); - } - body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body)); body->lock_handle1 = lock->l_remote_handle; body->lock_desc = *desc; @@ -534,12 +526,13 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, LDLM_DEBUG(lock, "server preparing blocking AST"); ptlrpc_req_set_repsize(req, 1, NULL); - if (instant_cancel) + if (instant_cancel) { + unlock_res(lock->l_resource); ldlm_lock_cancel(lock); - else if (lock->l_granted_mode == lock->l_req_mode) + } else if (lock->l_granted_mode == lock->l_req_mode) { ldlm_add_waiting_lock(lock); - - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + unlock_res(lock->l_resource); + } req->rq_send_state = LUSTRE_IMP_FULL; req->rq_timeout = ldlm_get_rq_timeout(ldlm_timeout, obd_timeout); /* timeout for initial AST reply */ @@ -581,12 +574,12 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) LDLM_ERROR(lock, "enqueue wait took %luus from %lu", total_enqueue_wait, lock->l_enqueued_time.tv_sec); - mutex_down(&lock->l_resource->lr_lvb_sem); + lock_res_and_lock(lock); if (lock->l_resource->lr_lvb_len) { size[DLM_REQ_REC_OFF] = lock->l_resource->lr_lvb_len; buffers = 3; } - mutex_up(&lock->l_resource->lr_lvb_sem); + unlock_res_and_lock(lock); req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse, LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK, buffers, @@ -602,12 +595,12 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) if (buffers == 3) { void *lvb; - mutex_down(&lock->l_resource->lr_lvb_sem); lvb = lustre_msg_buf(req->rq_reqmsg, DLM_REQ_REC_OFF, lock->l_resource->lr_lvb_len); + lock_res_and_lock(lock); memcpy(lvb, lock->l_resource->lr_lvb_data, lock->l_resource->lr_lvb_len); - mutex_up(&lock->l_resource->lr_lvb_sem); + unlock_res_and_lock(lock); } LDLM_DEBUG(lock, "server preparing completion AST (after %ldus wait)", @@ -618,7 +611,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) req->rq_timeout = ldlm_get_rq_timeout(ldlm_timeout, obd_timeout); /* timeout for initial AST reply */ /* We only send real blocking ASTs after the lock is granted */ - l_lock(&lock->l_resource->lr_namespace->ns_lock); + lock_res_and_lock(lock); if (lock->l_flags & LDLM_FL_AST_SENT) { body->lock_flags |= LDLM_FL_AST_SENT; @@ -629,14 +622,16 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) * that would not only cancel the lock, but will also remove * it from waiting list */ if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) { + unlock_res_and_lock(lock); ldlm_lock_cancel(lock); instant_cancel = 1; + lock_res_and_lock(lock); } else { ldlm_add_waiting_lock(lock); /* start the lock-timeout clock */ } } - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + unlock_res_and_lock(lock); rc = ptlrpc_queue_wait(req); if (rc != 0) @@ -673,9 +668,10 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data) body->lock_handle1 = lock->l_remote_handle; ldlm_lock2desc(lock, &body->lock_desc); - mutex_down(&lock->l_resource->lr_lvb_sem); + lock_res_and_lock(lock); size[REPLY_REC_OFF] = lock->l_resource->lr_lvb_len; - mutex_up(&lock->l_resource->lr_lvb_sem); + unlock_res_and_lock(lock); + res = lock->l_resource; ptlrpc_req_set_repsize(req, 2, size); req->rq_send_state = LUSTRE_IMP_FULL; @@ -696,20 +692,19 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data) static struct ldlm_lock * find_existing_lock(struct obd_export *exp, struct lustre_handle *remote_hdl) { - struct obd_device *obd = exp->exp_obd; struct list_head *iter; - l_lock(&obd->obd_namespace->ns_lock); + spin_lock(&exp->exp_ldlm_data.led_lock); list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) { struct ldlm_lock *lock; lock = list_entry(iter, struct ldlm_lock, l_export_chain); if (lock->l_remote_handle.cookie == remote_hdl->cookie) { LDLM_LOCK_GET(lock); - l_unlock(&obd->obd_namespace->ns_lock); + spin_unlock(&exp->exp_ldlm_data.led_lock); return lock; } } - l_unlock(&obd->obd_namespace->ns_lock); + spin_unlock(&exp->exp_ldlm_data.led_lock); return NULL; } @@ -817,18 +812,17 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req, LDLM_DEBUG(lock, "server-side enqueue handler, new lock created"); OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_BLOCKED, obd_timeout * 2); - l_lock(&lock->l_resource->lr_namespace->ns_lock); /* Don't enqueue a lock onto the export if it has already * been evicted. Cancel it now instead. (bug 3822) */ if (req->rq_export->exp_failed) { LDLM_ERROR(lock, "lock on destroyed export %p", req->rq_export); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); GOTO(out, rc = -ENOTCONN); } lock->l_export = class_export_get(req->rq_export); + spin_lock(&lock->l_export->exp_ldlm_data.led_lock); list_add(&lock->l_export_chain, &lock->l_export->exp_ldlm_data.led_held_locks); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + spin_unlock(&lock->l_export->exp_ldlm_data.led_lock); existing_lock: @@ -839,12 +833,12 @@ existing_lock: } else { int buffers = 2; - mutex_down(&lock->l_resource->lr_lvb_sem); + lock_res_and_lock(lock); if (lock->l_resource->lr_lvb_len) { size[DLM_REPLY_REC_OFF] = lock->l_resource->lr_lvb_len; buffers = 3; } - mutex_up(&lock->l_resource->lr_lvb_sem); + unlock_res_and_lock(lock); if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR)) GOTO(out, rc = -ENOMEM); @@ -872,7 +866,7 @@ existing_lock: /* We never send a blocking AST until the lock is granted, but * we can tell it right now */ - l_lock(&lock->l_resource->lr_namespace->ns_lock); + lock_res_and_lock(lock); /* Now take into account flags to be inherited from original lock request both in reply to client and in our own lock flags. */ @@ -887,9 +881,11 @@ existing_lock: rc = -ENOTCONN; } else if (lock->l_flags & LDLM_FL_AST_SENT) { dlm_rep->lock_flags |= LDLM_FL_AST_SENT; - if (dlm_rep->lock_flags & LDLM_FL_CANCEL_ON_BLOCK) + if (dlm_rep->lock_flags & LDLM_FL_CANCEL_ON_BLOCK) { + unlock_res_and_lock(lock); ldlm_lock_cancel(lock); - else if (lock->l_granted_mode == lock->l_req_mode) + lock_res_and_lock(lock); + } else if (lock->l_granted_mode == lock->l_req_mode) ldlm_add_waiting_lock(lock); } /* Make sure we never ever grant usual metadata locks to liblustre @@ -917,7 +913,7 @@ existing_lock: } } - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + unlock_res_and_lock(lock); EXIT; out: @@ -932,13 +928,11 @@ existing_lock: /* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this * ldlm_reprocess_all. If this moves, revisit that code. -phil */ if (lock) { - l_lock(&lock->l_resource->lr_namespace->ns_lock); LDLM_DEBUG(lock, "server-side enqueue handler, sending reply" "(err=%d, rc=%d)", err, rc); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); if (rc == 0) { - mutex_down(&lock->l_resource->lr_lvb_sem); + lock_res_and_lock(lock); size[DLM_REPLY_REC_OFF] = lock->l_resource->lr_lvb_len; if (size[DLM_REPLY_REC_OFF] > 0) { void *lvb = lustre_msg_buf(req->rq_repmsg, @@ -950,10 +944,12 @@ existing_lock: memcpy(lvb, lock->l_resource->lr_lvb_data, size[DLM_REPLY_REC_OFF]); } - mutex_up(&lock->l_resource->lr_lvb_sem); + unlock_res_and_lock(lock); } else { + lock_res_and_lock(lock); ldlm_resource_unlink_lock(lock); - ldlm_lock_destroy(lock); + ldlm_lock_destroy_nolock(lock); + unlock_res_and_lock(lock); } if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK) @@ -998,19 +994,16 @@ int ldlm_handle_convert(struct ptlrpc_request *req) if (!lock) { req->rq_status = EINVAL; } else { - void *res; - l_lock(&lock->l_resource->lr_namespace->ns_lock); + void *res = NULL; + LDLM_DEBUG(lock, "server-side convert handler START"); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + do_gettimeofday(&lock->l_enqueued_time); res = ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode, &dlm_rep->lock_flags); if (res) { - l_lock(&lock->l_resource->lr_namespace->ns_lock); if (ldlm_del_waiting_lock(lock)) - CDEBUG(D_DLMTRACE,"converted waiting lock %p\n", - lock); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + LDLM_DEBUG(lock, "converted waiting lock"); req->rq_status = 0; } else { req->rq_status = EDEADLOCK; @@ -1020,9 +1013,7 @@ int ldlm_handle_convert(struct ptlrpc_request *req) if (lock) { if (!req->rq_status) ldlm_reprocess_all(lock->l_resource); - l_lock(&lock->l_resource->lr_namespace->ns_lock); LDLM_DEBUG(lock, "server-side convert handler END"); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); LDLM_LOCK_PUT(lock); } else LDLM_DEBUG_NOLOCK("server-side convert handler END"); @@ -1063,10 +1054,8 @@ int ldlm_handle_cancel(struct ptlrpc_request *req) dlm_req->lock_handle1.cookie); req->rq_status = ESTALE; } else { - res = lock->l_resource; - l_lock(&res->lr_namespace->ns_lock); LDLM_DEBUG(lock, "server-side cancel handler START"); - l_unlock(&res->lr_namespace->ns_lock); + res = lock->l_resource; if (res && res->lr_namespace->ns_lvbo && res->lr_namespace->ns_lvbo->lvbo_update) { (void)res->lr_namespace->ns_lvbo->lvbo_update @@ -1074,11 +1063,9 @@ int ldlm_handle_cancel(struct ptlrpc_request *req) //(res, req->rq_reqmsg, 1); } - l_lock(&res->lr_namespace->ns_lock); ldlm_lock_cancel(lock); if (ldlm_del_waiting_lock(lock)) CDEBUG(D_DLMTRACE, "cancelled waiting lock %p\n", lock); - l_unlock(&res->lr_namespace->ns_lock); req->rq_status = rc; } @@ -1087,9 +1074,7 @@ int ldlm_handle_cancel(struct ptlrpc_request *req) if (lock) { ldlm_reprocess_all(lock->l_resource); - l_lock(&lock->l_resource->lr_namespace->ns_lock); LDLM_DEBUG(lock, "server-side cancel handler END"); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); LDLM_LOCK_PUT(lock); } @@ -1102,33 +1087,29 @@ void ldlm_handle_bl_callback(struct ldlm_namespace *ns, int do_ast; ENTRY; - l_lock(&ns->ns_lock); LDLM_DEBUG(lock, "client blocking AST callback handler START"); + lock_res_and_lock(lock); lock->l_flags |= LDLM_FL_CBPENDING; if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) lock->l_flags |= LDLM_FL_CANCEL; do_ast = (!lock->l_readers && !lock->l_writers); + unlock_res_and_lock(lock); if (do_ast) { LDLM_DEBUG(lock, "already unused, calling " "callback (%p)", lock->l_blocking_ast); - if (lock->l_blocking_ast != NULL) { - l_unlock(&ns->ns_lock); - l_check_no_ns_lock(ns); + if (lock->l_blocking_ast != NULL) lock->l_blocking_ast(lock, ld, lock->l_ast_data, LDLM_CB_BLOCKING); - l_lock(&ns->ns_lock); - } } else { LDLM_DEBUG(lock, "Lock still has references, will be" " cancelled later"); } LDLM_DEBUG(lock, "client blocking callback handler END"); - l_unlock(&ns->ns_lock); LDLM_LOCK_PUT(lock); EXIT; } @@ -1141,9 +1122,10 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, CFS_LIST_HEAD(ast_list); ENTRY; - l_lock(&ns->ns_lock); LDLM_DEBUG(lock, "client completion callback handler START"); + lock_res_and_lock(lock); + /* If we receive the completion AST before the actual enqueue returned, * then we might need to switch lock modes, resources, or extents. */ if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) { @@ -1160,9 +1142,12 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, if (memcmp(&dlm_req->lock_desc.l_resource.lr_name, &lock->l_resource->lr_name, sizeof(lock->l_resource->lr_name)) != 0) { + unlock_res_and_lock(lock); ldlm_lock_change_resource(ns, lock, dlm_req->lock_desc.l_resource.lr_name); LDLM_DEBUG(lock, "completion AST, new resource"); + CERROR("change resource!\n"); + lock_res_and_lock(lock); } if (dlm_req->lock_flags & LDLM_FL_AST_SENT) { @@ -1182,17 +1167,16 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, } } - lock->l_resource->lr_tmp = &ast_list; - ldlm_grant_lock(lock, req, sizeof(*req), 1); - lock->l_resource->lr_tmp = NULL; + ldlm_grant_lock(lock, &ast_list); + unlock_res_and_lock(lock); + LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work"); - l_unlock(&ns->ns_lock); - LDLM_LOCK_PUT(lock); - ldlm_run_ast_work(ns, &ast_list); + ldlm_run_cp_ast_work(&ast_list); LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)", lock); + LDLM_LOCK_PUT(lock); EXIT; } @@ -1204,15 +1188,10 @@ static void ldlm_handle_gl_callback(struct ptlrpc_request *req, int rc = -ENOSYS; ENTRY; - l_lock(&ns->ns_lock); LDLM_DEBUG(lock, "client glimpse AST callback handler"); - if (lock->l_glimpse_ast != NULL) { - l_unlock(&ns->ns_lock); - l_check_no_ns_lock(ns); + if (lock->l_glimpse_ast != NULL) rc = lock->l_glimpse_ast(lock, req); - l_lock(&ns->ns_lock); - } if (req->rq_repmsg != NULL) { ptlrpc_reply(req); @@ -1221,17 +1200,19 @@ static void ldlm_handle_gl_callback(struct ptlrpc_request *req, ptlrpc_error(req); } - l_unlock(&ns->ns_lock); + lock_res_and_lock(lock); if (lock->l_granted_mode == LCK_PW && !lock->l_readers && !lock->l_writers && cfs_time_after(cfs_time_current(), cfs_time_add(lock->l_last_used, cfs_time_seconds(10)))) { + unlock_res_and_lock(lock); if (ldlm_bl_to_thread(ns, NULL, lock)) ldlm_handle_bl_callback(ns, NULL, lock); + EXIT; return; } - + unlock_res_and_lock(lock); LDLM_LOCK_PUT(lock); EXIT; } @@ -1384,7 +1365,9 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) } /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */ + lock_res_and_lock(lock); lock->l_flags |= (dlm_req->lock_flags & LDLM_AST_FLAGS); + unlock_res_and_lock(lock); /* We want the ost thread to get this reply so that it can respond * to ost requests (write cache writeback) that might be triggered @@ -1750,8 +1733,6 @@ int __init ldlm_init(void) return -ENOMEM; } - l_lock_init(&ldlm_handle_lock); - return 0; } @@ -1832,10 +1813,7 @@ EXPORT_SYMBOL(ldlm_namespace_dump); EXPORT_SYMBOL(ldlm_dump_all_namespaces); EXPORT_SYMBOL(ldlm_resource_get); EXPORT_SYMBOL(ldlm_resource_putref); - -/* l_lock.c */ -EXPORT_SYMBOL(l_lock); -EXPORT_SYMBOL(l_unlock); +EXPORT_SYMBOL(ldlm_resource_unlink_lock); /* ldlm_lib.c */ EXPORT_SYMBOL(client_import_add_conn); @@ -1854,3 +1832,8 @@ EXPORT_SYMBOL(target_queue_recovery_request); EXPORT_SYMBOL(target_handle_ping); EXPORT_SYMBOL(target_handle_disconnect); EXPORT_SYMBOL(target_queue_final_reply); + +/* l_lock.c */ +EXPORT_SYMBOL(lock_res_and_lock); +EXPORT_SYMBOL(unlock_res_and_lock); + diff --git a/lustre/ldlm/ldlm_plain.c b/lustre/ldlm/ldlm_plain.c index 38b0c40..69b1104 100644 --- a/lustre/ldlm/ldlm_plain.c +++ b/lustre/ldlm/ldlm_plain.c @@ -38,7 +38,7 @@ static inline int ldlm_plain_compat_queue(struct list_head *queue, struct ldlm_lock *req, - int send_cbs) + struct list_head *work_list) { struct list_head *tmp; struct ldlm_lock *lock; @@ -57,12 +57,12 @@ ldlm_plain_compat_queue(struct list_head *queue, struct ldlm_lock *req, if (lockmode_compat(lock->l_req_mode, req_mode)) continue; - if (!send_cbs) + if (!work_list) RETURN(0); compat = 0; if (lock->l_blocking_ast) - ldlm_add_ast_work_item(lock, req, NULL, 0); + ldlm_add_ast_work_item(lock, req, work_list); } RETURN(compat); @@ -78,7 +78,7 @@ ldlm_plain_compat_queue(struct list_head *queue, struct ldlm_lock *req, * - the caller has NOT initialized req->lr_tmp, so we must * - must call this function with the ns lock held once */ int ldlm_process_plain_lock(struct ldlm_lock *lock, int *flags, int first_enq, - ldlm_error_t *err) + ldlm_error_t *err, struct list_head *work_list) { struct ldlm_resource *res = lock->l_resource; struct list_head rpc_list = CFS_LIST_HEAD_INIT(rpc_list); @@ -88,25 +88,22 @@ int ldlm_process_plain_lock(struct ldlm_lock *lock, int *flags, int first_enq, LASSERT(list_empty(&res->lr_converting)); if (!first_enq) { - LASSERT(res->lr_tmp != NULL); - rc = ldlm_plain_compat_queue(&res->lr_granted, lock, 0); + LASSERT(work_list != NULL); + rc = ldlm_plain_compat_queue(&res->lr_granted, lock, NULL); if (!rc) RETURN(LDLM_ITER_STOP); - rc = ldlm_plain_compat_queue(&res->lr_waiting, lock, 0); + rc = ldlm_plain_compat_queue(&res->lr_waiting, lock, NULL); if (!rc) RETURN(LDLM_ITER_STOP); ldlm_resource_unlink_lock(lock); - ldlm_grant_lock(lock, NULL, 0, 1); + ldlm_grant_lock(lock, work_list); RETURN(LDLM_ITER_CONTINUE); } restart: - LASSERT(res->lr_tmp == NULL); - res->lr_tmp = &rpc_list; - rc = ldlm_plain_compat_queue(&res->lr_granted, lock, 1); - rc += ldlm_plain_compat_queue(&res->lr_waiting, lock, 1); - res->lr_tmp = NULL; + rc = ldlm_plain_compat_queue(&res->lr_granted, lock, &rpc_list); + rc += ldlm_plain_compat_queue(&res->lr_waiting, lock, &rpc_list); if (rc != 2) { /* If either of the compat_queue()s returned 0, then we @@ -117,15 +114,15 @@ int ldlm_process_plain_lock(struct ldlm_lock *lock, int *flags, int first_enq, * re-ordered! Causes deadlock, because ASTs aren't sent! */ if (list_empty(&lock->l_res_link)) ldlm_resource_add_lock(res, &res->lr_waiting, lock); - l_unlock(&res->lr_namespace->ns_lock); - rc = ldlm_run_ast_work(res->lr_namespace, &rpc_list); - l_lock(&res->lr_namespace->ns_lock); + unlock_res(res); + rc = ldlm_run_bl_ast_work(&rpc_list); + lock_res(res); if (rc == -ERESTART) GOTO(restart, -ERESTART); *flags |= LDLM_FL_BLOCK_GRANTED; } else { ldlm_resource_unlink_lock(lock); - ldlm_grant_lock(lock, NULL, 0, 0); + ldlm_grant_lock(lock, NULL); } RETURN(0); } diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index ead2009..2cbd590 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -166,20 +166,20 @@ int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, RETURN(0); } - l_lock(&lock->l_resource->lr_namespace->ns_lock); + lock_res_and_lock(lock); /* Get this: if ldlm_blocking_ast is racing with intent_policy, such * that ldlm_blocking_ast is called just before intent_policy method * takes the ns_lock, then by the time we get the lock, we might not * be the correct blocking function anymore. So check, and return * early, if so. */ if (lock->l_blocking_ast != ldlm_blocking_ast) { - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + unlock_res_and_lock(lock); RETURN(0); } lock->l_flags |= LDLM_FL_CBPENDING; do_ast = (!lock->l_readers && !lock->l_writers); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + unlock_res_and_lock(lock); if (do_ast) { struct lustre_handle lockh; @@ -251,8 +251,12 @@ int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, struct ldlm_res_id res_id, ldlm_lock_addref_internal(lock, mode); ldlm_lock2handle(lock, lockh); + lock_res_and_lock(lock); lock->l_flags |= LDLM_FL_LOCAL; + if (*flags & LDLM_FL_ATOMIC_CB) + lock->l_flags |= LDLM_FL_ATOMIC_CB; lock->l_lvb_swabber = lvb_swabber; + unlock_res_and_lock(lock); if (policy != NULL) lock->l_policy_data = *policy; if (type == LDLM_EXTENT) @@ -286,10 +290,10 @@ static void failed_lock_cleanup(struct ldlm_namespace *ns, struct lustre_handle *lockh, int mode) { /* Set a flag to prevent us from sending a CANCEL (bug 407) */ - l_lock(&ns->ns_lock); + lock_res_and_lock(lock); lock->l_flags |= LDLM_FL_LOCAL_ONLY; + unlock_res_and_lock(lock); LDLM_DEBUG(lock, "setting FL_LOCAL_ONLY"); - l_unlock(&ns->ns_lock); ldlm_lock_decref_and_cancel(lockh, mode); @@ -353,14 +357,14 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, /* lock enqueued on the server */ cleanup_phase = 0; - l_lock(&ns->ns_lock); + lock_res_and_lock(lock); lock->l_remote_handle = reply->lock_handle; *flags = reply->lock_flags; lock->l_flags |= reply->lock_flags & LDLM_INHERIT_FLAGS; /* move NO_TIMEOUT flag to the lock to force ldlm_lock_match() * to wait with no timeout as well */ lock->l_flags |= reply->lock_flags & LDLM_FL_NO_TIMEOUT; - l_unlock(&ns->ns_lock); + unlock_res_and_lock(lock); CDEBUG(D_INFO, "local: %p, remote cookie: "LPX64", flags: 0x%x\n", lock, reply->lock_handle.cookie, *flags); @@ -406,9 +410,9 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, * because it cannot handle asynchronous ASTs robustly (see * bug 7311). */ (LIBLUSTRE_CLIENT && type == LDLM_EXTENT)) { - l_lock(&ns->ns_lock); + lock_res_and_lock(lock); lock->l_flags |= LDLM_FL_CBPENDING; - l_unlock(&ns->ns_lock); + unlock_res_and_lock(lock); LDLM_DEBUG(lock, "enqueue reply includes blocking AST"); } @@ -703,12 +707,12 @@ int ldlm_cli_cancel(struct lustre_handle *lockh) LDLM_DEBUG(lock, "client-side cancel"); /* Set this flag to prevent others from getting new references*/ - l_lock(&lock->l_resource->lr_namespace->ns_lock); + lock_res_and_lock(lock); lock->l_flags |= LDLM_FL_CBPENDING; local_only = (lock->l_flags & (LDLM_FL_LOCAL_ONLY|LDLM_FL_CANCEL_ON_BLOCK)); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); ldlm_cancel_callback(lock); + unlock_res_and_lock(lock); if (local_only) { CDEBUG(D_INFO, "not sending request (at caller's " @@ -797,17 +801,32 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync) sync = LDLM_SYNC; /* force to be sync in user space */ #endif - l_lock(&ns->ns_lock); + spin_lock(&ns->ns_unused_lock); count = ns->ns_nr_unused - ns->ns_max_unused; if (count <= 0) { - l_unlock(&ns->ns_lock); + spin_unlock(&ns->ns_unused_lock); RETURN(0); } - list_for_each_entry_safe(lock, next, &ns->ns_unused_list, l_lru) { + while (!list_empty(&ns->ns_unused_list)) { + struct list_head *tmp = ns->ns_unused_list.next; + lock = list_entry(tmp, struct ldlm_lock, l_lru); LASSERT(!lock->l_readers && !lock->l_writers); + LDLM_LOCK_GET(lock); /* dropped by bl thread */ + spin_unlock(&ns->ns_unused_lock); + + lock_res_and_lock(lock); + if (ldlm_lock_remove_from_lru(lock) == 0) { + /* other thread is removing lock from lru */ + unlock_res_and_lock(lock); + LDLM_LOCK_PUT(lock); + spin_lock(&ns->ns_unused_lock); + continue; + + } + /* If we have chosen to canecl this lock voluntarily, we better send cancel notification to server, so that it frees appropriate state. This might lead to a race where while @@ -822,32 +841,34 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync) * won't see this flag and call l_blocking_ast */ lock->l_flags |= LDLM_FL_CBPENDING; - LDLM_LOCK_GET(lock); /* dropped by bl thread */ - ldlm_lock_remove_from_lru(lock); - /* We can't re-add to l_lru as it confuses the refcounting in * ldlm_lock_remove_from_lru() if an AST arrives after we drop - * ns_lock below. Use l_export_chain as that is unused on - * client, and lru is client-only (l_pending_chain is used by - * ldlm_chain_lock_for_replay() on client). bug 5666 */ - if (sync != LDLM_ASYNC || ldlm_bl_to_thread(ns, NULL, lock)) { - LASSERTF(list_empty(&lock->l_export_chain), - "lock %p next %p prev %p\n", - lock, &lock->l_export_chain.next, - &lock->l_export_chain.prev); - __LDLM_DEBUG(D_INFO, lock, "adding to LRU clear list"); - list_add(&lock->l_export_chain, &cblist); + * ns_lock below. We use l_tmp and can't use l_pending_chain as + * it is used both on server and client nevertheles bug 5666 + * says it is used only on server. --umka */ + list_add(&lock->l_tmp, &cblist); + unlock_res_and_lock(lock); + + LDLM_LOCK_GET(lock); /* to hold lock after bl thread */ + if (sync == LDLM_ASYNC && (ldlm_bl_to_thread(ns, NULL, lock) == 0)) { + lock_res_and_lock(lock); + list_del_init(&lock->l_tmp); + unlock_res_and_lock(lock); } + LDLM_LOCK_PUT(lock); + + spin_lock(&ns->ns_unused_lock); if (--count == 0) break; } - l_unlock(&ns->ns_lock); + spin_unlock(&ns->ns_unused_lock); - list_for_each_entry_safe(lock, next, &cblist, l_export_chain) { - list_del_init(&lock->l_export_chain); + list_for_each_entry_safe(lock, next, &cblist, l_tmp) { + list_del_init(&lock->l_tmp); ldlm_handle_bl_callback(ns, NULL, lock); } + RETURN(rc); } @@ -855,9 +876,9 @@ static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns, struct ldlm_res_id res_id, int flags, void *opaque) { - struct ldlm_resource *res; struct list_head *tmp, *next, list = CFS_LIST_HEAD_INIT(list); - struct ldlm_ast_work *w; + struct ldlm_resource *res; + struct ldlm_lock *lock; ENTRY; res = ldlm_resource_get(ns, NULL, res_id, 0, 0); @@ -867,9 +888,8 @@ static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns, RETURN(0); } - l_lock(&ns->ns_lock); + lock_res(res); list_for_each(tmp, &res->lr_granted) { - struct ldlm_lock *lock; lock = list_entry(tmp, struct ldlm_lock, l_res_link); if (opaque != NULL && lock->l_ast_data != opaque) { @@ -890,31 +910,27 @@ static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns, /* See CBPENDING comment in ldlm_cancel_lru */ lock->l_flags |= LDLM_FL_CBPENDING; - OBD_ALLOC(w, sizeof(*w)); - LASSERT(w); - - w->w_lock = LDLM_LOCK_GET(lock); - - list_add(&w->w_list, &list); + LASSERT(list_empty(&lock->l_bl_ast)); + list_add(&lock->l_bl_ast, &list); + LDLM_LOCK_GET(lock); } - l_unlock(&ns->ns_lock); + unlock_res(res); list_for_each_safe(tmp, next, &list) { struct lustre_handle lockh; int rc; - w = list_entry(tmp, struct ldlm_ast_work, w_list); + lock = list_entry(tmp, struct ldlm_lock, l_bl_ast); if (flags & LDLM_FL_LOCAL_ONLY) { - ldlm_lock_cancel(w->w_lock); + ldlm_lock_cancel(lock); } else { - ldlm_lock2handle(w->w_lock, &lockh); + ldlm_lock2handle(lock, &lockh); rc = ldlm_cli_cancel(&lockh); if (rc != ELDLM_OK) CERROR("ldlm_cli_cancel: %d\n", rc); } - list_del(&w->w_list); - LDLM_LOCK_PUT(w->w_lock); - OBD_FREE(w, sizeof(*w)); + list_del_init(&lock->l_bl_ast); + LDLM_LOCK_PUT(lock); } ldlm_resource_putref(res); @@ -922,6 +938,18 @@ static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns, RETURN(0); } +static inline int have_no_nsresource(struct ldlm_namespace *ns) +{ + int no_resource = 0; + + spin_lock(&ns->ns_hash_lock); + if (ns->ns_resources == 0) + no_resource = 1; + spin_unlock(&ns->ns_hash_lock); + + RETURN(no_resource); +} + /* Cancel all locks on a namespace (or a specific resource, if given) * that have 0 readers/writers. * @@ -941,14 +969,17 @@ int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, RETURN(ldlm_cli_cancel_unused_resource(ns, *res_id, flags, opaque)); - l_lock(&ns->ns_lock); + spin_lock(&ns->ns_hash_lock); for (i = 0; i < RES_HASH_SIZE; i++) { - struct list_head *tmp, *pos; - list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) { - int rc; + struct list_head *tmp; + tmp = ns->ns_hash[i].next; + while (tmp != &(ns->ns_hash[i])) { struct ldlm_resource *res; + int rc; + res = list_entry(tmp, struct ldlm_resource, lr_hash); ldlm_resource_getref(res); + spin_unlock(&ns->ns_hash_lock); rc = ldlm_cli_cancel_unused_resource(ns, res->lr_name, flags, opaque); @@ -956,10 +987,13 @@ int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, if (rc) CERROR("cancel_unused_res ("LPU64"): %d\n", res->lr_name.name[0], rc); - ldlm_resource_putref(res); + + spin_lock(&ns->ns_hash_lock); + tmp = tmp->next; + ldlm_resource_putref_locked(res); } } - l_unlock(&ns->ns_lock); + spin_unlock(&ns->ns_hash_lock); RETURN(ELDLM_OK); } @@ -980,7 +1014,7 @@ int ldlm_cli_join_lru(struct ldlm_namespace *ns, RETURN(count); LASSERT(res->lr_type == LDLM_EXTENT); - l_lock(&ns->ns_lock); + lock_res(res); if (!join) goto split; @@ -989,9 +1023,11 @@ int ldlm_cli_join_lru(struct ldlm_namespace *ns, !lock->l_readers && !lock->l_writers && !(lock->l_flags & LDLM_FL_LOCAL) && !(lock->l_flags & LDLM_FL_CBPENDING)) { + spin_lock(&ns->ns_unused_lock); LASSERT(ns->ns_nr_unused >= 0); list_add_tail(&lock->l_lru, &ns->ns_unused_list); ns->ns_nr_unused++; + spin_unlock(&ns->ns_unused_lock); lock->l_flags &= ~LDLM_FL_NO_LRU; LDLM_DEBUG(lock, "join lock to lru"); count++; @@ -999,16 +1035,18 @@ int ldlm_cli_join_lru(struct ldlm_namespace *ns, } goto unlock; split: + spin_lock(&ns->ns_unused_lock); list_for_each_entry_safe (lock, n, &ns->ns_unused_list, l_lru) { if (lock->l_resource == res) { - ldlm_lock_remove_from_lru(lock); + ldlm_lock_remove_from_lru_nolock(lock); lock->l_flags |= LDLM_FL_NO_LRU; LDLM_DEBUG(lock, "split lock from lru"); count++; } } + spin_unlock(&ns->ns_unused_lock); unlock: - l_unlock(&ns->ns_lock); + unlock_res(res); ldlm_resource_putref(res); RETURN(count); } @@ -1021,15 +1059,13 @@ int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter, struct list_head *tmp, *next; struct ldlm_lock *lock; int rc = LDLM_ITER_CONTINUE; - struct ldlm_namespace *ns; ENTRY; if (!res) RETURN(LDLM_ITER_CONTINUE); - ns = res->lr_namespace; - l_lock(&ns->ns_lock); + lock_res(res); list_for_each_safe(tmp, next, &res->lr_granted) { lock = list_entry(tmp, struct ldlm_lock, l_res_link); @@ -1051,7 +1087,7 @@ int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter, GOTO(out, rc = LDLM_ITER_STOP); } out: - l_unlock(&ns->ns_lock); + unlock_res(res); RETURN(rc); } @@ -1082,24 +1118,29 @@ int ldlm_namespace_foreach_res(struct ldlm_namespace *ns, ldlm_res_iterator_t iter, void *closure) { int i, rc = LDLM_ITER_CONTINUE; + struct ldlm_resource *res; + struct list_head *tmp; ENTRY; - l_lock(&ns->ns_lock); + spin_lock(&ns->ns_hash_lock); for (i = 0; i < RES_HASH_SIZE; i++) { - struct list_head *tmp, *next; - list_for_each_safe(tmp, next, &(ns->ns_hash[i])) { - struct ldlm_resource *res = - list_entry(tmp, struct ldlm_resource, lr_hash); - + tmp = ns->ns_hash[i].next; + while (tmp != &(ns->ns_hash[i])) { + res = list_entry(tmp, struct ldlm_resource, lr_hash); ldlm_resource_getref(res); + spin_unlock(&ns->ns_hash_lock); + rc = iter(res, closure); - ldlm_resource_putref(res); + + spin_lock(&ns->ns_hash_lock); + tmp = tmp->next; + ldlm_resource_putref_locked(res); if (rc == LDLM_ITER_STOP) GOTO(out, rc); } } out: - l_unlock(&ns->ns_lock); + spin_unlock(&ns->ns_hash_lock); RETURN(rc); } @@ -1121,9 +1162,7 @@ void ldlm_resource_iterate(struct ldlm_namespace *ns, struct ldlm_res_id *res_id return; } - l_lock(&ns->ns_lock); ldlm_resource_foreach(res, iter, data); - l_unlock(&ns->ns_lock); ldlm_resource_putref(res); EXIT; } @@ -1262,7 +1301,6 @@ int ldlm_replay_locks(struct obd_import *imp) /* ensure this doesn't fall to 0 before all have been queued */ atomic_inc(&imp->imp_replay_inflight); - l_lock(&ns->ns_lock); (void)ldlm_namespace_foreach(ns, ldlm_chain_lock_for_replay, &list); list_for_each_entry_safe(lock, next, &list, l_pending_chain) { @@ -1271,7 +1309,6 @@ int ldlm_replay_locks(struct obd_import *imp) continue; /* or try to do the rest? */ rc = replay_one_lock(imp, lock); } - l_unlock(&ns->ns_lock); atomic_dec(&imp->imp_replay_inflight); diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index 02e22fd..318201e 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -243,12 +243,12 @@ struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 client) strcpy(ns->ns_name, name); CFS_INIT_LIST_HEAD(&ns->ns_root_list); - l_lock_init(&ns->ns_lock); - cfs_waitq_init(&ns->ns_refcount_waitq); - atomic_set(&ns->ns_refcount, 0); + ns->ns_refcount = 0; ns->ns_client = client; - spin_lock_init(&ns->ns_counter_lock); - ns->ns_locks = 0; + spin_lock_init(&ns->ns_hash_lock); + atomic_set(&ns->ns_locks, 0); + ns->ns_resources = 0; + cfs_waitq_init(&ns->ns_waitq); for (bucket = ns->ns_hash + RES_HASH_SIZE - 1; bucket >= ns->ns_hash; bucket--) @@ -257,6 +257,7 @@ struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 client) CFS_INIT_LIST_HEAD(&ns->ns_unused_list); ns->ns_nr_unused = 0; ns->ns_max_unused = LDLM_DEFAULT_LRU_SIZE; + spin_lock_init(&ns->ns_unused_lock); mutex_down(&ldlm_namespace_lock); list_add(&ns->ns_list_chain, &ldlm_namespace_list); @@ -284,15 +285,33 @@ extern struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock); static void cleanup_resource(struct ldlm_resource *res, struct list_head *q, int flags) { - struct list_head *tmp, *pos; + struct list_head *tmp; int rc = 0, client = res->lr_namespace->ns_client; int local_only = (flags & LDLM_FL_LOCAL_ONLY); ENTRY; - list_for_each_safe(tmp, pos, q) { - struct ldlm_lock *lock; - lock = list_entry(tmp, struct ldlm_lock, l_res_link); - LDLM_LOCK_GET(lock); + + do { + struct ldlm_lock *lock = NULL; + + /* first, we look for non-cleaned-yet lock + * all cleaned locks are marked by CLEANED flag */ + lock_res(res); + list_for_each(tmp, q) { + lock = list_entry(tmp, struct ldlm_lock, l_res_link); + if (lock->l_flags & LDLM_FL_CLEANED) { + lock = NULL; + continue; + } + LDLM_LOCK_GET(lock); + lock->l_flags |= LDLM_FL_CLEANED; + break; + } + + if (lock == NULL) { + unlock_res(res); + break; + } /* Set CBPENDING so nothing in the cancellation path * can match this lock */ @@ -307,6 +326,7 @@ static void cleanup_resource(struct ldlm_resource *res, struct list_head *q, * will go away ... */ /* ... without sending a CANCEL message. */ lock->l_flags |= LDLM_FL_LOCAL_ONLY; + unlock_res(res); LDLM_DEBUG(lock, "setting FL_LOCAL_ONLY"); if (lock->l_completion_ast) lock->l_completion_ast(lock, 0, NULL); @@ -316,6 +336,8 @@ static void cleanup_resource(struct ldlm_resource *res, struct list_head *q, if (client) { struct lustre_handle lockh; + + unlock_res(res); ldlm_lock2handle(lock, &lockh); if (!local_only) { rc = ldlm_cli_cancel(&lockh); @@ -326,19 +348,21 @@ static void cleanup_resource(struct ldlm_resource *res, struct list_head *q, if (local_only || rc != ELDLM_OK) ldlm_lock_cancel(lock); } else { + ldlm_resource_unlink_lock(lock); + unlock_res(res); LDLM_DEBUG(lock, "Freeing a lock still held by a " "client node"); - - ldlm_resource_unlink_lock(lock); ldlm_lock_destroy(lock); } LDLM_LOCK_PUT(lock); - } + } while (1); + EXIT; } int ldlm_namespace_cleanup(struct ldlm_namespace *ns, int flags) { + struct list_head *tmp; int i; if (ns == NULL) { @@ -346,27 +370,35 @@ int ldlm_namespace_cleanup(struct ldlm_namespace *ns, int flags) return ELDLM_OK; } - l_lock(&ns->ns_lock); for (i = 0; i < RES_HASH_SIZE; i++) { - struct list_head *tmp, *pos; - list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) { + spin_lock(&ns->ns_hash_lock); + tmp = ns->ns_hash[i].next; + while (tmp != &(ns->ns_hash[i])) { struct ldlm_resource *res; res = list_entry(tmp, struct ldlm_resource, lr_hash); ldlm_resource_getref(res); + spin_unlock(&ns->ns_hash_lock); cleanup_resource(res, &res->lr_granted, flags); cleanup_resource(res, &res->lr_converting, flags); cleanup_resource(res, &res->lr_waiting, flags); - if (!ldlm_resource_putref(res)) { - CERROR("Namespace %s resource refcount %d " - "after lock cleanup; forcing cleanup.\n", - ns->ns_name, - atomic_read(&res->lr_refcount)); - } + spin_lock(&ns->ns_hash_lock); + tmp = tmp->next; + + /* XXX: former stuff caused issues in case of race + * between ldlm_namespace_cleanup() and lockd() when + * client gets blocking ast when lock gets distracted by + * server. This is 1_4 branch solution, let's see how + * will it behave. */ + if (!ldlm_resource_putref_locked(res)) + CDEBUG(D_INFO, + "Namespace %s resource refcount nonzero " + "(%d) after lock cleanup; forcing cleanup.\n", + ns->ns_name, atomic_read(&res->lr_refcount)); } + spin_unlock(&ns->ns_hash_lock); } - l_unlock(&ns->ns_lock); return ELDLM_OK; } @@ -398,22 +430,21 @@ int ldlm_namespace_free(struct ldlm_namespace *ns, int force) } #endif - if (atomic_read(&ns->ns_refcount) > 0) { + if (ns->ns_refcount > 0) { struct l_wait_info lwi = LWI_INTR(NULL, NULL); int rc; CDEBUG(D_DLMTRACE, "dlm namespace %s free waiting on refcount %d\n", - ns->ns_name, atomic_read(&ns->ns_refcount)); - rc = l_wait_event(ns->ns_refcount_waitq, - atomic_read(&ns->ns_refcount) == 0, &lwi); - if (atomic_read(&ns->ns_refcount)) { + ns->ns_name, ns->ns_refcount); + rc = l_wait_event(ns->ns_waitq, + ns->ns_refcount == 0, &lwi); + if (ns->ns_refcount) LCONSOLE_ERROR("Lock manager: wait for %s namespace " "cleanup aborted with %d resources in " "use. (%d)\nI'm going to try to clean " "up anyway, but I might need a reboot " "of this node.\n", ns->ns_name, - atomic_read(&ns->ns_refcount), rc); - } + (int) ns->ns_refcount, rc); CDEBUG(D_DLMTRACE, "dlm namespace %s free done waiting\n", ns->ns_name); } @@ -456,20 +487,43 @@ static struct ldlm_resource *ldlm_resource_new(void) CFS_INIT_LIST_HEAD(&res->lr_granted); CFS_INIT_LIST_HEAD(&res->lr_converting); CFS_INIT_LIST_HEAD(&res->lr_waiting); - sema_init(&res->lr_lvb_sem, 1); atomic_set(&res->lr_refcount, 1); + spin_lock_init(&res->lr_lock); + + /* one who creates the resource must unlock + * the semaphore after lvb initialization */ + init_MUTEX_LOCKED(&res->lr_lvb_sem); return res; } +/* must be called with hash lock held */ +static struct ldlm_resource * +ldlm_resource_find(struct ldlm_namespace *ns, struct ldlm_res_id name, __u32 hash) +{ + struct list_head *bucket, *tmp; + struct ldlm_resource *res; + + LASSERT_SPIN_LOCKED(&ns->ns_hash_lock); + bucket = ns->ns_hash + hash; + + list_for_each(tmp, bucket) { + res = list_entry(tmp, struct ldlm_resource, lr_hash); + if (memcmp(&res->lr_name, &name, sizeof(res->lr_name)) == 0) + return res; + } + + return NULL; +} + /* Args: locked namespace * Returns: newly-allocated, referenced, unlocked resource */ static struct ldlm_resource * ldlm_resource_add(struct ldlm_namespace *ns, struct ldlm_resource *parent, - struct ldlm_res_id name, ldlm_type_t type) + struct ldlm_res_id name, __u32 hash, ldlm_type_t type) { struct list_head *bucket; - struct ldlm_resource *res; + struct ldlm_resource *res, *old_res; ENTRY; LASSERTF(type >= LDLM_MIN_TYPE && type < LDLM_MAX_TYPE, @@ -479,16 +533,31 @@ ldlm_resource_add(struct ldlm_namespace *ns, struct ldlm_resource *parent, if (!res) RETURN(NULL); - l_lock(&ns->ns_lock); res->lr_name = name; res->lr_namespace = ns; - atomic_inc(&ns->ns_refcount); - res->lr_type = type; res->lr_most_restr = LCK_NL; - bucket = ns->ns_hash + ldlm_hash_fn(parent, name); + spin_lock(&ns->ns_hash_lock); + old_res = ldlm_resource_find(ns, name, hash); + if (old_res) { + /* someone won the race and added the resource before */ + ldlm_resource_getref(old_res); + spin_unlock(&ns->ns_hash_lock); + OBD_SLAB_FREE(res, ldlm_resource_slab, sizeof *res); + /* synchronize WRT resource creation */ + if (ns->ns_lvbo && ns->ns_lvbo->lvbo_init) { + down(&old_res->lr_lvb_sem); + up(&old_res->lr_lvb_sem); + } + RETURN(old_res); + } + + /* we won! let's add the resource */ + bucket = ns->ns_hash + hash; list_add(&res->lr_hash, bucket); + ns->ns_resources++; + ns->ns_refcount++; if (parent == NULL) { list_add(&res->lr_childof, &ns->ns_root_list); @@ -496,8 +565,19 @@ ldlm_resource_add(struct ldlm_namespace *ns, struct ldlm_resource *parent, res->lr_parent = parent; list_add(&res->lr_childof, &parent->lr_children); } - l_unlock(&ns->ns_lock); + spin_unlock(&ns->ns_hash_lock); + + if (ns->ns_lvbo && ns->ns_lvbo->lvbo_init) { + int rc; + OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CREATE_RESOURCE, 2); + rc = ns->ns_lvbo->lvbo_init(res); + if (rc) + CERROR("lvbo_init failed for resource " + LPU64": rc %d\n", name.name[0], rc); + /* we create resource with locked lr_lvb_sem */ + up(&res->lr_lvb_sem); + } RETURN(res); } @@ -509,7 +589,7 @@ struct ldlm_resource * ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent, struct ldlm_res_id name, ldlm_type_t type, int create) { - struct list_head *bucket, *tmp; + __u32 hash = ldlm_hash_fn(parent, name); struct ldlm_resource *res = NULL; ENTRY; @@ -517,47 +597,24 @@ ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent, LASSERT(ns->ns_hash != NULL); LASSERT(name.name[0] != 0); - l_lock(&ns->ns_lock); - bucket = ns->ns_hash + ldlm_hash_fn(parent, name); - - list_for_each(tmp, bucket) { - res = list_entry(tmp, struct ldlm_resource, lr_hash); - - if (memcmp(&res->lr_name, &name, sizeof(res->lr_name)) == 0) { - ldlm_resource_getref(res); - l_unlock(&ns->ns_lock); - RETURN(res); + spin_lock(&ns->ns_hash_lock); + res = ldlm_resource_find(ns, name, hash); + if (res) { + ldlm_resource_getref(res); + spin_unlock(&ns->ns_hash_lock); + /* synchronize WRT resource creation */ + if (ns->ns_lvbo && ns->ns_lvbo->lvbo_init) { + down(&res->lr_lvb_sem); + up(&res->lr_lvb_sem); } + RETURN(res); } + spin_unlock(&ns->ns_hash_lock); - if (create) { - res = ldlm_resource_add(ns, parent, name, type); - if (res == NULL) - GOTO(out, NULL); - } else { - res = NULL; - } - - if (create && ns->ns_lvbo && ns->ns_lvbo->lvbo_init) { - int rc; - - /* Although this is technically a lock inversion risk (lvb_sem - * should be taken before DLM lock), this resource was just - * created, so nobody else can take the lvb_sem yet. -p */ - mutex_down(&res->lr_lvb_sem); - /* Drop the dlm lock, because lvbo_init can touch the disk */ - l_unlock(&ns->ns_lock); - OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CREATE_RESOURCE, 2); - rc = ns->ns_lvbo->lvbo_init(res); - mutex_up(&res->lr_lvb_sem); - if (rc) - CERROR("lvbo_init failed for resource "LPU64"/"LPU64 - ": rc %d\n", name.name[0], name.name[1], rc); - } else { -out: - l_unlock(&ns->ns_lock); - } + if (create == 0) + RETURN(NULL); + res = ldlm_resource_add(ns, parent, name, hash, type); RETURN(res); } @@ -571,9 +628,45 @@ struct ldlm_resource *ldlm_resource_getref(struct ldlm_resource *res) return res; } +void __ldlm_resource_putref_final(struct ldlm_resource *res) +{ + struct ldlm_namespace *ns = res->lr_namespace; + + LASSERT_SPIN_LOCKED(&ns->ns_hash_lock); + + if (!list_empty(&res->lr_granted)) { + ldlm_resource_dump(D_ERROR, res); + LBUG(); + } + + if (!list_empty(&res->lr_converting)) { + ldlm_resource_dump(D_ERROR, res); + LBUG(); + } + + if (!list_empty(&res->lr_waiting)) { + ldlm_resource_dump(D_ERROR, res); + LBUG(); + } + + if (!list_empty(&res->lr_children)) { + ldlm_resource_dump(D_ERROR, res); + LBUG(); + } + + ns->ns_refcount--; + list_del_init(&res->lr_hash); + list_del_init(&res->lr_childof); + + ns->ns_resources--; + if (ns->ns_resources == 0) + wake_up(&ns->ns_waitq); +} + /* Returns 1 if the resource was freed, 0 if it remains. */ int ldlm_resource_putref(struct ldlm_resource *res) { + struct ldlm_namespace *ns = res->lr_namespace; int rc = 0; ENTRY; @@ -582,53 +675,37 @@ int ldlm_resource_putref(struct ldlm_resource *res) LASSERT(atomic_read(&res->lr_refcount) > 0); LASSERT(atomic_read(&res->lr_refcount) < LI_POISON); - if (atomic_dec_and_test(&res->lr_refcount)) { - struct ldlm_namespace *ns = res->lr_namespace; - ENTRY; - - l_lock(&ns->ns_lock); - - if (atomic_read(&res->lr_refcount) != 0) { - /* We lost the race. */ - l_unlock(&ns->ns_lock); - RETURN(rc); - } - - if (!list_empty(&res->lr_granted)) { - ldlm_resource_dump(D_ERROR, res); - LBUG(); - } + LASSERT(atomic_read(&res->lr_refcount) >= 0); + if (atomic_dec_and_lock(&res->lr_refcount, &ns->ns_hash_lock)) { + __ldlm_resource_putref_final(res); + spin_unlock(&ns->ns_hash_lock); + if (res->lr_lvb_data) + OBD_FREE(res->lr_lvb_data, res->lr_lvb_len); + OBD_SLAB_FREE(res, ldlm_resource_slab, sizeof *res); + rc = 1; + } - if (!list_empty(&res->lr_converting)) { - ldlm_resource_dump(D_ERROR, res); - LBUG(); - } + RETURN(rc); +} - if (!list_empty(&res->lr_waiting)) { - ldlm_resource_dump(D_ERROR, res); - LBUG(); - } +/* Returns 1 if the resource was freed, 0 if it remains. */ +int ldlm_resource_putref_locked(struct ldlm_resource *res) +{ + int rc = 0; + ENTRY; - if (!list_empty(&res->lr_children)) { - ldlm_resource_dump(D_ERROR, res); - LBUG(); - } + CDEBUG(D_INFO, "putref res: %p count: %d\n", res, + atomic_read(&res->lr_refcount) - 1); + LASSERT(atomic_read(&res->lr_refcount) > 0); + LASSERT(atomic_read(&res->lr_refcount) < LI_POISON); - list_del_init(&res->lr_hash); - list_del_init(&res->lr_childof); + LASSERT(atomic_read(&res->lr_refcount) >= 0); + if (atomic_dec_and_test(&res->lr_refcount)) { + __ldlm_resource_putref_final(res); if (res->lr_lvb_data) OBD_FREE(res->lr_lvb_data, res->lr_lvb_len); - l_unlock(&ns->ns_lock); - OBD_SLAB_FREE(res, ldlm_resource_slab, sizeof *res); - - if (atomic_dec_and_test(&ns->ns_refcount)) { - CDEBUG(D_DLMTRACE, "last ref on ns %s\n", ns->ns_name); - cfs_waitq_signal(&ns->ns_refcount_waitq); - } - rc = 1; - EXIT; } RETURN(rc); @@ -637,7 +714,7 @@ int ldlm_resource_putref(struct ldlm_resource *res) void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head, struct ldlm_lock *lock) { - l_lock(&res->lr_namespace->ns_lock); + check_res_locked(res); ldlm_resource_dump(D_OTHER, res); CDEBUG(D_OTHER, "About to add this lock:\n"); @@ -645,14 +722,12 @@ void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head, if (lock->l_destroyed) { CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n"); - goto out; + return; } LASSERT(list_empty(&lock->l_res_link)); list_add_tail(&lock->l_res_link, head); - out: - l_unlock(&res->lr_namespace->ns_lock); } void ldlm_resource_insert_lock_after(struct ldlm_lock *original, @@ -660,7 +735,7 @@ void ldlm_resource_insert_lock_after(struct ldlm_lock *original, { struct ldlm_resource *res = original->l_resource; - l_lock(&res->lr_namespace->ns_lock); + check_res_locked(res); ldlm_resource_dump(D_OTHER, res); CDEBUG(D_OTHER, "About to insert this lock after %p:\n", original); @@ -674,15 +749,13 @@ void ldlm_resource_insert_lock_after(struct ldlm_lock *original, LASSERT(list_empty(&new->l_res_link)); list_add(&new->l_res_link, &original->l_res_link); - out: - l_unlock(&res->lr_namespace->ns_lock); + out:; } void ldlm_resource_unlink_lock(struct ldlm_lock *lock) { - l_lock(&lock->l_resource->lr_namespace->ns_lock); + check_res_locked(lock->l_resource); list_del_init(&lock->l_res_link); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); } EXPORT_SYMBOL(ldlm_resource_unlink_lock); @@ -711,22 +784,31 @@ void ldlm_namespace_dump(int level, struct ldlm_namespace *ns) { struct list_head *tmp; - CDEBUG(level, "--- Namespace: %s (rc: %d, client: %d)\n", ns->ns_name, - atomic_read(&ns->ns_refcount), ns->ns_client); + CDEBUG(level, "--- Namespace: %s (rc: %d, client: %d)\n", + ns->ns_name, ns->ns_refcount, ns->ns_client); - l_lock(&ns->ns_lock); - if (cfs_time_after(cfs_time_current(), ns->ns_next_dump)) { - list_for_each(tmp, &ns->ns_root_list) { - struct ldlm_resource *res; - res = list_entry(tmp, struct ldlm_resource, lr_childof); + if (cfs_time_before(cfs_time_current(), ns->ns_next_dump)) + return; - /* Once we have resources with children, this should - * really dump them recursively. */ - ldlm_resource_dump(level, res); - } - ns->ns_next_dump = cfs_time_shift(10); + spin_lock(&ns->ns_hash_lock); + tmp = ns->ns_root_list.next; + while (tmp != &ns->ns_root_list) { + struct ldlm_resource *res; + res = list_entry(tmp, struct ldlm_resource, lr_childof); + + ldlm_resource_getref(res); + spin_unlock(&ns->ns_hash_lock); + + lock_res(res); + ldlm_resource_dump(level, res); + unlock_res(res); + + spin_lock(&ns->ns_hash_lock); + tmp = tmp->next; + ldlm_resource_putref_locked(res); } - l_unlock(&ns->ns_lock); + ns->ns_next_dump = cfs_time_shift(10); + spin_unlock(&ns->ns_hash_lock); } void ldlm_resource_dump(int level, struct ldlm_resource *res) diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 0bcdc7c..de1245c 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -784,8 +784,8 @@ static int ll_extent_lock_callback(struct ldlm_lock *lock, ll_pgcache_remove_extent(inode, lsm, lock, stripe); - l_lock(&lock->l_resource->lr_namespace->ns_lock); lov_stripe_lock(lsm); + lock_res_and_lock(lock); kms = ldlm_extent_shift_kms(lock, lsm->lsm_oinfo[stripe].loi_kms); @@ -793,8 +793,8 @@ static int ll_extent_lock_callback(struct ldlm_lock *lock, LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64, lsm->lsm_oinfo[stripe].loi_kms, kms); lsm->lsm_oinfo[stripe].loi_kms = kms; + unlock_res_and_lock(lock); lov_stripe_unlock(lsm); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); //ll_try_done_writing(inode); iput: iput(inode); @@ -840,16 +840,16 @@ int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data) lvb = lock->l_lvb_data; lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size; - l_lock(&lock->l_resource->lr_namespace->ns_lock); LOCK_INODE_MUTEX(inode); + lock_res_and_lock(lock); kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size); kms = ldlm_extent_shift_kms(NULL, kms); if (lsm->lsm_oinfo[stripe].loi_kms != kms) LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64, lsm->lsm_oinfo[stripe].loi_kms, kms); lsm->lsm_oinfo[stripe].loi_kms = kms; + unlock_res_and_lock(lock); UNLOCK_INODE_MUTEX(inode); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); } iput: diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 8126d16..37bcd94 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -1019,7 +1019,8 @@ struct cache_definition ll_cache_definition = { struct inode *ll_inode_from_lock(struct ldlm_lock *lock) { struct inode *inode = NULL; - l_lock(&lock->l_resource->lr_namespace->ns_lock); + /* NOTE: we depend on atomic igrab() -bzzz */ + lock_res_and_lock(lock); if (lock->l_ast_data) { struct ll_inode_info *lli = ll_i2info(lock->l_ast_data); if (lli->lli_inode_magic == LLI_INODE_MAGIC) { @@ -1033,7 +1034,7 @@ struct inode *ll_inode_from_lock(struct ldlm_lock *lock) inode = NULL; } } - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + unlock_res_and_lock(lock); return inode; } diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index 73d7747..3f40342 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -122,7 +122,7 @@ void mdc_set_lock_data(__u64 *l, void *data) lock = ldlm_handle2lock(lockh); LASSERT(lock != NULL); - l_lock(&lock->l_resource->lr_namespace->ns_lock); + lock_res_and_lock(lock); #ifdef __KERNEL__ if (lock->l_ast_data && lock->l_ast_data != data) { struct inode *new_inode = data; @@ -136,7 +136,7 @@ void mdc_set_lock_data(__u64 *l, void *data) } #endif lock->l_ast_data = data; - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + unlock_res_and_lock(lock); LDLM_LOCK_PUT(lock); EXIT; diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 9eb7dc7..452aba5 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -174,7 +174,7 @@ struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid, struct mds_obd *mds = &obd->u.mds; struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de; struct ldlm_res_id res_id = { .name = {0} }; - int flags = 0, rc; + int flags = LDLM_FL_ATOMIC_CB, rc; ldlm_policy_data_t policy = { .l_inodebits = { lockpart} }; ENTRY; @@ -2244,7 +2244,7 @@ static void fixup_handle_for_resent_req(struct ptlrpc_request *req, int offset, if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)) return; - l_lock(&obd->obd_namespace->ns_lock); + spin_lock(&obd->obd_namespace->ns_hash_lock); list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) { struct ldlm_lock *lock; lock = list_entry(iter, struct ldlm_lock, l_export_chain); @@ -2257,11 +2257,11 @@ static void fixup_handle_for_resent_req(struct ptlrpc_request *req, int offset, lockh->cookie); if (old_lock) *old_lock = LDLM_LOCK_GET(lock); - l_unlock(&obd->obd_namespace->ns_lock); + spin_unlock(&obd->obd_namespace->ns_hash_lock); return; } } - l_unlock(&obd->obd_namespace->ns_lock); + spin_unlock(&obd->obd_namespace->ns_hash_lock); /* If the xid matches, then we know this is a resent request, * and allow it. (It's probably an OPEN, for which we don't @@ -2451,7 +2451,7 @@ static int mds_intent_policy(struct ldlm_namespace *ns, } /* Fixup the lock to be given to the client */ - l_lock(&new_lock->l_resource->lr_namespace->ns_lock); + lock_res_and_lock(new_lock); new_lock->l_readers = 0; new_lock->l_writers = 0; @@ -2467,8 +2467,8 @@ static int mds_intent_policy(struct ldlm_namespace *ns, new_lock->l_flags &= ~LDLM_FL_LOCAL; + unlock_res_and_lock(new_lock); LDLM_LOCK_PUT(new_lock); - l_unlock(&new_lock->l_resource->lr_namespace->ns_lock); RETURN(ELDLM_LOCK_REPLACED); } diff --git a/lustre/mds/mds_open.c b/lustre/mds/mds_open.c index 0942b54..461a22a 100644 --- a/lustre/mds/mds_open.c +++ b/lustre/mds/mds_open.c @@ -837,7 +837,7 @@ int mds_lock_new_child(struct obd_device *obd, struct inode *inode, { struct ldlm_res_id child_res_id = { .name = { inode->i_ino, 0, 1, 0 } }; struct lustre_handle lockh; - int lock_flags = 0; + int lock_flags = LDLM_FL_ATOMIC_CB; int rc; if (child_lockh == NULL) diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c index 4347ba7..146d13e 100644 --- a/lustre/mds/mds_reint.c +++ b/lustre/mds/mds_reint.c @@ -1039,7 +1039,7 @@ int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id, CDEBUG(D_DLMTRACE, "lock order: "LPU64"/"LPU64"\n", res_id[0]->name[0], res_id[1]->name[0]); - flags = LDLM_FL_LOCAL_ONLY; + flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB; rc = ldlm_cli_enqueue_local(obd->obd_namespace, *res_id[0], LDLM_IBITS, policies[0], lock_modes[0], &flags, ldlm_blocking_ast, @@ -1054,7 +1054,7 @@ int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id, memcpy(handles[1], handles[0], sizeof(*(handles[1]))); ldlm_lock_addref(handles[1], lock_modes[1]); } else if (res_id[1]->name[0] != 0) { - flags = LDLM_FL_LOCAL_ONLY; + flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB; rc = ldlm_cli_enqueue_local(obd->obd_namespace, *res_id[1], LDLM_IBITS, policies[1], lock_modes[1], &flags, @@ -1149,7 +1149,7 @@ int enqueue_4ordered_locks(struct obd_device *obd,struct ldlm_res_id *p1_res_id, /* XXX we could send ASTs on all these locks first before blocking? */ for (i = 0; i < 4; i++) { - flags = 0; + flags = LDLM_FL_ATOMIC_CB; if (res_id[i]->name[0] == 0) break; if (i && res_eq(res_id[i], res_id[i-1])) { @@ -1241,7 +1241,7 @@ static int mds_verify_child(struct obd_device *obd, *dchildp = dchild = vchild; if (dchild->d_inode) { - int flags = 0; + int flags = LDLM_FL_ATOMIC_CB; child_res_id->name[0] = dchild->d_inode->i_ino; child_res_id->name[1] = dchild->d_inode->i_generation; diff --git a/lustre/obdclass/genops.c b/lustre/obdclass/genops.c index bd36253..9808155 100644 --- a/lustre/obdclass/genops.c +++ b/lustre/obdclass/genops.c @@ -525,6 +525,7 @@ struct obd_export *class_new_export(struct obd_device *obd, CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies); /* XXX this should be in LDLM init */ CFS_INIT_LIST_HEAD(&export->exp_ldlm_data.led_held_locks); + spin_lock_init(&export->exp_ldlm_data.led_lock); CFS_INIT_LIST_HEAD(&export->exp_handle.h_link); class_handle_hash(&export->exp_handle, export_handle_addref); diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index ad30426..e9f84fb 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -1165,25 +1165,25 @@ static int filter_intent_policy(struct ldlm_namespace *ns, lock->l_req_mode = LCK_PR; LASSERT(ns == res->lr_namespace); - l_lock(&ns->ns_lock); - - res->lr_tmp = &rpc_list; - rc = policy(lock, &tmpflags, 0, &err); - res->lr_tmp = NULL; + lock_res(res); + rc = policy(lock, &tmpflags, 0, &err, &rpc_list); + check_res_locked(res); /* FIXME: we should change the policy function slightly, to not make * this list at all, since we just turn around and free it */ while (!list_empty(&rpc_list)) { - struct ldlm_ast_work *w = - list_entry(rpc_list.next, struct ldlm_ast_work, w_list); - list_del(&w->w_list); - LDLM_LOCK_PUT(w->w_lock); - OBD_FREE(w, sizeof(*w)); + struct ldlm_lock *wlock = + list_entry(rpc_list.next, struct ldlm_lock, l_cp_ast); + LASSERT((lock->l_flags & LDLM_FL_AST_SENT) == 0); + LASSERT(lock->l_flags & LDLM_FL_CP_REQD); + lock->l_flags &= ~LDLM_FL_CP_REQD; + list_del_init(&wlock->l_cp_ast); + LDLM_LOCK_PUT(wlock); } /* The lock met with no resistance; we're finished. */ if (rc == LDLM_ITER_CONTINUE) { - l_unlock(&ns->ns_lock); + unlock_res(res); /* * do not grant locks to the liblustre clients: they cannot * handle ASTs robustly. @@ -1199,11 +1199,9 @@ static int filter_intent_policy(struct ldlm_namespace *ns, * policy nicely created a list of all PW locks for us. We will choose * the highest of those which are larger than the size in the LVB, if * any, and perform a glimpse callback. */ - down(&res->lr_lvb_sem); res_lvb = res->lr_lvb_data; LASSERT(res_lvb != NULL); *reply_lvb = *res_lvb; - up(&res->lr_lvb_sem); list_for_each(tmp, &res->lr_granted) { struct ldlm_lock *tmplock = @@ -1242,7 +1240,7 @@ static int filter_intent_policy(struct ldlm_namespace *ns, LDLM_LOCK_PUT(l); l = LDLM_LOCK_GET(tmplock); } - l_unlock(&ns->ns_lock); + unlock_res(res); /* There were no PW locks beyond the size in the LVB; finished. */ if (l == NULL) { @@ -1287,9 +1285,9 @@ static int filter_intent_policy(struct ldlm_namespace *ns, if (rc != 0 && ns->ns_lvbo && ns->ns_lvbo->lvbo_update) ns->ns_lvbo->lvbo_update(res, NULL, 0, 1); - down(&res->lr_lvb_sem); + lock_res(res); *reply_lvb = *res_lvb; - up(&res->lr_lvb_sem); + unlock_res(res); out: LDLM_LOCK_PUT(l); diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 50ae0bb..5d86f26 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -2726,7 +2726,7 @@ static void osc_set_data_with_check(struct lustre_handle *lockh, void *data, CERROR("lockh %p, data %p - client evicted?\n", lockh, data); return; } - l_lock(&lock->l_resource->lr_namespace->ns_lock); + lock_res_and_lock(lock); #ifdef __KERNEL__ #ifdef __LINUX__ /* Liang XXX: Darwin and Winnt checking should be added */ @@ -2746,7 +2746,7 @@ static void osc_set_data_with_check(struct lustre_handle *lockh, void *data, #endif lock->l_ast_data = data; lock->l_flags |= (flags & LDLM_FL_NO_LRU); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + unlock_res_and_lock(lock); LDLM_LOCK_PUT(lock); } diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index 221a7ae..37a0d50 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -515,20 +515,20 @@ static int ost_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, } /* XXX layering violation! -phil */ - l_lock(&lock->l_resource->lr_namespace->ns_lock); + lock_res_and_lock(lock); /* Get this: if mds_blocking_ast is racing with mds_intent_policy, * such that mds_blocking_ast is called just before l_i_p takes the * ns_lock, then by the time we get the lock, we might not be the * correct blocking function anymore. So check, and return early, if * so. */ if (lock->l_blocking_ast != ost_blocking_ast) { - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + unlock_res_and_lock(lock); RETURN(0); } lock->l_flags |= LDLM_FL_CBPENDING; do_ast = (!lock->l_readers && !lock->l_writers); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + unlock_res_and_lock(lock); if (do_ast) { struct lustre_handle lockh;