X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fldlm%2Fldlm_request.c;h=ff5ae53aa8fcfe746e38cede78f75373626607d1;hb=6052cc88eb1232ac3b0193f0d47881887a2dcfdc;hp=143d0a9c376b07c8d9dd1cfbce72ca892e97f13a;hpb=7fff052c930da4822c3b2a13d130da7473a20a58;p=fs%2Flustre-release.git diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 143d0a9..ff5ae53 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -67,14 +67,11 @@ unsigned int ldlm_enqueue_min = OBD_TIMEOUT_DEFAULT; module_param(ldlm_enqueue_min, uint, 0644); MODULE_PARM_DESC(ldlm_enqueue_min, "lock enqueue timeout minimum"); +EXPORT_SYMBOL(ldlm_enqueue_min); /* in client side, whether the cached locks will be canceled before replay */ unsigned int ldlm_cancel_unused_locks_before_replay = 1; -static void interrupted_completion_wait(void *data) -{ -} - struct lock_wait_data { struct ldlm_lock *lwd_lock; __u32 lwd_conn_cnt; @@ -111,9 +108,8 @@ int ldlm_request_bufsize(int count, int type) return sizeof(struct ldlm_request) + avail; } -int ldlm_expired_completion_wait(void *data) +void ldlm_expired_completion_wait(struct lock_wait_data *lwd) { - struct lock_wait_data *lwd = data; struct ldlm_lock *lock = lwd->lwd_lock; struct obd_import *imp; struct obd_device *obd; @@ -124,9 +120,8 @@ int ldlm_expired_completion_wait(void *data) LDLM_ERROR(lock, "lock timed out (enqueued at %lld, %llds ago); not entering recovery in server code, just going back to sleep", - (s64)lock->l_activity, - (s64)(ktime_get_real_seconds() - - lock->l_activity)); + lock->l_activity, + ktime_get_real_seconds() - lock->l_activity); if (ktime_get_seconds() > next_dump) { last_dump = next_dump; next_dump = ktime_get_seconds() + 300; @@ -135,7 +130,7 @@ int ldlm_expired_completion_wait(void *data) if (last_dump == 0) libcfs_debug_dumplog(); } - RETURN(0); + RETURN_EXIT; } obd = lock->l_conn_export->exp_obd; @@ -143,11 +138,11 @@ int ldlm_expired_completion_wait(void *data) ptlrpc_fail_import(imp, lwd->lwd_conn_cnt); LDLM_ERROR(lock, "lock timed out (enqueued at %lld, %llds ago), entering recovery for %s@%s", - (s64)lock->l_activity, - (s64)(ktime_get_real_seconds() - lock->l_activity), + lock->l_activity, + ktime_get_real_seconds() - lock->l_activity, obd2cli_tgt(obd), imp->imp_connection->c_remote_uuid.uuid); - RETURN(0); + EXIT; } int is_granted_or_cancelled_nolock(struct ldlm_lock *lock) @@ -176,9 +171,9 @@ EXPORT_SYMBOL(is_granted_or_cancelled_nolock); * We use the same basis for both server side and client side functions * from a single node. */ -static time64_t ldlm_cp_timeout(struct ldlm_lock *lock) +static timeout_t ldlm_cp_timeout(struct ldlm_lock *lock) { - time64_t timeout; + timeout_t timeout; if (AT_OFF) return obd_timeout; @@ -189,7 +184,7 @@ static time64_t ldlm_cp_timeout(struct ldlm_lock *lock) * doesn't respond reasonably, and then give us the lock. */ timeout = at_get(ldlm_lock_to_ns_at(lock)); - return max(3 * timeout, (time64_t) ldlm_enqueue_min); + return max(3 * timeout, (timeout_t)ldlm_enqueue_min); } /** @@ -198,7 +193,6 @@ static time64_t ldlm_cp_timeout(struct ldlm_lock *lock) */ static int ldlm_completion_tail(struct ldlm_lock *lock, void *data) { - time64_t delay; int result = 0; if (ldlm_is_destroyed(lock) || ldlm_is_failed(lock)) { @@ -208,10 +202,16 @@ static int ldlm_completion_tail(struct ldlm_lock *lock, void *data) LDLM_DEBUG(lock, "client-side enqueue: granted"); } else { /* Take into AT only CP RPC, not immediately granted locks */ - delay = ktime_get_real_seconds() - lock->l_activity; - LDLM_DEBUG(lock, "client-side enqueue: granted after %llds", - (s64)delay); + timeout_t delay = 0; + /* Discard negative timeouts. We should also limit the + * maximum value of the timeout + */ + if (ktime_get_real_seconds() > lock->l_activity) + delay = ktime_get_real_seconds() - lock->l_activity; + + LDLM_DEBUG(lock, "client-side enqueue: granted after %ds", + delay); /* Update our time estimate */ at_measured(ldlm_lock_to_ns_at(lock), delay); } @@ -269,8 +269,7 @@ int ldlm_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data) struct lock_wait_data lwd; struct obd_device *obd; struct obd_import *imp = NULL; - struct l_wait_info lwi; - time64_t timeout; + timeout_t timeout; int rc = 0; ENTRY; @@ -300,15 +299,6 @@ noreproc: lwd.lwd_lock = lock; lock->l_activity = ktime_get_real_seconds(); - if (ldlm_is_no_timeout(lock)) { - LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT"); - lwi = LWI_INTR(interrupted_completion_wait, &lwd); - } else { - lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout), - ldlm_expired_completion_wait, - interrupted_completion_wait, &lwd); - } - if (imp != NULL) { spin_lock(&imp->imp_lock); lwd.lwd_conn_cnt = imp->imp_conn_cnt; @@ -322,8 +312,22 @@ noreproc: rc = -EINTR; } else { /* Go to sleep until the lock is granted or cancelled. */ - rc = l_wait_event(lock->l_waitq, - is_granted_or_cancelled(lock), &lwi); + if (ldlm_is_no_timeout(lock)) { + LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT"); + rc = l_wait_event_abortable( + lock->l_waitq, + is_granted_or_cancelled(lock)); + } else { + if (wait_event_idle_timeout( + lock->l_waitq, + is_granted_or_cancelled(lock), + cfs_time_seconds(timeout)) == 0) { + ldlm_expired_completion_wait(&lwd); + rc = l_wait_event_abortable( + lock->l_waitq, + is_granted_or_cancelled(lock)); + } + } } if (rc) { @@ -849,8 +853,7 @@ int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req, struct req_capsule *pill = &req->rq_pill; struct ldlm_request *dlm = NULL; LIST_HEAD(head); - enum ldlm_lru_flags lru_flags; - int avail, to_free, pack = 0; + int avail, to_free = 0, pack = 0; int rc; ENTRY; @@ -862,10 +865,10 @@ int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req, req_capsule_filled_sizes(pill, RCL_CLIENT); avail = ldlm_capsule_handles_avail(pill, RCL_CLIENT, canceloff); - lru_flags = LDLM_LRU_FLAG_NO_WAIT | (ns_connect_lru_resize(ns) ? - LDLM_LRU_FLAG_LRUR : LDLM_LRU_FLAG_AGED); - to_free = !ns_connect_lru_resize(ns) && - opc == LDLM_ENQUEUE ? 1 : 0; + /* If we have reached the limit, free +1 slot for the new one */ + if (!ns_connect_lru_resize(ns) && opc == LDLM_ENQUEUE && + ns->ns_nr_unused >= ns->ns_max_unused) + to_free = 1; /* * Cancel LRU locks here _only_ if the server supports @@ -875,7 +878,7 @@ int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req, if (avail > count) count += ldlm_cancel_lru_local(ns, cancels, to_free, avail - count, 0, - lru_flags); + LDLM_LRU_FLAG_NO_WAIT); if (avail > count) pack = count; else @@ -1445,6 +1448,14 @@ int ldlm_cli_update_pool(struct ptlrpc_request *req) new_slv = lustre_msg_get_slv(req->rq_repmsg); obd = req->rq_import->imp_obd; + read_lock(&obd->obd_pool_lock); + if (obd->obd_pool_slv == new_slv && + obd->obd_pool_limit == new_limit) { + read_unlock(&obd->obd_pool_lock); + RETURN(0); + } + read_unlock(&obd->obd_pool_lock); + /* * Set new SLV and limit in OBD fields to make them accessible * to the pool thread. We do not access obd_namespace and pool @@ -1490,7 +1501,6 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh, enum ldlm_cancel_flags cancel_flags) { struct obd_export *exp; - enum ldlm_lru_flags lru_flags; int avail, count = 1; __u64 rc = 0; struct ldlm_namespace *ns; @@ -1548,10 +1558,8 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh, LASSERT(avail > 0); ns = ldlm_lock_to_ns(lock); - lru_flags = ns_connect_lru_resize(ns) ? - LDLM_LRU_FLAG_LRUR : LDLM_LRU_FLAG_AGED; count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1, - LCF_BL_AST, lru_flags); + LCF_BL_AST, 0); } ldlm_cli_cancel_list(&cancels, count, NULL, cancel_flags); RETURN(0); @@ -1615,12 +1623,12 @@ int ldlm_cli_cancel_list_local(struct list_head *cancels, int count, */ static enum ldlm_policy_res ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock, - int unused, int added, int count) + int added, int min) { enum ldlm_policy_res result = LDLM_POLICY_CANCEL_LOCK; /* - * don't check added & count since we want to process all locks + * don't check @added & @min since we want to process all locks * from unused list. * It's fine to not take lock to access lock->l_resource since * the lock has already been granted so it won't change. @@ -1641,8 +1649,8 @@ ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock, /** * Callback function for LRU-resize policy. Decides whether to keep - * \a lock in LRU for current \a LRU size \a unused, added in current - * scan \a added and number of locks to be preferably canceled \a count. + * \a lock in LRU for \a added in current scan and \a min number of locks + * to be preferably canceled. * * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning * @@ -1650,34 +1658,28 @@ ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock, */ static enum ldlm_policy_res ldlm_cancel_lrur_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock, - int unused, int added, - int count) + int added, int min) { ktime_t cur = ktime_get(); struct ldlm_pool *pl = &ns->ns_pool; u64 slv, lvf, lv; s64 la; - /* - * Stop LRU processing when we reach past @count or have checked all - * locks in LRU. - */ - if (count && added >= count) - return LDLM_POLICY_KEEP_LOCK; + if (added < min) + return LDLM_POLICY_CANCEL_LOCK; /* * Despite of the LV, It doesn't make sense to keep the lock which * is unused for ns_max_age time. */ - if (ktime_after(ktime_get(), - ktime_add(lock->l_last_used, ns->ns_max_age))) + if (ktime_after(cur, ktime_add(lock->l_last_used, ns->ns_max_age))) return LDLM_POLICY_CANCEL_LOCK; slv = ldlm_pool_get_slv(pl); lvf = ldlm_pool_get_lvf(pl); la = div_u64(ktime_to_ns(ktime_sub(cur, lock->l_last_used)), NSEC_PER_SEC); - lv = lvf * la * unused; + lv = lvf * la * ns->ns_nr_unused; /* Inform pool about current CLV to see it via debugfs. */ ldlm_pool_set_clv(pl, lv); @@ -1695,44 +1697,21 @@ static enum ldlm_policy_res ldlm_cancel_lrur_policy(struct ldlm_namespace *ns, static enum ldlm_policy_res ldlm_cancel_lrur_no_wait_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock, - int unused, int added, - int count) + int added, int min) { enum ldlm_policy_res result; - result = ldlm_cancel_lrur_policy(ns, lock, unused, added, count); + result = ldlm_cancel_lrur_policy(ns, lock, added, min); if (result == LDLM_POLICY_KEEP_LOCK) return result; - return ldlm_cancel_no_wait_policy(ns, lock, unused, added, count); + return ldlm_cancel_no_wait_policy(ns, lock, added, min); } /** - * Callback function for debugfs used policy. Makes decision whether to keep - * \a lock in LRU for current \a LRU size \a unused, added in current scan \a - * added and number of locks to be preferably canceled \a count. - * - * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning - * - * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU - */ -static enum ldlm_policy_res ldlm_cancel_passed_policy(struct ldlm_namespace *ns, - struct ldlm_lock *lock, - int unused, int added, - int count) -{ - /* - * Stop LRU processing when we reach past @count or have checked all - * locks in LRU. - */ - return (added >= count) ? - LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK; -} - -/** - * Callback function for aged policy. Makes decision whether to keep \a lock in - * LRU for current LRU size \a unused, added in current scan \a added and - * number of locks to be preferably canceled \a count. + * Callback function for aged policy. Decides whether to keep + * \a lock in LRU for \a added in current scan and \a min number of locks + * to be preferably canceled. * * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning * @@ -1740,10 +1719,9 @@ static enum ldlm_policy_res ldlm_cancel_passed_policy(struct ldlm_namespace *ns, */ static enum ldlm_policy_res ldlm_cancel_aged_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock, - int unused, int added, - int count) + int added, int min) { - if ((added >= count) && + if ((added >= min) && ktime_before(ktime_get(), ktime_add(lock->l_last_used, ns->ns_max_age))) return LDLM_POLICY_KEEP_LOCK; @@ -1754,78 +1732,43 @@ static enum ldlm_policy_res ldlm_cancel_aged_policy(struct ldlm_namespace *ns, static enum ldlm_policy_res ldlm_cancel_aged_no_wait_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock, - int unused, int added, int count) + int added, int min) { enum ldlm_policy_res result; - result = ldlm_cancel_aged_policy(ns, lock, unused, added, count); + result = ldlm_cancel_aged_policy(ns, lock, added, min); if (result == LDLM_POLICY_KEEP_LOCK) return result; - return ldlm_cancel_no_wait_policy(ns, lock, unused, added, count); -} - -/** - * Callback function for default policy. Makes decision whether to keep \a lock - * in LRU for current LRU size \a unused, added in current scan \a added and - * number of locks to be preferably canceled \a count. - * - * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning - * - * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU - */ -static -enum ldlm_policy_res ldlm_cancel_default_policy(struct ldlm_namespace *ns, - struct ldlm_lock *lock, - int unused, int added, - int count) -{ - /* - * Stop LRU processing when we reach past count or have checked all - * locks in LRU. - */ - return (added >= count) ? - LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK; + return ldlm_cancel_no_wait_policy(ns, lock, added, min); } typedef enum ldlm_policy_res (*ldlm_cancel_lru_policy_t)(struct ldlm_namespace *ns, struct ldlm_lock *lock, - int unused, int added, int count); + int added, int min); static ldlm_cancel_lru_policy_t ldlm_cancel_lru_policy(struct ldlm_namespace *ns, enum ldlm_lru_flags lru_flags) { if (ns_connect_lru_resize(ns)) { - if (lru_flags & LDLM_LRU_FLAG_SHRINK) - /* We kill passed number of old locks. */ - return ldlm_cancel_passed_policy; - if (lru_flags & LDLM_LRU_FLAG_LRUR) { - if (lru_flags & LDLM_LRU_FLAG_NO_WAIT) - return ldlm_cancel_lrur_no_wait_policy; - else - return ldlm_cancel_lrur_policy; - } - if (lru_flags & LDLM_LRU_FLAG_PASSED) - return ldlm_cancel_passed_policy; + if (lru_flags & LDLM_LRU_FLAG_NO_WAIT) + return ldlm_cancel_lrur_no_wait_policy; + else + return ldlm_cancel_lrur_policy; } else { - if (lru_flags & LDLM_LRU_FLAG_AGED) { - if (lru_flags & LDLM_LRU_FLAG_NO_WAIT) - return ldlm_cancel_aged_no_wait_policy; - else - return ldlm_cancel_aged_policy; - } + if (lru_flags & LDLM_LRU_FLAG_NO_WAIT) + return ldlm_cancel_aged_no_wait_policy; + else + return ldlm_cancel_aged_policy; } - if (lru_flags & LDLM_LRU_FLAG_NO_WAIT) - return ldlm_cancel_no_wait_policy; - - return ldlm_cancel_default_policy; } /** - * - Free space in LRU for \a count new locks, + * - Free space in LRU for \a min new locks, * redundant unused locks are canceled locally; * - also cancel locally unused aged locks; * - do not cancel more than \a max locks; + * - if some locks are cancelled, try to cancel at least \a batch locks * - GET the found locks and add them into the \a cancels list. * * A client lock can be added to the l_bl_ast list only when it is @@ -1836,40 +1779,49 @@ ldlm_cancel_lru_policy(struct ldlm_namespace *ns, enum ldlm_lru_flags lru_flags) * attempt to cancel a lock rely on this flag, l_bl_ast list is accessed * later without any special locking. * - * Calling policies for enabled LRU resize: - * ---------------------------------------- - * flags & LDLM_LRU_FLAG_LRUR - use LRU resize policy (SLV from server) to - * cancel not more than \a count locks; - * - * flags & LDLM_LRU_FLAG_PASSED - cancel \a count number of old locks (located - * at the beginning of LRU list); - * - * flags & LDLM_LRU_FLAG_SHRINK - cancel not more than \a count locks according - * to memory pressre policy function; + * Locks are cancelled according to the LRU resize policy (SLV from server) + * if LRU resize is enabled; otherwise, the "aged policy" is used; * - * flags & LDLM_LRU_FLAG_AGED - cancel \a count locks according to "aged policy" + * LRU flags: + * ---------------------------------------- * - * flags & LDLM_LRU_FLAG_NO_WAIT - cancel as many unused locks as possible - * (typically before replaying locks) w/o - * sending any RPCs or waiting for any - * outstanding RPC to complete. + * flags & LDLM_LRU_FLAG_NO_WAIT - cancel locks w/o sending any RPCs or waiting + * for any outstanding RPC to complete. * * flags & LDLM_CANCEL_CLEANUP - when cancelling read locks, do not check for - * other read locks covering the same pages, just - * discard those pages. + * other read locks covering the same pages, just + * discard those pages. */ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns, - struct list_head *cancels, int count, int max, + struct list_head *cancels, + int min, int max, int batch, enum ldlm_lru_flags lru_flags) { ldlm_cancel_lru_policy_t pf; int added = 0; int no_wait = lru_flags & LDLM_LRU_FLAG_NO_WAIT; - ENTRY; + /* + * Let only 1 thread to proceed. However, not for those which have the + * @max limit given (ELC), as LRU may be left not cleaned up in full. + */ + if (max == 0) { + if (test_and_set_bit(LDLM_LRU_CANCEL, &ns->ns_flags)) + RETURN(0); + } else if (test_bit(LDLM_LRU_CANCEL, &ns->ns_flags)) + RETURN(0); + + LASSERT(ergo(max, min <= max)); + /* No sense to give @batch for ELC */ + LASSERT(ergo(max, batch == 0)); + if (!ns_connect_lru_resize(ns)) - count += ns->ns_nr_unused - ns->ns_max_unused; + min = max_t(int, min, ns->ns_nr_unused - ns->ns_max_unused); + + /* If at least 1 lock is to be cancelled, cancel at least @batch locks */ + if (min && min < batch) + min = batch; pf = ldlm_cancel_lru_policy(ns, lru_flags); LASSERT(pf != NULL); @@ -1926,7 +1878,7 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns, * their weight. Big extent locks will stay in * the cache. */ - result = pf(ns, lock, ns->ns_nr_unused, added, count); + result = pf(ns, lock, added, min); if (result == LDLM_POLICY_KEEP_LOCK) { lu_ref_del(&lock->l_reference, __func__, current); LDLM_LOCK_RELEASE(lock); @@ -2003,18 +1955,25 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns, unlock_res_and_lock(lock); lu_ref_del(&lock->l_reference, __FUNCTION__, current); added++; + /* Once a lock added, batch the requested amount */ + if (min == 0) + min = batch; } + + if (max == 0) + clear_bit(LDLM_LRU_CANCEL, &ns->ns_flags); + RETURN(added); } int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels, - int count, int max, + int min, int max, enum ldlm_cancel_flags cancel_flags, enum ldlm_lru_flags lru_flags) { int added; - added = ldlm_prepare_lru_list(ns, cancels, count, max, lru_flags); + added = ldlm_prepare_lru_list(ns, cancels, min, max, 0, lru_flags); if (added <= 0) return added; @@ -2022,14 +1981,14 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels, } /** - * Cancel at least \a nr locks from given namespace LRU. + * Cancel at least \a min locks from given namespace LRU. * * When called with LCF_ASYNC the blocking callback will be handled * in a thread and this function will return after the thread has been * asked to call the callback. When called with LCF_ASYNC the blocking * callback will be performed in this function. */ -int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, +int ldlm_cancel_lru(struct ldlm_namespace *ns, int min, enum ldlm_cancel_flags cancel_flags, enum ldlm_lru_flags lru_flags) { @@ -2042,7 +2001,8 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, * Just prepare the list of locks, do not actually cancel them yet. * Locks are cancelled later in a separate thread. */ - count = ldlm_prepare_lru_list(ns, &cancels, nr, 0, lru_flags); + count = ldlm_prepare_lru_list(ns, &cancels, min, 0, + ns->ns_cancel_batch, lru_flags); rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count, cancel_flags); if (rc == 0) RETURN(count); @@ -2400,6 +2360,8 @@ static int replay_lock_interpret(const struct lu_env *env, ENTRY; atomic_dec(&req->rq_import->imp_replay_inflight); + wake_up(&req->rq_import->imp_replay_waitq); + if (rc != ELDLM_OK) GOTO(out, rc); @@ -2515,7 +2477,7 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock) LDLM_DEBUG(lock, "replaying lock:"); - atomic_inc(&req->rq_import->imp_replay_inflight); + atomic_inc(&imp->imp_replay_inflight); aa = ptlrpc_req_async_args(aa, req); aa->lock_handle = body->lock_handle[0]; req->rq_interpret_reply = replay_lock_interpret; @@ -2555,7 +2517,20 @@ static void ldlm_cancel_unused_locks_for_replay(struct ldlm_namespace *ns) canceled, ldlm_ns_name(ns)); } -int ldlm_replay_locks(struct obd_import *imp) +static int lock_can_replay(struct obd_import *imp) +{ + struct client_obd *cli = &imp->imp_obd->u.cli; + + CDEBUG(D_HA, "check lock replay limit, inflights = %u(%u)\n", + atomic_read(&imp->imp_replay_inflight) - 1, + cli->cl_max_rpcs_in_flight); + + /* +1 due to ldlm_lock_replay() increment */ + return atomic_read(&imp->imp_replay_inflight) < + 1 + min_t(u32, cli->cl_max_rpcs_in_flight, 8); +} + +int __ldlm_replay_locks(struct obd_import *imp, bool rate_limit) { struct ldlm_namespace *ns = imp->imp_obd->obd_namespace; LIST_HEAD(list); @@ -2564,15 +2539,12 @@ int ldlm_replay_locks(struct obd_import *imp) ENTRY; - LASSERT(atomic_read(&imp->imp_replay_inflight) == 0); + LASSERT(atomic_read(&imp->imp_replay_inflight) == 1); /* don't replay locks if import failed recovery */ if (imp->imp_vbr_failed) RETURN(0); - /* ensure this doesn't fall to 0 before all have been queued */ - atomic_inc(&imp->imp_replay_inflight); - if (ldlm_cancel_unused_locks_before_replay) ldlm_cancel_unused_locks_for_replay(ns); @@ -2586,9 +2558,54 @@ int ldlm_replay_locks(struct obd_import *imp) } rc = replay_one_lock(imp, lock); LDLM_LOCK_RELEASE(lock); + + if (rate_limit) + wait_event_idle_exclusive(imp->imp_replay_waitq, + lock_can_replay(imp)); } + RETURN(rc); +} + +/** + * Lock replay uses rate control and can sleep waiting so + * must be in separate thread from ptlrpcd itself + */ +static int ldlm_lock_replay_thread(void *data) +{ + struct obd_import *imp = data; + + CDEBUG(D_HA, "lock replay thread %s to %s@%s\n", + imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd), + imp->imp_connection->c_remote_uuid.uuid); + + __ldlm_replay_locks(imp, true); atomic_dec(&imp->imp_replay_inflight); + ptlrpc_import_recovery_state_machine(imp); + class_import_put(imp); - RETURN(rc); + return 0; +} + +int ldlm_replay_locks(struct obd_import *imp) +{ + struct task_struct *task; + int rc = 0; + + class_import_get(imp); + /* ensure this doesn't fall to 0 before all have been queued */ + atomic_inc(&imp->imp_replay_inflight); + + task = kthread_run(ldlm_lock_replay_thread, imp, "ldlm_lock_replay"); + if (IS_ERR(task)) { + rc = PTR_ERR(task); + CDEBUG(D_HA, "can't start lock replay thread: rc = %d\n", rc); + + /* run lock replay without rate control */ + rc = __ldlm_replay_locks(imp, false); + atomic_dec(&imp->imp_replay_inflight); + class_import_put(imp); + } + + return rc; }