X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fldlm%2Fldlm_lockd.c;h=b29b7214d1713ebbcdedc55e627d70b54808c8fc;hb=c156613b29be6fcee13d0df7008f0cd7847a5263;hp=760feb588a4e37a9fb58c8a0f4541f115c388953;hpb=bee9c1897677473f12c0b807edd3e8fec452bc32;p=fs%2Flustre-release.git diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 760feb5..b29b721 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -15,11 +15,7 @@ * * You should have received a copy of the GNU General Public License * version 2 along with this program; If not, see - * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. + * http://www.gnu.org/licenses/gpl-2.0.html * * GPL HEADER END */ @@ -27,7 +23,7 @@ * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2010, 2014, Intel Corporation. + * Copyright (c) 2010, 2016, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -42,33 +38,33 @@ #define DEBUG_SUBSYSTEM S_LDLM #include +#include #include +#include #include #include -#include #include "ldlm_internal.h" static int ldlm_num_threads; -CFS_MODULE_PARM(ldlm_num_threads, "i", int, 0444, - "number of DLM service threads to start"); +module_param(ldlm_num_threads, int, 0444); +MODULE_PARM_DESC(ldlm_num_threads, "number of DLM service threads to start"); static char *ldlm_cpts; -CFS_MODULE_PARM(ldlm_cpts, "s", charp, 0444, - "CPU partitions ldlm threads should run on"); +module_param(ldlm_cpts, charp, 0444); +MODULE_PARM_DESC(ldlm_cpts, "CPU partitions ldlm threads should run on"); static struct mutex ldlm_ref_mutex; static int ldlm_refcount; -struct ldlm_cb_async_args { - struct ldlm_cb_set_arg *ca_set_arg; - struct ldlm_lock *ca_lock; -}; +struct kobject *ldlm_kobj; +struct kset *ldlm_ns_kset; +struct kset *ldlm_svc_kset; /* LDLM state */ static struct ldlm_state *ldlm_state; -inline cfs_time_t round_timeout(cfs_time_t timeout) +static inline cfs_time_t round_timeout(cfs_time_t timeout) { return cfs_time_seconds((int)cfs_duration_sec(cfs_time_sub(timeout, 0)) + 1); } @@ -184,15 +180,10 @@ static int expired_lock_main(void *arg) spin_lock_bh(&waiting_locks_spinlock); if (expired_lock_thread.elt_dump) { - struct libcfs_debug_msg_data msgdata = { - .msg_file = __FILE__, - .msg_fn = "waiting_locks_callback", - .msg_line = expired_lock_thread.elt_dump }; spin_unlock_bh(&waiting_locks_spinlock); /* from waiting_locks_callback, but not in timer */ libcfs_debug_dumplog(); - libcfs_run_lbug_upcall(&msgdata); spin_lock_bh(&waiting_locks_spinlock); expired_lock_thread.elt_dump = 0; @@ -206,7 +197,7 @@ static int expired_lock_main(void *arg) lock = list_entry(expired->next, struct ldlm_lock, l_pending_chain); - if ((void *)lock < LP_POISON + PAGE_CACHE_SIZE && + if ((void *)lock < LP_POISON + PAGE_SIZE && (void *)lock >= LP_POISON) { spin_unlock_bh(&waiting_locks_spinlock); CERROR("free lock on elt list %p\n", lock); @@ -214,7 +205,7 @@ static int expired_lock_main(void *arg) } list_del_init(&lock->l_pending_chain); if ((void *)lock->l_export < - LP_POISON + PAGE_CACHE_SIZE && + LP_POISON + PAGE_SIZE && (void *)lock->l_export >= LP_POISON) { CERROR("lock with free export on elt list %p\n", lock->l_export); @@ -335,9 +326,9 @@ static void waiting_locks_callback(unsigned long unused) continue; } ldlm_lock_to_ns(lock)->ns_timeouts++; - LDLM_ERROR(lock, "lock callback timer expired after %lds: " + LDLM_ERROR(lock, "lock callback timer expired after %llds: " "evicting client at %s ", - cfs_time_current_sec() - lock->l_last_activity, + ktime_get_real_seconds() - lock->l_last_activity, libcfs_nid2str( lock->l_export->exp_connection->c_peer.nid)); @@ -366,7 +357,7 @@ static void waiting_locks_callback(unsigned long unused) lock = list_entry(waiting_locks_list.next, struct ldlm_lock, l_pending_chain); timeout_rounded = (cfs_time_t)round_timeout(lock->l_callback_timeout); - cfs_timer_arm(&waiting_locks_timer, timeout_rounded); + mod_timer(&waiting_locks_timer, timeout_rounded); } spin_unlock_bh(&waiting_locks_spinlock); } @@ -401,10 +392,9 @@ static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, int seconds) timeout_rounded = round_timeout(lock->l_callback_timeout); - if (cfs_time_before(timeout_rounded, - cfs_timer_deadline(&waiting_locks_timer)) || - !cfs_timer_is_armed(&waiting_locks_timer)) { - cfs_timer_arm(&waiting_locks_timer, timeout_rounded); + if (cfs_time_before(timeout_rounded, waiting_locks_timer.expires) || + !timer_pending(&waiting_locks_timer)) { + mod_timer(&waiting_locks_timer, timeout_rounded); } /* if the new lock has a shorter timeout than something earlier on the list, we'll wait the longer amount of time; no big deal. */ @@ -469,7 +459,7 @@ static int ldlm_add_waiting_lock(struct ldlm_lock *lock) } ldlm_set_waited(lock); - lock->l_last_activity = cfs_time_current_sec(); + lock->l_last_activity = ktime_get_real_seconds(); ret = __ldlm_add_waiting_lock(lock, timeout); if (ret) { /* grab ref on the lock if it has been added to the @@ -508,13 +498,13 @@ static int __ldlm_del_waiting_lock(struct ldlm_lock *lock) /* Removing the head of the list, adjust timer. */ if (list_next == &waiting_locks_list) { /* No more, just cancel. */ - cfs_timer_disarm(&waiting_locks_timer); + del_timer(&waiting_locks_timer); } else { struct ldlm_lock *next; next = list_entry(list_next, struct ldlm_lock, l_pending_chain); - cfs_timer_arm(&waiting_locks_timer, - round_timeout(next->l_callback_timeout)); + mod_timer(&waiting_locks_timer, + round_timeout(next->l_callback_timeout)); } } list_del_init(&lock->l_pending_chain); @@ -659,31 +649,50 @@ static void ldlm_failed_ast(struct ldlm_lock *lock, int rc, * Perform lock cleanup if AST reply came with error. */ static int ldlm_handle_ast_error(struct ldlm_lock *lock, - struct ptlrpc_request *req, int rc, - const char *ast_type) + struct ptlrpc_request *req, int rc, + const char *ast_type) { - lnet_process_id_t peer = req->rq_import->imp_connection->c_peer; + struct lnet_process_id peer = req->rq_import->imp_connection->c_peer; if (!req->rq_replied || (rc && rc != -EINVAL)) { if (lock->l_export && lock->l_export->exp_libclient) { - LDLM_DEBUG(lock, "%s AST to liblustre client (nid %s)" - " timeout, just cancelling lock", ast_type, + LDLM_DEBUG(lock, + "%s AST (req@%p x%llu) to liblustre client (nid %s) timeout, just cancelling lock", + ast_type, req, req->rq_xid, libcfs_nid2str(peer.nid)); ldlm_lock_cancel(lock); rc = -ERESTART; } else if (ldlm_is_cancel(lock)) { - LDLM_DEBUG(lock, "%s AST timeout from nid %s, but " - "cancel was received (AST reply lost?)", - ast_type, libcfs_nid2str(peer.nid)); + LDLM_DEBUG(lock, + "%s AST (req@%p x%llu) timeout from nid %s, but cancel was received (AST reply lost?)", + ast_type, req, req->rq_xid, + libcfs_nid2str(peer.nid)); ldlm_lock_cancel(lock); rc = -ERESTART; + } else if (rc == -ENODEV || rc == -ESHUTDOWN || + (rc == -EIO && + req->rq_import->imp_state == LUSTRE_IMP_CLOSED)) { + /* Upon umount process the AST fails because cannot be + * sent. This shouldn't lead to the client eviction. + * -ENODEV error is returned by ptl_send_rpc() for + * new request in such import. + * -SHUTDOWN is returned by ptlrpc_import_delay_req() + * if imp_invalid is set or obd_no_recov. + * Meanwhile there is also check for LUSTRE_IMP_CLOSED + * in ptlrpc_import_delay_req() as well with -EIO code. + * In all such cases errors are ignored. + */ + LDLM_DEBUG(lock, "%s AST can't be sent due to a server" + " %s failure or umount process: rc = %d\n", + ast_type, + req->rq_import->imp_obd->obd_name, rc); } else { - LDLM_ERROR(lock, "client (nid %s) %s %s AST " - "(req status %d rc %d), evict it", + LDLM_ERROR(lock, + "client (nid %s) %s %s AST (req@%p x%llu status %d rc %d), evict it", libcfs_nid2str(peer.nid), req->rq_replied ? "returned error from" : "failed to reply to", - ast_type, + ast_type, req, req->rq_xid, (req->rq_repmsg != NULL) ? lustre_msg_get_status(req->rq_repmsg) : 0, rc); @@ -695,12 +704,12 @@ static int ldlm_handle_ast_error(struct ldlm_lock *lock, if (rc == -EINVAL) { struct ldlm_resource *res = lock->l_resource; - LDLM_DEBUG(lock, "client (nid %s) returned %d" - " from %s AST - normal race", + LDLM_DEBUG(lock, + "client (nid %s) returned %d from %s AST (req@%p x%llu) - normal race", libcfs_nid2str(peer.nid), req->rq_repmsg ? lustre_msg_get_status(req->rq_repmsg) : -1, - ast_type); + ast_type, req, req->rq_xid); if (res) { /* update lvbo to return proper attributes. * see bug 23174 */ @@ -735,7 +744,9 @@ static int ldlm_cb_interpret(const struct lu_env *env, * - Glimpse callback of remote lock might return * -ELDLM_NO_LOCK_DATA when inode is cleared. LU-274 */ - if (rc == -ELDLM_NO_LOCK_DATA) { + if (unlikely(arg->gl_interpret_reply)) { + rc = arg->gl_interpret_reply(env, req, data, rc); + } else if (rc == -ELDLM_NO_LOCK_DATA) { LDLM_DEBUG(lock, "lost race - client has a lock but no " "inode"); ldlm_res_lvbo_update(lock->l_resource, NULL, 1); @@ -852,6 +863,11 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, /* Don't need to do anything here. */ RETURN(0); + if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_SRV_BL_AST)) { + LDLM_DEBUG(lock, "dropping BL AST"); + RETURN(0); + } + LASSERT(lock); LASSERT(data != NULL); if (lock->l_export->exp_obd->obd_recovering != 0) @@ -903,6 +919,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, LDLM_DEBUG(lock, "server preparing blocking AST"); ptlrpc_request_set_replen(req); + ldlm_set_cbpending(lock); if (instant_cancel) { unlock_res_and_lock(lock); ldlm_lock_cancel(lock); @@ -923,7 +940,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, if (AT_OFF) req->rq_timeout = ldlm_get_rq_timeout(); - lock->l_last_activity = cfs_time_current_sec(); + lock->l_last_activity = ktime_get_real_seconds(); if (lock->l_export && lock->l_export->exp_nid_stats && lock->l_export->exp_nid_stats->nid_ldlm_stats) @@ -1013,7 +1030,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data) } } - lock->l_last_activity = cfs_time_current_sec(); + lock->l_last_activity = ktime_get_real_seconds(); LDLM_DEBUG(lock, "server preparing completion AST"); @@ -1123,7 +1140,7 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data) if (AT_OFF) req->rq_timeout = ldlm_get_rq_timeout(); - lock->l_last_activity = cfs_time_current_sec(); + lock->l_last_activity = ktime_get_real_seconds(); req->rq_interpret_reply = ldlm_cb_interpret; @@ -1265,23 +1282,6 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, GOTO(out, rc = -EPROTO); } -#if 0 - /* FIXME this makes it impossible to use LDLM_PLAIN locks -- check - against server's _CONNECT_SUPPORTED flags? (I don't want to use - ibits for mgc/mgs) */ - - /* INODEBITS_INTEROP: Perform conversion from plain lock to - * inodebits lock if client does not support them. */ - if (!(exp_connect_flags(req->rq_export) & OBD_CONNECT_IBITS) && - (dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN)) { - dlm_req->lock_desc.l_resource.lr_type = LDLM_IBITS; - dlm_req->lock_desc.l_policy_data.l_inodebits.bits = - MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE; - if (dlm_req->lock_desc.l_req_mode == LCK_PR) - dlm_req->lock_desc.l_req_mode = LCK_CR; - } -#endif - if (unlikely((flags & LDLM_FL_REPLAY) || (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))) { /* Find an existing lock in the per-export lock hash */ @@ -1291,8 +1291,8 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, lock = cfs_hash_lookup(req->rq_export->exp_lock_hash, (void *)&dlm_req->lock_handle[0]); if (lock != NULL) { - DEBUG_REQ(D_DLMTRACE, req, "found existing lock cookie " - LPX64, lock->l_handle.h_cookie); + DEBUG_REQ(D_DLMTRACE, req, "found existing lock cookie %#llx", + lock->l_handle.h_cookie); flags |= LDLM_FL_RESENT; GOTO(existing_lock, rc = 0); } @@ -1355,6 +1355,14 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, * without them. */ lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags & LDLM_FL_INHERIT_MASK); + + ldlm_convert_policy_to_local(req->rq_export, + dlm_req->lock_desc.l_resource.lr_type, + &dlm_req->lock_desc.l_policy_data, + &lock->l_policy_data); + if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) + lock->l_req_extent = lock->l_policy_data.l_extent; + existing_lock: if (flags & LDLM_FL_HAS_INTENT) { @@ -1376,14 +1384,6 @@ existing_lock: GOTO(out, rc); } - if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN) - ldlm_convert_policy_to_local(req->rq_export, - dlm_req->lock_desc.l_resource.lr_type, - &dlm_req->lock_desc.l_policy_data, - &lock->l_policy_data); - if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) - lock->l_req_extent = lock->l_policy_data.l_extent; - err = ldlm_lock_enqueue(ns, &lock, cookie, &flags); if (err) { if ((int)err < 0) @@ -1441,20 +1441,20 @@ existing_lock: if (unlikely(!ldlm_is_cancel_on_block(lock) || !(dlm_rep->lock_flags & LDLM_FL_CANCEL_ON_BLOCK))){ CERROR("Granting sync lock to libclient. " - "req fl %d, rep fl %d, lock fl "LPX64"\n", + "req fl %d, rep fl %d, lock fl %#llx\n", dlm_req->lock_flags, dlm_rep->lock_flags, lock->l_flags); LDLM_ERROR(lock, "sync lock"); - if (dlm_req->lock_flags & LDLM_FL_HAS_INTENT) { - struct ldlm_intent *it; - - it = req_capsule_client_get(&req->rq_pill, - &RMF_LDLM_INTENT); - if (it != NULL) { - CERROR("This is intent %s ("LPU64")\n", - ldlm_it2str(it->opc), it->opc); - } - } + if (dlm_req->lock_flags & LDLM_FL_HAS_INTENT) { + struct ldlm_intent *it; + + it = req_capsule_client_get(&req->rq_pill, + &RMF_LDLM_INTENT); + if (it != NULL) { + CERROR("This is intent %s (%llu)\n", + ldlm_it2str(it->opc), it->opc); + } + } } } @@ -1515,7 +1515,7 @@ existing_lock: } } - if (rc != 0) { + if (rc != 0 && !(flags & LDLM_FL_RESENT)) { if (lock->l_export) { ldlm_lock_cancel(lock); } else { @@ -1671,7 +1671,7 @@ int ldlm_request_cancel(struct ptlrpc_request *req, lock = ldlm_handle2lock(&dlm_req->lock_handle[i]); if (!lock) { LDLM_DEBUG_NOLOCK("server-side cancel handler stale " - "lock (cookie "LPU64")", + "lock (cookie %llu)", dlm_req->lock_handle[i].cookie); continue; } @@ -1697,10 +1697,10 @@ int ldlm_request_cancel(struct ptlrpc_request *req, } if ((flags & LATF_STATS) && ldlm_is_ast_sent(lock)) { - long delay = cfs_time_sub(cfs_time_current_sec(), - lock->l_last_activity); - LDLM_DEBUG(lock, "server cancels blocked lock after " - CFS_DURATION_T"s", delay); + time64_t delay = ktime_get_real_seconds() - + lock->l_last_activity; + LDLM_DEBUG(lock, "server cancels blocked lock after %llds", + (s64)delay); at_measured(&lock->l_export->exp_bl_lock_at, delay); } ldlm_lock_cancel(lock); @@ -2135,11 +2135,11 @@ static int ldlm_handle_setinfo(struct ptlrpc_request *req) } static inline void ldlm_callback_errmsg(struct ptlrpc_request *req, - const char *msg, int rc, - struct lustre_handle *handle) + const char *msg, int rc, + const struct lustre_handle *handle) { DEBUG_REQ((req->rq_no_reply || rc) ? D_WARNING : D_DLMTRACE, req, - "%s: [nid %s] [rc %d] [lock "LPX64"]", + "%s: [nid %s] [rc %d] [lock %#llx]", msg, libcfs_id2str(req->rq_peer), rc, handle ? handle->cookie : 0); if (req->rq_no_reply) @@ -2258,7 +2258,7 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) lock = ldlm_handle2lock_long(&dlm_req->lock_handle[0], 0); if (!lock) { - CDEBUG(D_DLMTRACE, "callback on lock "LPX64" - lock " + CDEBUG(D_DLMTRACE, "callback on lock %#llx - lock " "disappeared\n", dlm_req->lock_handle[0].cookie); rc = ldlm_callback_reply(req, -EINVAL); ldlm_callback_errmsg(req, "Operate with invalid parameter", rc, @@ -2274,28 +2274,27 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) lock_res_and_lock(lock); lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags & LDLM_FL_AST_MASK); - if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) { - /* If somebody cancels lock and cache is already dropped, - * or lock is failed before cp_ast received on client, - * we can tell the server we have no lock. Otherwise, we - * should send cancel after dropping the cache. */ + if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) { + /* If somebody cancels lock and cache is already dropped, + * or lock is failed before cp_ast received on client, + * we can tell the server we have no lock. Otherwise, we + * should send cancel after dropping the cache. */ if ((ldlm_is_canceling(lock) && ldlm_is_bl_done(lock)) || - ldlm_is_failed(lock)) { - LDLM_DEBUG(lock, "callback on lock " - LPX64" - lock disappeared\n", - dlm_req->lock_handle[0].cookie); - unlock_res_and_lock(lock); - LDLM_LOCK_RELEASE(lock); - rc = ldlm_callback_reply(req, -EINVAL); - ldlm_callback_errmsg(req, "Operate on stale lock", rc, - &dlm_req->lock_handle[0]); - RETURN(0); - } + ldlm_is_failed(lock)) { + LDLM_DEBUG(lock, "callback on lock %llx - lock disappeared", + dlm_req->lock_handle[0].cookie); + unlock_res_and_lock(lock); + LDLM_LOCK_RELEASE(lock); + rc = ldlm_callback_reply(req, -EINVAL); + ldlm_callback_errmsg(req, "Operate on stale lock", rc, + &dlm_req->lock_handle[0]); + RETURN(0); + } /* BL_AST locks are not needed in LRU. * Let ldlm_cancel_lru() be fast. */ - ldlm_lock_remove_from_lru(lock); + ldlm_lock_remove_from_lru(lock); ldlm_set_bl_ast(lock); - } + } unlock_res_and_lock(lock); /* We want the ost thread to get this reply so that it can respond @@ -2360,7 +2359,7 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req) struct ldlm_request *dlm_req; CERROR("%s from %s arrived at %lu with bad export cookie " - LPU64"\n", + "%llu\n", ll_opcode2str(lustre_msg_get_opc(req->rq_reqmsg)), libcfs_nid2str(req->rq_peer.nid), req->rq_arrival_time.tv_sec, @@ -2420,7 +2419,7 @@ static int ldlm_cancel_hpreq_lock_match(struct ptlrpc_request *req, if (lustre_handle_equal(&dlm_req->lock_handle[i], &lockh)) { DEBUG_REQ(D_RPCTRACE, req, - "Prio raised by lock "LPX64".", lockh.cookie); + "Prio raised by lock %#llx.", lockh.cookie); rc = 1; break; @@ -2537,15 +2536,15 @@ static int ldlm_revoke_lock_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd, void ldlm_revoke_export_locks(struct obd_export *exp) { struct list_head rpc_list; - ENTRY; + ENTRY; INIT_LIST_HEAD(&rpc_list); - cfs_hash_for_each_empty(exp->exp_lock_hash, - ldlm_revoke_lock_cb, &rpc_list); - ldlm_run_ast_work(exp->exp_obd->obd_namespace, &rpc_list, - LDLM_WORK_REVOKE_AST); + cfs_hash_for_each_nolock(exp->exp_lock_hash, + ldlm_revoke_lock_cb, &rpc_list, 0); + ldlm_run_ast_work(exp->exp_obd->obd_namespace, &rpc_list, + LDLM_WORK_REVOKE_AST); - EXIT; + EXIT; } EXPORT_SYMBOL(ldlm_revoke_export_locks); #endif /* HAVE_SERVER_SUPPORT */ @@ -2600,7 +2599,6 @@ static int ldlm_bl_get_work(struct ldlm_bl_pool *blp, /* This only contains temporary data until the thread starts */ struct ldlm_bl_thread_data { - char bltd_name[CFS_CURPROC_COMM_MAX]; struct ldlm_bl_pool *bltd_blp; struct completion bltd_comp; int bltd_num; @@ -2608,19 +2606,32 @@ struct ldlm_bl_thread_data { static int ldlm_bl_thread_main(void *arg); -static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp) +static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp, bool check_busy) { struct ldlm_bl_thread_data bltd = { .bltd_blp = blp }; struct task_struct *task; init_completion(&bltd.bltd_comp); - bltd.bltd_num = atomic_read(&blp->blp_num_threads); - snprintf(bltd.bltd_name, sizeof(bltd.bltd_name) - 1, - "ldlm_bl_%02d", bltd.bltd_num); - task = kthread_run(ldlm_bl_thread_main, &bltd, bltd.bltd_name); + + bltd.bltd_num = atomic_inc_return(&blp->blp_num_threads); + if (bltd.bltd_num >= blp->blp_max_threads) { + atomic_dec(&blp->blp_num_threads); + return 0; + } + + LASSERTF(bltd.bltd_num > 0, "thread num:%d\n", bltd.bltd_num); + if (check_busy && + atomic_read(&blp->blp_busy_threads) < (bltd.bltd_num - 1)) { + atomic_dec(&blp->blp_num_threads); + return 0; + } + + task = kthread_run(ldlm_bl_thread_main, &bltd, "ldlm_bl_%02d", + bltd.bltd_num); if (IS_ERR(task)) { CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %ld\n", - atomic_read(&blp->blp_num_threads), PTR_ERR(task)); + bltd.bltd_num, PTR_ERR(task)); + atomic_dec(&blp->blp_num_threads); return PTR_ERR(task); } wait_for_completion(&bltd.bltd_comp); @@ -2632,12 +2643,11 @@ static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp) static int ldlm_bl_thread_need_create(struct ldlm_bl_pool *blp, struct ldlm_bl_work_item *blwi) { - int busy = atomic_read(&blp->blp_busy_threads); - - if (busy >= blp->blp_max_threads) + if (atomic_read(&blp->blp_num_threads) >= blp->blp_max_threads) return 0; - if (busy < atomic_read(&blp->blp_num_threads)) + if (atomic_read(&blp->blp_busy_threads) < + atomic_read(&blp->blp_num_threads)) return 0; if (blwi != NULL && (blwi->blwi_ns == NULL || @@ -2726,9 +2736,6 @@ static int ldlm_bl_thread_main(void *arg) blp = bltd->bltd_blp; - atomic_inc(&blp->blp_num_threads); - atomic_inc(&blp->blp_busy_threads); - complete(&bltd->bltd_comp); /* cannot use bltd after this, it is only on caller's stack */ @@ -2740,29 +2747,33 @@ static int ldlm_bl_thread_main(void *arg) rc = ldlm_bl_get_work(blp, &blwi, &exp); - if (rc == 0) { - atomic_dec(&blp->blp_busy_threads); + if (rc == 0) l_wait_event_exclusive(blp->blp_waitq, ldlm_bl_get_work(blp, &blwi, &exp), &lwi); - atomic_inc(&blp->blp_busy_threads); - } + atomic_inc(&blp->blp_busy_threads); if (ldlm_bl_thread_need_create(blp, blwi)) /* discard the return value, we tried */ - ldlm_bl_thread_start(blp); + ldlm_bl_thread_start(blp, true); if (exp) rc = ldlm_bl_thread_exports(blp, exp); else if (blwi) rc = ldlm_bl_thread_blwi(blp, blwi); + atomic_dec(&blp->blp_busy_threads); + if (rc == LDLM_ITER_STOP) break; + + /* If there are many namespaces, we will not sleep waiting for + * work, and must do a cond_resched to avoid holding the CPU + * for too long */ + cond_resched(); } - atomic_dec(&blp->blp_busy_threads); atomic_dec(&blp->blp_num_threads); complete(&blp->blp_comp); RETURN(0); @@ -2913,6 +2924,40 @@ void ldlm_destroy_export(struct obd_export *exp) } EXPORT_SYMBOL(ldlm_destroy_export); +static ssize_t cancel_unused_locks_before_replay_show(struct kobject *kobj, + struct attribute *attr, + char *buf) +{ + return sprintf(buf, "%d\n", ldlm_cancel_unused_locks_before_replay); +} + +static ssize_t cancel_unused_locks_before_replay_store(struct kobject *kobj, + struct attribute *attr, + const char *buffer, + size_t count) +{ + int rc; + unsigned long val; + + rc = kstrtoul(buffer, 10, &val); + if (rc) + return rc; + + ldlm_cancel_unused_locks_before_replay = val; + + return count; +} +LUSTRE_RW_ATTR(cancel_unused_locks_before_replay); + +static struct attribute *ldlm_attrs[] = { + &lustre_attr_cancel_unused_locks_before_replay.attr, + NULL, +}; + +static struct attribute_group ldlm_attr_group = { + .attrs = ldlm_attrs, +}; + static int ldlm_setup(void) { static struct ptlrpc_service_conf conf; @@ -2932,9 +2977,25 @@ static int ldlm_setup(void) if (ldlm_state == NULL) RETURN(-ENOMEM); + ldlm_kobj = kobject_create_and_add("ldlm", lustre_kobj); + if (!ldlm_kobj) + GOTO(out, -ENOMEM); + + rc = sysfs_create_group(ldlm_kobj, &ldlm_attr_group); + if (rc) + GOTO(out, rc); + + ldlm_ns_kset = kset_create_and_add("namespaces", NULL, ldlm_kobj); + if (!ldlm_ns_kset) + GOTO(out, -ENOMEM); + + ldlm_svc_kset = kset_create_and_add("services", NULL, ldlm_kobj); + if (!ldlm_svc_kset) + GOTO(out, -ENOMEM); + #ifdef CONFIG_PROC_FS - rc = ldlm_proc_setup(); - if (rc != 0) + rc = ldlm_proc_setup(); + if (rc != 0) GOTO(out, rc); #endif /* CONFIG_PROC_FS */ @@ -3042,7 +3103,7 @@ static int ldlm_setup(void) } for (i = 0; i < blp->blp_min_threads; i++) { - rc = ldlm_bl_thread_start(blp); + rc = ldlm_bl_thread_start(blp, false); if (rc < 0) GOTO(out, rc); } @@ -3054,7 +3115,7 @@ static int ldlm_setup(void) INIT_LIST_HEAD(&waiting_locks_list); spin_lock_init(&waiting_locks_spinlock); - cfs_timer_init(&waiting_locks_timer, waiting_locks_callback, NULL); + setup_timer(&waiting_locks_timer, waiting_locks_callback, 0); task = kthread_run(expired_lock_main, NULL, "ldlm_elt"); if (IS_ERR(task)) { @@ -3126,6 +3187,13 @@ static int ldlm_cleanup(void) ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service); #endif + if (ldlm_ns_kset) + kset_unregister(ldlm_ns_kset); + if (ldlm_svc_kset) + kset_unregister(ldlm_svc_kset); + if (ldlm_kobj) + kobject_put(ldlm_kobj); + ldlm_proc_cleanup(); #ifdef HAVE_SERVER_SUPPORT