*/
/*
* This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
*/
/**
* This file contains Asynchronous System Trap (AST) handlers and related
unsigned int ldlm_enqueue_min = OBD_TIMEOUT_DEFAULT;
module_param(ldlm_enqueue_min, uint, 0644);
MODULE_PARM_DESC(ldlm_enqueue_min, "lock enqueue timeout minimum");
+EXPORT_SYMBOL(ldlm_enqueue_min);
/* in client side, whether the cached locks will be canceled before replay */
unsigned int ldlm_cancel_unused_locks_before_replay = 1;
-static void interrupted_completion_wait(void *data)
-{
-}
-
struct lock_wait_data {
struct ldlm_lock *lwd_lock;
__u32 lwd_conn_cnt;
return sizeof(struct ldlm_request) + avail;
}
-int ldlm_expired_completion_wait(void *data)
+void ldlm_expired_completion_wait(struct lock_wait_data *lwd)
{
- struct lock_wait_data *lwd = data;
struct ldlm_lock *lock = lwd->lwd_lock;
struct obd_import *imp;
struct obd_device *obd;
LDLM_ERROR(lock,
"lock timed out (enqueued at %lld, %llds ago); not entering recovery in server code, just going back to sleep",
- (s64)lock->l_activity,
- (s64)(ktime_get_real_seconds() -
- lock->l_activity));
+ lock->l_activity,
+ ktime_get_real_seconds() - lock->l_activity);
if (ktime_get_seconds() > next_dump) {
last_dump = next_dump;
next_dump = ktime_get_seconds() + 300;
if (last_dump == 0)
libcfs_debug_dumplog();
}
- RETURN(0);
+ RETURN_EXIT;
}
obd = lock->l_conn_export->exp_obd;
ptlrpc_fail_import(imp, lwd->lwd_conn_cnt);
LDLM_ERROR(lock,
"lock timed out (enqueued at %lld, %llds ago), entering recovery for %s@%s",
- (s64)lock->l_activity,
- (s64)(ktime_get_real_seconds() - lock->l_activity),
+ lock->l_activity,
+ ktime_get_real_seconds() - lock->l_activity,
obd2cli_tgt(obd), imp->imp_connection->c_remote_uuid.uuid);
- RETURN(0);
+ EXIT;
}
+int is_granted_or_cancelled_nolock(struct ldlm_lock *lock)
+{
+ int ret = 0;
+
+ check_res_locked(lock->l_resource);
+ if (ldlm_is_granted(lock) && !ldlm_is_cp_reqd(lock))
+ ret = 1;
+ else if (ldlm_is_failed(lock) || ldlm_is_cancel(lock))
+ ret = 1;
+ return ret;
+}
+EXPORT_SYMBOL(is_granted_or_cancelled_nolock);
+
/**
* Calculate the Completion timeout (covering enqueue, BL AST, data flush,
* lock cancel, and their replies). Used for lock completion timeout on the
* We use the same basis for both server side and client side functions
* from a single node.
*/
-static time64_t ldlm_cp_timeout(struct ldlm_lock *lock)
+static timeout_t ldlm_cp_timeout(struct ldlm_lock *lock)
{
- time64_t timeout;
+ timeout_t timeout;
if (AT_OFF)
return obd_timeout;
* doesn't respond reasonably, and then give us the lock.
*/
timeout = at_get(ldlm_lock_to_ns_at(lock));
- return max(3 * timeout, (time64_t) ldlm_enqueue_min);
+ return max(3 * timeout, (timeout_t)ldlm_enqueue_min);
}
/**
*/
static int ldlm_completion_tail(struct ldlm_lock *lock, void *data)
{
- time64_t delay;
int result = 0;
if (ldlm_is_destroyed(lock) || ldlm_is_failed(lock)) {
LDLM_DEBUG(lock, "client-side enqueue: granted");
} else {
/* Take into AT only CP RPC, not immediately granted locks */
- delay = ktime_get_real_seconds() - lock->l_activity;
- LDLM_DEBUG(lock, "client-side enqueue: granted after %llds",
- (s64)delay);
+ timeout_t delay = 0;
+
+ /* Discard negative timeouts. We should also limit the
+ * maximum value of the timeout
+ */
+ if (ktime_get_real_seconds() > lock->l_activity)
+ delay = ktime_get_real_seconds() - lock->l_activity;
+ LDLM_DEBUG(lock, "client-side enqueue: granted after %ds",
+ delay);
/* Update our time estimate */
at_measured(ldlm_lock_to_ns_at(lock), delay);
}
RETURN(ldlm_completion_tail(lock, data));
}
- LDLM_DEBUG(lock,
- "client-side enqueue returned a blocked lock, going forward");
- ldlm_reprocess_all(lock->l_resource);
+ LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, going forward");
+ ldlm_reprocess_all(lock->l_resource, 0);
RETURN(0);
}
EXPORT_SYMBOL(ldlm_completion_ast_async);
struct lock_wait_data lwd;
struct obd_device *obd;
struct obd_import *imp = NULL;
- struct l_wait_info lwi;
- time64_t timeout;
+ timeout_t timeout;
int rc = 0;
ENTRY;
lwd.lwd_lock = lock;
lock->l_activity = ktime_get_real_seconds();
- if (ldlm_is_no_timeout(lock)) {
- LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT");
- lwi = LWI_INTR(interrupted_completion_wait, &lwd);
- } else {
- lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout),
- ldlm_expired_completion_wait,
- interrupted_completion_wait, &lwd);
- }
-
if (imp != NULL) {
spin_lock(&imp->imp_lock);
lwd.lwd_conn_cnt = imp->imp_conn_cnt;
rc = -EINTR;
} else {
/* Go to sleep until the lock is granted or cancelled. */
- rc = l_wait_event(lock->l_waitq,
- is_granted_or_cancelled(lock), &lwi);
+ if (ldlm_is_no_timeout(lock)) {
+ LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT");
+ rc = l_wait_event_abortable(
+ lock->l_waitq,
+ is_granted_or_cancelled(lock));
+ } else {
+ if (wait_event_idle_timeout(
+ lock->l_waitq,
+ is_granted_or_cancelled(lock),
+ cfs_time_seconds(timeout)) == 0) {
+ ldlm_expired_completion_wait(&lwd);
+ rc = l_wait_event_abortable(
+ lock->l_waitq,
+ is_granted_or_cancelled(lock));
+ }
+ }
}
if (rc) {
if (IS_ERR(lock))
GOTO(out_nolock, err = PTR_ERR(lock));
- err = ldlm_lvbo_init(env, lock->l_resource);
+ err = ldlm_lvbo_init(lock->l_resource);
if (err < 0) {
LDLM_ERROR(lock, "delayed lvb init failed (rc %d)", err);
ldlm_lock_destroy_nolock(lock);
* bl_ast and -EINVAL reply is sent to server anyways.
* b=17645
*/
- lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_FAILED |
+ lock->l_flags |= LDLM_FL_FAILED |
LDLM_FL_ATOMIC_CB | LDLM_FL_CBPENDING;
+ if (!(ldlm_is_bl_ast(lock) &&
+ lock->l_remote_handle.cookie != 0))
+ lock->l_flags |= LDLM_FL_LOCAL_ONLY;
need_cancel = 1;
}
unlock_res_and_lock(lock);
}
}
+static bool ldlm_request_slot_needed(struct ldlm_enqueue_info *einfo)
+{
+ /* exclude EXTENT locks and DOM-only IBITS locks because they
+ * are asynchronous and don't wait on server being blocked.
+ */
+ return einfo->ei_type == LDLM_FLOCK ||
+ (einfo->ei_type == LDLM_IBITS &&
+ einfo->ei_inodebits != MDS_INODELOCK_DOM);
+}
+
/**
* Finishing portion of client lock enqueue code.
*
* Called after receiving reply from server.
*/
int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
- enum ldlm_type type, __u8 with_policy,
- enum ldlm_mode mode, __u64 *flags, void *lvb,
+ struct ldlm_enqueue_info *einfo,
+ __u8 with_policy, __u64 *ldlm_flags, void *lvb,
__u32 lvb_len, const struct lustre_handle *lockh,
int rc)
{
struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
const struct lu_env *env = NULL;
- int is_replay = *flags & LDLM_FL_REPLAY;
+ int is_replay = *ldlm_flags & LDLM_FL_REPLAY;
struct ldlm_lock *lock;
struct ldlm_reply *reply;
int cleanup_phase = 1;
ENTRY;
+ if (ldlm_request_slot_needed(einfo))
+ obd_put_request_slot(&req->rq_import->imp_obd->u.cli);
+
+ ptlrpc_put_mod_rpc_slot(req);
+
if (req && req->rq_svc_thread)
env = req->rq_svc_thread->t_env;
lock = ldlm_handle2lock(lockh);
/* ldlm_cli_enqueue is holding a reference on this lock. */
if (!lock) {
- LASSERT(type == LDLM_FLOCK);
+ LASSERT(einfo->ei_type == LDLM_FLOCK);
RETURN(-ENOLCK);
}
lock->l_remote_handle = reply->lock_handle;
}
- *flags = ldlm_flags_from_wire(reply->lock_flags);
+ *ldlm_flags = ldlm_flags_from_wire(reply->lock_flags);
lock->l_flags |= ldlm_flags_from_wire(reply->lock_flags &
LDLM_FL_INHERIT_MASK);
unlock_res_and_lock(lock);
CDEBUG(D_INFO, "local: %p, remote cookie: %#llx, flags: %#llx\n",
- lock, reply->lock_handle.cookie, *flags);
+ lock, reply->lock_handle.cookie, *ldlm_flags);
/*
* If enqueue returned a blocked lock but the completion handler has
* already run, then it fixed up the resource and we don't need to do it
* again.
*/
- if ((*flags) & LDLM_FL_LOCK_CHANGED) {
+ if ((*ldlm_flags) & LDLM_FL_LOCK_CHANGED) {
int newmode = reply->lock_desc.l_req_mode;
LASSERT(!is_replay);
&lock->l_policy_data);
}
- if (type != LDLM_PLAIN)
+ if (einfo->ei_type != LDLM_PLAIN)
LDLM_DEBUG(lock,
"client-side enqueue, new policy data");
}
- if ((*flags) & LDLM_FL_AST_SENT) {
+ if ((*ldlm_flags) & LDLM_FL_AST_SENT) {
lock_res_and_lock(lock);
+ ldlm_bl_desc2lock(&reply->lock_desc, lock);
lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
unlock_res_and_lock(lock);
LDLM_DEBUG(lock, "enqueue reply includes blocking AST");
}
if (!is_replay) {
- rc = ldlm_lock_enqueue(env, ns, &lock, NULL, flags);
+ rc = ldlm_lock_enqueue(env, ns, &lock, NULL, ldlm_flags);
if (lock->l_completion_ast != NULL) {
- int err = lock->l_completion_ast(lock, *flags, NULL);
+ int err = lock->l_completion_ast(lock, *ldlm_flags,
+ NULL);
if (!rc)
rc = err;
EXIT;
cleanup:
if (cleanup_phase == 1 && rc)
- failed_lock_cleanup(ns, lock, mode);
+ failed_lock_cleanup(ns, lock, einfo->ei_mode);
/* Put lock 2 times, the second reference is held by ldlm_cli_enqueue */
LDLM_LOCK_PUT(lock);
LDLM_LOCK_RELEASE(lock);
struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
struct req_capsule *pill = &req->rq_pill;
struct ldlm_request *dlm = NULL;
- struct list_head head = LIST_HEAD_INIT(head);
- enum ldlm_lru_flags lru_flags;
- int avail, to_free, pack = 0;
+ LIST_HEAD(head);
+ int avail, to_free = 0, pack = 0;
int rc;
ENTRY;
req_capsule_filled_sizes(pill, RCL_CLIENT);
avail = ldlm_capsule_handles_avail(pill, RCL_CLIENT, canceloff);
- lru_flags = LDLM_LRU_FLAG_NO_WAIT | (ns_connect_lru_resize(ns) ?
- LDLM_LRU_FLAG_LRUR : LDLM_LRU_FLAG_AGED);
- to_free = !ns_connect_lru_resize(ns) &&
- opc == LDLM_ENQUEUE ? 1 : 0;
+ /* If we have reached the limit, free +1 slot for the new one */
+ if (!ns_connect_lru_resize(ns) && opc == LDLM_ENQUEUE &&
+ ns->ns_nr_unused >= ns->ns_max_unused)
+ to_free = 1;
/*
* Cancel LRU locks here _only_ if the server supports
if (avail > count)
count += ldlm_cancel_lru_local(ns, cancels, to_free,
avail - count, 0,
- lru_flags);
+ LDLM_LRU_FLAG_NO_WAIT);
if (avail > count)
pack = count;
else
lock->l_conn_export = exp;
lock->l_export = NULL;
lock->l_blocking_ast = einfo->ei_cb_bl;
- lock->l_flags |= (*flags & (LDLM_FL_NO_LRU | LDLM_FL_EXCL));
+ lock->l_flags |= (*flags & (LDLM_FL_NO_LRU | LDLM_FL_EXCL |
+ LDLM_FL_ATOMIC_CB));
lock->l_activity = ktime_get_real_seconds();
/* lock not sent to server yet */
}
if (*flags & LDLM_FL_NDELAY) {
- DEBUG_REQ(D_DLMTRACE, req, "enque lock with no delay\n");
+ DEBUG_REQ(D_DLMTRACE, req, "enqueue lock with no delay");
req->rq_no_resend = req->rq_no_delay = 1;
/*
* probably set a shorter timeout value and handle ETIMEDOUT
/* extended LDLM opcodes in client stats */
if (exp->exp_obd->obd_svc_stats != NULL) {
- bool glimpse = *flags & LDLM_FL_HAS_INTENT;
-
- /* OST glimpse has no intent buffer */
- if (req_capsule_has_field(&req->rq_pill, &RMF_LDLM_INTENT,
- RCL_CLIENT)) {
- struct ldlm_intent *it;
-
- it = req_capsule_client_get(&req->rq_pill,
- &RMF_LDLM_INTENT);
- glimpse = (it && (it->opc == IT_GLIMPSE));
- }
-
- if (!glimpse)
- ldlm_svc_get_eopc(body, exp->exp_obd->obd_svc_stats);
- else
+ /* glimpse is intent with no intent buffer */
+ if (*flags & LDLM_FL_HAS_INTENT &&
+ !req_capsule_has_field(&req->rq_pill, &RMF_LDLM_INTENT,
+ RCL_CLIENT))
lprocfs_counter_incr(exp->exp_obd->obd_svc_stats,
PTLRPC_LAST_CNTR +
LDLM_GLIMPSE_ENQUEUE);
+ else
+ ldlm_svc_get_eopc(body, exp->exp_obd->obd_svc_stats);
+ }
+
+ /* It is important to obtain modify RPC slot first (if applicable), so
+ * that threads that are waiting for a modify RPC slot are not polluting
+ * our rpcs in flight counter. */
+
+ if (einfo->ei_enq_slot)
+ ptlrpc_get_mod_rpc_slot(req);
+
+ if (ldlm_request_slot_needed(einfo)) {
+ rc = obd_get_request_slot(&req->rq_import->imp_obd->u.cli);
+ if (rc) {
+ if (einfo->ei_enq_slot)
+ ptlrpc_put_mod_rpc_slot(req);
+ failed_lock_cleanup(ns, lock, einfo->ei_mode);
+ LDLM_LOCK_RELEASE(lock);
+ if (!req_passed_in)
+ ptlrpc_req_finished(req);
+ GOTO(out, rc);
+ }
}
if (async) {
rc = ptlrpc_queue_wait(req);
- err = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, policy ? 1 : 0,
- einfo->ei_mode, flags, lvb, lvb_len,
- lockh, rc);
+ err = ldlm_cli_enqueue_fini(exp, req, einfo, policy ? 1 : 0, flags,
+ lvb, lvb_len, lockh, rc);
/*
* If ldlm_cli_enqueue_fini did not find the lock, we need to free
else
rc = err;
+out:
if (!req_passed_in && req != NULL) {
ptlrpc_req_finished(req);
if (reqp)
EXPORT_SYMBOL(ldlm_cli_enqueue);
/**
- * Client-side lock convert reply handling.
- *
- * Finish client lock converting, checks for concurrent converts
- * and clear 'converting' flag so lock can be placed back into LRU.
- */
-static int lock_convert_interpret(const struct lu_env *env,
- struct ptlrpc_request *req,
- void *args, int rc)
-{
- struct ldlm_async_args *aa = args;
- struct ldlm_lock *lock;
- struct ldlm_reply *reply;
-
- ENTRY;
-
- lock = ldlm_handle2lock(&aa->lock_handle);
- if (!lock) {
- LDLM_DEBUG_NOLOCK("convert ACK for unknown local cookie %#llx",
- aa->lock_handle.cookie);
- RETURN(-ESTALE);
- }
-
- LDLM_DEBUG(lock, "CONVERTED lock:");
-
- if (rc != ELDLM_OK)
- GOTO(out, rc);
-
- reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
- if (reply == NULL)
- GOTO(out, rc = -EPROTO);
-
- if (reply->lock_handle.cookie != aa->lock_handle.cookie) {
- LDLM_ERROR(lock,
- "convert ACK with wrong lock cookie %#llx but cookie %#llx from server %s id %s\n",
- aa->lock_handle.cookie, reply->lock_handle.cookie,
- req->rq_export->exp_client_uuid.uuid,
- libcfs_id2str(req->rq_peer));
- GOTO(out, rc = ELDLM_NO_LOCK_DATA);
- }
-
- lock_res_and_lock(lock);
- /*
- * Lock convert is sent for any new bits to drop, the converting flag
- * is dropped when ibits on server are the same as on client. Meanwhile
- * that can be so that more later convert will be replied first with
- * and clear converting flag, so in case of such race just exit here.
- * if lock has no converting bits then
- */
- if (!ldlm_is_converting(lock)) {
- LDLM_DEBUG(lock,
- "convert ACK for lock without converting flag, reply ibits %#llx",
- reply->lock_desc.l_policy_data.l_inodebits.bits);
- } else if (reply->lock_desc.l_policy_data.l_inodebits.bits !=
- lock->l_policy_data.l_inodebits.bits) {
- /*
- * Compare server returned lock ibits and local lock ibits
- * if they are the same we consider convertion is done,
- * otherwise we have more converts inflight and keep
- * converting flag.
- */
- LDLM_DEBUG(lock, "convert ACK with ibits %#llx\n",
- reply->lock_desc.l_policy_data.l_inodebits.bits);
- } else {
- ldlm_clear_converting(lock);
-
- /*
- * Concurrent BL AST may arrive and cause another convert
- * or cancel so just do nothing here if bl_ast is set,
- * finish with convert otherwise.
- */
- if (!ldlm_is_bl_ast(lock)) {
- struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
-
- /*
- * Drop cancel_bits since there are no more converts
- * and put lock into LRU if it is still not used and
- * is not there yet.
- */
- lock->l_policy_data.l_inodebits.cancel_bits = 0;
- if (!lock->l_readers && !lock->l_writers &&
- !ldlm_is_canceling(lock)) {
- spin_lock(&ns->ns_lock);
- /* there is check for list_empty() inside */
- ldlm_lock_remove_from_lru_nolock(lock);
- ldlm_lock_add_to_lru_nolock(lock);
- spin_unlock(&ns->ns_lock);
- }
- }
- }
- unlock_res_and_lock(lock);
-out:
- if (rc) {
- int flag;
-
- lock_res_and_lock(lock);
- if (ldlm_is_converting(lock)) {
- ldlm_clear_converting(lock);
- ldlm_set_cbpending(lock);
- ldlm_set_bl_ast(lock);
- lock->l_policy_data.l_inodebits.cancel_bits = 0;
- }
- unlock_res_and_lock(lock);
-
- /*
- * fallback to normal lock cancel. If rc means there is no
- * valid lock on server, do only local cancel
- */
- if (rc == ELDLM_NO_LOCK_DATA)
- flag = LCF_LOCAL;
- else
- flag = LCF_ASYNC;
-
- rc = ldlm_cli_cancel(&aa->lock_handle, flag);
- if (rc < 0)
- LDLM_DEBUG(lock, "failed to cancel lock: rc = %d\n",
- rc);
- }
- LDLM_LOCK_PUT(lock);
- RETURN(rc);
-}
-
-/**
* Client-side IBITS lock convert.
*
* Inform server that lock has been converted instead of canceling.
* is made asynchronous.
*
*/
-int ldlm_cli_convert(struct ldlm_lock *lock, __u32 *flags)
+int ldlm_cli_convert_req(struct ldlm_lock *lock, __u32 *flags, __u64 new_bits)
{
struct ldlm_request *body;
struct ptlrpc_request *req;
- struct ldlm_async_args *aa;
struct obd_export *exp = lock->l_conn_export;
ENTRY;
- if (exp == NULL) {
- LDLM_ERROR(lock, "convert must not be called on local locks.");
- RETURN(-EINVAL);
- }
+ LASSERT(exp != NULL);
/*
* this is better to check earlier and it is done so already,
body->lock_desc.l_req_mode = lock->l_req_mode;
body->lock_desc.l_granted_mode = lock->l_granted_mode;
- body->lock_desc.l_policy_data.l_inodebits.bits =
- lock->l_policy_data.l_inodebits.bits;
+ body->lock_desc.l_policy_data.l_inodebits.bits = new_bits;
body->lock_desc.l_policy_data.l_inodebits.cancel_bits = 0;
body->lock_flags = ldlm_flags_to_wire(*flags);
lprocfs_counter_incr(exp->exp_obd->obd_svc_stats,
LDLM_CONVERT - LDLM_FIRST_OPC);
- aa = ptlrpc_req_async_args(req);
- ldlm_lock2handle(lock, &aa->lock_handle);
- req->rq_interpret_reply = lock_convert_interpret;
-
ptlrpcd_add_req(req);
RETURN(0);
}
bool local_only;
LDLM_DEBUG(lock, "client-side cancel");
+ OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL_LOCAL,
+ cfs_fail_val);
+
/* Set this flag to prevent others from getting new references*/
lock_res_and_lock(lock);
ldlm_set_cbpending(lock);
}
LDLM_DEBUG(lock, "server-side local cancel");
ldlm_lock_cancel(lock);
- ldlm_reprocess_all(lock->l_resource);
+ ldlm_reprocess_all(lock->l_resource,
+ lock->l_policy_data.l_inodebits.bits);
}
RETURN(rc);
*/
int ldlm_cli_update_pool(struct ptlrpc_request *req)
{
+ struct ldlm_namespace *ns;
struct obd_device *obd;
- __u64 new_slv;
+ __u64 new_slv, ratio;
__u32 new_limit;
ENTRY;
if (lustre_msg_get_slv(req->rq_repmsg) == 0 ||
lustre_msg_get_limit(req->rq_repmsg) == 0) {
DEBUG_REQ(D_HA, req,
- "Zero SLV or Limit found (SLV: %llu, Limit: %u)",
+ "Zero SLV or limit found (SLV=%llu, limit=%u)",
lustre_msg_get_slv(req->rq_repmsg),
lustre_msg_get_limit(req->rq_repmsg));
RETURN(0);
new_slv = lustre_msg_get_slv(req->rq_repmsg);
obd = req->rq_import->imp_obd;
+ read_lock(&obd->obd_pool_lock);
+ if (obd->obd_pool_slv == new_slv &&
+ obd->obd_pool_limit == new_limit) {
+ read_unlock(&obd->obd_pool_lock);
+ RETURN(0);
+ }
+ read_unlock(&obd->obd_pool_lock);
+
/*
- * Set new SLV and limit in OBD fields to make them accessible
- * to the pool thread. We do not access obd_namespace and pool
- * directly here as there is no reliable way to make sure that
- * they are still alive at cleanup time. Evil races are possible
- * which may cause Oops at that time.
+ * OBD device keeps the new pool attributes before they are handled by
+ * the pool.
*/
write_lock(&obd->obd_pool_lock);
obd->obd_pool_slv = new_slv;
obd->obd_pool_limit = new_limit;
write_unlock(&obd->obd_pool_lock);
+ /*
+ * Check if an urgent pool recalc is needed, let it to be a change of
+ * SLV on 10%. It is applicable to LRU resize enabled case only.
+ */
+ ns = obd->obd_namespace;
+ if (!ns_connect_lru_resize(ns) ||
+ ldlm_pool_get_slv(&ns->ns_pool) < new_slv)
+ RETURN(0);
+
+ ratio = 100 * new_slv / ldlm_pool_get_slv(&ns->ns_pool);
+ if (100 - ratio >= ns->ns_recalc_pct &&
+ !ns->ns_stopping && !ns->ns_rpc_recalc) {
+ bool recalc = false;
+
+ spin_lock(&ns->ns_lock);
+ if (!ns->ns_stopping && !ns->ns_rpc_recalc) {
+ ldlm_namespace_get(ns);
+ recalc = true;
+ ns->ns_rpc_recalc = 1;
+ }
+ spin_unlock(&ns->ns_lock);
+ if (recalc)
+ ldlm_bl_to_thread_ns(ns);
+ }
+
RETURN(0);
}
+int ldlm_cli_convert(struct ldlm_lock *lock,
+ enum ldlm_cancel_flags cancel_flags)
+{
+ int rc = -EINVAL;
+
+ LASSERT(!lock->l_readers && !lock->l_writers);
+ LDLM_DEBUG(lock, "client lock convert START");
+
+ if (lock->l_resource->lr_type == LDLM_IBITS) {
+ lock_res_and_lock(lock);
+ do {
+ rc = ldlm_cli_inodebits_convert(lock, cancel_flags);
+ } while (rc == -EAGAIN);
+ unlock_res_and_lock(lock);
+ }
+
+ LDLM_DEBUG(lock, "client lock convert END");
+ RETURN(rc);
+}
+EXPORT_SYMBOL(ldlm_cli_convert);
+
/**
* Client side lock cancel.
*
enum ldlm_cancel_flags cancel_flags)
{
struct obd_export *exp;
- enum ldlm_lru_flags lru_flags;
int avail, count = 1;
__u64 rc = 0;
struct ldlm_namespace *ns;
struct ldlm_lock *lock;
- struct list_head cancels = LIST_HEAD_INIT(cancels);
+ LIST_HEAD(cancels);
ENTRY;
RETURN(0);
}
- /* Convert lock bits instead of cancel for IBITS locks */
- if (cancel_flags & LCF_CONVERT) {
- LASSERT(lock->l_resource->lr_type == LDLM_IBITS);
- LASSERT(lock->l_policy_data.l_inodebits.cancel_bits != 0);
-
- rc = ldlm_cli_dropbits(lock,
- lock->l_policy_data.l_inodebits.cancel_bits);
- if (rc == 0) {
- LDLM_LOCK_RELEASE(lock);
- RETURN(0);
- }
- }
-
lock_res_and_lock(lock);
+ LASSERT(!ldlm_is_converting(lock));
+
/* Lock is being canceled and the caller doesn't want to wait */
if (ldlm_is_canceling(lock)) {
if (cancel_flags & LCF_ASYNC) {
unlock_res_and_lock(lock);
} else {
- struct l_wait_info lwi = { 0 };
-
unlock_res_and_lock(lock);
- l_wait_event(lock->l_waitq, is_bl_done(lock), &lwi);
+ wait_event_idle(lock->l_waitq, is_bl_done(lock));
}
LDLM_LOCK_RELEASE(lock);
RETURN(0);
}
- /*
- * Lock is being converted, cancel it immediately.
- * When convert will end, it releases lock and it will be gone.
- */
- if (ldlm_is_converting(lock)) {
- /* set back flags removed by convert */
- ldlm_set_cbpending(lock);
- ldlm_set_bl_ast(lock);
- }
-
ldlm_set_canceling(lock);
unlock_res_and_lock(lock);
LASSERT(avail > 0);
ns = ldlm_lock_to_ns(lock);
- lru_flags = ns_connect_lru_resize(ns) ?
- LDLM_LRU_FLAG_LRUR : LDLM_LRU_FLAG_AGED;
count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1,
- LCF_BL_AST, lru_flags);
+ LCF_BL_AST, 0);
}
ldlm_cli_cancel_list(&cancels, count, NULL, cancel_flags);
RETURN(0);
int ldlm_cli_cancel_list_local(struct list_head *cancels, int count,
enum ldlm_cancel_flags cancel_flags)
{
- struct list_head head = LIST_HEAD_INIT(head);
+ LIST_HEAD(head);
struct ldlm_lock *lock, *next;
int left = 0, bl_ast = 0;
__u64 rc;
*/
if (!(cancel_flags & LCF_BL_AST) && (rc == LDLM_FL_BL_AST)) {
LDLM_DEBUG(lock, "Cancel lock separately");
- list_del_init(&lock->l_bl_ast);
- list_add(&lock->l_bl_ast, &head);
+ list_move(&lock->l_bl_ast, &head);
bl_ast++;
continue;
}
*/
static enum ldlm_policy_res
ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock,
- int unused, int added, int count)
+ int added, int min)
{
enum ldlm_policy_res result = LDLM_POLICY_CANCEL_LOCK;
/*
- * don't check added & count since we want to process all locks
+ * don't check @added & @min since we want to process all locks
* from unused list.
* It's fine to not take lock to access lock->l_resource since
* the lock has already been granted so it won't change.
case LDLM_IBITS:
if (ns->ns_cancel != NULL && ns->ns_cancel(lock) != 0)
break;
+ /* fallthrough */
default:
result = LDLM_POLICY_SKIP_LOCK;
break;
/**
* Callback function for LRU-resize policy. Decides whether to keep
- * \a lock in LRU for current \a LRU size \a unused, added in current
- * scan \a added and number of locks to be preferably canceled \a count.
+ * \a lock in LRU for \a added in current scan and \a min number of locks
+ * to be preferably canceled.
*
* \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
*
*/
static enum ldlm_policy_res ldlm_cancel_lrur_policy(struct ldlm_namespace *ns,
struct ldlm_lock *lock,
- int unused, int added,
- int count)
+ int added, int min)
{
ktime_t cur = ktime_get();
struct ldlm_pool *pl = &ns->ns_pool;
u64 slv, lvf, lv;
s64 la;
- /*
- * Stop LRU processing when we reach past @count or have checked all
- * locks in LRU.
- */
- if (count && added >= count)
- return LDLM_POLICY_KEEP_LOCK;
+ if (added < min)
+ return LDLM_POLICY_CANCEL_LOCK;
/*
* Despite of the LV, It doesn't make sense to keep the lock which
* is unused for ns_max_age time.
*/
- if (ktime_after(ktime_get(),
- ktime_add(lock->l_last_used, ns->ns_max_age)))
+ if (ktime_after(cur, ktime_add(lock->l_last_used, ns->ns_max_age)))
return LDLM_POLICY_CANCEL_LOCK;
slv = ldlm_pool_get_slv(pl);
lvf = ldlm_pool_get_lvf(pl);
la = div_u64(ktime_to_ns(ktime_sub(cur, lock->l_last_used)),
NSEC_PER_SEC);
- lv = lvf * la * unused;
+ lv = lvf * la * ns->ns_nr_unused >> 8;
/* Inform pool about current CLV to see it via debugfs. */
ldlm_pool_set_clv(pl, lv);
static enum ldlm_policy_res
ldlm_cancel_lrur_no_wait_policy(struct ldlm_namespace *ns,
struct ldlm_lock *lock,
- int unused, int added,
- int count)
+ int added, int min)
{
enum ldlm_policy_res result;
- result = ldlm_cancel_lrur_policy(ns, lock, unused, added, count);
+ result = ldlm_cancel_lrur_policy(ns, lock, added, min);
if (result == LDLM_POLICY_KEEP_LOCK)
return result;
- return ldlm_cancel_no_wait_policy(ns, lock, unused, added, count);
+ return ldlm_cancel_no_wait_policy(ns, lock, added, min);
}
/**
- * Callback function for debugfs used policy. Makes decision whether to keep
- * \a lock in LRU for current \a LRU size \a unused, added in current scan \a
- * added and number of locks to be preferably canceled \a count.
- *
- * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
- *
- * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU
- */
-static enum ldlm_policy_res ldlm_cancel_passed_policy(struct ldlm_namespace *ns,
- struct ldlm_lock *lock,
- int unused, int added,
- int count)
-{
- /*
- * Stop LRU processing when we reach past @count or have checked all
- * locks in LRU.
- */
- return (added >= count) ?
- LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
-}
-
-/**
- * Callback function for aged policy. Makes decision whether to keep \a lock in
- * LRU for current LRU size \a unused, added in current scan \a added and
- * number of locks to be preferably canceled \a count.
+ * Callback function for aged policy. Decides whether to keep
+ * \a lock in LRU for \a added in current scan and \a min number of locks
+ * to be preferably canceled.
*
* \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
*
*/
static enum ldlm_policy_res ldlm_cancel_aged_policy(struct ldlm_namespace *ns,
struct ldlm_lock *lock,
- int unused, int added,
- int count)
+ int added, int min)
{
- if ((added >= count) &&
+ if ((added >= min) &&
ktime_before(ktime_get(),
ktime_add(lock->l_last_used, ns->ns_max_age)))
return LDLM_POLICY_KEEP_LOCK;
static enum ldlm_policy_res
ldlm_cancel_aged_no_wait_policy(struct ldlm_namespace *ns,
struct ldlm_lock *lock,
- int unused, int added, int count)
+ int added, int min)
{
enum ldlm_policy_res result;
- result = ldlm_cancel_aged_policy(ns, lock, unused, added, count);
+ result = ldlm_cancel_aged_policy(ns, lock, added, min);
if (result == LDLM_POLICY_KEEP_LOCK)
return result;
- return ldlm_cancel_no_wait_policy(ns, lock, unused, added, count);
-}
-
-/**
- * Callback function for default policy. Makes decision whether to keep \a lock
- * in LRU for current LRU size \a unused, added in current scan \a added and
- * number of locks to be preferably canceled \a count.
- *
- * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
- *
- * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU
- */
-static
-enum ldlm_policy_res ldlm_cancel_default_policy(struct ldlm_namespace *ns,
- struct ldlm_lock *lock,
- int unused, int added,
- int count)
-{
- /*
- * Stop LRU processing when we reach past count or have checked all
- * locks in LRU.
- */
- return (added >= count) ?
- LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
+ return ldlm_cancel_no_wait_policy(ns, lock, added, min);
}
typedef enum ldlm_policy_res
(*ldlm_cancel_lru_policy_t)(struct ldlm_namespace *ns, struct ldlm_lock *lock,
- int unused, int added, int count);
+ int added, int min);
static ldlm_cancel_lru_policy_t
ldlm_cancel_lru_policy(struct ldlm_namespace *ns, enum ldlm_lru_flags lru_flags)
{
if (ns_connect_lru_resize(ns)) {
- if (lru_flags & LDLM_LRU_FLAG_SHRINK)
- /* We kill passed number of old locks. */
- return ldlm_cancel_passed_policy;
- if (lru_flags & LDLM_LRU_FLAG_LRUR) {
- if (lru_flags & LDLM_LRU_FLAG_NO_WAIT)
- return ldlm_cancel_lrur_no_wait_policy;
- else
- return ldlm_cancel_lrur_policy;
- }
- if (lru_flags & LDLM_LRU_FLAG_PASSED)
- return ldlm_cancel_passed_policy;
+ if (lru_flags & LDLM_LRU_FLAG_NO_WAIT)
+ return ldlm_cancel_lrur_no_wait_policy;
+ else
+ return ldlm_cancel_lrur_policy;
} else {
- if (lru_flags & LDLM_LRU_FLAG_AGED) {
- if (lru_flags & LDLM_LRU_FLAG_NO_WAIT)
- return ldlm_cancel_aged_no_wait_policy;
- else
- return ldlm_cancel_aged_policy;
- }
+ if (lru_flags & LDLM_LRU_FLAG_NO_WAIT)
+ return ldlm_cancel_aged_no_wait_policy;
+ else
+ return ldlm_cancel_aged_policy;
}
- if (lru_flags & LDLM_LRU_FLAG_NO_WAIT)
- return ldlm_cancel_no_wait_policy;
-
- return ldlm_cancel_default_policy;
}
/**
- * - Free space in LRU for \a count new locks,
+ * - Free space in LRU for \a min new locks,
* redundant unused locks are canceled locally;
* - also cancel locally unused aged locks;
* - do not cancel more than \a max locks;
+ * - if some locks are cancelled, try to cancel at least \a batch locks
* - GET the found locks and add them into the \a cancels list.
*
* A client lock can be added to the l_bl_ast list only when it is
* attempt to cancel a lock rely on this flag, l_bl_ast list is accessed
* later without any special locking.
*
- * Calling policies for enabled LRU resize:
- * ----------------------------------------
- * flags & LDLM_LRU_FLAG_LRUR - use LRU resize policy (SLV from server) to
- * cancel not more than \a count locks;
- *
- * flags & LDLM_LRU_FLAG_PASSED - cancel \a count number of old locks (located
- * at the beginning of LRU list);
- *
- * flags & LDLM_LRU_FLAG_SHRINK - cancel not more than \a count locks according
- * to memory pressre policy function;
+ * Locks are cancelled according to the LRU resize policy (SLV from server)
+ * if LRU resize is enabled; otherwise, the "aged policy" is used;
*
- * flags & LDLM_LRU_FLAG_AGED - cancel \a count locks according to "aged policy"
+ * LRU flags:
+ * ----------------------------------------
*
- * flags & LDLM_LRU_FLAG_NO_WAIT - cancel as many unused locks as possible
- * (typically before replaying locks) w/o
- * sending any RPCs or waiting for any
- * outstanding RPC to complete.
+ * flags & LDLM_LRU_FLAG_NO_WAIT - cancel locks w/o sending any RPCs or waiting
+ * for any outstanding RPC to complete.
*
* flags & LDLM_CANCEL_CLEANUP - when cancelling read locks, do not check for
- * other read locks covering the same pages, just
- * discard those pages.
+ * other read locks covering the same pages, just
+ * discard those pages.
*/
static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
- struct list_head *cancels, int count, int max,
+ struct list_head *cancels,
+ int min, int max, int batch,
enum ldlm_lru_flags lru_flags)
{
ldlm_cancel_lru_policy_t pf;
int added = 0;
int no_wait = lru_flags & LDLM_LRU_FLAG_NO_WAIT;
-
ENTRY;
+ /*
+ * Let only 1 thread to proceed. However, not for those which have the
+ * @max limit given (ELC), as LRU may be left not cleaned up in full.
+ */
+ if (max == 0) {
+ if (test_and_set_bit(LDLM_LRU_CANCEL, &ns->ns_flags))
+ RETURN(0);
+ } else if (test_bit(LDLM_LRU_CANCEL, &ns->ns_flags))
+ RETURN(0);
+
+ LASSERT(ergo(max, min <= max));
+ /* No sense to give @batch for ELC */
+ LASSERT(ergo(max, batch == 0));
+
if (!ns_connect_lru_resize(ns))
- count += ns->ns_nr_unused - ns->ns_max_unused;
+ min = max_t(int, min, ns->ns_nr_unused - ns->ns_max_unused);
+
+ /* If at least 1 lock is to be cancelled, cancel at least @batch locks */
+ if (min && min < batch)
+ min = batch;
pf = ldlm_cancel_lru_policy(ns, lru_flags);
LASSERT(pf != NULL);
/* No locks which got blocking requests. */
LASSERT(!ldlm_is_bl_ast(lock));
- if (!ldlm_is_canceling(lock) &&
- !ldlm_is_converting(lock))
+ if (!ldlm_is_canceling(lock))
break;
/*
* their weight. Big extent locks will stay in
* the cache.
*/
- result = pf(ns, lock, ns->ns_nr_unused, added, count);
+ result = pf(ns, lock, added, min);
if (result == LDLM_POLICY_KEEP_LOCK) {
lu_ref_del(&lock->l_reference, __func__, current);
LDLM_LOCK_RELEASE(lock);
lock_res_and_lock(lock);
/* Check flags again under the lock. */
- if (ldlm_is_canceling(lock) || ldlm_is_converting(lock) ||
+ if (ldlm_is_canceling(lock) ||
ldlm_lock_remove_from_lru_check(lock, last_use) == 0) {
/*
* Another thread is removing lock from LRU, or
unlock_res_and_lock(lock);
lu_ref_del(&lock->l_reference, __FUNCTION__, current);
added++;
+ /* Once a lock added, batch the requested amount */
+ if (min == 0)
+ min = batch;
}
+
+ if (max == 0)
+ clear_bit(LDLM_LRU_CANCEL, &ns->ns_flags);
+
RETURN(added);
}
int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
- int count, int max,
+ int min, int max,
enum ldlm_cancel_flags cancel_flags,
enum ldlm_lru_flags lru_flags)
{
int added;
- added = ldlm_prepare_lru_list(ns, cancels, count, max, lru_flags);
+ added = ldlm_prepare_lru_list(ns, cancels, min, max, 0, lru_flags);
if (added <= 0)
return added;
}
/**
- * Cancel at least \a nr locks from given namespace LRU.
+ * Cancel at least \a min locks from given namespace LRU.
*
* When called with LCF_ASYNC the blocking callback will be handled
* in a thread and this function will return after the thread has been
* asked to call the callback. When called with LCF_ASYNC the blocking
* callback will be performed in this function.
*/
-int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr,
+int ldlm_cancel_lru(struct ldlm_namespace *ns, int min,
enum ldlm_cancel_flags cancel_flags,
enum ldlm_lru_flags lru_flags)
{
- struct list_head cancels = LIST_HEAD_INIT(cancels);
+ LIST_HEAD(cancels);
int count, rc;
ENTRY;
* Just prepare the list of locks, do not actually cancel them yet.
* Locks are cancelled later in a separate thread.
*/
- count = ldlm_prepare_lru_list(ns, &cancels, nr, 0, lru_flags);
+ count = ldlm_prepare_lru_list(ns, &cancels, min, 0,
+ ns->ns_cancel_batch, lru_flags);
rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count, cancel_flags);
if (rc == 0)
RETURN(count);
continue;
/*
- * If somebody is already doing CANCEL, or blocking AST came,
- * or lock is being converted then skip this lock.
+ * If somebody is already doing CANCEL, or blocking AST came
+ * then skip this lock.
*/
- if (ldlm_is_bl_ast(lock) || ldlm_is_canceling(lock) ||
- ldlm_is_converting(lock))
+ if (ldlm_is_bl_ast(lock) || ldlm_is_canceling(lock))
continue;
if (lockmode_compat(lock->l_granted_mode, mode))
/*
* If policy is given and this is IBITS lock, add to list only
* those locks that match by policy.
- * Skip locks with DoM bit always to don't flush data.
*/
- if (policy && (lock->l_resource->lr_type == LDLM_IBITS) &&
- (!(lock->l_policy_data.l_inodebits.bits &
- policy->l_inodebits.bits) || ldlm_has_dom(lock)))
- continue;
+ if (policy && (lock->l_resource->lr_type == LDLM_IBITS)) {
+ if (!(lock->l_policy_data.l_inodebits.bits &
+ policy->l_inodebits.bits))
+ continue;
+ /* Skip locks with DoM bit if it is not set in policy
+ * to don't flush data by side-bits. Lock convert will
+ * drop those bits separately.
+ */
+ if (ldlm_has_dom(lock) &&
+ !(policy->l_inodebits.bits & MDS_INODELOCK_DOM))
+ continue;
+ }
/* See CBPENDING comment in ldlm_cancel_lru */
lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING |
lock_flags;
-
LASSERT(list_empty(&lock->l_bl_ast));
list_add(&lock->l_bl_ast, cancels);
LDLM_LOCK_GET(lock);
enum ldlm_cancel_flags flags, void *opaque)
{
struct ldlm_resource *res;
- struct list_head cancels = LIST_HEAD_INIT(cancels);
+ LIST_HEAD(cancels);
int count;
int rc;
ENTRY;
atomic_dec(&req->rq_import->imp_replay_inflight);
+ wake_up(&req->rq_import->imp_replay_waitq);
+
if (rc != ELDLM_OK)
GOTO(out, rc);
/* We're part of recovery, so don't wait for it. */
req->rq_send_state = LUSTRE_IMP_REPLAY_LOCKS;
+ /* If the state changed while we were prepared, don't wait */
+ req->rq_no_delay = 1;
body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
ldlm_lock2desc(lock, &body->lock_desc);
LDLM_DEBUG(lock, "replaying lock:");
- atomic_inc(&req->rq_import->imp_replay_inflight);
- CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
- aa = ptlrpc_req_async_args(req);
+ atomic_inc(&imp->imp_replay_inflight);
+ aa = ptlrpc_req_async_args(aa, req);
aa->lock_handle = body->lock_handle[0];
req->rq_interpret_reply = replay_lock_interpret;
ptlrpcd_add_req(req);
static void ldlm_cancel_unused_locks_for_replay(struct ldlm_namespace *ns)
{
int canceled;
- struct list_head cancels = LIST_HEAD_INIT(cancels);
+ LIST_HEAD(cancels);
CDEBUG(D_DLMTRACE,
"Dropping as many unused locks as possible before replay for namespace %s (%d)\n",
ldlm_ns_name(ns), ns->ns_nr_unused);
+ OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_REPLAY_PAUSE, cfs_fail_val);
+
/*
* We don't need to care whether or not LRU resize is enabled
* because the LDLM_LRU_FLAG_NO_WAIT policy doesn't use the
canceled, ldlm_ns_name(ns));
}
-int ldlm_replay_locks(struct obd_import *imp)
+static int lock_can_replay(struct obd_import *imp)
+{
+ struct client_obd *cli = &imp->imp_obd->u.cli;
+
+ CDEBUG(D_HA, "check lock replay limit, inflights = %u(%u)\n",
+ atomic_read(&imp->imp_replay_inflight) - 1,
+ cli->cl_max_rpcs_in_flight);
+
+ /* +1 due to ldlm_lock_replay() increment */
+ return atomic_read(&imp->imp_replay_inflight) <
+ 1 + min_t(u32, cli->cl_max_rpcs_in_flight, 8);
+}
+
+int __ldlm_replay_locks(struct obd_import *imp, bool rate_limit)
{
struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
- struct list_head list = LIST_HEAD_INIT(list);
+ LIST_HEAD(list);
struct ldlm_lock *lock, *next;
int rc = 0;
ENTRY;
- LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
+ while (atomic_read(&imp->imp_replay_inflight) != 1)
+ cond_resched();
/* don't replay locks if import failed recovery */
if (imp->imp_vbr_failed)
RETURN(0);
- /* ensure this doesn't fall to 0 before all have been queued */
- atomic_inc(&imp->imp_replay_inflight);
-
if (ldlm_cancel_unused_locks_before_replay)
ldlm_cancel_unused_locks_for_replay(ns);
list_for_each_entry_safe(lock, next, &list, l_pending_chain) {
list_del_init(&lock->l_pending_chain);
- if (rc) {
+ /* If we disconnected in the middle - cleanup and let
+ * reconnection to happen again. LU-14027 */
+ if (rc || (imp->imp_state != LUSTRE_IMP_REPLAY_LOCKS)) {
LDLM_LOCK_RELEASE(lock);
- continue; /* or try to do the rest? */
+ continue;
}
rc = replay_one_lock(imp, lock);
LDLM_LOCK_RELEASE(lock);
+
+ if (rate_limit)
+ wait_event_idle_exclusive(imp->imp_replay_waitq,
+ lock_can_replay(imp));
}
+ RETURN(rc);
+}
+
+/**
+ * Lock replay uses rate control and can sleep waiting so
+ * must be in separate thread from ptlrpcd itself
+ */
+static int ldlm_lock_replay_thread(void *data)
+{
+ struct obd_import *imp = data;
+
+ CDEBUG(D_HA, "lock replay thread %s to %s@%s\n",
+ imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd),
+ imp->imp_connection->c_remote_uuid.uuid);
+
+ __ldlm_replay_locks(imp, true);
atomic_dec(&imp->imp_replay_inflight);
+ ptlrpc_import_recovery_state_machine(imp);
+ class_import_put(imp);
- RETURN(rc);
+ return 0;
+}
+
+int ldlm_replay_locks(struct obd_import *imp)
+{
+ struct task_struct *task;
+ int rc = 0;
+
+ /* ensure this doesn't fall to 0 before all have been queued */
+ if (atomic_inc_return(&imp->imp_replay_inflight) > 1) {
+ atomic_dec(&imp->imp_replay_inflight);
+ return 0;
+ }
+ class_import_get(imp);
+
+ task = kthread_run(ldlm_lock_replay_thread, imp, "ldlm_lock_replay");
+ if (IS_ERR(task)) {
+ rc = PTR_ERR(task);
+ CDEBUG(D_HA, "can't start lock replay thread: rc = %d\n", rc);
+
+ /* run lock replay without rate control */
+ rc = __ldlm_replay_locks(imp, false);
+ atomic_dec(&imp->imp_replay_inflight);
+ class_import_put(imp);
+ }
+
+ return rc;
}