unsigned int ldlm_enqueue_min = OBD_TIMEOUT_DEFAULT;
module_param(ldlm_enqueue_min, uint, 0644);
MODULE_PARM_DESC(ldlm_enqueue_min, "lock enqueue timeout minimum");
+EXPORT_SYMBOL(ldlm_enqueue_min);
/* in client side, whether the cached locks will be canceled before replay */
unsigned int ldlm_cancel_unused_locks_before_replay = 1;
-static void interrupted_completion_wait(void *data)
-{
-}
-
struct lock_wait_data {
struct ldlm_lock *lwd_lock;
__u32 lwd_conn_cnt;
return sizeof(struct ldlm_request) + avail;
}
-int ldlm_expired_completion_wait(void *data)
+void ldlm_expired_completion_wait(struct lock_wait_data *lwd)
{
- struct lock_wait_data *lwd = data;
struct ldlm_lock *lock = lwd->lwd_lock;
struct obd_import *imp;
struct obd_device *obd;
LDLM_ERROR(lock,
"lock timed out (enqueued at %lld, %llds ago); not entering recovery in server code, just going back to sleep",
- (s64)lock->l_activity,
- (s64)(ktime_get_real_seconds() -
- lock->l_activity));
+ lock->l_activity,
+ ktime_get_real_seconds() - lock->l_activity);
if (ktime_get_seconds() > next_dump) {
last_dump = next_dump;
next_dump = ktime_get_seconds() + 300;
if (last_dump == 0)
libcfs_debug_dumplog();
}
- RETURN(0);
+ RETURN_EXIT;
}
obd = lock->l_conn_export->exp_obd;
ptlrpc_fail_import(imp, lwd->lwd_conn_cnt);
LDLM_ERROR(lock,
"lock timed out (enqueued at %lld, %llds ago), entering recovery for %s@%s",
- (s64)lock->l_activity,
- (s64)(ktime_get_real_seconds() - lock->l_activity),
+ lock->l_activity,
+ ktime_get_real_seconds() - lock->l_activity,
obd2cli_tgt(obd), imp->imp_connection->c_remote_uuid.uuid);
- RETURN(0);
+ EXIT;
}
int is_granted_or_cancelled_nolock(struct ldlm_lock *lock)
* We use the same basis for both server side and client side functions
* from a single node.
*/
-static time64_t ldlm_cp_timeout(struct ldlm_lock *lock)
+static timeout_t ldlm_cp_timeout(struct ldlm_lock *lock)
{
- time64_t timeout;
+ timeout_t timeout;
if (AT_OFF)
return obd_timeout;
* doesn't respond reasonably, and then give us the lock.
*/
timeout = at_get(ldlm_lock_to_ns_at(lock));
- return max(3 * timeout, (time64_t) ldlm_enqueue_min);
+ return max(3 * timeout, (timeout_t)ldlm_enqueue_min);
}
/**
*/
static int ldlm_completion_tail(struct ldlm_lock *lock, void *data)
{
- time64_t delay;
int result = 0;
if (ldlm_is_destroyed(lock) || ldlm_is_failed(lock)) {
LDLM_DEBUG(lock, "client-side enqueue: granted");
} else {
/* Take into AT only CP RPC, not immediately granted locks */
- delay = ktime_get_real_seconds() - lock->l_activity;
- LDLM_DEBUG(lock, "client-side enqueue: granted after %llds",
- (s64)delay);
+ timeout_t delay = 0;
+
+ /* Discard negative timeouts. We should also limit the
+ * maximum value of the timeout
+ */
+ if (ktime_get_real_seconds() > lock->l_activity)
+ delay = ktime_get_real_seconds() - lock->l_activity;
+ LDLM_DEBUG(lock, "client-side enqueue: granted after %ds",
+ delay);
/* Update our time estimate */
at_measured(ldlm_lock_to_ns_at(lock), delay);
}
struct lock_wait_data lwd;
struct obd_device *obd;
struct obd_import *imp = NULL;
- struct l_wait_info lwi;
- time64_t timeout;
+ timeout_t timeout;
int rc = 0;
ENTRY;
lwd.lwd_lock = lock;
lock->l_activity = ktime_get_real_seconds();
- if (ldlm_is_no_timeout(lock)) {
- LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT");
- lwi = LWI_INTR(interrupted_completion_wait, &lwd);
- } else {
- lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout),
- ldlm_expired_completion_wait,
- interrupted_completion_wait, &lwd);
- }
-
if (imp != NULL) {
spin_lock(&imp->imp_lock);
lwd.lwd_conn_cnt = imp->imp_conn_cnt;
rc = -EINTR;
} else {
/* Go to sleep until the lock is granted or cancelled. */
- rc = l_wait_event(lock->l_waitq,
- is_granted_or_cancelled(lock), &lwi);
+ if (ldlm_is_no_timeout(lock)) {
+ LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT");
+ rc = l_wait_event_abortable(
+ lock->l_waitq,
+ is_granted_or_cancelled(lock));
+ } else {
+ if (wait_event_idle_timeout(
+ lock->l_waitq,
+ is_granted_or_cancelled(lock),
+ cfs_time_seconds(timeout)) == 0) {
+ ldlm_expired_completion_wait(&lwd);
+ rc = l_wait_event_abortable(
+ lock->l_waitq,
+ is_granted_or_cancelled(lock));
+ }
+ }
}
if (rc) {
new_slv = lustre_msg_get_slv(req->rq_repmsg);
obd = req->rq_import->imp_obd;
+ read_lock(&obd->obd_pool_lock);
+ if (obd->obd_pool_slv == new_slv &&
+ obd->obd_pool_limit == new_limit) {
+ read_unlock(&obd->obd_pool_lock);
+ RETURN(0);
+ }
+ read_unlock(&obd->obd_pool_lock);
+
/*
* Set new SLV and limit in OBD fields to make them accessible
* to the pool thread. We do not access obd_namespace and pool
ENTRY;
atomic_dec(&req->rq_import->imp_replay_inflight);
+ wake_up(&req->rq_import->imp_replay_waitq);
+
if (rc != ELDLM_OK)
GOTO(out, rc);
LDLM_DEBUG(lock, "replaying lock:");
- atomic_inc(&req->rq_import->imp_replay_inflight);
+ atomic_inc(&imp->imp_replay_inflight);
aa = ptlrpc_req_async_args(aa, req);
aa->lock_handle = body->lock_handle[0];
req->rq_interpret_reply = replay_lock_interpret;
canceled, ldlm_ns_name(ns));
}
-int ldlm_replay_locks(struct obd_import *imp)
+static int lock_can_replay(struct obd_import *imp)
+{
+ struct client_obd *cli = &imp->imp_obd->u.cli;
+
+ CDEBUG(D_HA, "check lock replay limit, inflights = %u(%u)\n",
+ atomic_read(&imp->imp_replay_inflight) - 1,
+ cli->cl_max_rpcs_in_flight);
+
+ /* +1 due to ldlm_lock_replay() increment */
+ return atomic_read(&imp->imp_replay_inflight) <
+ 1 + min_t(u32, cli->cl_max_rpcs_in_flight, 8);
+}
+
+int __ldlm_replay_locks(struct obd_import *imp, bool rate_limit)
{
struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
LIST_HEAD(list);
ENTRY;
- LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
+ LASSERT(atomic_read(&imp->imp_replay_inflight) == 1);
/* don't replay locks if import failed recovery */
if (imp->imp_vbr_failed)
RETURN(0);
- /* ensure this doesn't fall to 0 before all have been queued */
- atomic_inc(&imp->imp_replay_inflight);
-
if (ldlm_cancel_unused_locks_before_replay)
ldlm_cancel_unused_locks_for_replay(ns);
}
rc = replay_one_lock(imp, lock);
LDLM_LOCK_RELEASE(lock);
+
+ if (rate_limit)
+ wait_event_idle_exclusive(imp->imp_replay_waitq,
+ lock_can_replay(imp));
}
+ RETURN(rc);
+}
+
+/**
+ * Lock replay uses rate control and can sleep waiting so
+ * must be in separate thread from ptlrpcd itself
+ */
+static int ldlm_lock_replay_thread(void *data)
+{
+ struct obd_import *imp = data;
+
+ unshare_fs_struct();
+
+ CDEBUG(D_HA, "lock replay thread %s to %s@%s\n",
+ imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd),
+ imp->imp_connection->c_remote_uuid.uuid);
+
+ __ldlm_replay_locks(imp, true);
atomic_dec(&imp->imp_replay_inflight);
+ ptlrpc_import_recovery_state_machine(imp);
+ class_import_put(imp);
- RETURN(rc);
+ return 0;
+}
+
+int ldlm_replay_locks(struct obd_import *imp)
+{
+ struct task_struct *task;
+ int rc = 0;
+
+ class_import_get(imp);
+ /* ensure this doesn't fall to 0 before all have been queued */
+ atomic_inc(&imp->imp_replay_inflight);
+
+ task = kthread_run(ldlm_lock_replay_thread, imp, "ldlm_lock_replay");
+ if (IS_ERR(task)) {
+ rc = PTR_ERR(task);
+ CDEBUG(D_HA, "can't start lock replay thread: rc = %d\n", rc);
+
+ /* run lock replay without rate control */
+ rc = __ldlm_replay_locks(imp, false);
+ atomic_dec(&imp->imp_replay_inflight);
+ class_import_put(imp);
+ }
+
+ return rc;
}