X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fldlm%2Fldlm_lockd.c;h=c2a420d6269c0b099c433bf8dd959938796236a0;hb=a42a91c783903ea15ad902032166c7e312dad7ee;hp=bfb4af143f3d5349eeca3323c92b3be29e08f164;hpb=30be03b4dd593894687773d2a460d441d85f88a2;p=fs%2Flustre-release.git diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index bfb4af1..c2a420d 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -15,11 +15,7 @@ * * You should have received a copy of the GNU General Public License * version 2 along with this program; If not, see - * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. + * http://www.gnu.org/licenses/gpl-2.0.html * * GPL HEADER END */ @@ -27,7 +23,7 @@ * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2010, 2013, Intel Corporation. + * Copyright (c) 2010, 2015, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -41,22 +37,21 @@ #define DEBUG_SUBSYSTEM S_LDLM +#include +#include #include #include #include -#include #include "ldlm_internal.h" static int ldlm_num_threads; -CFS_MODULE_PARM(ldlm_num_threads, "i", int, 0444, - "number of DLM service threads to start"); +module_param(ldlm_num_threads, int, 0444); +MODULE_PARM_DESC(ldlm_num_threads, "number of DLM service threads to start"); static char *ldlm_cpts; -CFS_MODULE_PARM(ldlm_cpts, "s", charp, 0444, - "CPU partitions ldlm threads should run on"); +module_param(ldlm_cpts, charp, 0444); +MODULE_PARM_DESC(ldlm_cpts, "CPU partitions ldlm threads should run on"); -extern struct kmem_cache *ldlm_resource_slab; -extern struct kmem_cache *ldlm_lock_slab; static struct mutex ldlm_ref_mutex; static int ldlm_refcount; @@ -69,7 +64,7 @@ struct ldlm_cb_async_args { static struct ldlm_state *ldlm_state; -inline cfs_time_t round_timeout(cfs_time_t timeout) +static inline cfs_time_t round_timeout(cfs_time_t timeout) { return cfs_time_seconds((int)cfs_duration_sec(cfs_time_sub(timeout, 0)) + 1); } @@ -112,15 +107,15 @@ struct ldlm_bl_pool { }; struct ldlm_bl_work_item { - struct list_head blwi_entry; - struct ldlm_namespace *blwi_ns; - struct ldlm_lock_desc blwi_ld; - struct ldlm_lock *blwi_lock; - struct list_head blwi_head; - int blwi_count; - struct completion blwi_comp; - ldlm_cancel_flags_t blwi_flags; - int blwi_mem_pressure; + struct list_head blwi_entry; + struct ldlm_namespace *blwi_ns; + struct ldlm_lock_desc blwi_ld; + struct ldlm_lock *blwi_lock; + struct list_head blwi_head; + int blwi_count; + struct completion blwi_comp; + enum ldlm_cancel_flags blwi_flags; + int blwi_mem_pressure; }; #ifdef HAVE_SERVER_SUPPORT @@ -207,7 +202,7 @@ static int expired_lock_main(void *arg) lock = list_entry(expired->next, struct ldlm_lock, l_pending_chain); - if ((void *)lock < LP_POISON + PAGE_CACHE_SIZE && + if ((void *)lock < LP_POISON + PAGE_SIZE && (void *)lock >= LP_POISON) { spin_unlock_bh(&waiting_locks_spinlock); CERROR("free lock on elt list %p\n", lock); @@ -215,7 +210,7 @@ static int expired_lock_main(void *arg) } list_del_init(&lock->l_pending_chain); if ((void *)lock->l_export < - LP_POISON + PAGE_CACHE_SIZE && + LP_POISON + PAGE_SIZE && (void *)lock->l_export >= LP_POISON) { CERROR("lock with free export on elt list %p\n", lock->l_export); @@ -237,6 +232,10 @@ static int expired_lock_main(void *arg) export = class_export_lock_get(lock->l_export, lock); spin_unlock_bh(&waiting_locks_spinlock); + spin_lock_bh(&export->exp_bl_list_lock); + list_del_init(&lock->l_exp_list); + spin_unlock_bh(&export->exp_bl_list_lock); + do_dump++; class_fail_export(export); class_export_lock_put(export, lock); @@ -320,7 +319,7 @@ static void waiting_locks_callback(unsigned long unused) spin_unlock_bh(&waiting_locks_spinlock); LDLM_DEBUG(lock, "prolong the busy lock"); ldlm_refresh_waiting_lock(lock, - ldlm_get_enq_timeout(lock)); + ldlm_bl_timeout(lock) >> 1); spin_lock_bh(&waiting_locks_spinlock); if (!cont) { @@ -410,44 +409,73 @@ static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, int seconds) return 1; } +static void ldlm_add_blocked_lock(struct ldlm_lock *lock) +{ + spin_lock_bh(&lock->l_export->exp_bl_list_lock); + if (list_empty(&lock->l_exp_list)) { + if (lock->l_granted_mode != lock->l_req_mode) + list_add_tail(&lock->l_exp_list, + &lock->l_export->exp_bl_list); + else + list_add(&lock->l_exp_list, + &lock->l_export->exp_bl_list); + } + spin_unlock_bh(&lock->l_export->exp_bl_list_lock); + + /* A blocked lock is added. Adjust the position in + * the stale list if the export is in the list. + * If export is stale and not in the list - it is being + * processed and will be placed on the right position + * on obd_stale_export_put(). */ + if (!list_empty(&lock->l_export->exp_stale_list)) + obd_stale_export_adjust(lock->l_export); +} + static int ldlm_add_waiting_lock(struct ldlm_lock *lock) { int ret; - int timeout = ldlm_get_enq_timeout(lock); + int timeout = ldlm_bl_timeout(lock); /* NB: must be called with hold of lock_res_and_lock() */ LASSERT(ldlm_is_res_locked(lock)); - ldlm_set_waited(lock); - LASSERT(!ldlm_is_cancel_on_block(lock)); + /* Do not put cross-MDT lock in the waiting list, since we + * will not evict it due to timeout for now */ + if (lock->l_export != NULL && + (exp_connect_flags(lock->l_export) & OBD_CONNECT_MDS_MDS)) + return 0; + spin_lock_bh(&waiting_locks_spinlock); + if (ldlm_is_cancel(lock)) { + spin_unlock_bh(&waiting_locks_spinlock); + return 0; + } + if (ldlm_is_destroyed(lock)) { static cfs_time_t next; - spin_unlock_bh(&waiting_locks_spinlock); - LDLM_ERROR(lock, "not waiting on destroyed lock (bug 5653)"); - if (cfs_time_after(cfs_time_current(), next)) { - next = cfs_time_shift(14400); - libcfs_debug_dumpstack(NULL); - } - return 0; - } - ret = __ldlm_add_waiting_lock(lock, timeout); - if (ret) { - /* grab ref on the lock if it has been added to the - * waiting list */ - LDLM_LOCK_GET(lock); - } - spin_unlock_bh(&waiting_locks_spinlock); + spin_unlock_bh(&waiting_locks_spinlock); + LDLM_ERROR(lock, "not waiting on destroyed lock (bug 5653)"); + if (cfs_time_after(cfs_time_current(), next)) { + next = cfs_time_shift(14400); + libcfs_debug_dumpstack(NULL); + } + return 0; + } + ldlm_set_waited(lock); + lock->l_last_activity = cfs_time_current_sec(); + ret = __ldlm_add_waiting_lock(lock, timeout); if (ret) { - spin_lock_bh(&lock->l_export->exp_bl_list_lock); - if (list_empty(&lock->l_exp_list)) - list_add(&lock->l_exp_list, - &lock->l_export->exp_bl_list); - spin_unlock_bh(&lock->l_export->exp_bl_list_lock); + /* grab ref on the lock if it has been added to the + * waiting list */ + LDLM_LOCK_GET(lock); } + spin_unlock_bh(&waiting_locks_spinlock); + + if (ret) + ldlm_add_blocked_lock(lock); LDLM_DEBUG(lock, "%sadding to wait list(timeout: %d, AT: %s)", ret == 0 ? "not re-" : "", timeout, @@ -502,6 +530,7 @@ int ldlm_del_waiting_lock(struct ldlm_lock *lock) spin_lock_bh(&waiting_locks_spinlock); ret = __ldlm_del_waiting_lock(lock); + ldlm_clear_waited(lock); spin_unlock_bh(&waiting_locks_spinlock); /* remove the lock out of export blocking list */ @@ -518,7 +547,6 @@ int ldlm_del_waiting_lock(struct ldlm_lock *lock) LDLM_DEBUG(lock, "%s", ret == 0 ? "wasn't waiting" : "removed"); return ret; } -EXPORT_SYMBOL(ldlm_del_waiting_lock); /** * Prolong the contended lock waiting time. @@ -533,6 +561,12 @@ int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout) return 0; } + if (exp_connect_flags(lock->l_export) & OBD_CONNECT_MDS_MDS) { + /* We don't have a "waiting locks list" on OSP. */ + LDLM_DEBUG(lock, "MDS-MDS lock: no-op"); + return 0; + } + spin_lock_bh(&waiting_locks_spinlock); if (list_empty(&lock->l_pending_chain)) { @@ -569,6 +603,31 @@ int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout) #ifdef HAVE_SERVER_SUPPORT /** + * Calculate the per-export Blocking timeout (covering BL AST, data flush, + * lock cancel, and their replies). Used for lock callback timeout and AST + * re-send period. + * + * \param[in] lock lock which is getting the blocking callback + * + * \retval timeout in seconds to wait for the client reply + */ +unsigned int ldlm_bl_timeout(struct ldlm_lock *lock) +{ + unsigned int timeout; + + if (AT_OFF) + return obd_timeout / 2; + + /* Since these are non-updating timeouts, we should be conservative. + * Take more than usually, 150% + * It would be nice to have some kind of "early reply" mechanism for + * lock callbacks too... */ + timeout = at_get(&lock->l_export->exp_bl_lock_at); + return max(timeout + (timeout >> 1), ldlm_enqueue_min); +} +EXPORT_SYMBOL(ldlm_bl_timeout); + +/** * Perform lock cleanup if AST sending failed. */ static void ldlm_failed_ast(struct ldlm_lock *lock, int rc, @@ -596,60 +655,62 @@ static void ldlm_failed_ast(struct ldlm_lock *lock, int rc, * Perform lock cleanup if AST reply came with error. */ static int ldlm_handle_ast_error(struct ldlm_lock *lock, - struct ptlrpc_request *req, int rc, - const char *ast_type) -{ - lnet_process_id_t peer = req->rq_import->imp_connection->c_peer; - - if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) { - LASSERT(lock->l_export); - if (lock->l_export->exp_libclient) { - LDLM_DEBUG(lock, "%s AST to liblustre client (nid %s)" - " timeout, just cancelling lock", ast_type, - libcfs_nid2str(peer.nid)); - ldlm_lock_cancel(lock); - rc = -ERESTART; - } else if (ldlm_is_cancel(lock)) { - LDLM_DEBUG(lock, "%s AST timeout from nid %s, but " - "cancel was received (AST reply lost?)", - ast_type, libcfs_nid2str(peer.nid)); - ldlm_lock_cancel(lock); - rc = -ERESTART; - } else { - ldlm_del_waiting_lock(lock); - ldlm_failed_ast(lock, rc, ast_type); - } - } else if (rc) { - if (rc == -EINVAL) { - struct ldlm_resource *res = lock->l_resource; - LDLM_DEBUG(lock, "client (nid %s) returned %d" - " from %s AST - normal race", - libcfs_nid2str(peer.nid), - req->rq_repmsg ? - lustre_msg_get_status(req->rq_repmsg) : -1, - ast_type); - if (res) { - /* update lvbo to return proper attributes. - * see bug 23174 */ - ldlm_resource_getref(res); - ldlm_res_lvbo_update(res, NULL, 1); - ldlm_resource_putref(res); - } + struct ptlrpc_request *req, int rc, + const char *ast_type) +{ + lnet_process_id_t peer = req->rq_import->imp_connection->c_peer; - } else { - LDLM_ERROR(lock, "client (nid %s) returned %d: rc=%d " - "from %s AST", libcfs_nid2str(peer.nid), + if (!req->rq_replied || (rc && rc != -EINVAL)) { + if (lock->l_export && lock->l_export->exp_libclient) { + LDLM_DEBUG(lock, + "%s AST (req@%p x%llu) to liblustre client (nid %s) timeout, just cancelling lock", + ast_type, req, req->rq_xid, + libcfs_nid2str(peer.nid)); + ldlm_lock_cancel(lock); + rc = -ERESTART; + } else if (ldlm_is_cancel(lock)) { + LDLM_DEBUG(lock, + "%s AST (req@%p x%llu) timeout from nid %s, but cancel was received (AST reply lost?)", + ast_type, req, req->rq_xid, + libcfs_nid2str(peer.nid)); + ldlm_lock_cancel(lock); + rc = -ERESTART; + } else { + LDLM_ERROR(lock, + "client (nid %s) %s %s AST (req@%p x%llu status %d rc %d), evict it", + libcfs_nid2str(peer.nid), + req->rq_replied ? "returned error from" : + "failed to reply to", + ast_type, req, req->rq_xid, (req->rq_repmsg != NULL) ? lustre_msg_get_status(req->rq_repmsg) : 0, - rc, ast_type); - } - ldlm_lock_cancel(lock); - /* Server-side AST functions are called from ldlm_reprocess_all, - * which needs to be told to please restart its reprocessing. */ - rc = -ERESTART; - } + rc); + ldlm_failed_ast(lock, rc, ast_type); + } + return rc; + } - return rc; + if (rc == -EINVAL) { + struct ldlm_resource *res = lock->l_resource; + + LDLM_DEBUG(lock, + "client (nid %s) returned %d from %s AST (req@%p x%llu) - normal race", + libcfs_nid2str(peer.nid), + req->rq_repmsg ? + lustre_msg_get_status(req->rq_repmsg) : -1, + ast_type, req, req->rq_xid); + if (res) { + /* update lvbo to return proper attributes. + * see bug 23174 */ + ldlm_resource_getref(res); + ldlm_res_lvbo_update(res, NULL, 1); + ldlm_resource_putref(res); + } + ldlm_lock_cancel(lock); + rc = -ERESTART; + } + + return rc; } static int ldlm_cb_interpret(const struct lu_env *env, @@ -710,7 +771,7 @@ static void ldlm_update_resend(struct ptlrpc_request *req, void *data) struct ldlm_cb_async_args *ca = data; struct ldlm_lock *lock = ca->ca_lock; - ldlm_refresh_waiting_lock(lock, ldlm_get_enq_timeout(lock)); + ldlm_refresh_waiting_lock(lock, ldlm_bl_timeout(lock)); } static inline int ldlm_ast_fini(struct ptlrpc_request *req, @@ -789,6 +850,11 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, /* Don't need to do anything here. */ RETURN(0); + if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_SRV_BL_AST)) { + LDLM_DEBUG(lock, "dropping BL AST"); + RETURN(0); + } + LASSERT(lock); LASSERT(data != NULL); if (lock->l_export->exp_obd->obd_recovering != 0) @@ -810,20 +876,22 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, req->rq_interpret_reply = ldlm_cb_interpret; lock_res_and_lock(lock); - if (lock->l_granted_mode != lock->l_req_mode) { - /* this blocking AST will be communicated as part of the - * completion AST instead */ + if (ldlm_is_destroyed(lock)) { + /* What's the point? */ unlock_res_and_lock(lock); - ptlrpc_req_finished(req); - LDLM_DEBUG(lock, "lock not granted, not sending blocking AST"); RETURN(0); } - if (ldlm_is_destroyed(lock)) { - /* What's the point? */ + if (lock->l_granted_mode != lock->l_req_mode) { + /* this blocking AST will be communicated as part of the + * completion AST instead */ + ldlm_add_blocked_lock(lock); + ldlm_set_waited(lock); unlock_res_and_lock(lock); + ptlrpc_req_finished(req); + LDLM_DEBUG(lock, "lock not granted, not sending blocking AST"); RETURN(0); } @@ -838,6 +906,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, LDLM_DEBUG(lock, "server preparing blocking AST"); ptlrpc_request_set_replen(req); + ldlm_set_cbpending(lock); if (instant_cancel) { unlock_res_and_lock(lock); ldlm_lock_cancel(lock); @@ -849,7 +918,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, unlock_res_and_lock(lock); /* Do not resend after lock callback timeout */ - req->rq_delay_limit = ldlm_get_enq_timeout(lock); + req->rq_delay_limit = ldlm_bl_timeout(lock); req->rq_resend_cb = ldlm_update_resend; } @@ -869,7 +938,6 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, RETURN(rc); } -EXPORT_SYMBOL(ldlm_server_blocking_ast); /** * ->l_completion_ast callback for a remote lock in server namespace. @@ -884,7 +952,6 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data) struct ldlm_request *body; struct ptlrpc_request *req; struct ldlm_cb_async_args *ca; - long total_enqueue_wait; int instant_cancel = 0; int rc = 0; int lvb_len; @@ -893,10 +960,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data) LASSERT(lock != NULL); LASSERT(data != NULL); - total_enqueue_wait = cfs_time_sub(cfs_time_current_sec(), - lock->l_last_activity); - - if (OBD_FAIL_PRECHECK(OBD_FAIL_OST_LDLM_REPLY_NET)) { + if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_SRV_CP_AST)) { LDLM_DEBUG(lock, "dropping CP AST"); RETURN(0); } @@ -953,25 +1017,9 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data) } } - LDLM_DEBUG(lock, "server preparing completion AST (after %lds wait)", - total_enqueue_wait); - lock->l_last_activity = cfs_time_current_sec(); - /* Server-side enqueue wait time estimate, used in - __ldlm_add_waiting_lock to set future enqueue timers */ - if (total_enqueue_wait < ldlm_get_enq_timeout(lock)) - at_measured(ldlm_lock_to_ns_at(lock), - total_enqueue_wait); - else - /* bz18618. Don't add lock enqueue time we spend waiting for a - previous callback to fail. Locks waiting legitimately will - get extended by ldlm_refresh_waiting_lock regardless of the - estimate, so it's okay to underestimate here. */ - LDLM_DEBUG(lock, "lock completed after %lus; estimate was %ds. " - "It is likely that a previous callback timed out.", - total_enqueue_wait, - at_get(ldlm_lock_to_ns_at(lock))); + LDLM_DEBUG(lock, "server preparing completion AST"); ptlrpc_request_set_replen(req); @@ -1006,7 +1054,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data) /* start the lock-timeout clock */ ldlm_add_waiting_lock(lock); /* Do not resend after lock callback timeout */ - req->rq_delay_limit = ldlm_get_enq_timeout(lock); + req->rq_delay_limit = ldlm_bl_timeout(lock); req->rq_resend_cb = ldlm_update_resend; } } @@ -1021,7 +1069,6 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data) RETURN(lvb_len < 0 ? lvb_len : rc); } -EXPORT_SYMBOL(ldlm_server_completion_ast); /** * Server side ->l_glimpse_ast handler for client locks. @@ -1093,7 +1140,6 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data) RETURN(rc); } -EXPORT_SYMBOL(ldlm_server_glimpse_ast); int ldlm_glimpse_locks(struct ldlm_resource *res, struct list_head *gl_work_list) @@ -1165,25 +1211,25 @@ static void ldlm_svc_get_eopc(const struct ldlm_request *dlm_req, * service threads to carry out client lock enqueueing requests. */ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, - struct ptlrpc_request *req, - const struct ldlm_request *dlm_req, - const struct ldlm_callback_suite *cbs) + struct ptlrpc_request *req, + const struct ldlm_request *dlm_req, + const struct ldlm_callback_suite *cbs) { - struct ldlm_reply *dlm_rep; + struct ldlm_reply *dlm_rep; __u64 flags; - ldlm_error_t err = ELDLM_OK; - struct ldlm_lock *lock = NULL; - void *cookie = NULL; - int rc = 0; + enum ldlm_error err = ELDLM_OK; + struct ldlm_lock *lock = NULL; + void *cookie = NULL; + int rc = 0; struct ldlm_resource *res = NULL; - ENTRY; + ENTRY; - LDLM_DEBUG_NOLOCK("server-side enqueue handler START"); + LDLM_DEBUG_NOLOCK("server-side enqueue handler START"); - ldlm_request_cancel(req, dlm_req, LDLM_ENQUEUE_CANCEL_OFF); + ldlm_request_cancel(req, dlm_req, LDLM_ENQUEUE_CANCEL_OFF, LATF_SKIP); flags = ldlm_flags_from_wire(dlm_req->lock_flags); - LASSERT(req->rq_export); + LASSERT(req->rq_export); if (ptlrpc_req2svc(req)->srv_stats != NULL) ldlm_svc_get_eopc(dlm_req, ptlrpc_req2svc(req)->srv_stats); @@ -1223,23 +1269,6 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, GOTO(out, rc = -EPROTO); } -#if 0 - /* FIXME this makes it impossible to use LDLM_PLAIN locks -- check - against server's _CONNECT_SUPPORTED flags? (I don't want to use - ibits for mgc/mgs) */ - - /* INODEBITS_INTEROP: Perform conversion from plain lock to - * inodebits lock if client does not support them. */ - if (!(exp_connect_flags(req->rq_export) & OBD_CONNECT_IBITS) && - (dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN)) { - dlm_req->lock_desc.l_resource.lr_type = LDLM_IBITS; - dlm_req->lock_desc.l_policy_data.l_inodebits.bits = - MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE; - if (dlm_req->lock_desc.l_req_mode == LCK_PR) - dlm_req->lock_desc.l_req_mode = LCK_CR; - } -#endif - if (unlikely((flags & LDLM_FL_REPLAY) || (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))) { /* Find an existing lock in the per-export lock hash */ @@ -1249,12 +1278,19 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, lock = cfs_hash_lookup(req->rq_export->exp_lock_hash, (void *)&dlm_req->lock_handle[0]); if (lock != NULL) { - DEBUG_REQ(D_DLMTRACE, req, "found existing lock cookie " - LPX64, lock->l_handle.h_cookie); + DEBUG_REQ(D_DLMTRACE, req, "found existing lock cookie %#llx", + lock->l_handle.h_cookie); flags |= LDLM_FL_RESENT; GOTO(existing_lock, rc = 0); } - } + } else { + if (ldlm_reclaim_full()) { + DEBUG_REQ(D_DLMTRACE, req, "Too many granted locks, " + "reject current enqueue request and let the " + "client retry later.\n"); + GOTO(out, rc = -EINPROGRESS); + } + } /* The lock's callback data might be set in the policy function */ lock = ldlm_lock_create(ns, &dlm_req->lock_desc.l_resource.lr_name, @@ -1267,7 +1303,6 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, GOTO(out, rc); } - lock->l_last_activity = cfs_time_current_sec(); lock->l_remote_handle = dlm_req->lock_handle[0]; LDLM_DEBUG(lock, "server-side enqueue handler, new lock created"); @@ -1281,7 +1316,7 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, /* non-replayed lock, delayed lvb init may need to be done */ rc = ldlm_lvbo_init(res); if (rc < 0) { - LDLM_ERROR(lock, "delayed lvb init failed (rc %d)", rc); + LDLM_DEBUG(lock, "delayed lvb init failed (rc %d)", rc); GOTO(out, rc); } } @@ -1307,6 +1342,14 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, * without them. */ lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags & LDLM_FL_INHERIT_MASK); + + ldlm_convert_policy_to_local(req->rq_export, + dlm_req->lock_desc.l_resource.lr_type, + &dlm_req->lock_desc.l_policy_data, + &lock->l_policy_data); + if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) + lock->l_req_extent = lock->l_policy_data.l_extent; + existing_lock: if (flags & LDLM_FL_HAS_INTENT) { @@ -1328,14 +1371,6 @@ existing_lock: GOTO(out, rc); } - if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN) - ldlm_convert_policy_to_local(req->rq_export, - dlm_req->lock_desc.l_resource.lr_type, - &dlm_req->lock_desc.l_policy_data, - &lock->l_policy_data); - if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) - lock->l_req_extent = lock->l_policy_data.l_extent; - err = ldlm_lock_enqueue(ns, &lock, cookie, &flags); if (err) { if ((int)err < 0) @@ -1348,6 +1383,9 @@ existing_lock: ldlm_lock2desc(lock, &dlm_rep->lock_desc); ldlm_lock2handle(lock, &dlm_rep->lock_handle); + if (lock && lock->l_resource->lr_type == LDLM_EXTENT) + OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_BL_EVICT, 6); + /* We never send a blocking AST until the lock is granted, but * we can tell it right now */ lock_res_and_lock(lock); @@ -1390,20 +1428,20 @@ existing_lock: if (unlikely(!ldlm_is_cancel_on_block(lock) || !(dlm_rep->lock_flags & LDLM_FL_CANCEL_ON_BLOCK))){ CERROR("Granting sync lock to libclient. " - "req fl %d, rep fl %d, lock fl "LPX64"\n", + "req fl %d, rep fl %d, lock fl %#llx\n", dlm_req->lock_flags, dlm_rep->lock_flags, lock->l_flags); LDLM_ERROR(lock, "sync lock"); - if (dlm_req->lock_flags & LDLM_FL_HAS_INTENT) { - struct ldlm_intent *it; - - it = req_capsule_client_get(&req->rq_pill, - &RMF_LDLM_INTENT); - if (it != NULL) { - CERROR("This is intent %s ("LPU64")\n", - ldlm_it2str(it->opc), it->opc); - } - } + if (dlm_req->lock_flags & LDLM_FL_HAS_INTENT) { + struct ldlm_intent *it; + + it = req_capsule_client_get(&req->rq_pill, + &RMF_LDLM_INTENT); + if (it != NULL) { + CERROR("This is intent %s (%llu)\n", + ldlm_it2str(it->opc), it->opc); + } + } } } @@ -1464,12 +1502,17 @@ existing_lock: } } - if (rc != 0) { - lock_res_and_lock(lock); - ldlm_resource_unlink_lock(lock); - ldlm_lock_destroy_nolock(lock); - unlock_res_and_lock(lock); - } + if (rc != 0 && !(flags & LDLM_FL_RESENT)) { + if (lock->l_export) { + ldlm_lock_cancel(lock); + } else { + lock_res_and_lock(lock); + ldlm_resource_unlink_lock(lock); + ldlm_lock_destroy_nolock(lock); + unlock_res_and_lock(lock); + + } + } if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK) ldlm_reprocess_all(lock->l_resource); @@ -1482,7 +1525,6 @@ existing_lock: return rc; } -EXPORT_SYMBOL(ldlm_handle_enqueue0); /** * Old-style LDLM main entry point for server code enqueue. @@ -1509,7 +1551,6 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req, } return rc; } -EXPORT_SYMBOL(ldlm_handle_enqueue); /** * Main LDLM entry point for server code to process lock conversion requests. @@ -1542,7 +1583,6 @@ int ldlm_handle_convert0(struct ptlrpc_request *req, LDLM_DEBUG(lock, "server-side convert handler START"); - lock->l_last_activity = cfs_time_current_sec(); res = ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode, &dlm_rep->lock_flags); if (res) { @@ -1564,7 +1604,6 @@ int ldlm_handle_convert0(struct ptlrpc_request *req, RETURN(0); } -EXPORT_SYMBOL(ldlm_handle_convert0); /** * Old-style main LDLM entry point for server code to process lock conversion @@ -1584,7 +1623,6 @@ int ldlm_handle_convert(struct ptlrpc_request *req) } return rc; } -EXPORT_SYMBOL(ldlm_handle_convert); /** * Cancel all the locks whose handles are packed into ldlm_request @@ -1593,7 +1631,8 @@ EXPORT_SYMBOL(ldlm_handle_convert); * requests. */ int ldlm_request_cancel(struct ptlrpc_request *req, - const struct ldlm_request *dlm_req, int first) + const struct ldlm_request *dlm_req, + int first, enum lustre_at_flags flags) { struct ldlm_resource *res, *pres = NULL; struct ldlm_lock *lock; @@ -1619,7 +1658,7 @@ int ldlm_request_cancel(struct ptlrpc_request *req, lock = ldlm_handle2lock(&dlm_req->lock_handle[i]); if (!lock) { LDLM_DEBUG_NOLOCK("server-side cancel handler stale " - "lock (cookie "LPU64")", + "lock (cookie %llu)", dlm_req->lock_handle[i].cookie); continue; } @@ -1643,6 +1682,14 @@ int ldlm_request_cancel(struct ptlrpc_request *req, } pres = res; } + + if ((flags & LATF_STATS) && ldlm_is_ast_sent(lock)) { + long delay = cfs_time_sub(cfs_time_current_sec(), + lock->l_last_activity); + LDLM_DEBUG(lock, "server cancels blocked lock after " + CFS_DURATION_T"s", delay); + at_measured(&lock->l_export->exp_bl_lock_at, delay); + } ldlm_lock_cancel(lock); LDLM_LOCK_PUT(lock); } @@ -1682,12 +1729,11 @@ int ldlm_handle_cancel(struct ptlrpc_request *req) if (rc) RETURN(rc); - if (!ldlm_request_cancel(req, dlm_req, 0)) + if (!ldlm_request_cancel(req, dlm_req, 0, LATF_STATS)) req->rq_status = LUSTRE_ESTALE; RETURN(ptlrpc_reply(req)); } -EXPORT_SYMBOL(ldlm_handle_cancel); #endif /* HAVE_SERVER_SUPPORT */ /** @@ -1749,8 +1795,8 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) { int to = cfs_time_seconds(1); while (to > 0) { - schedule_timeout_and_set_state( - TASK_INTERRUPTIBLE, to); + set_current_state(TASK_INTERRUPTIBLE); + schedule_timeout(to); if (lock->l_granted_mode == lock->l_req_mode || ldlm_is_destroyed(lock)) break; @@ -1773,22 +1819,6 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, lock->l_lvb_len, lvb_len); GOTO(out, rc = -EINVAL); } - } else if (ldlm_has_layout(lock)) { /* for layout lock, lvb has - * variable length */ - void *lvb_data; - - OBD_ALLOC_LARGE(lvb_data, lvb_len); - if (lvb_data == NULL) { - LDLM_ERROR(lock, "No memory: %d.\n", lvb_len); - GOTO(out, rc = -ENOMEM); - } - - lock_res_and_lock(lock); - LASSERT(lock->l_lvb_data == NULL); - lock->l_lvb_type = LVB_T_LAYOUT; - lock->l_lvb_data = lvb_data; - lock->l_lvb_len = lvb_len; - unlock_res_and_lock(lock); } } @@ -1934,7 +1964,7 @@ static int ldlm_callback_reply(struct ptlrpc_request *req, int rc) } static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi, - ldlm_cancel_flags_t cancel_flags) + enum ldlm_cancel_flags cancel_flags) { struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool; ENTRY; @@ -1965,7 +1995,7 @@ static inline void init_blwi(struct ldlm_bl_work_item *blwi, struct ldlm_lock_desc *ld, struct list_head *cancels, int count, struct ldlm_lock *lock, - ldlm_cancel_flags_t cancel_flags) + enum ldlm_cancel_flags cancel_flags) { init_completion(&blwi->blwi_comp); INIT_LIST_HEAD(&blwi->blwi_head); @@ -1999,7 +2029,7 @@ static int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, struct ldlm_lock *lock, struct list_head *cancels, int count, - ldlm_cancel_flags_t cancel_flags) + enum ldlm_cancel_flags cancel_flags) { ENTRY; @@ -2036,11 +2066,17 @@ int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, struct list_head *cancels, int count, - ldlm_cancel_flags_t cancel_flags) + enum ldlm_cancel_flags cancel_flags) { return ldlm_bl_to_thread(ns, ld, NULL, cancels, count, cancel_flags); } +int ldlm_bl_thread_wakeup(void) +{ + wake_up(&ldlm_state->ldlm_bl_pool->blp_waitq); + return 0; +} + /* Setinfo coming from Server (eg MDT) to Client (eg MDC)! */ static int ldlm_handle_setinfo(struct ptlrpc_request *req) { @@ -2086,11 +2122,11 @@ static int ldlm_handle_setinfo(struct ptlrpc_request *req) } static inline void ldlm_callback_errmsg(struct ptlrpc_request *req, - const char *msg, int rc, - struct lustre_handle *handle) + const char *msg, int rc, + const struct lustre_handle *handle) { DEBUG_REQ((req->rq_no_reply || rc) ? D_WARNING : D_DLMTRACE, req, - "%s: [nid %s] [rc %d] [lock "LPX64"]", + "%s: [nid %s] [rc %d] [lock %#llx]", msg, libcfs_id2str(req->rq_peer), rc, handle ? handle->cookie : 0); if (req->rq_no_reply) @@ -2099,23 +2135,6 @@ static inline void ldlm_callback_errmsg(struct ptlrpc_request *req, CWARN("Send reply failed, maybe cause bug 21636.\n"); } -static int ldlm_handle_qc_callback(struct ptlrpc_request *req) -{ - struct obd_quotactl *oqctl; - struct client_obd *cli = &req->rq_export->exp_obd->u.cli; - - oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL); - if (oqctl == NULL) { - CERROR("Can't unpack obd_quotactl\n"); - RETURN(-EPROTO); - } - - oqctl->qc_stat = ptlrpc_status_ntoh(oqctl->qc_stat); - - cli->cl_qchk_stat = oqctl->qc_stat; - return 0; -} - /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */ static int ldlm_callback_handler(struct ptlrpc_request *req) { @@ -2148,8 +2167,11 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) switch (lustre_msg_get_opc(req->rq_reqmsg)) { case LDLM_BL_CALLBACK: - if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET)) + if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET)) { + if (cfs_fail_err) + ldlm_callback_reply(req, -(int)cfs_fail_err); RETURN(0); + } break; case LDLM_CP_CALLBACK: if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CP_CALLBACK_NET)) @@ -2192,13 +2214,6 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) rc = llog_origin_handle_close(req); ldlm_callback_reply(req, rc); RETURN(0); - case OBD_QC_CALLBACK: - req_capsule_set(&req->rq_pill, &RQF_QC_CALLBACK); - if (OBD_FAIL_CHECK(OBD_FAIL_OBD_QC_CALLBACK_NET)) - RETURN(0); - rc = ldlm_handle_qc_callback(req); - ldlm_callback_reply(req, rc); - RETURN(0); default: CERROR("unknown opcode %u\n", lustre_msg_get_opc(req->rq_reqmsg)); @@ -2230,7 +2245,7 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) lock = ldlm_handle2lock_long(&dlm_req->lock_handle[0], 0); if (!lock) { - CDEBUG(D_DLMTRACE, "callback on lock "LPX64" - lock " + CDEBUG(D_DLMTRACE, "callback on lock %#llx - lock " "disappeared\n", dlm_req->lock_handle[0].cookie); rc = ldlm_callback_reply(req, -EINVAL); ldlm_callback_errmsg(req, "Operate with invalid parameter", rc, @@ -2246,28 +2261,27 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) lock_res_and_lock(lock); lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags & LDLM_FL_AST_MASK); - if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) { - /* If somebody cancels lock and cache is already dropped, - * or lock is failed before cp_ast received on client, - * we can tell the server we have no lock. Otherwise, we - * should send cancel after dropping the cache. */ + if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) { + /* If somebody cancels lock and cache is already dropped, + * or lock is failed before cp_ast received on client, + * we can tell the server we have no lock. Otherwise, we + * should send cancel after dropping the cache. */ if ((ldlm_is_canceling(lock) && ldlm_is_bl_done(lock)) || - ldlm_is_failed(lock)) { - LDLM_DEBUG(lock, "callback on lock " - LPX64" - lock disappeared\n", - dlm_req->lock_handle[0].cookie); - unlock_res_and_lock(lock); - LDLM_LOCK_RELEASE(lock); - rc = ldlm_callback_reply(req, -EINVAL); - ldlm_callback_errmsg(req, "Operate on stale lock", rc, - &dlm_req->lock_handle[0]); - RETURN(0); - } + ldlm_is_failed(lock)) { + LDLM_DEBUG(lock, "callback on lock %llx - lock disappeared", + dlm_req->lock_handle[0].cookie); + unlock_res_and_lock(lock); + LDLM_LOCK_RELEASE(lock); + rc = ldlm_callback_reply(req, -EINVAL); + ldlm_callback_errmsg(req, "Operate on stale lock", rc, + &dlm_req->lock_handle[0]); + RETURN(0); + } /* BL_AST locks are not needed in LRU. * Let ldlm_cancel_lru() be fast. */ - ldlm_lock_remove_from_lru(lock); + ldlm_lock_remove_from_lru(lock); ldlm_set_bl_ast(lock); - } + } unlock_res_and_lock(lock); /* We want the ost thread to get this reply so that it can respond @@ -2332,7 +2346,7 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req) struct ldlm_request *dlm_req; CERROR("%s from %s arrived at %lu with bad export cookie " - LPU64"\n", + "%llu\n", ll_opcode2str(lustre_msg_get_opc(req->rq_reqmsg)), libcfs_nid2str(req->rq_peer.nid), req->rq_arrival_time.tv_sec, @@ -2357,7 +2371,8 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req) req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL); CDEBUG(D_INODE, "cancel\n"); if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_NET) || - CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_CANCEL_RESEND)) + CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_CANCEL_RESEND) || + CFS_FAIL_CHECK(OBD_FAIL_LDLM_BL_EVICT)) RETURN(0); rc = ldlm_handle_cancel(req); if (rc) @@ -2391,7 +2406,7 @@ static int ldlm_cancel_hpreq_lock_match(struct ptlrpc_request *req, if (lustre_handle_equal(&dlm_req->lock_handle[i], &lockh)) { DEBUG_REQ(D_RPCTRACE, req, - "Prio raised by lock "LPX64".", lockh.cookie); + "Prio raised by lock %#llx.", lockh.cookie); rc = 1; break; @@ -2458,8 +2473,8 @@ static int ldlm_hpreq_handler(struct ptlrpc_request *req) RETURN(0); } -int ldlm_revoke_lock_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd, - struct hlist_node *hnode, void *data) +static int ldlm_revoke_lock_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd, + struct hlist_node *hnode, void *data) { struct list_head *rpc_list = data; @@ -2508,25 +2523,40 @@ int ldlm_revoke_lock_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd, void ldlm_revoke_export_locks(struct obd_export *exp) { struct list_head rpc_list; - ENTRY; + ENTRY; INIT_LIST_HEAD(&rpc_list); - cfs_hash_for_each_empty(exp->exp_lock_hash, - ldlm_revoke_lock_cb, &rpc_list); - ldlm_run_ast_work(exp->exp_obd->obd_namespace, &rpc_list, - LDLM_WORK_REVOKE_AST); + cfs_hash_for_each_nolock(exp->exp_lock_hash, + ldlm_revoke_lock_cb, &rpc_list, 0); + ldlm_run_ast_work(exp->exp_obd->obd_namespace, &rpc_list, + LDLM_WORK_REVOKE_AST); - EXIT; + EXIT; } EXPORT_SYMBOL(ldlm_revoke_export_locks); #endif /* HAVE_SERVER_SUPPORT */ -static struct ldlm_bl_work_item *ldlm_bl_get_work(struct ldlm_bl_pool *blp) +static int ldlm_bl_get_work(struct ldlm_bl_pool *blp, + struct ldlm_bl_work_item **p_blwi, + struct obd_export **p_exp) { struct ldlm_bl_work_item *blwi = NULL; static unsigned int num_bl = 0; + static unsigned int num_stale; + int num_th = atomic_read(&blp->blp_num_threads); + + *p_exp = obd_stale_export_get(); spin_lock(&blp->blp_lock); + if (*p_exp != NULL) { + if (num_th == 1 || ++num_stale < num_th) { + spin_unlock(&blp->blp_lock); + return 1; + } else { + num_stale = 0; + } + } + /* process a request from the blp_list at least every blp_num_threads */ if (!list_empty(&blp->blp_list) && (list_empty(&blp->blp_prio_list) || num_bl == 0)) @@ -2539,18 +2569,23 @@ static struct ldlm_bl_work_item *ldlm_bl_get_work(struct ldlm_bl_pool *blp) blwi_entry); if (blwi) { - if (++num_bl >= atomic_read(&blp->blp_num_threads)) + if (++num_bl >= num_th) num_bl = 0; list_del(&blwi->blwi_entry); } spin_unlock(&blp->blp_lock); + *p_blwi = blwi; + + if (*p_exp != NULL && *p_blwi != NULL) { + obd_stale_export_put(*p_exp); + *p_exp = NULL; + } - return blwi; + return (*p_blwi != NULL || *p_exp != NULL) ? 1 : 0; } /* This only contains temporary data until the thread starts */ struct ldlm_bl_thread_data { - char bltd_name[CFS_CURPROC_COMM_MAX]; struct ldlm_bl_pool *bltd_blp; struct completion bltd_comp; int bltd_num; @@ -2558,19 +2593,32 @@ struct ldlm_bl_thread_data { static int ldlm_bl_thread_main(void *arg); -static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp) +static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp, bool check_busy) { struct ldlm_bl_thread_data bltd = { .bltd_blp = blp }; struct task_struct *task; init_completion(&bltd.bltd_comp); - bltd.bltd_num = atomic_read(&blp->blp_num_threads); - snprintf(bltd.bltd_name, sizeof(bltd.bltd_name) - 1, - "ldlm_bl_%02d", bltd.bltd_num); - task = kthread_run(ldlm_bl_thread_main, &bltd, bltd.bltd_name); + + bltd.bltd_num = atomic_inc_return(&blp->blp_num_threads); + if (bltd.bltd_num >= blp->blp_max_threads) { + atomic_dec(&blp->blp_num_threads); + return 0; + } + + LASSERTF(bltd.bltd_num > 0, "thread num:%d\n", bltd.bltd_num); + if (check_busy && + atomic_read(&blp->blp_busy_threads) < (bltd.bltd_num - 1)) { + atomic_dec(&blp->blp_num_threads); + return 0; + } + + task = kthread_run(ldlm_bl_thread_main, &bltd, "ldlm_bl_%02d", + bltd.bltd_num); if (IS_ERR(task)) { CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %ld\n", - atomic_read(&blp->blp_num_threads), PTR_ERR(task)); + bltd.bltd_num, PTR_ERR(task)); + atomic_dec(&blp->blp_num_threads); return PTR_ERR(task); } wait_for_completion(&bltd.bltd_comp); @@ -2578,6 +2626,88 @@ static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp) return 0; } +/* Not fatal if racy and have a few too many threads */ +static int ldlm_bl_thread_need_create(struct ldlm_bl_pool *blp, + struct ldlm_bl_work_item *blwi) +{ + if (atomic_read(&blp->blp_num_threads) >= blp->blp_max_threads) + return 0; + + if (atomic_read(&blp->blp_busy_threads) < + atomic_read(&blp->blp_num_threads)) + return 0; + + if (blwi != NULL && (blwi->blwi_ns == NULL || + blwi->blwi_mem_pressure)) + return 0; + + return 1; +} + +static int ldlm_bl_thread_blwi(struct ldlm_bl_pool *blp, + struct ldlm_bl_work_item *blwi) +{ + ENTRY; + + if (blwi->blwi_ns == NULL) + /* added by ldlm_cleanup() */ + RETURN(LDLM_ITER_STOP); + + if (blwi->blwi_mem_pressure) + memory_pressure_set(); + + OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL2, 4); + + if (blwi->blwi_count) { + int count; + /* The special case when we cancel locks in lru + * asynchronously, we pass the list of locks here. + * Thus locks are marked LDLM_FL_CANCELING, but NOT + * canceled locally yet. */ + count = ldlm_cli_cancel_list_local(&blwi->blwi_head, + blwi->blwi_count, + LCF_BL_AST); + ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL, + blwi->blwi_flags); + } else { + ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld, + blwi->blwi_lock); + } + if (blwi->blwi_mem_pressure) + memory_pressure_clr(); + + if (blwi->blwi_flags & LCF_ASYNC) + OBD_FREE(blwi, sizeof(*blwi)); + else + complete(&blwi->blwi_comp); + + RETURN(0); +} + +/** + * Cancel stale locks on export. Cancel blocked locks first. + * If the given export has blocked locks, the next in the list may have + * them too, thus cancel not blocked locks only if the current export has + * no blocked locks. + **/ +static int ldlm_bl_thread_exports(struct ldlm_bl_pool *blp, + struct obd_export *exp) +{ + int num; + ENTRY; + + OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_BL_EVICT, 4); + + num = ldlm_export_cancel_blocked_locks(exp); + if (num == 0) + ldlm_export_cancel_locks(exp); + + obd_stale_export_put(exp); + + RETURN(0); +} + + /** * Main blocking requests processing thread. * @@ -2593,70 +2723,39 @@ static int ldlm_bl_thread_main(void *arg) blp = bltd->bltd_blp; - atomic_inc(&blp->blp_num_threads); - atomic_inc(&blp->blp_busy_threads); - complete(&bltd->bltd_comp); /* cannot use bltd after this, it is only on caller's stack */ while (1) { struct l_wait_info lwi = { 0 }; struct ldlm_bl_work_item *blwi = NULL; - int busy; + struct obd_export *exp = NULL; + int rc; - blwi = ldlm_bl_get_work(blp); + rc = ldlm_bl_get_work(blp, &blwi, &exp); - if (blwi == NULL) { - atomic_dec(&blp->blp_busy_threads); + if (rc == 0) l_wait_event_exclusive(blp->blp_waitq, - (blwi = ldlm_bl_get_work(blp)) != NULL, - &lwi); - busy = atomic_inc_return(&blp->blp_busy_threads); - } else { - busy = atomic_read(&blp->blp_busy_threads); - } + ldlm_bl_get_work(blp, &blwi, + &exp), + &lwi); + atomic_inc(&blp->blp_busy_threads); - if (blwi->blwi_ns == NULL) - /* added by ldlm_cleanup() */ - break; - - /* Not fatal if racy and have a few too many threads */ - if (unlikely(busy < blp->blp_max_threads && - busy >= atomic_read(&blp->blp_num_threads) && - !blwi->blwi_mem_pressure)) + if (ldlm_bl_thread_need_create(blp, blwi)) /* discard the return value, we tried */ - ldlm_bl_thread_start(blp); - - if (blwi->blwi_mem_pressure) - memory_pressure_set(); - - OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL2, 4); - - if (blwi->blwi_count) { - int count; - /* The special case when we cancel locks in LRU - * asynchronously, we pass the list of locks here. - * Thus locks are marked LDLM_FL_CANCELING, but NOT - * canceled locally yet. */ - count = ldlm_cli_cancel_list_local(&blwi->blwi_head, - blwi->blwi_count, - LCF_BL_AST); - ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL, - blwi->blwi_flags); - } else { - ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld, - blwi->blwi_lock); - } - if (blwi->blwi_mem_pressure) - memory_pressure_clr(); + ldlm_bl_thread_start(blp, true); - if (blwi->blwi_flags & LCF_ASYNC) - OBD_FREE(blwi, sizeof(*blwi)); - else - complete(&blwi->blwi_comp); + if (exp) + rc = ldlm_bl_thread_exports(blp, exp); + else if (blwi) + rc = ldlm_bl_thread_blwi(blp, blwi); + + atomic_dec(&blp->blp_busy_threads); + + if (rc == LDLM_ITER_STOP) + break; } - atomic_dec(&blp->blp_busy_threads); atomic_dec(&blp->blp_num_threads); complete(&blp->blp_comp); RETURN(0); @@ -2680,7 +2779,6 @@ int ldlm_get_ref(void) RETURN(rc); } -EXPORT_SYMBOL(ldlm_get_ref); void ldlm_put_ref(void) { @@ -2699,13 +2797,12 @@ void ldlm_put_ref(void) EXIT; } -EXPORT_SYMBOL(ldlm_put_ref); /* * Export handle<->lock hash operations. */ static unsigned -ldlm_export_lock_hash(cfs_hash_t *hs, const void *key, unsigned mask) +ldlm_export_lock_hash(struct cfs_hash *hs, const void *key, unsigned mask) { return cfs_hash_u64_hash(((struct lustre_handle *)key)->cookie, mask); } @@ -2741,7 +2838,7 @@ ldlm_export_lock_object(struct hlist_node *hnode) } static void -ldlm_export_lock_get(cfs_hash_t *hs, struct hlist_node *hnode) +ldlm_export_lock_get(struct cfs_hash *hs, struct hlist_node *hnode) { struct ldlm_lock *lock; @@ -2750,7 +2847,7 @@ ldlm_export_lock_get(cfs_hash_t *hs, struct hlist_node *hnode) } static void -ldlm_export_lock_put(cfs_hash_t *hs, struct hlist_node *hnode) +ldlm_export_lock_put(struct cfs_hash *hs, struct hlist_node *hnode) { struct ldlm_lock *lock; @@ -2758,7 +2855,7 @@ ldlm_export_lock_put(cfs_hash_t *hs, struct hlist_node *hnode) LDLM_LOCK_RELEASE(lock); } -static cfs_hash_ops_t ldlm_export_lock_ops = { +static struct cfs_hash_ops ldlm_export_lock_ops = { .hs_hash = ldlm_export_lock_hash, .hs_key = ldlm_export_lock_key, .hs_keycmp = ldlm_export_lock_keycmp, @@ -2828,11 +2925,11 @@ static int ldlm_setup(void) if (ldlm_state == NULL) RETURN(-ENOMEM); -#ifdef LPROCFS +#ifdef CONFIG_PROC_FS rc = ldlm_proc_setup(); if (rc != 0) GOTO(out, rc); -#endif /* LPROCFS */ +#endif /* CONFIG_PROC_FS */ memset(&conf, 0, sizeof(conf)); conf = (typeof(conf)) { @@ -2938,7 +3035,7 @@ static int ldlm_setup(void) } for (i = 0; i < blp->blp_min_threads; i++) { - rc = ldlm_bl_thread_start(blp); + rc = ldlm_bl_thread_start(blp, false); if (rc < 0) GOTO(out, rc); } @@ -2950,7 +3047,7 @@ static int ldlm_setup(void) INIT_LIST_HEAD(&waiting_locks_list); spin_lock_init(&waiting_locks_spinlock); - cfs_timer_init(&waiting_locks_timer, waiting_locks_callback, 0); + cfs_timer_init(&waiting_locks_timer, waiting_locks_callback, NULL); task = kthread_run(expired_lock_main, NULL, "ldlm_elt"); if (IS_ERR(task)) { @@ -2968,6 +3065,12 @@ static int ldlm_setup(void) CERROR("Failed to initialize LDLM pools: %d\n", rc); GOTO(out, rc); } + + rc = ldlm_reclaim_setup(); + if (rc) { + CERROR("Failed to setup reclaim thread: rc = %d\n", rc); + GOTO(out, rc); + } RETURN(0); out: @@ -2987,7 +3090,8 @@ static int ldlm_cleanup(void) RETURN(-EBUSY); } - ldlm_pools_fini(); + ldlm_reclaim_cleanup(); + ldlm_pools_fini(); if (ldlm_state->ldlm_bl_pool != NULL) { struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool; @@ -3051,23 +3155,34 @@ int ldlm_init(void) ldlm_lock_slab = kmem_cache_create("ldlm_locks", sizeof(struct ldlm_lock), 0, SLAB_HWCACHE_ALIGN | SLAB_DESTROY_BY_RCU, NULL); - if (ldlm_lock_slab == NULL) { - kmem_cache_destroy(ldlm_resource_slab); - return -ENOMEM; - } + if (ldlm_lock_slab == NULL) + goto out_resource; ldlm_interval_slab = kmem_cache_create("interval_node", sizeof(struct ldlm_interval), 0, SLAB_HWCACHE_ALIGN, NULL); - if (ldlm_interval_slab == NULL) { - kmem_cache_destroy(ldlm_resource_slab); - kmem_cache_destroy(ldlm_lock_slab); - return -ENOMEM; - } + if (ldlm_interval_slab == NULL) + goto out_lock; + + ldlm_interval_tree_slab = kmem_cache_create("interval_tree", + sizeof(struct ldlm_interval_tree) * LCK_MODE_NUM, + 0, SLAB_HWCACHE_ALIGN, NULL); + if (ldlm_interval_tree_slab == NULL) + goto out_interval; + #if LUSTRE_TRACKS_LOCK_EXP_REFS - class_export_dump_hook = ldlm_dump_export_locks; + class_export_dump_hook = ldlm_dump_export_locks; #endif - return 0; + return 0; + +out_interval: + kmem_cache_destroy(ldlm_interval_slab); +out_lock: + kmem_cache_destroy(ldlm_lock_slab); +out_resource: + kmem_cache_destroy(ldlm_resource_slab); + + return -ENOMEM; } void ldlm_exit(void) @@ -3081,4 +3196,5 @@ void ldlm_exit(void) synchronize_rcu(); kmem_cache_destroy(ldlm_lock_slab); kmem_cache_destroy(ldlm_interval_slab); + kmem_cache_destroy(ldlm_interval_tree_slab); }