X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fldlm%2Fldlm_lockd.c;h=651eebe40e881b48c4f17abac5e631b9ff6e42fa;hb=eda984e7cb4e6a97310ed0f5e81f398dc48b56bf;hp=7f70349c4dcf662677d82a5b5e4d4ee8fe3f01e9;hpb=4de90170e2573321e7691364d1d527aedfd25ff9;p=fs%2Flustre-release.git diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 7f70349..651eebe 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -15,11 +15,7 @@ * * You should have received a copy of the GNU General Public License * version 2 along with this program; If not, see - * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. + * http://www.gnu.org/licenses/gpl-2.0.html * * GPL HEADER END */ @@ -27,7 +23,7 @@ * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2010, 2014, Intel Corporation. + * Copyright (c) 2010, 2016, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -41,33 +37,34 @@ #define DEBUG_SUBSYSTEM S_LDLM +#include +#include #include +#include #include #include -#include #include "ldlm_internal.h" static int ldlm_num_threads; -CFS_MODULE_PARM(ldlm_num_threads, "i", int, 0444, - "number of DLM service threads to start"); +module_param(ldlm_num_threads, int, 0444); +MODULE_PARM_DESC(ldlm_num_threads, "number of DLM service threads to start"); static char *ldlm_cpts; -CFS_MODULE_PARM(ldlm_cpts, "s", charp, 0444, - "CPU partitions ldlm threads should run on"); +module_param(ldlm_cpts, charp, 0444); +MODULE_PARM_DESC(ldlm_cpts, "CPU partitions ldlm threads should run on"); static struct mutex ldlm_ref_mutex; static int ldlm_refcount; -struct ldlm_cb_async_args { - struct ldlm_cb_set_arg *ca_set_arg; - struct ldlm_lock *ca_lock; -}; +struct kobject *ldlm_kobj; +struct kset *ldlm_ns_kset; +struct kset *ldlm_svc_kset; /* LDLM state */ static struct ldlm_state *ldlm_state; -inline cfs_time_t round_timeout(cfs_time_t timeout) +static inline cfs_time_t round_timeout(cfs_time_t timeout) { return cfs_time_seconds((int)cfs_duration_sec(cfs_time_sub(timeout, 0)) + 1); } @@ -110,15 +107,15 @@ struct ldlm_bl_pool { }; struct ldlm_bl_work_item { - struct list_head blwi_entry; - struct ldlm_namespace *blwi_ns; - struct ldlm_lock_desc blwi_ld; - struct ldlm_lock *blwi_lock; - struct list_head blwi_head; - int blwi_count; - struct completion blwi_comp; - ldlm_cancel_flags_t blwi_flags; - int blwi_mem_pressure; + struct list_head blwi_entry; + struct ldlm_namespace *blwi_ns; + struct ldlm_lock_desc blwi_ld; + struct ldlm_lock *blwi_lock; + struct list_head blwi_head; + int blwi_count; + struct completion blwi_comp; + enum ldlm_cancel_flags blwi_flags; + int blwi_mem_pressure; }; #ifdef HAVE_SERVER_SUPPORT @@ -183,15 +180,10 @@ static int expired_lock_main(void *arg) spin_lock_bh(&waiting_locks_spinlock); if (expired_lock_thread.elt_dump) { - struct libcfs_debug_msg_data msgdata = { - .msg_file = __FILE__, - .msg_fn = "waiting_locks_callback", - .msg_line = expired_lock_thread.elt_dump }; spin_unlock_bh(&waiting_locks_spinlock); /* from waiting_locks_callback, but not in timer */ libcfs_debug_dumplog(); - libcfs_run_lbug_upcall(&msgdata); spin_lock_bh(&waiting_locks_spinlock); expired_lock_thread.elt_dump = 0; @@ -205,7 +197,7 @@ static int expired_lock_main(void *arg) lock = list_entry(expired->next, struct ldlm_lock, l_pending_chain); - if ((void *)lock < LP_POISON + PAGE_CACHE_SIZE && + if ((void *)lock < LP_POISON + PAGE_SIZE && (void *)lock >= LP_POISON) { spin_unlock_bh(&waiting_locks_spinlock); CERROR("free lock on elt list %p\n", lock); @@ -213,7 +205,7 @@ static int expired_lock_main(void *arg) } list_del_init(&lock->l_pending_chain); if ((void *)lock->l_export < - LP_POISON + PAGE_CACHE_SIZE && + LP_POISON + PAGE_SIZE && (void *)lock->l_export >= LP_POISON) { CERROR("lock with free export on elt list %p\n", lock->l_export); @@ -334,9 +326,9 @@ static void waiting_locks_callback(unsigned long unused) continue; } ldlm_lock_to_ns(lock)->ns_timeouts++; - LDLM_ERROR(lock, "lock callback timer expired after %lds: " + LDLM_ERROR(lock, "lock callback timer expired after %llds: " "evicting client at %s ", - cfs_time_current_sec() - lock->l_last_activity, + ktime_get_real_seconds() - lock->l_last_activity, libcfs_nid2str( lock->l_export->exp_connection->c_peer.nid)); @@ -365,7 +357,7 @@ static void waiting_locks_callback(unsigned long unused) lock = list_entry(waiting_locks_list.next, struct ldlm_lock, l_pending_chain); timeout_rounded = (cfs_time_t)round_timeout(lock->l_callback_timeout); - cfs_timer_arm(&waiting_locks_timer, timeout_rounded); + mod_timer(&waiting_locks_timer, timeout_rounded); } spin_unlock_bh(&waiting_locks_spinlock); } @@ -400,10 +392,9 @@ static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, int seconds) timeout_rounded = round_timeout(lock->l_callback_timeout); - if (cfs_time_before(timeout_rounded, - cfs_timer_deadline(&waiting_locks_timer)) || - !cfs_timer_is_armed(&waiting_locks_timer)) { - cfs_timer_arm(&waiting_locks_timer, timeout_rounded); + if (cfs_time_before(timeout_rounded, waiting_locks_timer.expires) || + !timer_pending(&waiting_locks_timer)) { + mod_timer(&waiting_locks_timer, timeout_rounded); } /* if the new lock has a shorter timeout than something earlier on the list, we'll wait the longer amount of time; no big deal. */ @@ -412,6 +403,28 @@ static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, int seconds) return 1; } +static void ldlm_add_blocked_lock(struct ldlm_lock *lock) +{ + spin_lock_bh(&lock->l_export->exp_bl_list_lock); + if (list_empty(&lock->l_exp_list)) { + if (lock->l_granted_mode != lock->l_req_mode) + list_add_tail(&lock->l_exp_list, + &lock->l_export->exp_bl_list); + else + list_add(&lock->l_exp_list, + &lock->l_export->exp_bl_list); + } + spin_unlock_bh(&lock->l_export->exp_bl_list_lock); + + /* A blocked lock is added. Adjust the position in + * the stale list if the export is in the list. + * If export is stale and not in the list - it is being + * processed and will be placed on the right position + * on obd_stale_export_put(). */ + if (!list_empty(&lock->l_export->exp_stale_list)) + obd_stale_export_adjust(lock->l_export); +} + static int ldlm_add_waiting_lock(struct ldlm_lock *lock) { int ret; @@ -419,13 +432,23 @@ static int ldlm_add_waiting_lock(struct ldlm_lock *lock) /* NB: must be called with hold of lock_res_and_lock() */ LASSERT(ldlm_is_res_locked(lock)); - ldlm_set_waited(lock); - LASSERT(!ldlm_is_cancel_on_block(lock)); + /* Do not put cross-MDT lock in the waiting list, since we + * will not evict it due to timeout for now */ + if (lock->l_export != NULL && + (exp_connect_flags(lock->l_export) & OBD_CONNECT_MDS_MDS)) + return 0; + spin_lock_bh(&waiting_locks_spinlock); + if (ldlm_is_cancel(lock)) { + spin_unlock_bh(&waiting_locks_spinlock); + return 0; + } + if (ldlm_is_destroyed(lock)) { static cfs_time_t next; + spin_unlock_bh(&waiting_locks_spinlock); LDLM_ERROR(lock, "not waiting on destroyed lock (bug 5653)"); if (cfs_time_after(cfs_time_current(), next)) { @@ -435,7 +458,8 @@ static int ldlm_add_waiting_lock(struct ldlm_lock *lock) return 0; } - lock->l_last_activity = cfs_time_current_sec(); + ldlm_set_waited(lock); + lock->l_last_activity = ktime_get_real_seconds(); ret = __ldlm_add_waiting_lock(lock, timeout); if (ret) { /* grab ref on the lock if it has been added to the @@ -444,13 +468,8 @@ static int ldlm_add_waiting_lock(struct ldlm_lock *lock) } spin_unlock_bh(&waiting_locks_spinlock); - if (ret) { - spin_lock_bh(&lock->l_export->exp_bl_list_lock); - if (list_empty(&lock->l_exp_list)) - list_add(&lock->l_exp_list, - &lock->l_export->exp_bl_list); - spin_unlock_bh(&lock->l_export->exp_bl_list_lock); - } + if (ret) + ldlm_add_blocked_lock(lock); LDLM_DEBUG(lock, "%sadding to wait list(timeout: %d, AT: %s)", ret == 0 ? "not re-" : "", timeout, @@ -479,13 +498,13 @@ static int __ldlm_del_waiting_lock(struct ldlm_lock *lock) /* Removing the head of the list, adjust timer. */ if (list_next == &waiting_locks_list) { /* No more, just cancel. */ - cfs_timer_disarm(&waiting_locks_timer); + del_timer(&waiting_locks_timer); } else { struct ldlm_lock *next; next = list_entry(list_next, struct ldlm_lock, l_pending_chain); - cfs_timer_arm(&waiting_locks_timer, - round_timeout(next->l_callback_timeout)); + mod_timer(&waiting_locks_timer, + round_timeout(next->l_callback_timeout)); } } list_del_init(&lock->l_pending_chain); @@ -505,6 +524,7 @@ int ldlm_del_waiting_lock(struct ldlm_lock *lock) spin_lock_bh(&waiting_locks_spinlock); ret = __ldlm_del_waiting_lock(lock); + ldlm_clear_waited(lock); spin_unlock_bh(&waiting_locks_spinlock); /* remove the lock out of export blocking list */ @@ -521,7 +541,6 @@ int ldlm_del_waiting_lock(struct ldlm_lock *lock) LDLM_DEBUG(lock, "%s", ret == 0 ? "wasn't waiting" : "removed"); return ret; } -EXPORT_SYMBOL(ldlm_del_waiting_lock); /** * Prolong the contended lock waiting time. @@ -536,6 +555,12 @@ int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout) return 0; } + if (exp_connect_flags(lock->l_export) & OBD_CONNECT_MDS_MDS) { + /* We don't have a "waiting locks list" on OSP. */ + LDLM_DEBUG(lock, "MDS-MDS lock: no-op"); + return 0; + } + spin_lock_bh(&waiting_locks_spinlock); if (list_empty(&lock->l_pending_chain)) { @@ -624,31 +649,50 @@ static void ldlm_failed_ast(struct ldlm_lock *lock, int rc, * Perform lock cleanup if AST reply came with error. */ static int ldlm_handle_ast_error(struct ldlm_lock *lock, - struct ptlrpc_request *req, int rc, - const char *ast_type) + struct ptlrpc_request *req, int rc, + const char *ast_type) { - lnet_process_id_t peer = req->rq_import->imp_connection->c_peer; + struct lnet_process_id peer = req->rq_import->imp_connection->c_peer; if (!req->rq_replied || (rc && rc != -EINVAL)) { if (lock->l_export && lock->l_export->exp_libclient) { - LDLM_DEBUG(lock, "%s AST to liblustre client (nid %s)" - " timeout, just cancelling lock", ast_type, + LDLM_DEBUG(lock, + "%s AST (req@%p x%llu) to liblustre client (nid %s) timeout, just cancelling lock", + ast_type, req, req->rq_xid, libcfs_nid2str(peer.nid)); ldlm_lock_cancel(lock); rc = -ERESTART; } else if (ldlm_is_cancel(lock)) { - LDLM_DEBUG(lock, "%s AST timeout from nid %s, but " - "cancel was received (AST reply lost?)", - ast_type, libcfs_nid2str(peer.nid)); + LDLM_DEBUG(lock, + "%s AST (req@%p x%llu) timeout from nid %s, but cancel was received (AST reply lost?)", + ast_type, req, req->rq_xid, + libcfs_nid2str(peer.nid)); ldlm_lock_cancel(lock); rc = -ERESTART; + } else if (rc == -ENODEV || rc == -ESHUTDOWN || + (rc == -EIO && + req->rq_import->imp_state == LUSTRE_IMP_CLOSED)) { + /* Upon umount process the AST fails because cannot be + * sent. This shouldn't lead to the client eviction. + * -ENODEV error is returned by ptl_send_rpc() for + * new request in such import. + * -SHUTDOWN is returned by ptlrpc_import_delay_req() + * if imp_invalid is set or obd_no_recov. + * Meanwhile there is also check for LUSTRE_IMP_CLOSED + * in ptlrpc_import_delay_req() as well with -EIO code. + * In all such cases errors are ignored. + */ + LDLM_DEBUG(lock, "%s AST can't be sent due to a server" + " %s failure or umount process: rc = %d\n", + ast_type, + req->rq_import->imp_obd->obd_name, rc); } else { - LDLM_ERROR(lock, "client (nid %s) %s %s AST " - "(req status %d rc %d), evict it", + LDLM_ERROR(lock, + "client (nid %s) %s %s AST (req@%p x%llu status %d rc %d), evict it", libcfs_nid2str(peer.nid), req->rq_replied ? "returned error from" : "failed to reply to", - ast_type, + ast_type, req, req->rq_xid, (req->rq_repmsg != NULL) ? lustre_msg_get_status(req->rq_repmsg) : 0, rc); @@ -660,12 +704,12 @@ static int ldlm_handle_ast_error(struct ldlm_lock *lock, if (rc == -EINVAL) { struct ldlm_resource *res = lock->l_resource; - LDLM_DEBUG(lock, "client (nid %s) returned %d" - " from %s AST - normal race", + LDLM_DEBUG(lock, + "client (nid %s) returned %d from %s AST (req@%p x%llu) - normal race", libcfs_nid2str(peer.nid), req->rq_repmsg ? lustre_msg_get_status(req->rq_repmsg) : -1, - ast_type); + ast_type, req, req->rq_xid); if (res) { /* update lvbo to return proper attributes. * see bug 23174 */ @@ -700,7 +744,9 @@ static int ldlm_cb_interpret(const struct lu_env *env, * - Glimpse callback of remote lock might return * -ELDLM_NO_LOCK_DATA when inode is cleared. LU-274 */ - if (rc == -ELDLM_NO_LOCK_DATA) { + if (unlikely(arg->gl_interpret_reply)) { + rc = arg->gl_interpret_reply(env, req, data, rc); + } else if (rc == -ELDLM_NO_LOCK_DATA) { LDLM_DEBUG(lock, "lost race - client has a lock but no " "inode"); ldlm_res_lvbo_update(lock->l_resource, NULL, 1); @@ -817,6 +863,11 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, /* Don't need to do anything here. */ RETURN(0); + if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_SRV_BL_AST)) { + LDLM_DEBUG(lock, "dropping BL AST"); + RETURN(0); + } + LASSERT(lock); LASSERT(data != NULL); if (lock->l_export->exp_obd->obd_recovering != 0) @@ -838,20 +889,22 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, req->rq_interpret_reply = ldlm_cb_interpret; lock_res_and_lock(lock); - if (lock->l_granted_mode != lock->l_req_mode) { - /* this blocking AST will be communicated as part of the - * completion AST instead */ + if (ldlm_is_destroyed(lock)) { + /* What's the point? */ unlock_res_and_lock(lock); - ptlrpc_req_finished(req); - LDLM_DEBUG(lock, "lock not granted, not sending blocking AST"); RETURN(0); } - if (ldlm_is_destroyed(lock)) { - /* What's the point? */ + if (lock->l_granted_mode != lock->l_req_mode) { + /* this blocking AST will be communicated as part of the + * completion AST instead */ + ldlm_add_blocked_lock(lock); + ldlm_set_waited(lock); unlock_res_and_lock(lock); + ptlrpc_req_finished(req); + LDLM_DEBUG(lock, "lock not granted, not sending blocking AST"); RETURN(0); } @@ -866,6 +919,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, LDLM_DEBUG(lock, "server preparing blocking AST"); ptlrpc_request_set_replen(req); + ldlm_set_cbpending(lock); if (instant_cancel) { unlock_res_and_lock(lock); ldlm_lock_cancel(lock); @@ -886,7 +940,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, if (AT_OFF) req->rq_timeout = ldlm_get_rq_timeout(); - lock->l_last_activity = cfs_time_current_sec(); + lock->l_last_activity = ktime_get_real_seconds(); if (lock->l_export && lock->l_export->exp_nid_stats && lock->l_export->exp_nid_stats->nid_ldlm_stats) @@ -897,7 +951,6 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, RETURN(rc); } -EXPORT_SYMBOL(ldlm_server_blocking_ast); /** * ->l_completion_ast callback for a remote lock in server namespace. @@ -977,7 +1030,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data) } } - lock->l_last_activity = cfs_time_current_sec(); + lock->l_last_activity = ktime_get_real_seconds(); LDLM_DEBUG(lock, "server preparing completion AST"); @@ -1029,7 +1082,6 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data) RETURN(lvb_len < 0 ? lvb_len : rc); } -EXPORT_SYMBOL(ldlm_server_completion_ast); /** * Server side ->l_glimpse_ast handler for client locks. @@ -1088,7 +1140,7 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data) if (AT_OFF) req->rq_timeout = ldlm_get_rq_timeout(); - lock->l_last_activity = cfs_time_current_sec(); + lock->l_last_activity = ktime_get_real_seconds(); req->rq_interpret_reply = ldlm_cb_interpret; @@ -1101,7 +1153,6 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data) RETURN(rc); } -EXPORT_SYMBOL(ldlm_server_glimpse_ast); int ldlm_glimpse_locks(struct ldlm_resource *res, struct list_head *gl_work_list) @@ -1173,25 +1224,25 @@ static void ldlm_svc_get_eopc(const struct ldlm_request *dlm_req, * service threads to carry out client lock enqueueing requests. */ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, - struct ptlrpc_request *req, - const struct ldlm_request *dlm_req, - const struct ldlm_callback_suite *cbs) + struct ptlrpc_request *req, + const struct ldlm_request *dlm_req, + const struct ldlm_callback_suite *cbs) { - struct ldlm_reply *dlm_rep; + struct ldlm_reply *dlm_rep; __u64 flags; - ldlm_error_t err = ELDLM_OK; - struct ldlm_lock *lock = NULL; - void *cookie = NULL; - int rc = 0; + enum ldlm_error err = ELDLM_OK; + struct ldlm_lock *lock = NULL; + void *cookie = NULL; + int rc = 0; struct ldlm_resource *res = NULL; - ENTRY; + ENTRY; - LDLM_DEBUG_NOLOCK("server-side enqueue handler START"); + LDLM_DEBUG_NOLOCK("server-side enqueue handler START"); ldlm_request_cancel(req, dlm_req, LDLM_ENQUEUE_CANCEL_OFF, LATF_SKIP); flags = ldlm_flags_from_wire(dlm_req->lock_flags); - LASSERT(req->rq_export); + LASSERT(req->rq_export); if (ptlrpc_req2svc(req)->srv_stats != NULL) ldlm_svc_get_eopc(dlm_req, ptlrpc_req2svc(req)->srv_stats); @@ -1231,23 +1282,6 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, GOTO(out, rc = -EPROTO); } -#if 0 - /* FIXME this makes it impossible to use LDLM_PLAIN locks -- check - against server's _CONNECT_SUPPORTED flags? (I don't want to use - ibits for mgc/mgs) */ - - /* INODEBITS_INTEROP: Perform conversion from plain lock to - * inodebits lock if client does not support them. */ - if (!(exp_connect_flags(req->rq_export) & OBD_CONNECT_IBITS) && - (dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN)) { - dlm_req->lock_desc.l_resource.lr_type = LDLM_IBITS; - dlm_req->lock_desc.l_policy_data.l_inodebits.bits = - MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE; - if (dlm_req->lock_desc.l_req_mode == LCK_PR) - dlm_req->lock_desc.l_req_mode = LCK_CR; - } -#endif - if (unlikely((flags & LDLM_FL_REPLAY) || (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))) { /* Find an existing lock in the per-export lock hash */ @@ -1257,12 +1291,19 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, lock = cfs_hash_lookup(req->rq_export->exp_lock_hash, (void *)&dlm_req->lock_handle[0]); if (lock != NULL) { - DEBUG_REQ(D_DLMTRACE, req, "found existing lock cookie " - LPX64, lock->l_handle.h_cookie); + DEBUG_REQ(D_DLMTRACE, req, "found existing lock cookie %#llx", + lock->l_handle.h_cookie); flags |= LDLM_FL_RESENT; GOTO(existing_lock, rc = 0); } - } + } else { + if (ldlm_reclaim_full()) { + DEBUG_REQ(D_DLMTRACE, req, "Too many granted locks, " + "reject current enqueue request and let the " + "client retry later.\n"); + GOTO(out, rc = -EINPROGRESS); + } + } /* The lock's callback data might be set in the policy function */ lock = ldlm_lock_create(ns, &dlm_req->lock_desc.l_resource.lr_name, @@ -1314,6 +1355,14 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, * without them. */ lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags & LDLM_FL_INHERIT_MASK); + + ldlm_convert_policy_to_local(req->rq_export, + dlm_req->lock_desc.l_resource.lr_type, + &dlm_req->lock_desc.l_policy_data, + &lock->l_policy_data); + if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) + lock->l_req_extent = lock->l_policy_data.l_extent; + existing_lock: if (flags & LDLM_FL_HAS_INTENT) { @@ -1335,14 +1384,6 @@ existing_lock: GOTO(out, rc); } - if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN) - ldlm_convert_policy_to_local(req->rq_export, - dlm_req->lock_desc.l_resource.lr_type, - &dlm_req->lock_desc.l_policy_data, - &lock->l_policy_data); - if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) - lock->l_req_extent = lock->l_policy_data.l_extent; - err = ldlm_lock_enqueue(ns, &lock, cookie, &flags); if (err) { if ((int)err < 0) @@ -1355,6 +1396,9 @@ existing_lock: ldlm_lock2desc(lock, &dlm_rep->lock_desc); ldlm_lock2handle(lock, &dlm_rep->lock_handle); + if (lock && lock->l_resource->lr_type == LDLM_EXTENT) + OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_BL_EVICT, 6); + /* We never send a blocking AST until the lock is granted, but * we can tell it right now */ lock_res_and_lock(lock); @@ -1397,20 +1441,20 @@ existing_lock: if (unlikely(!ldlm_is_cancel_on_block(lock) || !(dlm_rep->lock_flags & LDLM_FL_CANCEL_ON_BLOCK))){ CERROR("Granting sync lock to libclient. " - "req fl %d, rep fl %d, lock fl "LPX64"\n", + "req fl %d, rep fl %d, lock fl %#llx\n", dlm_req->lock_flags, dlm_rep->lock_flags, lock->l_flags); LDLM_ERROR(lock, "sync lock"); - if (dlm_req->lock_flags & LDLM_FL_HAS_INTENT) { - struct ldlm_intent *it; - - it = req_capsule_client_get(&req->rq_pill, - &RMF_LDLM_INTENT); - if (it != NULL) { - CERROR("This is intent %s ("LPU64")\n", - ldlm_it2str(it->opc), it->opc); - } - } + if (dlm_req->lock_flags & LDLM_FL_HAS_INTENT) { + struct ldlm_intent *it; + + it = req_capsule_client_get(&req->rq_pill, + &RMF_LDLM_INTENT); + if (it != NULL) { + CERROR("This is intent %s (%llu)\n", + ldlm_it2str(it->opc), it->opc); + } + } } } @@ -1471,12 +1515,17 @@ existing_lock: } } - if (rc != 0) { - lock_res_and_lock(lock); - ldlm_resource_unlink_lock(lock); - ldlm_lock_destroy_nolock(lock); - unlock_res_and_lock(lock); - } + if (rc != 0 && !(flags & LDLM_FL_RESENT)) { + if (lock->l_export) { + ldlm_lock_cancel(lock); + } else { + lock_res_and_lock(lock); + ldlm_resource_unlink_lock(lock); + ldlm_lock_destroy_nolock(lock); + unlock_res_and_lock(lock); + + } + } if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK) ldlm_reprocess_all(lock->l_resource); @@ -1489,7 +1538,6 @@ existing_lock: return rc; } -EXPORT_SYMBOL(ldlm_handle_enqueue0); /** * Old-style LDLM main entry point for server code enqueue. @@ -1516,7 +1564,6 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req, } return rc; } -EXPORT_SYMBOL(ldlm_handle_enqueue); /** * Main LDLM entry point for server code to process lock conversion requests. @@ -1570,7 +1617,6 @@ int ldlm_handle_convert0(struct ptlrpc_request *req, RETURN(0); } -EXPORT_SYMBOL(ldlm_handle_convert0); /** * Old-style main LDLM entry point for server code to process lock conversion @@ -1590,7 +1636,6 @@ int ldlm_handle_convert(struct ptlrpc_request *req) } return rc; } -EXPORT_SYMBOL(ldlm_handle_convert); /** * Cancel all the locks whose handles are packed into ldlm_request @@ -1626,7 +1671,7 @@ int ldlm_request_cancel(struct ptlrpc_request *req, lock = ldlm_handle2lock(&dlm_req->lock_handle[i]); if (!lock) { LDLM_DEBUG_NOLOCK("server-side cancel handler stale " - "lock (cookie "LPU64")", + "lock (cookie %llu)", dlm_req->lock_handle[i].cookie); continue; } @@ -1652,10 +1697,10 @@ int ldlm_request_cancel(struct ptlrpc_request *req, } if ((flags & LATF_STATS) && ldlm_is_ast_sent(lock)) { - long delay = cfs_time_sub(cfs_time_current_sec(), - lock->l_last_activity); - LDLM_DEBUG(lock, "server cancels blocked lock after " - CFS_DURATION_T"s", delay); + time64_t delay = ktime_get_real_seconds() - + lock->l_last_activity; + LDLM_DEBUG(lock, "server cancels blocked lock after %llds", + (s64)delay); at_measured(&lock->l_export->exp_bl_lock_at, delay); } ldlm_lock_cancel(lock); @@ -1702,7 +1747,6 @@ int ldlm_handle_cancel(struct ptlrpc_request *req) RETURN(ptlrpc_reply(req)); } -EXPORT_SYMBOL(ldlm_handle_cancel); #endif /* HAVE_SERVER_SUPPORT */ /** @@ -1764,8 +1808,8 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) { int to = cfs_time_seconds(1); while (to > 0) { - schedule_timeout_and_set_state( - TASK_INTERRUPTIBLE, to); + set_current_state(TASK_INTERRUPTIBLE); + schedule_timeout(to); if (lock->l_granted_mode == lock->l_req_mode || ldlm_is_destroyed(lock)) break; @@ -1788,22 +1832,6 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, lock->l_lvb_len, lvb_len); GOTO(out, rc = -EINVAL); } - } else if (ldlm_has_layout(lock)) { /* for layout lock, lvb has - * variable length */ - void *lvb_data; - - OBD_ALLOC_LARGE(lvb_data, lvb_len); - if (lvb_data == NULL) { - LDLM_ERROR(lock, "No memory: %d.\n", lvb_len); - GOTO(out, rc = -ENOMEM); - } - - lock_res_and_lock(lock); - LASSERT(lock->l_lvb_data == NULL); - lock->l_lvb_type = LVB_T_LAYOUT; - lock->l_lvb_data = lvb_data; - lock->l_lvb_len = lvb_len; - unlock_res_and_lock(lock); } } @@ -1949,7 +1977,7 @@ static int ldlm_callback_reply(struct ptlrpc_request *req, int rc) } static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi, - ldlm_cancel_flags_t cancel_flags) + enum ldlm_cancel_flags cancel_flags) { struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool; ENTRY; @@ -1980,7 +2008,7 @@ static inline void init_blwi(struct ldlm_bl_work_item *blwi, struct ldlm_lock_desc *ld, struct list_head *cancels, int count, struct ldlm_lock *lock, - ldlm_cancel_flags_t cancel_flags) + enum ldlm_cancel_flags cancel_flags) { init_completion(&blwi->blwi_comp); INIT_LIST_HEAD(&blwi->blwi_head); @@ -2014,7 +2042,7 @@ static int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, struct ldlm_lock *lock, struct list_head *cancels, int count, - ldlm_cancel_flags_t cancel_flags) + enum ldlm_cancel_flags cancel_flags) { ENTRY; @@ -2051,11 +2079,17 @@ int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, struct list_head *cancels, int count, - ldlm_cancel_flags_t cancel_flags) + enum ldlm_cancel_flags cancel_flags) { return ldlm_bl_to_thread(ns, ld, NULL, cancels, count, cancel_flags); } +int ldlm_bl_thread_wakeup(void) +{ + wake_up(&ldlm_state->ldlm_bl_pool->blp_waitq); + return 0; +} + /* Setinfo coming from Server (eg MDT) to Client (eg MDC)! */ static int ldlm_handle_setinfo(struct ptlrpc_request *req) { @@ -2101,11 +2135,11 @@ static int ldlm_handle_setinfo(struct ptlrpc_request *req) } static inline void ldlm_callback_errmsg(struct ptlrpc_request *req, - const char *msg, int rc, - struct lustre_handle *handle) + const char *msg, int rc, + const struct lustre_handle *handle) { DEBUG_REQ((req->rq_no_reply || rc) ? D_WARNING : D_DLMTRACE, req, - "%s: [nid %s] [rc %d] [lock "LPX64"]", + "%s: [nid %s] [rc %d] [lock %#llx]", msg, libcfs_id2str(req->rq_peer), rc, handle ? handle->cookie : 0); if (req->rq_no_reply) @@ -2114,23 +2148,6 @@ static inline void ldlm_callback_errmsg(struct ptlrpc_request *req, CWARN("Send reply failed, maybe cause bug 21636.\n"); } -static int ldlm_handle_qc_callback(struct ptlrpc_request *req) -{ - struct obd_quotactl *oqctl; - struct client_obd *cli = &req->rq_export->exp_obd->u.cli; - - oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL); - if (oqctl == NULL) { - CERROR("Can't unpack obd_quotactl\n"); - RETURN(-EPROTO); - } - - oqctl->qc_stat = ptlrpc_status_ntoh(oqctl->qc_stat); - - cli->cl_qchk_stat = oqctl->qc_stat; - return 0; -} - /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */ static int ldlm_callback_handler(struct ptlrpc_request *req) { @@ -2210,13 +2227,6 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) rc = llog_origin_handle_close(req); ldlm_callback_reply(req, rc); RETURN(0); - case OBD_QC_CALLBACK: - req_capsule_set(&req->rq_pill, &RQF_QC_CALLBACK); - if (OBD_FAIL_CHECK(OBD_FAIL_OBD_QC_CALLBACK_NET)) - RETURN(0); - rc = ldlm_handle_qc_callback(req); - ldlm_callback_reply(req, rc); - RETURN(0); default: CERROR("unknown opcode %u\n", lustre_msg_get_opc(req->rq_reqmsg)); @@ -2248,7 +2258,7 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) lock = ldlm_handle2lock_long(&dlm_req->lock_handle[0], 0); if (!lock) { - CDEBUG(D_DLMTRACE, "callback on lock "LPX64" - lock " + CDEBUG(D_DLMTRACE, "callback on lock %#llx - lock " "disappeared\n", dlm_req->lock_handle[0].cookie); rc = ldlm_callback_reply(req, -EINVAL); ldlm_callback_errmsg(req, "Operate with invalid parameter", rc, @@ -2264,28 +2274,27 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) lock_res_and_lock(lock); lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags & LDLM_FL_AST_MASK); - if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) { - /* If somebody cancels lock and cache is already dropped, - * or lock is failed before cp_ast received on client, - * we can tell the server we have no lock. Otherwise, we - * should send cancel after dropping the cache. */ + if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) { + /* If somebody cancels lock and cache is already dropped, + * or lock is failed before cp_ast received on client, + * we can tell the server we have no lock. Otherwise, we + * should send cancel after dropping the cache. */ if ((ldlm_is_canceling(lock) && ldlm_is_bl_done(lock)) || - ldlm_is_failed(lock)) { - LDLM_DEBUG(lock, "callback on lock " - LPX64" - lock disappeared\n", - dlm_req->lock_handle[0].cookie); - unlock_res_and_lock(lock); - LDLM_LOCK_RELEASE(lock); - rc = ldlm_callback_reply(req, -EINVAL); - ldlm_callback_errmsg(req, "Operate on stale lock", rc, - &dlm_req->lock_handle[0]); - RETURN(0); - } + ldlm_is_failed(lock)) { + LDLM_DEBUG(lock, "callback on lock %llx - lock disappeared", + dlm_req->lock_handle[0].cookie); + unlock_res_and_lock(lock); + LDLM_LOCK_RELEASE(lock); + rc = ldlm_callback_reply(req, -EINVAL); + ldlm_callback_errmsg(req, "Operate on stale lock", rc, + &dlm_req->lock_handle[0]); + RETURN(0); + } /* BL_AST locks are not needed in LRU. * Let ldlm_cancel_lru() be fast. */ - ldlm_lock_remove_from_lru(lock); + ldlm_lock_remove_from_lru(lock); ldlm_set_bl_ast(lock); - } + } unlock_res_and_lock(lock); /* We want the ost thread to get this reply so that it can respond @@ -2350,7 +2359,7 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req) struct ldlm_request *dlm_req; CERROR("%s from %s arrived at %lu with bad export cookie " - LPU64"\n", + "%llu\n", ll_opcode2str(lustre_msg_get_opc(req->rq_reqmsg)), libcfs_nid2str(req->rq_peer.nid), req->rq_arrival_time.tv_sec, @@ -2375,7 +2384,8 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req) req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL); CDEBUG(D_INODE, "cancel\n"); if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_NET) || - CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_CANCEL_RESEND)) + CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_CANCEL_RESEND) || + CFS_FAIL_CHECK(OBD_FAIL_LDLM_BL_EVICT)) RETURN(0); rc = ldlm_handle_cancel(req); if (rc) @@ -2409,7 +2419,7 @@ static int ldlm_cancel_hpreq_lock_match(struct ptlrpc_request *req, if (lustre_handle_equal(&dlm_req->lock_handle[i], &lockh)) { DEBUG_REQ(D_RPCTRACE, req, - "Prio raised by lock "LPX64".", lockh.cookie); + "Prio raised by lock %#llx.", lockh.cookie); rc = 1; break; @@ -2476,7 +2486,7 @@ static int ldlm_hpreq_handler(struct ptlrpc_request *req) RETURN(0); } -static int ldlm_revoke_lock_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd, +static int ldlm_revoke_lock_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd, struct hlist_node *hnode, void *data) { @@ -2526,25 +2536,40 @@ static int ldlm_revoke_lock_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd, void ldlm_revoke_export_locks(struct obd_export *exp) { struct list_head rpc_list; - ENTRY; + ENTRY; INIT_LIST_HEAD(&rpc_list); - cfs_hash_for_each_empty(exp->exp_lock_hash, - ldlm_revoke_lock_cb, &rpc_list); - ldlm_run_ast_work(exp->exp_obd->obd_namespace, &rpc_list, - LDLM_WORK_REVOKE_AST); + cfs_hash_for_each_nolock(exp->exp_lock_hash, + ldlm_revoke_lock_cb, &rpc_list, 0); + ldlm_run_ast_work(exp->exp_obd->obd_namespace, &rpc_list, + LDLM_WORK_REVOKE_AST); - EXIT; + EXIT; } EXPORT_SYMBOL(ldlm_revoke_export_locks); #endif /* HAVE_SERVER_SUPPORT */ -static struct ldlm_bl_work_item *ldlm_bl_get_work(struct ldlm_bl_pool *blp) +static int ldlm_bl_get_work(struct ldlm_bl_pool *blp, + struct ldlm_bl_work_item **p_blwi, + struct obd_export **p_exp) { struct ldlm_bl_work_item *blwi = NULL; static unsigned int num_bl = 0; + static unsigned int num_stale; + int num_th = atomic_read(&blp->blp_num_threads); + + *p_exp = obd_stale_export_get(); spin_lock(&blp->blp_lock); + if (*p_exp != NULL) { + if (num_th == 1 || ++num_stale < num_th) { + spin_unlock(&blp->blp_lock); + return 1; + } else { + num_stale = 0; + } + } + /* process a request from the blp_list at least every blp_num_threads */ if (!list_empty(&blp->blp_list) && (list_empty(&blp->blp_prio_list) || num_bl == 0)) @@ -2557,18 +2582,23 @@ static struct ldlm_bl_work_item *ldlm_bl_get_work(struct ldlm_bl_pool *blp) blwi_entry); if (blwi) { - if (++num_bl >= atomic_read(&blp->blp_num_threads)) + if (++num_bl >= num_th) num_bl = 0; list_del(&blwi->blwi_entry); } spin_unlock(&blp->blp_lock); + *p_blwi = blwi; + + if (*p_exp != NULL && *p_blwi != NULL) { + obd_stale_export_put(*p_exp); + *p_exp = NULL; + } - return blwi; + return (*p_blwi != NULL || *p_exp != NULL) ? 1 : 0; } /* This only contains temporary data until the thread starts */ struct ldlm_bl_thread_data { - char bltd_name[CFS_CURPROC_COMM_MAX]; struct ldlm_bl_pool *bltd_blp; struct completion bltd_comp; int bltd_num; @@ -2576,19 +2606,32 @@ struct ldlm_bl_thread_data { static int ldlm_bl_thread_main(void *arg); -static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp) +static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp, bool check_busy) { struct ldlm_bl_thread_data bltd = { .bltd_blp = blp }; struct task_struct *task; init_completion(&bltd.bltd_comp); - bltd.bltd_num = atomic_read(&blp->blp_num_threads); - snprintf(bltd.bltd_name, sizeof(bltd.bltd_name) - 1, - "ldlm_bl_%02d", bltd.bltd_num); - task = kthread_run(ldlm_bl_thread_main, &bltd, bltd.bltd_name); + + bltd.bltd_num = atomic_inc_return(&blp->blp_num_threads); + if (bltd.bltd_num >= blp->blp_max_threads) { + atomic_dec(&blp->blp_num_threads); + return 0; + } + + LASSERTF(bltd.bltd_num > 0, "thread num:%d\n", bltd.bltd_num); + if (check_busy && + atomic_read(&blp->blp_busy_threads) < (bltd.bltd_num - 1)) { + atomic_dec(&blp->blp_num_threads); + return 0; + } + + task = kthread_run(ldlm_bl_thread_main, &bltd, "ldlm_bl_%02d", + bltd.bltd_num); if (IS_ERR(task)) { CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %ld\n", - atomic_read(&blp->blp_num_threads), PTR_ERR(task)); + bltd.bltd_num, PTR_ERR(task)); + atomic_dec(&blp->blp_num_threads); return PTR_ERR(task); } wait_for_completion(&bltd.bltd_comp); @@ -2596,6 +2639,88 @@ static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp) return 0; } +/* Not fatal if racy and have a few too many threads */ +static int ldlm_bl_thread_need_create(struct ldlm_bl_pool *blp, + struct ldlm_bl_work_item *blwi) +{ + if (atomic_read(&blp->blp_num_threads) >= blp->blp_max_threads) + return 0; + + if (atomic_read(&blp->blp_busy_threads) < + atomic_read(&blp->blp_num_threads)) + return 0; + + if (blwi != NULL && (blwi->blwi_ns == NULL || + blwi->blwi_mem_pressure)) + return 0; + + return 1; +} + +static int ldlm_bl_thread_blwi(struct ldlm_bl_pool *blp, + struct ldlm_bl_work_item *blwi) +{ + ENTRY; + + if (blwi->blwi_ns == NULL) + /* added by ldlm_cleanup() */ + RETURN(LDLM_ITER_STOP); + + if (blwi->blwi_mem_pressure) + memory_pressure_set(); + + OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL2, 4); + + if (blwi->blwi_count) { + int count; + /* The special case when we cancel locks in lru + * asynchronously, we pass the list of locks here. + * Thus locks are marked LDLM_FL_CANCELING, but NOT + * canceled locally yet. */ + count = ldlm_cli_cancel_list_local(&blwi->blwi_head, + blwi->blwi_count, + LCF_BL_AST); + ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL, + blwi->blwi_flags); + } else { + ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld, + blwi->blwi_lock); + } + if (blwi->blwi_mem_pressure) + memory_pressure_clr(); + + if (blwi->blwi_flags & LCF_ASYNC) + OBD_FREE(blwi, sizeof(*blwi)); + else + complete(&blwi->blwi_comp); + + RETURN(0); +} + +/** + * Cancel stale locks on export. Cancel blocked locks first. + * If the given export has blocked locks, the next in the list may have + * them too, thus cancel not blocked locks only if the current export has + * no blocked locks. + **/ +static int ldlm_bl_thread_exports(struct ldlm_bl_pool *blp, + struct obd_export *exp) +{ + int num; + ENTRY; + + OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_BL_EVICT, 4); + + num = ldlm_export_cancel_blocked_locks(exp); + if (num == 0) + ldlm_export_cancel_locks(exp); + + obd_stale_export_put(exp); + + RETURN(0); +} + + /** * Main blocking requests processing thread. * @@ -2611,70 +2736,39 @@ static int ldlm_bl_thread_main(void *arg) blp = bltd->bltd_blp; - atomic_inc(&blp->blp_num_threads); - atomic_inc(&blp->blp_busy_threads); - complete(&bltd->bltd_comp); /* cannot use bltd after this, it is only on caller's stack */ while (1) { struct l_wait_info lwi = { 0 }; struct ldlm_bl_work_item *blwi = NULL; - int busy; + struct obd_export *exp = NULL; + int rc; - blwi = ldlm_bl_get_work(blp); + rc = ldlm_bl_get_work(blp, &blwi, &exp); - if (blwi == NULL) { - atomic_dec(&blp->blp_busy_threads); + if (rc == 0) l_wait_event_exclusive(blp->blp_waitq, - (blwi = ldlm_bl_get_work(blp)) != NULL, - &lwi); - busy = atomic_inc_return(&blp->blp_busy_threads); - } else { - busy = atomic_read(&blp->blp_busy_threads); - } - - if (blwi->blwi_ns == NULL) - /* added by ldlm_cleanup() */ - break; + ldlm_bl_get_work(blp, &blwi, + &exp), + &lwi); + atomic_inc(&blp->blp_busy_threads); - /* Not fatal if racy and have a few too many threads */ - if (unlikely(busy < blp->blp_max_threads && - busy >= atomic_read(&blp->blp_num_threads) && - !blwi->blwi_mem_pressure)) + if (ldlm_bl_thread_need_create(blp, blwi)) /* discard the return value, we tried */ - ldlm_bl_thread_start(blp); - - if (blwi->blwi_mem_pressure) - memory_pressure_set(); - - OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL2, 4); - - if (blwi->blwi_count) { - int count; - /* The special case when we cancel locks in LRU - * asynchronously, we pass the list of locks here. - * Thus locks are marked LDLM_FL_CANCELING, but NOT - * canceled locally yet. */ - count = ldlm_cli_cancel_list_local(&blwi->blwi_head, - blwi->blwi_count, - LCF_BL_AST); - ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL, - blwi->blwi_flags); - } else { - ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld, - blwi->blwi_lock); - } - if (blwi->blwi_mem_pressure) - memory_pressure_clr(); + ldlm_bl_thread_start(blp, true); - if (blwi->blwi_flags & LCF_ASYNC) - OBD_FREE(blwi, sizeof(*blwi)); - else - complete(&blwi->blwi_comp); + if (exp) + rc = ldlm_bl_thread_exports(blp, exp); + else if (blwi) + rc = ldlm_bl_thread_blwi(blp, blwi); + + atomic_dec(&blp->blp_busy_threads); + + if (rc == LDLM_ITER_STOP) + break; } - atomic_dec(&blp->blp_busy_threads); atomic_dec(&blp->blp_num_threads); complete(&blp->blp_comp); RETURN(0); @@ -2698,7 +2792,6 @@ int ldlm_get_ref(void) RETURN(rc); } -EXPORT_SYMBOL(ldlm_get_ref); void ldlm_put_ref(void) { @@ -2717,13 +2810,12 @@ void ldlm_put_ref(void) EXIT; } -EXPORT_SYMBOL(ldlm_put_ref); /* * Export handle<->lock hash operations. */ static unsigned -ldlm_export_lock_hash(cfs_hash_t *hs, const void *key, unsigned mask) +ldlm_export_lock_hash(struct cfs_hash *hs, const void *key, unsigned mask) { return cfs_hash_u64_hash(((struct lustre_handle *)key)->cookie, mask); } @@ -2759,7 +2851,7 @@ ldlm_export_lock_object(struct hlist_node *hnode) } static void -ldlm_export_lock_get(cfs_hash_t *hs, struct hlist_node *hnode) +ldlm_export_lock_get(struct cfs_hash *hs, struct hlist_node *hnode) { struct ldlm_lock *lock; @@ -2768,7 +2860,7 @@ ldlm_export_lock_get(cfs_hash_t *hs, struct hlist_node *hnode) } static void -ldlm_export_lock_put(cfs_hash_t *hs, struct hlist_node *hnode) +ldlm_export_lock_put(struct cfs_hash *hs, struct hlist_node *hnode) { struct ldlm_lock *lock; @@ -2776,7 +2868,7 @@ ldlm_export_lock_put(cfs_hash_t *hs, struct hlist_node *hnode) LDLM_LOCK_RELEASE(lock); } -static cfs_hash_ops_t ldlm_export_lock_ops = { +static struct cfs_hash_ops ldlm_export_lock_ops = { .hs_hash = ldlm_export_lock_hash, .hs_key = ldlm_export_lock_key, .hs_keycmp = ldlm_export_lock_keycmp, @@ -2827,6 +2919,40 @@ void ldlm_destroy_export(struct obd_export *exp) } EXPORT_SYMBOL(ldlm_destroy_export); +static ssize_t cancel_unused_locks_before_replay_show(struct kobject *kobj, + struct attribute *attr, + char *buf) +{ + return sprintf(buf, "%d\n", ldlm_cancel_unused_locks_before_replay); +} + +static ssize_t cancel_unused_locks_before_replay_store(struct kobject *kobj, + struct attribute *attr, + const char *buffer, + size_t count) +{ + int rc; + unsigned long val; + + rc = kstrtoul(buffer, 10, &val); + if (rc) + return rc; + + ldlm_cancel_unused_locks_before_replay = val; + + return count; +} +LUSTRE_RW_ATTR(cancel_unused_locks_before_replay); + +static struct attribute *ldlm_attrs[] = { + &lustre_attr_cancel_unused_locks_before_replay.attr, + NULL, +}; + +static struct attribute_group ldlm_attr_group = { + .attrs = ldlm_attrs, +}; + static int ldlm_setup(void) { static struct ptlrpc_service_conf conf; @@ -2846,9 +2972,25 @@ static int ldlm_setup(void) if (ldlm_state == NULL) RETURN(-ENOMEM); + ldlm_kobj = kobject_create_and_add("ldlm", lustre_kobj); + if (!ldlm_kobj) + GOTO(out, -ENOMEM); + + rc = sysfs_create_group(ldlm_kobj, &ldlm_attr_group); + if (rc) + GOTO(out, rc); + + ldlm_ns_kset = kset_create_and_add("namespaces", NULL, ldlm_kobj); + if (!ldlm_ns_kset) + GOTO(out, -ENOMEM); + + ldlm_svc_kset = kset_create_and_add("services", NULL, ldlm_kobj); + if (!ldlm_svc_kset) + GOTO(out, -ENOMEM); + #ifdef CONFIG_PROC_FS - rc = ldlm_proc_setup(); - if (rc != 0) + rc = ldlm_proc_setup(); + if (rc != 0) GOTO(out, rc); #endif /* CONFIG_PROC_FS */ @@ -2956,7 +3098,7 @@ static int ldlm_setup(void) } for (i = 0; i < blp->blp_min_threads; i++) { - rc = ldlm_bl_thread_start(blp); + rc = ldlm_bl_thread_start(blp, false); if (rc < 0) GOTO(out, rc); } @@ -2968,7 +3110,7 @@ static int ldlm_setup(void) INIT_LIST_HEAD(&waiting_locks_list); spin_lock_init(&waiting_locks_spinlock); - cfs_timer_init(&waiting_locks_timer, waiting_locks_callback, NULL); + setup_timer(&waiting_locks_timer, waiting_locks_callback, 0); task = kthread_run(expired_lock_main, NULL, "ldlm_elt"); if (IS_ERR(task)) { @@ -2986,6 +3128,12 @@ static int ldlm_setup(void) CERROR("Failed to initialize LDLM pools: %d\n", rc); GOTO(out, rc); } + + rc = ldlm_reclaim_setup(); + if (rc) { + CERROR("Failed to setup reclaim thread: rc = %d\n", rc); + GOTO(out, rc); + } RETURN(0); out: @@ -3005,7 +3153,8 @@ static int ldlm_cleanup(void) RETURN(-EBUSY); } - ldlm_pools_fini(); + ldlm_reclaim_cleanup(); + ldlm_pools_fini(); if (ldlm_state->ldlm_bl_pool != NULL) { struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool; @@ -3033,6 +3182,13 @@ static int ldlm_cleanup(void) ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service); #endif + if (ldlm_ns_kset) + kset_unregister(ldlm_ns_kset); + if (ldlm_svc_kset) + kset_unregister(ldlm_svc_kset); + if (ldlm_kobj) + kobject_put(ldlm_kobj); + ldlm_proc_cleanup(); #ifdef HAVE_SERVER_SUPPORT @@ -3069,23 +3225,34 @@ int ldlm_init(void) ldlm_lock_slab = kmem_cache_create("ldlm_locks", sizeof(struct ldlm_lock), 0, SLAB_HWCACHE_ALIGN | SLAB_DESTROY_BY_RCU, NULL); - if (ldlm_lock_slab == NULL) { - kmem_cache_destroy(ldlm_resource_slab); - return -ENOMEM; - } + if (ldlm_lock_slab == NULL) + goto out_resource; ldlm_interval_slab = kmem_cache_create("interval_node", sizeof(struct ldlm_interval), 0, SLAB_HWCACHE_ALIGN, NULL); - if (ldlm_interval_slab == NULL) { - kmem_cache_destroy(ldlm_resource_slab); - kmem_cache_destroy(ldlm_lock_slab); - return -ENOMEM; - } + if (ldlm_interval_slab == NULL) + goto out_lock; + + ldlm_interval_tree_slab = kmem_cache_create("interval_tree", + sizeof(struct ldlm_interval_tree) * LCK_MODE_NUM, + 0, SLAB_HWCACHE_ALIGN, NULL); + if (ldlm_interval_tree_slab == NULL) + goto out_interval; + #if LUSTRE_TRACKS_LOCK_EXP_REFS - class_export_dump_hook = ldlm_dump_export_locks; + class_export_dump_hook = ldlm_dump_export_locks; #endif - return 0; + return 0; + +out_interval: + kmem_cache_destroy(ldlm_interval_slab); +out_lock: + kmem_cache_destroy(ldlm_lock_slab); +out_resource: + kmem_cache_destroy(ldlm_resource_slab); + + return -ENOMEM; } void ldlm_exit(void) @@ -3099,4 +3266,5 @@ void ldlm_exit(void) synchronize_rcu(); kmem_cache_destroy(ldlm_lock_slab); kmem_cache_destroy(ldlm_interval_slab); + kmem_cache_destroy(ldlm_interval_tree_slab); }