X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fldlm%2Fldlm_lock.c;h=4c124d44a47bbfa6c1134a862c0ecd7ccaaaf97d;hp=50d4504a6141e07470dbe82b143348d8439bda7b;hb=f0d608786a27dfb8dddf06d6b086b491749557f1;hpb=cefa8cda2ba2d288ccaa4ec077a6c627592503ea diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 50d4504..4c124d4 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -26,8 +26,11 @@ * GPL HEADER END */ /* - * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * + * Copyright (c) 2011 Whamcloud, Inc. + * */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -43,9 +46,7 @@ #ifdef __KERNEL__ # include -# ifndef HAVE_VFS_INTENT_PATCHES # include -# endif #else # include #endif @@ -73,6 +74,48 @@ char *ldlm_typename[] = { [LDLM_IBITS] "IBT", }; +static ldlm_policy_wire_to_local_t ldlm_policy_wire_to_local[] = { + [LDLM_PLAIN - LDLM_MIN_TYPE] ldlm_plain_policy_wire_to_local, + [LDLM_EXTENT - LDLM_MIN_TYPE] ldlm_extent_policy_wire_to_local, + [LDLM_FLOCK - LDLM_MIN_TYPE] ldlm_flock_policy_wire_to_local, + [LDLM_IBITS - LDLM_MIN_TYPE] ldlm_ibits_policy_wire_to_local, +}; + +static ldlm_policy_local_to_wire_t ldlm_policy_local_to_wire[] = { + [LDLM_PLAIN - LDLM_MIN_TYPE] ldlm_plain_policy_local_to_wire, + [LDLM_EXTENT - LDLM_MIN_TYPE] ldlm_extent_policy_local_to_wire, + [LDLM_FLOCK - LDLM_MIN_TYPE] ldlm_flock_policy_local_to_wire, + [LDLM_IBITS - LDLM_MIN_TYPE] ldlm_ibits_policy_local_to_wire, +}; + +/** + * Converts lock policy from local format to on the wire lock_desc format + */ +void ldlm_convert_policy_to_wire(ldlm_type_t type, + const ldlm_policy_data_t *lpolicy, + ldlm_wire_policy_data_t *wpolicy) +{ + ldlm_policy_local_to_wire_t convert; + + convert = ldlm_policy_local_to_wire[type - LDLM_MIN_TYPE]; + + convert(lpolicy, wpolicy); +} + +/** + * Converts lock policy from on the wire lock_desc format to local format + */ +void ldlm_convert_policy_to_local(ldlm_type_t type, + const ldlm_wire_policy_data_t *wpolicy, + ldlm_policy_data_t *lpolicy) +{ + ldlm_policy_wire_to_local_t convert; + + convert = ldlm_policy_wire_to_local[type - LDLM_MIN_TYPE]; + + convert(wpolicy, lpolicy); +} + char *ldlm_it2str(int it) { switch (it) { @@ -92,6 +135,8 @@ char *ldlm_it2str(int it) return "unlink"; case IT_GETXATTR: return "getxattr"; + case IT_LAYOUT: + return "layout"; default: CERROR("Unknown intent %d\n", it); return "UNKNOWN"; @@ -132,7 +177,7 @@ void ldlm_register_intent(struct ldlm_namespace *ns, ldlm_res_policy arg) */ struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock) { - atomic_inc(&lock->l_refc); + cfs_atomic_inc(&lock->l_refc); return lock; } @@ -147,8 +192,8 @@ void ldlm_lock_put(struct ldlm_lock *lock) ENTRY; LASSERT(lock->l_resource != LP_POISON); - LASSERT(atomic_read(&lock->l_refc) > 0); - if (atomic_dec_and_test(&lock->l_refc)) { + LASSERT(cfs_atomic_read(&lock->l_refc) > 0); + if (cfs_atomic_dec_and_test(&lock->l_refc)) { struct ldlm_resource *res; LDLM_DEBUG(lock, @@ -156,15 +201,16 @@ void ldlm_lock_put(struct ldlm_lock *lock) res = lock->l_resource; LASSERT(lock->l_destroyed); - LASSERT(list_empty(&lock->l_res_link)); - LASSERT(list_empty(&lock->l_pending_chain)); + LASSERT(cfs_list_empty(&lock->l_res_link)); + LASSERT(cfs_list_empty(&lock->l_pending_chain)); - atomic_dec(&res->lr_namespace->ns_locks); + lprocfs_counter_decr(ldlm_res_to_ns(res)->ns_stats, + LDLM_NSS_LOCKS); lu_ref_del(&res->lr_reference, "lock", lock); ldlm_resource_putref(res); lock->l_resource = NULL; if (lock->l_export) { - class_export_put(lock->l_export); + class_export_lock_put(lock->l_export, lock); lock->l_export = NULL; } @@ -183,12 +229,15 @@ void ldlm_lock_put(struct ldlm_lock *lock) int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock) { int rc = 0; - if (!list_empty(&lock->l_lru)) { - struct ldlm_namespace *ns = lock->l_resource->lr_namespace; + if (!cfs_list_empty(&lock->l_lru)) { + struct ldlm_namespace *ns = ldlm_lock_to_ns(lock); + LASSERT(lock->l_resource->lr_type != LDLM_FLOCK); - list_del_init(&lock->l_lru); + cfs_list_del_init(&lock->l_lru); + if (lock->l_flags & LDLM_FL_SKIPPED) + lock->l_flags &= ~LDLM_FL_SKIPPED; + LASSERT(ns->ns_nr_unused > 0); ns->ns_nr_unused--; - LASSERT(ns->ns_nr_unused >= 0); rc = 1; } return rc; @@ -196,47 +245,62 @@ int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock) int ldlm_lock_remove_from_lru(struct ldlm_lock *lock) { - struct ldlm_namespace *ns = lock->l_resource->lr_namespace; + struct ldlm_namespace *ns = ldlm_lock_to_ns(lock); int rc; + ENTRY; - spin_lock(&ns->ns_unused_lock); + if (lock->l_ns_srv) { + LASSERT(cfs_list_empty(&lock->l_lru)); + RETURN(0); + } + + cfs_spin_lock(&ns->ns_lock); rc = ldlm_lock_remove_from_lru_nolock(lock); - spin_unlock(&ns->ns_unused_lock); + cfs_spin_unlock(&ns->ns_lock); EXIT; return rc; } void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock) { - struct ldlm_namespace *ns = lock->l_resource->lr_namespace; + struct ldlm_namespace *ns = ldlm_lock_to_ns(lock); + lock->l_last_used = cfs_time_current(); - LASSERT(list_empty(&lock->l_lru)); + LASSERT(cfs_list_empty(&lock->l_lru)); LASSERT(lock->l_resource->lr_type != LDLM_FLOCK); - list_add_tail(&lock->l_lru, &ns->ns_unused_list); + cfs_list_add_tail(&lock->l_lru, &ns->ns_unused_list); LASSERT(ns->ns_nr_unused >= 0); ns->ns_nr_unused++; } void ldlm_lock_add_to_lru(struct ldlm_lock *lock) { - struct ldlm_namespace *ns = lock->l_resource->lr_namespace; + struct ldlm_namespace *ns = ldlm_lock_to_ns(lock); + ENTRY; - spin_lock(&ns->ns_unused_lock); + cfs_spin_lock(&ns->ns_lock); ldlm_lock_add_to_lru_nolock(lock); - spin_unlock(&ns->ns_unused_lock); + cfs_spin_unlock(&ns->ns_lock); EXIT; } void ldlm_lock_touch_in_lru(struct ldlm_lock *lock) { - struct ldlm_namespace *ns = lock->l_resource->lr_namespace; + struct ldlm_namespace *ns = ldlm_lock_to_ns(lock); + ENTRY; - spin_lock(&ns->ns_unused_lock); - if (!list_empty(&lock->l_lru)) { + if (lock->l_ns_srv) { + LASSERT(cfs_list_empty(&lock->l_lru)); + EXIT; + return; + } + + cfs_spin_lock(&ns->ns_lock); + if (!cfs_list_empty(&lock->l_lru)) { ldlm_lock_remove_from_lru_nolock(lock); ldlm_lock_add_to_lru_nolock(lock); } - spin_unlock(&ns->ns_unused_lock); + cfs_spin_unlock(&ns->ns_lock); EXIT; } @@ -255,23 +319,23 @@ int ldlm_lock_destroy_internal(struct ldlm_lock *lock) LBUG(); } - if (!list_empty(&lock->l_res_link)) { + if (!cfs_list_empty(&lock->l_res_link)) { LDLM_ERROR(lock, "lock still on resource"); ldlm_lock_dump(D_ERROR, lock, 0); LBUG(); } if (lock->l_destroyed) { - LASSERT(list_empty(&lock->l_lru)); + LASSERT(cfs_list_empty(&lock->l_lru)); EXIT; return 0; } lock->l_destroyed = 1; if (lock->l_export && lock->l_export->exp_lock_hash && - !hlist_unhashed(&lock->l_exp_hash)) - lustre_hash_del(lock->l_export->exp_lock_hash, - &lock->l_remote_handle, &lock->l_exp_hash); + !cfs_hlist_unhashed(&lock->l_exp_hash)) + cfs_hash_del(lock->l_export->exp_lock_hash, + &lock->l_remote_handle, &lock->l_exp_hash); ldlm_lock_remove_from_lru(lock); class_handle_unhash(&lock->l_handle); @@ -327,8 +391,7 @@ static void lock_handle_addref(void *lock) /* * usage: pass in a resource on which you have done ldlm_resource_get - * pass in a parent lock on which you have done a ldlm_lock_get - * after return, ldlm_*_put the resource and parent + * new lock will take over the refcount. * returns: lock with refcount 2 - one for current caller and one for remote */ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource) @@ -339,15 +402,15 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource) if (resource == NULL) LBUG(); - OBD_SLAB_ALLOC(lock, ldlm_lock_slab, CFS_ALLOC_IO, sizeof(*lock)); + OBD_SLAB_ALLOC_PTR_GFP(lock, ldlm_lock_slab, CFS_ALLOC_IO); if (lock == NULL) RETURN(NULL); - spin_lock_init(&lock->l_lock); - lock->l_resource = ldlm_resource_getref(resource); + cfs_spin_lock_init(&lock->l_lock); + lock->l_resource = resource; lu_ref_add(&resource->lr_reference, "lock", lock); - atomic_set(&lock->l_refc, 2); + cfs_atomic_set(&lock->l_refc, 2); CFS_INIT_LIST_HEAD(&lock->l_res_link); CFS_INIT_LIST_HEAD(&lock->l_lru); CFS_INIT_LIST_HEAD(&lock->l_pending_chain); @@ -360,15 +423,21 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource) CFS_INIT_LIST_HEAD(&lock->l_sl_policy); CFS_INIT_HLIST_NODE(&lock->l_exp_hash); - atomic_inc(&resource->lr_namespace->ns_locks); + lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats, + LDLM_NSS_LOCKS); CFS_INIT_LIST_HEAD(&lock->l_handle.h_link); class_handle_hash(&lock->l_handle, lock_handle_addref); - CFS_INIT_LIST_HEAD(&lock->l_extents_list); - spin_lock_init(&lock->l_extents_list_lock); - CFS_INIT_LIST_HEAD(&lock->l_cache_locks_list); lu_ref_init(&lock->l_reference); lu_ref_add(&lock->l_reference, "hash", lock); + lock->l_callback_timeout = 0; + +#if LUSTRE_TRACKS_LOCK_EXP_REFS + CFS_INIT_LIST_HEAD(&lock->l_exp_refs_link); + lock->l_exp_refs_nr = 0; + lock->l_exp_refs_target = NULL; +#endif + CFS_INIT_LIST_HEAD(&lock->l_exp_list); RETURN(lock); } @@ -394,22 +463,23 @@ int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock, LASSERT(new_resid->name[0] != 0); /* This function assumes that the lock isn't on any lists */ - LASSERT(list_empty(&lock->l_res_link)); + LASSERT(cfs_list_empty(&lock->l_res_link)); type = oldres->lr_type; unlock_res_and_lock(lock); newres = ldlm_resource_get(ns, NULL, new_resid, type, 1); - lu_ref_add(&newres->lr_reference, "lock", lock); if (newres == NULL) RETURN(-ENOMEM); + + lu_ref_add(&newres->lr_reference, "lock", lock); /* * To flip the lock from the old to the new resource, lock, oldres and * newres have to be locked. Resource spin-locks are nested within * lock->l_lock, and are taken in the memory address order to avoid * dead-locks. */ - spin_lock(&lock->l_lock); + cfs_spin_lock(&lock->l_lock); oldres = lock->l_resource; if (oldres < newres) { lock_res(oldres); @@ -447,8 +517,7 @@ void ldlm_lock2handle(const struct ldlm_lock *lock, struct lustre_handle *lockh) struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *handle, int flags) { - struct ldlm_namespace *ns; - struct ldlm_lock *lock, *retval = NULL; + struct ldlm_lock *lock; ENTRY; LASSERT(handle); @@ -457,36 +526,36 @@ struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *handle, if (lock == NULL) RETURN(NULL); - LASSERT(lock->l_resource != NULL); - ns = lock->l_resource->lr_namespace; - LASSERT(ns != NULL); + /* It's unlikely but possible that someone marked the lock as + * destroyed after we did handle2object on it */ + if (flags == 0 && !lock->l_destroyed) { + lu_ref_add(&lock->l_reference, "handle", cfs_current()); + RETURN(lock); + } - lu_ref_add_atomic(&lock->l_reference, "handle", cfs_current()); lock_res_and_lock(lock); - /* It's unlikely but possible that someone marked the lock as - * destroyed after we did handle2object on it */ - if (lock->l_destroyed) { + LASSERT(lock->l_resource != NULL); + + lu_ref_add_atomic(&lock->l_reference, "handle", cfs_current()); + if (unlikely(lock->l_destroyed)) { unlock_res_and_lock(lock); CDEBUG(D_INFO, "lock already destroyed: lock %p\n", lock); LDLM_LOCK_PUT(lock); - GOTO(out, retval); + RETURN(NULL); } if (flags && (lock->l_flags & flags)) { unlock_res_and_lock(lock); LDLM_LOCK_PUT(lock); - GOTO(out, retval); + RETURN(NULL); } if (flags) lock->l_flags |= flags; unlock_res_and_lock(lock); - retval = lock; - EXIT; - out: - return retval; + RETURN(lock); } void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc) @@ -497,17 +566,18 @@ void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc) */ if ((lock->l_resource->lr_type == LDLM_IBITS) && (exp && !(exp->exp_connect_flags & OBD_CONNECT_IBITS))) { - struct ldlm_resource res = *lock->l_resource; - /* Make sure all the right bits are set in this lock we are going to pass to client */ LASSERTF(lock->l_policy_data.l_inodebits.bits == - (MDS_INODELOCK_LOOKUP|MDS_INODELOCK_UPDATE), + (MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE | + MDS_INODELOCK_LAYOUT), "Inappropriate inode lock bits during " "conversion " LPU64 "\n", lock->l_policy_data.l_inodebits.bits); - res.lr_type = LDLM_PLAIN; - ldlm_res2desc(&res, &desc->l_resource); + + ldlm_res2desc(lock->l_resource, &desc->l_resource); + desc->l_resource.lr_type = LDLM_PLAIN; + /* Convert "new" lock mode to something old client can understand */ if ((lock->l_req_mode == LCK_CR) || @@ -531,12 +601,14 @@ void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc) ldlm_res2desc(lock->l_resource, &desc->l_resource); desc->l_req_mode = lock->l_req_mode; desc->l_granted_mode = lock->l_granted_mode; - desc->l_policy_data = lock->l_policy_data; + ldlm_convert_policy_to_wire(lock->l_resource->lr_type, + &lock->l_policy_data, + &desc->l_policy_data); } } void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new, - struct list_head *work_list) + cfs_list_t *work_list) { if ((lock->l_flags & LDLM_FL_AST_SENT) == 0) { LDLM_DEBUG(lock, "lock incompatible; sending blocking AST."); @@ -545,28 +617,28 @@ void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new, * discard dirty data, rather than writing back. */ if (new->l_flags & LDLM_AST_DISCARD_DATA) lock->l_flags |= LDLM_FL_DISCARD_DATA; - LASSERT(list_empty(&lock->l_bl_ast)); - list_add(&lock->l_bl_ast, work_list); + LASSERT(cfs_list_empty(&lock->l_bl_ast)); + cfs_list_add(&lock->l_bl_ast, work_list); LDLM_LOCK_GET(lock); LASSERT(lock->l_blocking_lock == NULL); lock->l_blocking_lock = LDLM_LOCK_GET(new); } } -void ldlm_add_cp_work_item(struct ldlm_lock *lock, struct list_head *work_list) +void ldlm_add_cp_work_item(struct ldlm_lock *lock, cfs_list_t *work_list) { if ((lock->l_flags & LDLM_FL_CP_REQD) == 0) { lock->l_flags |= LDLM_FL_CP_REQD; LDLM_DEBUG(lock, "lock granted; sending completion AST."); - LASSERT(list_empty(&lock->l_cp_ast)); - list_add(&lock->l_cp_ast, work_list); + LASSERT(cfs_list_empty(&lock->l_cp_ast)); + cfs_list_add(&lock->l_cp_ast, work_list); LDLM_LOCK_GET(lock); } } /* must be called with lr_lock held */ void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new, - struct list_head *work_list) + cfs_list_t *work_list) { ENTRY; check_res_locked(lock->l_resource); @@ -668,7 +740,7 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) lock_res_and_lock(lock); - ns = lock->l_resource->lr_namespace; + ns = ldlm_lock_to_ns(lock); ldlm_lock_decref_internal_nolock(lock, mode); @@ -684,7 +756,7 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) (lock->l_flags & LDLM_FL_CBPENDING)) { /* If we received a blocked AST and this was the last reference, * run the callback. */ - if (ns_is_server(ns) && lock->l_export) + if (lock->l_ns_srv && lock->l_export) CERROR("FL_CBPENDING set on non-local lock--just a " "warning\n"); @@ -693,16 +765,28 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) LDLM_LOCK_GET(lock); /* dropped by bl thread */ ldlm_lock_remove_from_lru(lock); unlock_res_and_lock(lock); + + if (lock->l_flags & LDLM_FL_FAIL_LOC) + OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE); + if ((lock->l_flags & LDLM_FL_ATOMIC_CB) || ldlm_bl_to_thread_lock(ns, NULL, lock) != 0) ldlm_handle_bl_callback(ns, NULL, lock); } else if (ns_is_client(ns) && !lock->l_readers && !lock->l_writers && + !(lock->l_flags & LDLM_FL_NO_LRU) && !(lock->l_flags & LDLM_FL_BL_AST)) { + + LDLM_DEBUG(lock, "add lock into lru list"); + /* If this is a client-side namespace and this was the last * reference, put it on the LRU. */ ldlm_lock_add_to_lru(lock); unlock_res_and_lock(lock); + + if (lock->l_flags & LDLM_FL_FAIL_LOC) + OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE); + /* Call ldlm_cancel_lru() only if EARLY_CANCEL and LRU RESIZE * are not supported by the server, otherwise, it is done on * enqueue. */ @@ -710,6 +794,7 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) !ns_connect_lru_resize(ns)) ldlm_cancel_lru(ns, 0, LDLM_ASYNC, 0); } else { + LDLM_DEBUG(lock, "do not add lock into lru list"); unlock_res_and_lock(lock); } @@ -742,9 +827,9 @@ void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode) } struct sl_insert_point { - struct list_head *res_link; - struct list_head *mode_link; - struct list_head *policy_link; + cfs_list_t *res_link; + cfs_list_t *mode_link; + cfs_list_t *policy_link; }; /* @@ -761,19 +846,19 @@ struct sl_insert_point { * NOTE: called by * - ldlm_grant_lock_with_skiplist */ -static void search_granted_lock(struct list_head *queue, +static void search_granted_lock(cfs_list_t *queue, struct ldlm_lock *req, struct sl_insert_point *prev) { - struct list_head *tmp; + cfs_list_t *tmp; struct ldlm_lock *lock, *mode_end, *policy_end; ENTRY; - list_for_each(tmp, queue) { - lock = list_entry(tmp, struct ldlm_lock, l_res_link); + cfs_list_for_each(tmp, queue) { + lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link); - mode_end = list_entry(lock->l_sl_mode.prev, struct ldlm_lock, - l_sl_mode); + mode_end = cfs_list_entry(lock->l_sl_mode.prev, + struct ldlm_lock, l_sl_mode); if (lock->l_req_mode != req->l_req_mode) { /* jump to last lock of mode group */ @@ -791,9 +876,10 @@ static void search_granted_lock(struct list_head *queue, return; } else if (lock->l_resource->lr_type == LDLM_IBITS) { for (;;) { - policy_end = list_entry(lock->l_sl_policy.prev, - struct ldlm_lock, - l_sl_policy); + policy_end = + cfs_list_entry(lock->l_sl_policy.prev, + struct ldlm_lock, + l_sl_policy); if (lock->l_policy_data.l_inodebits.bits == req->l_policy_data.l_inodebits.bits) { @@ -815,8 +901,8 @@ static void search_granted_lock(struct list_head *queue, /* go to next policy group within mode group */ tmp = policy_end->l_res_link.next; - lock = list_entry(tmp, struct ldlm_lock, - l_res_link); + lock = cfs_list_entry(tmp, struct ldlm_lock, + l_res_link); } /* loop over policy groups within the mode group */ /* insert point is last lock of the mode group, @@ -858,13 +944,13 @@ static void ldlm_granted_list_add_lock(struct ldlm_lock *lock, return; } - LASSERT(list_empty(&lock->l_res_link)); - LASSERT(list_empty(&lock->l_sl_mode)); - LASSERT(list_empty(&lock->l_sl_policy)); + LASSERT(cfs_list_empty(&lock->l_res_link)); + LASSERT(cfs_list_empty(&lock->l_sl_mode)); + LASSERT(cfs_list_empty(&lock->l_sl_policy)); - list_add(&lock->l_res_link, prev->res_link); - list_add(&lock->l_sl_mode, prev->mode_link); - list_add(&lock->l_sl_policy, prev->policy_link); + cfs_list_add(&lock->l_res_link, prev->res_link); + cfs_list_add(&lock->l_sl_mode, prev->mode_link); + cfs_list_add(&lock->l_sl_policy, prev->policy_link); EXIT; } @@ -888,7 +974,7 @@ static void ldlm_grant_lock_with_skiplist(struct ldlm_lock *lock) * * must be called with lr_lock held */ -void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list) +void ldlm_grant_lock(struct ldlm_lock *lock, cfs_list_t *work_list) { struct ldlm_resource *res = lock->l_resource; ENTRY; @@ -909,25 +995,25 @@ void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list) if (work_list && lock->l_completion_ast != NULL) ldlm_add_ast_work_item(lock, NULL, work_list); - ldlm_pool_add(&res->lr_namespace->ns_pool, lock); + ldlm_pool_add(&ldlm_res_to_ns(res)->ns_pool, lock); EXIT; } /* returns a referenced lock or NULL. See the flag descriptions below, in the * comment above ldlm_lock_match */ -static struct ldlm_lock *search_queue(struct list_head *queue, +static struct ldlm_lock *search_queue(cfs_list_t *queue, ldlm_mode_t *mode, ldlm_policy_data_t *policy, struct ldlm_lock *old_lock, int flags, int unref) { struct ldlm_lock *lock; - struct list_head *tmp; + cfs_list_t *tmp; - list_for_each(tmp, queue) { + cfs_list_for_each(tmp, queue) { ldlm_mode_t match; - lock = list_entry(tmp, struct ldlm_lock, l_res_link); + lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link); if (lock == old_lock) break; @@ -969,7 +1055,8 @@ static struct ldlm_lock *search_queue(struct list_head *queue, continue; if (!unref && - (lock->l_destroyed || (lock->l_flags & LDLM_FL_FAILED))) + (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED || + lock->l_failed)) continue; if ((flags & LDLM_FL_LOCAL_ONLY) && @@ -989,11 +1076,33 @@ static struct ldlm_lock *search_queue(struct list_head *queue, return NULL; } -void ldlm_lock_allow_match(struct ldlm_lock *lock) +void ldlm_lock_fail_match_locked(struct ldlm_lock *lock) +{ + if (!lock->l_failed) { + lock->l_failed = 1; + cfs_waitq_broadcast(&lock->l_waitq); + } +} +EXPORT_SYMBOL(ldlm_lock_fail_match_locked); + +void ldlm_lock_fail_match(struct ldlm_lock *lock) { lock_res_and_lock(lock); + ldlm_lock_fail_match_locked(lock); + unlock_res_and_lock(lock); +} +EXPORT_SYMBOL(ldlm_lock_fail_match); + +void ldlm_lock_allow_match_locked(struct ldlm_lock *lock) +{ lock->l_flags |= LDLM_FL_LVB_READY; - cfs_waitq_signal(&lock->l_waitq); + cfs_waitq_broadcast(&lock->l_waitq); +} + +void ldlm_lock_allow_match(struct ldlm_lock *lock) +{ + lock_res_and_lock(lock); + ldlm_lock_allow_match_locked(lock); unlock_res_and_lock(lock); } @@ -1035,7 +1144,7 @@ ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags, old_lock = ldlm_handle2lock(lockh); LASSERT(old_lock); - ns = old_lock->l_resource->lr_namespace; + ns = ldlm_lock_to_ns(old_lock); res_id = &old_lock->l_resource->lr_name; type = old_lock->l_resource->lr_type; mode = old_lock->l_req_mode; @@ -1096,7 +1205,16 @@ ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags, /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */ l_wait_event(lock->l_waitq, - (lock->l_flags & LDLM_FL_LVB_READY), &lwi); + lock->l_flags & LDLM_FL_LVB_READY || + lock->l_failed, + &lwi); + if (!(lock->l_flags & LDLM_FL_LVB_READY)) { + if (flags & LDLM_FL_TEST_LOCK) + LDLM_LOCK_RELEASE(lock); + else + ldlm_lock_decref_internal(lock, mode); + rc = 0; + } } } out2: @@ -1125,7 +1243,7 @@ ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags, type, mode, res_id->name[0], res_id->name[1], (type == LDLM_PLAIN || type == LDLM_IBITS) ? res_id->name[2] :policy->l_extent.start, - (type == LDLM_PLAIN || type == LDLM_IBITS) ? + (type == LDLM_PLAIN || type == LDLM_IBITS) ? res_id->name[3] : policy->l_extent.end); } if (old_lock) @@ -1134,6 +1252,41 @@ ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags, return rc ? mode : 0; } +ldlm_mode_t ldlm_revalidate_lock_handle(struct lustre_handle *lockh, + __u64 *bits) +{ + struct ldlm_lock *lock; + ldlm_mode_t mode = 0; + ENTRY; + + lock = ldlm_handle2lock(lockh); + if (lock != NULL) { + lock_res_and_lock(lock); + if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED || + lock->l_failed) + GOTO(out, mode); + + if (lock->l_flags & LDLM_FL_CBPENDING && + lock->l_readers == 0 && lock->l_writers == 0) + GOTO(out, mode); + + if (bits) + *bits = lock->l_policy_data.l_inodebits.bits; + mode = lock->l_granted_mode; + ldlm_lock_addref_internal_nolock(lock, mode); + } + + EXIT; + +out: + if (lock != NULL) { + unlock_res_and_lock(lock); + LDLM_LOCK_PUT(lock); + } + return mode; +} +EXPORT_SYMBOL(ldlm_revalidate_lock_handle); + /* Returns a referenced lock */ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns, const struct ldlm_res_id *res_id, @@ -1151,7 +1304,6 @@ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns, RETURN(NULL); lock = ldlm_lock_new(res); - ldlm_resource_putref(res); if (lock == NULL) RETURN(NULL); @@ -1159,6 +1311,7 @@ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns, lock->l_req_mode = mode; lock->l_ast_data = data; lock->l_pid = cfs_curproc_pid(); + lock->l_ns_srv = !!ns_is_server(ns); if (cbs) { lock->l_blocking_ast = cbs->lcs_blocking; lock->l_completion_ast = cbs->lcs_completion; @@ -1180,13 +1333,14 @@ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns, GOTO(out, 0); } + if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_NEW_LOCK)) + GOTO(out, 0); + RETURN(lock); out: - if (lock->l_lvb_data) - OBD_FREE(lock->l_lvb_data, lvb_len); - ldlm_interval_free(ldlm_interval_detach(lock)); - OBD_SLAB_FREE(lock, ldlm_lock_slab, sizeof(*lock)); + ldlm_lock_destroy(lock); + LDLM_LOCK_RELEASE(lock); return NULL; } @@ -1196,13 +1350,13 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns, { struct ldlm_lock *lock = *lockp; struct ldlm_resource *res = lock->l_resource; - int local = ns_is_client(res->lr_namespace); + int local = ns_is_client(ldlm_res_to_ns(res)); ldlm_processing_policy policy; ldlm_error_t rc = ELDLM_OK; struct ldlm_interval *node = NULL; ENTRY; - do_gettimeofday(&lock->l_enqueued_time); + lock->l_last_activity = cfs_time_current_sec(); /* policies are not executed on the client or during replay */ if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT && !local && ns->ns_policy) { @@ -1231,8 +1385,7 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns, * have to allocate the interval node early otherwise we can't regrant * this lock in the future. - jay */ if (!local && (*flags & LDLM_FL_REPLAY) && res->lr_type == LDLM_EXTENT) - OBD_SLAB_ALLOC(node, ldlm_interval_slab, CFS_ALLOC_IO, - sizeof(*node)); + OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, CFS_ALLOC_IO); lock_res_and_lock(lock); if (local && lock->l_req_mode == lock->l_granted_mode) { @@ -1304,10 +1457,10 @@ out: } /* Must be called with namespace taken: queue is waiting or converting. */ -int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue, - struct list_head *work_list) +int ldlm_reprocess_queue(struct ldlm_resource *res, cfs_list_t *queue, + cfs_list_t *work_list) { - struct list_head *tmp, *pos; + cfs_list_t *tmp, *pos; ldlm_processing_policy policy; int flags; int rc = LDLM_ITER_CONTINUE; @@ -1319,9 +1472,9 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue, policy = ldlm_processing_policy_table[res->lr_type]; LASSERT(policy); - list_for_each_safe(tmp, pos, queue) { + cfs_list_for_each_safe(tmp, pos, queue) { struct ldlm_lock *pending; - pending = list_entry(tmp, struct ldlm_lock, l_res_link); + pending = cfs_list_entry(tmp, struct ldlm_lock, l_res_link); CDEBUG(D_INFO, "Reprocessing lock %p\n", pending); @@ -1334,36 +1487,18 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue, RETURN(rc); } -/* Helper function for ldlm_run_ast_work(). - * - * Send an existing rpc set specified by @arg->set and then - * destroy it. Create new one if @do_create flag is set. */ -static void -ldlm_send_and_maybe_create_set(struct ldlm_cb_set_arg *arg, int do_create) -{ - ENTRY; - - ptlrpc_set_wait(arg->set); - if (arg->type == LDLM_BL_CALLBACK) - OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2); - ptlrpc_set_destroy(arg->set); - - if (do_create) - arg->set = ptlrpc_prep_set(); - - EXIT; -} - static int -ldlm_work_bl_ast_lock(struct list_head *tmp, struct ldlm_cb_set_arg *arg) +ldlm_work_bl_ast_lock(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg) { struct ldlm_lock_desc d; - struct ldlm_lock *lock = list_entry(tmp, struct ldlm_lock, l_bl_ast); + struct ldlm_lock *lock = cfs_list_entry(tmp, struct ldlm_lock, + l_bl_ast); + int rc; ENTRY; /* nobody should touch l_bl_ast */ lock_res_and_lock(lock); - list_del_init(&lock->l_bl_ast); + cfs_list_del_init(&lock->l_bl_ast); LASSERT(lock->l_flags & LDLM_FL_AST_SENT); LASSERT(lock->l_bl_ast_run == 0); @@ -1373,19 +1508,19 @@ ldlm_work_bl_ast_lock(struct list_head *tmp, struct ldlm_cb_set_arg *arg) ldlm_lock2desc(lock->l_blocking_lock, &d); - lock->l_blocking_ast(lock, &d, (void *)arg, - LDLM_CB_BLOCKING); + rc = lock->l_blocking_ast(lock, &d, (void *)arg, + LDLM_CB_BLOCKING); LDLM_LOCK_RELEASE(lock->l_blocking_lock); lock->l_blocking_lock = NULL; LDLM_LOCK_RELEASE(lock); - RETURN(1); + RETURN(rc); } static int -ldlm_work_cp_ast_lock(struct list_head *tmp, struct ldlm_cb_set_arg *arg) +ldlm_work_cp_ast_lock(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg) { - struct ldlm_lock *lock = list_entry(tmp, struct ldlm_lock, l_cp_ast); + struct ldlm_lock *lock = cfs_list_entry(tmp, struct ldlm_lock, l_cp_ast); ldlm_completion_callback completion_callback; int rc = 0; ENTRY; @@ -1403,7 +1538,7 @@ ldlm_work_cp_ast_lock(struct list_head *tmp, struct ldlm_cb_set_arg *arg) /* nobody should touch l_cp_ast */ lock_res_and_lock(lock); - list_del_init(&lock->l_cp_ast); + cfs_list_del_init(&lock->l_cp_ast); LASSERT(lock->l_flags & LDLM_FL_CP_REQD); /* save l_completion_ast since it can be changed by * mds_intent_policy(), see bug 14225 */ @@ -1411,85 +1546,94 @@ ldlm_work_cp_ast_lock(struct list_head *tmp, struct ldlm_cb_set_arg *arg) lock->l_flags &= ~LDLM_FL_CP_REQD; unlock_res_and_lock(lock); - if (completion_callback != NULL) { - completion_callback(lock, 0, (void *)arg); - rc = 1; - } + if (completion_callback != NULL) + rc = completion_callback(lock, 0, (void *)arg); LDLM_LOCK_RELEASE(lock); RETURN(rc); } static int -ldlm_work_revoke_ast_lock(struct list_head *tmp, struct ldlm_cb_set_arg *arg) +ldlm_work_revoke_ast_lock(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg) { struct ldlm_lock_desc desc; - struct ldlm_lock *lock = list_entry(tmp, struct ldlm_lock, l_rk_ast); + struct ldlm_lock *lock = cfs_list_entry(tmp, struct ldlm_lock, + l_rk_ast); + int rc; ENTRY; - list_del_init(&lock->l_rk_ast); + cfs_list_del_init(&lock->l_rk_ast); /* the desc just pretend to exclusive */ ldlm_lock2desc(lock, &desc); desc.l_req_mode = LCK_EX; desc.l_granted_mode = 0; - lock->l_blocking_ast(lock, &desc, (void*)arg, LDLM_CB_BLOCKING); + rc = lock->l_blocking_ast(lock, &desc, (void*)arg, LDLM_CB_BLOCKING); LDLM_LOCK_RELEASE(lock); - RETURN(1); + RETURN(rc); } -int ldlm_run_ast_work(struct list_head *rpc_list, ldlm_desc_ast_t ast_type) +int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list, + ldlm_desc_ast_t ast_type) { - struct ldlm_cb_set_arg arg; - struct list_head *tmp, *pos; - int (*work_ast_lock)(struct list_head *tmp,struct ldlm_cb_set_arg *arg); - int ast_count; + struct l_wait_info lwi = { 0 }; + struct ldlm_cb_set_arg *arg; + cfs_list_t *tmp, *pos; + int (*work_ast_lock)(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg); + unsigned int max_ast_count; + int rc; ENTRY; - arg.set = ptlrpc_prep_set(); - atomic_set(&arg.restart, 0); + if (cfs_list_empty(rpc_list)) + RETURN(0); + + OBD_ALLOC_PTR(arg); + if (arg == NULL) + RETURN(-ENOMEM); + + cfs_atomic_set(&arg->restart, 0); + cfs_atomic_set(&arg->rpcs, 0); + cfs_atomic_set(&arg->refcount, 1); + cfs_waitq_init(&arg->waitq); + switch (ast_type) { case LDLM_WORK_BL_AST: - arg.type = LDLM_BL_CALLBACK; + arg->type = LDLM_BL_CALLBACK; work_ast_lock = ldlm_work_bl_ast_lock; break; case LDLM_WORK_CP_AST: - arg.type = LDLM_CP_CALLBACK; + arg->type = LDLM_CP_CALLBACK; work_ast_lock = ldlm_work_cp_ast_lock; break; case LDLM_WORK_REVOKE_AST: - arg.type = LDLM_BL_CALLBACK; + arg->type = LDLM_BL_CALLBACK; work_ast_lock = ldlm_work_revoke_ast_lock; break; default: LBUG(); } - ast_count = 0; - list_for_each_safe(tmp, pos, rpc_list) { - ast_count += work_ast_lock(tmp, &arg); + max_ast_count = ns->ns_max_parallel_ast ? : UINT_MAX; + arg->threshold = max_ast_count; - /* Send the request set if it exceeds the PARALLEL_AST_LIMIT, - * and create a new set for requests that remained in - * @rpc_list */ - if (unlikely(ast_count == PARALLEL_AST_LIMIT)) { - ldlm_send_and_maybe_create_set(&arg, 1); - ast_count = 0; - } + cfs_list_for_each_safe(tmp, pos, rpc_list) { + (void)work_ast_lock(tmp, arg); + if (cfs_atomic_read(&arg->rpcs) < max_ast_count) + continue; + + l_wait_event(arg->waitq, + cfs_atomic_read(&arg->rpcs) < arg->threshold, + &lwi); } - if (ast_count > 0) - ldlm_send_and_maybe_create_set(&arg, 0); - else - /* In case when number of ASTs is multiply of - * PARALLEL_AST_LIMIT or @rpc_list was initially empty, - * @arg.set must be destroyed here, otherwise we get - * write memory leaking. */ - ptlrpc_set_destroy(arg.set); + arg->threshold = 1; + l_wait_event(arg->waitq, cfs_atomic_read(&arg->rpcs) == 0, &lwi); - RETURN(atomic_read(&arg.restart) ? -ERESTART : 0); + rc = cfs_atomic_read(&arg->restart) ? -ERESTART : 0; + ldlm_csa_put(arg); + RETURN(rc); } static int reprocess_one_queue(struct ldlm_resource *res, void *closure) @@ -1498,39 +1642,25 @@ static int reprocess_one_queue(struct ldlm_resource *res, void *closure) return LDLM_ITER_CONTINUE; } -void ldlm_reprocess_all_ns(struct ldlm_namespace *ns) +static int ldlm_reprocess_res(cfs_hash_t *hs, cfs_hash_bd_t *bd, + cfs_hlist_node_t *hnode, void *arg) { - struct list_head *tmp; - int i, rc; + struct ldlm_resource *res = cfs_hash_object(hs, hnode); + int rc; - if (ns == NULL) - return; + rc = reprocess_one_queue(res, arg); + return rc == LDLM_ITER_STOP; +} + +void ldlm_reprocess_all_ns(struct ldlm_namespace *ns) +{ ENTRY; - spin_lock(&ns->ns_hash_lock); - for (i = 0; i < RES_HASH_SIZE; i++) { - tmp = ns->ns_hash[i].next; - while (tmp != &(ns->ns_hash[i])) { - struct ldlm_resource *res = - list_entry(tmp, struct ldlm_resource, lr_hash); - - ldlm_resource_getref(res); - spin_unlock(&ns->ns_hash_lock); - LDLM_RESOURCE_ADDREF(res); - - rc = reprocess_one_queue(res, NULL); - - LDLM_RESOURCE_DELREF(res); - spin_lock(&ns->ns_hash_lock); - tmp = tmp->next; - ldlm_resource_putref_locked(res); - - if (rc == LDLM_ITER_STOP) - GOTO(out, rc); - } + + if (ns != NULL) { + cfs_hash_for_each_nolock(ns->ns_rs_hash, + ldlm_reprocess_res, NULL); } - out: - spin_unlock(&ns->ns_hash_lock); EXIT; } @@ -1541,7 +1671,7 @@ void ldlm_reprocess_all(struct ldlm_resource *res) ENTRY; /* Local lock trees don't get reprocessed. */ - if (ns_is_client(res->lr_namespace)) { + if (ns_is_client(ldlm_res_to_ns(res))) { EXIT; return; } @@ -1553,9 +1683,10 @@ void ldlm_reprocess_all(struct ldlm_resource *res) ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list); unlock_res(res); - rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_CP_AST); + rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list, + LDLM_WORK_CP_AST); if (rc == -ERESTART) { - LASSERT(list_empty(&rpc_list)); + LASSERT(cfs_list_empty(&rpc_list)); goto restart; } EXIT; @@ -1585,8 +1716,8 @@ void ldlm_unlink_lock_skiplist(struct ldlm_lock *req) req->l_resource->lr_type != LDLM_IBITS) return; - list_del_init(&req->l_sl_policy); - list_del_init(&req->l_sl_mode); + cfs_list_del_init(&req->l_sl_policy); + cfs_list_del_init(&req->l_sl_mode); } void ldlm_lock_cancel(struct ldlm_lock *lock) @@ -1598,7 +1729,7 @@ void ldlm_lock_cancel(struct ldlm_lock *lock) lock_res_and_lock(lock); res = lock->l_resource; - ns = res->lr_namespace; + ns = ldlm_res_to_ns(res); /* Please do not, no matter how tempting, remove this LBUG without * talking to me first. -phik */ @@ -1632,37 +1763,44 @@ void ldlm_lock_cancel(struct ldlm_lock *lock) int ldlm_lock_set_data(struct lustre_handle *lockh, void *data) { struct ldlm_lock *lock = ldlm_handle2lock(lockh); + int rc = -EINVAL; ENTRY; - if (lock == NULL) - RETURN(-EINVAL); - - lock->l_ast_data = data; - LDLM_LOCK_PUT(lock); - RETURN(0); + if (lock) { + if (lock->l_ast_data == NULL) + lock->l_ast_data = data; + if (lock->l_ast_data == data) + rc = 0; + LDLM_LOCK_PUT(lock); + } + RETURN(rc); } +EXPORT_SYMBOL(ldlm_lock_set_data); + +int ldlm_cancel_locks_for_export_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd, + cfs_hlist_node_t *hnode, void *data) -void ldlm_cancel_locks_for_export_cb(void *obj, void *data) { - struct obd_export *exp = data; - struct ldlm_lock *lock = obj; + struct obd_export *exp = data; + struct ldlm_lock *lock = cfs_hash_object(hs, hnode); struct ldlm_resource *res; res = ldlm_resource_getref(lock->l_resource); LDLM_LOCK_GET(lock); LDLM_DEBUG(lock, "export %p", exp); - ldlm_res_lvbo_update(res, NULL, 0, 1); + ldlm_res_lvbo_update(res, NULL, 1); ldlm_lock_cancel(lock); ldlm_reprocess_all(res); ldlm_resource_putref(res); LDLM_LOCK_RELEASE(lock); + return 0; } void ldlm_cancel_locks_for_export(struct obd_export *exp) { - lustre_hash_for_each_empty(exp->exp_lock_hash, - ldlm_cancel_locks_for_export_cb, exp); + cfs_hash_for_each_empty(exp->exp_lock_hash, + ldlm_cancel_locks_for_export_cb, exp); } /** @@ -1683,6 +1821,12 @@ void ldlm_lock_downgrade(struct ldlm_lock *lock, int new_mode) lock_res_and_lock(lock); ldlm_resource_unlink_lock(lock); + /* + * Remove the lock from pool as it will be added again in + * ldlm_grant_lock() called below. + */ + ldlm_pool_del(&ldlm_lock_to_ns(lock)->ns_pool, lock); + lock->l_req_mode = new_mode; ldlm_grant_lock(lock, NULL); unlock_res_and_lock(lock); @@ -1711,7 +1855,7 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, /* I can't check the type of lock here because the bitlock of lock * is not held here, so do the allocation blindly. -jay */ - OBD_SLAB_ALLOC(node, ldlm_interval_slab, CFS_ALLOC_IO, sizeof(*node)); + OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, CFS_ALLOC_IO); if (node == NULL) /* Actually, this causes EDEADLOCK to be returned */ RETURN(NULL); @@ -1721,7 +1865,7 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, lock_res_and_lock(lock); res = lock->l_resource; - ns = res->lr_namespace; + ns = ldlm_res_to_ns(res); old_mode = lock->l_req_mode; lock->l_req_mode = new_mode; @@ -1745,8 +1889,14 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, } } + /* + * Remove old lock from the pool before adding the lock with new + * mode below in ->policy() + */ + ldlm_pool_del(&ns->ns_pool, lock); + /* If this is a local resource, put it on the appropriate list. */ - if (ns_is_client(res->lr_namespace)) { + if (ns_is_client(ldlm_res_to_ns(res))) { if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED)) { ldlm_resource_add_lock(res, &res->lr_converting, lock); } else { @@ -1758,7 +1908,7 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, ldlm_grant_lock(lock, &rpc_list); granted = 1; - /* FIXME: completion handling not with ns_lock held ! */ + /* FIXME: completion handling not with lr_lock held ! */ if (lock->l_completion_ast) lock->l_completion_ast(lock, 0, NULL); } @@ -1783,7 +1933,7 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, unlock_res_and_lock(lock); if (granted) - ldlm_run_ast_work(&rpc_list, LDLM_WORK_CP_AST); + ldlm_run_ast_work(ns, &rpc_list, LDLM_WORK_CP_AST); if (node) OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node)); RETURN(res); @@ -1802,7 +1952,7 @@ void ldlm_lock_dump(int level, struct ldlm_lock *lock, int pos) } CDEBUG(level," -- Lock dump: %p/"LPX64" (rc: %d) (pos: %d) (pid: %d)\n", - lock, lock->l_handle.h_cookie, atomic_read(&lock->l_refc), + lock, lock->l_handle.h_cookie, cfs_atomic_read(&lock->l_refc), pos, lock->l_pid); if (lock->l_conn_export != NULL) obd = lock->l_conn_export->exp_obd; @@ -1824,9 +1974,9 @@ void ldlm_lock_dump(int level, struct ldlm_lock *lock, int pos) lock->l_resource->lr_name.name[1], lock->l_resource->lr_name.name[2]); CDEBUG(level, " Req mode: %s, grant mode: %s, rc: %u, read: %d, " - "write: %d flags: %#x\n", ldlm_lockname[lock->l_req_mode], + "write: %d flags: "LPX64"\n", ldlm_lockname[lock->l_req_mode], ldlm_lockname[lock->l_granted_mode], - atomic_read(&lock->l_refc), lock->l_readers, lock->l_writers, + cfs_atomic_read(&lock->l_refc), lock->l_readers, lock->l_writers, lock->l_flags); if (lock->l_resource->lr_type == LDLM_EXTENT) CDEBUG(level, " Extent: "LPU64" -> "LPU64 @@ -1861,7 +2011,7 @@ void ldlm_lock_dump_handle(int level, struct lustre_handle *lockh) } void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level, - struct libcfs_debug_msg_data *data, const char *fmt, + struct libcfs_debug_msg_data *data, const char *fmt, ...) { va_list args; @@ -1873,16 +2023,16 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level, libcfs_debug_vmsg2(cdls, data->msg_subsys, level,data->msg_file, data->msg_fn, data->msg_line, fmt, args, " ns: \?\? lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s " - "res: \?\? rrc=\?\? type: \?\?\? flags: %x remote: " - LPX64" expref: %d pid: %u\n", lock, - lock->l_handle.h_cookie, atomic_read(&lock->l_refc), + "res: \?\? rrc=\?\? type: \?\?\? flags: "LPX64" remote: " + LPX64" expref: %d pid: %u timeout: %lu\n", lock, + lock->l_handle.h_cookie, cfs_atomic_read(&lock->l_refc), lock->l_readers, lock->l_writers, ldlm_lockname[lock->l_granted_mode], ldlm_lockname[lock->l_req_mode], lock->l_flags, lock->l_remote_handle.cookie, lock->l_export ? - atomic_read(&lock->l_export->exp_refcount) : -99, - lock->l_pid); + cfs_atomic_read(&lock->l_export->exp_refcount) : -99, + lock->l_pid, lock->l_callback_timeout); va_end(args); return; } @@ -1893,24 +2043,24 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level, data->msg_fn, data->msg_line, fmt, args, " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s " "res: "LPU64"/"LPU64" rrc: %d type: %s ["LPU64"->"LPU64 - "] (req "LPU64"->"LPU64") flags: %x remote: "LPX64 - " expref: %d pid: %u\n", - lock->l_resource->lr_namespace->ns_name, lock, - lock->l_handle.h_cookie, atomic_read(&lock->l_refc), + "] (req "LPU64"->"LPU64") flags: "LPX64" remote: "LPX64 + " expref: %d pid: %u timeout %lu\n", + ldlm_lock_to_ns_name(lock), lock, + lock->l_handle.h_cookie, cfs_atomic_read(&lock->l_refc), lock->l_readers, lock->l_writers, ldlm_lockname[lock->l_granted_mode], ldlm_lockname[lock->l_req_mode], lock->l_resource->lr_name.name[0], lock->l_resource->lr_name.name[1], - atomic_read(&lock->l_resource->lr_refcount), + cfs_atomic_read(&lock->l_resource->lr_refcount), ldlm_typename[lock->l_resource->lr_type], lock->l_policy_data.l_extent.start, lock->l_policy_data.l_extent.end, lock->l_req_extent.start, lock->l_req_extent.end, lock->l_flags, lock->l_remote_handle.cookie, lock->l_export ? - atomic_read(&lock->l_export->exp_refcount) : -99, - lock->l_pid); + cfs_atomic_read(&lock->l_export->exp_refcount) : -99, + lock->l_pid, lock->l_callback_timeout); break; case LDLM_FLOCK: @@ -1918,24 +2068,24 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level, data->msg_fn, data->msg_line, fmt, args, " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s " "res: "LPU64"/"LPU64" rrc: %d type: %s pid: %d " - "["LPU64"->"LPU64"] flags: %x remote: "LPX64 - " expref: %d pid: %u\n", - lock->l_resource->lr_namespace->ns_name, lock, - lock->l_handle.h_cookie, atomic_read(&lock->l_refc), + "["LPU64"->"LPU64"] flags: "LPX64" remote: "LPX64 + " expref: %d pid: %u timeout: %lu\n", + ldlm_lock_to_ns_name(lock), lock, + lock->l_handle.h_cookie, cfs_atomic_read(&lock->l_refc), lock->l_readers, lock->l_writers, ldlm_lockname[lock->l_granted_mode], ldlm_lockname[lock->l_req_mode], lock->l_resource->lr_name.name[0], lock->l_resource->lr_name.name[1], - atomic_read(&lock->l_resource->lr_refcount), + cfs_atomic_read(&lock->l_resource->lr_refcount), ldlm_typename[lock->l_resource->lr_type], lock->l_policy_data.l_flock.pid, lock->l_policy_data.l_flock.start, lock->l_policy_data.l_flock.end, lock->l_flags, lock->l_remote_handle.cookie, lock->l_export ? - atomic_read(&lock->l_export->exp_refcount) : -99, - lock->l_pid); + cfs_atomic_read(&lock->l_export->exp_refcount) : -99, + lock->l_pid, lock->l_callback_timeout); break; case LDLM_IBITS: @@ -1943,45 +2093,45 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level, data->msg_fn, data->msg_line, fmt, args, " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s " "res: "LPU64"/"LPU64" bits "LPX64" rrc: %d type: %s " - "flags: %x remote: "LPX64" expref: %d " - "pid %u\n", - lock->l_resource->lr_namespace->ns_name, + "flags: "LPX64" remote: "LPX64" expref: %d " + "pid: %u timeout: %lu\n", + ldlm_lock_to_ns_name(lock), lock, lock->l_handle.h_cookie, - atomic_read (&lock->l_refc), + cfs_atomic_read (&lock->l_refc), lock->l_readers, lock->l_writers, ldlm_lockname[lock->l_granted_mode], ldlm_lockname[lock->l_req_mode], lock->l_resource->lr_name.name[0], lock->l_resource->lr_name.name[1], lock->l_policy_data.l_inodebits.bits, - atomic_read(&lock->l_resource->lr_refcount), + cfs_atomic_read(&lock->l_resource->lr_refcount), ldlm_typename[lock->l_resource->lr_type], lock->l_flags, lock->l_remote_handle.cookie, lock->l_export ? - atomic_read(&lock->l_export->exp_refcount) : -99, - lock->l_pid); + cfs_atomic_read(&lock->l_export->exp_refcount) : -99, + lock->l_pid, lock->l_callback_timeout); break; default: libcfs_debug_vmsg2(cdls, data->msg_subsys, level,data->msg_file, data->msg_fn, data->msg_line, fmt, args, " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s " - "res: "LPU64"/"LPU64" rrc: %d type: %s flags: %x " - "remote: "LPX64" expref: %d pid: %u\n", - lock->l_resource->lr_namespace->ns_name, + "res: "LPU64"/"LPU64" rrc: %d type: %s flags: "LPX64" " + "remote: "LPX64" expref: %d pid: %u timeout %lu\n", + ldlm_lock_to_ns_name(lock), lock, lock->l_handle.h_cookie, - atomic_read (&lock->l_refc), + cfs_atomic_read (&lock->l_refc), lock->l_readers, lock->l_writers, ldlm_lockname[lock->l_granted_mode], ldlm_lockname[lock->l_req_mode], lock->l_resource->lr_name.name[0], lock->l_resource->lr_name.name[1], - atomic_read(&lock->l_resource->lr_refcount), + cfs_atomic_read(&lock->l_resource->lr_refcount), ldlm_typename[lock->l_resource->lr_type], lock->l_flags, lock->l_remote_handle.cookie, lock->l_export ? - atomic_read(&lock->l_export->exp_refcount) : -99, - lock->l_pid); + cfs_atomic_read(&lock->l_export->exp_refcount) : -99, + lock->l_pid, lock->l_callback_timeout); break; } va_end(args);