*/
/*
* This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
*
* lustre/ldlm/ldlm_lockd.c
*
}
struct ldlm_bl_pool {
- spinlock_t blp_lock;
+ spinlock_t blp_lock;
/*
* blp_prio_list is used for callbacks that should be handled
* as a priority. It is used for LDLM_FL_DISCARD_DATA requests.
* see b=13843
*/
- struct list_head blp_prio_list;
+ struct list_head blp_prio_list;
/*
* blp_list is used for all other callbacks which are likely
* to take longer to process.
*/
- struct list_head blp_list;
-
- wait_queue_head_t blp_waitq;
- struct completion blp_comp;
- atomic_t blp_num_threads;
- atomic_t blp_busy_threads;
- int blp_min_threads;
- int blp_max_threads;
+ struct list_head blp_list;
+
+ wait_queue_head_t blp_waitq;
+ struct completion blp_comp;
+ atomic_t blp_num_threads;
+ atomic_t blp_busy_threads;
+ int blp_min_threads;
+ int blp_max_threads;
+ int blp_total_locks;
+ int blp_total_blwis;
};
struct ldlm_bl_work_item {
struct obd_export *export;
struct ldlm_lock *lock;
- lock = list_entry(expired->next, struct ldlm_lock,
- l_pending_chain);
+ lock = list_first_entry(expired, struct ldlm_lock,
+ l_pending_chain);
if ((void *)lock < LP_POISON + PAGE_SIZE &&
(void *)lock >= LP_POISON) {
spin_unlock_bh(&waiting_locks_spinlock);
spin_unlock_bh(&waiting_locks_spinlock);
/* Check if we need to prolong timeout */
- if (!OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT) &&
- lock->l_callback_timestamp != 0 && /* not AST error */
+ if (!CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT) &&
+ lock->l_callback_timestamp != 0 && /* not AST err */
ldlm_lock_busy(lock)) {
LDLM_DEBUG(lock, "prolong the busy lock");
lock_res_and_lock(lock);
LDLM_ERROR(lock,
"lock callback timer expired after %llds: evicting client at %s ",
- ktime_get_real_seconds() -
+ ktime_get_seconds() -
lock->l_blast_sent,
obd_export_nid2str(export));
ldlm_lock_to_ns(lock)->ns_timeouts++;
- do_dump++;
+ if (do_dump_on_eviction(export->exp_obd))
+ do_dump++;
class_fail_export(export);
}
class_export_lock_put(export, lock);
}
spin_unlock_bh(&waiting_locks_spinlock);
- if (do_dump && obd_dump_on_eviction) {
+ if (do_dump) {
CERROR("dump the log upon eviction\n");
libcfs_debug_dumplog();
}
spin_lock_bh(&waiting_locks_spinlock);
while (!list_empty(&waiting_locks_list)) {
- lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
- l_pending_chain);
+ lock = list_first_entry(&waiting_locks_list, struct ldlm_lock,
+ l_pending_chain);
if (lock->l_callback_timestamp > ktime_get_seconds() ||
lock->l_req_mode == LCK_GROUP)
break;
time64_t now = ktime_get_seconds();
timeout_t delta = 0;
- lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
- l_pending_chain);
+ lock = list_first_entry(&waiting_locks_list, struct ldlm_lock,
+ l_pending_chain);
if (lock->l_callback_timestamp - now > 0)
delta = lock->l_callback_timestamp - now;
mod_timer(&waiting_locks_timer,
static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, timeout_t delay)
{
unsigned long timeout_jiffies = jiffies;
- time64_t now = ktime_get_seconds();
time64_t deadline;
timeout_t timeout;
+ lock->l_blast_sent = ktime_get_seconds();
if (!list_empty(&lock->l_pending_chain))
return 0;
- if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT) ||
- OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT))
+ if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT) ||
+ CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT))
delay = 1;
- deadline = now + delay;
+ deadline = lock->l_blast_sent + delay;
if (likely(deadline > lock->l_callback_timestamp))
lock->l_callback_timestamp = deadline;
- timeout = clamp_t(timeout_t, lock->l_callback_timestamp - now,
+ timeout = clamp_t(timeout_t,
+ lock->l_callback_timestamp - lock->l_blast_sent,
0, delay);
timeout_jiffies += cfs_time_seconds(timeout);
static int ldlm_add_waiting_lock(struct ldlm_lock *lock, timeout_t timeout)
{
- int ret;
+ struct obd_device *obd = NULL;
+ int at_off, ret;
/* NB: must be called with hold of lock_res_and_lock() */
LASSERT(ldlm_is_res_locked(lock));
* Do not put cross-MDT lock in the waiting list, since we
* will not evict it due to timeout for now
*/
- if (lock->l_export != NULL &&
- (exp_connect_flags(lock->l_export) & OBD_CONNECT_MDS_MDS))
- return 0;
+ if (lock->l_export != NULL) {
+ obd = lock->l_export->exp_obd;
+
+ if (exp_connect_flags(lock->l_export) & OBD_CONNECT_MDS_MDS)
+ return 0;
+ }
spin_lock_bh(&waiting_locks_spinlock);
if (ldlm_is_cancel(lock)) {
LDLM_ERROR(lock, "not waiting on destroyed lock (b=5653)");
if (ktime_get_seconds() > next) {
next = ktime_get_seconds() + 14400;
- libcfs_debug_dumpstack(NULL);
+ dump_stack();
}
return 0;
}
ldlm_set_waited(lock);
- lock->l_blast_sent = ktime_get_real_seconds();
ret = __ldlm_add_waiting_lock(lock, timeout);
if (ret) {
/*
if (ret)
ldlm_add_blocked_lock(lock);
+ at_off = obd_at_off(obd);
LDLM_DEBUG(lock, "%sadding to wait list(timeout: %d, AT: %s)",
ret == 0 ? "not re-" : "", timeout,
- AT_OFF ? "off" : "on");
+ at_off ? "off" : "on");
return ret;
}
/* Removing the head of the list, adjust timer. */
if (list_next == &waiting_locks_list) {
/* No more, just cancel. */
- del_timer(&waiting_locks_timer);
+ timer_delete(&waiting_locks_timer);
} else {
time64_t now = ktime_get_seconds();
struct ldlm_lock *next;
__ldlm_add_waiting_lock(lock, timeout);
spin_unlock_bh(&waiting_locks_spinlock);
- LDLM_DEBUG(lock, "refreshed");
+ LDLM_DEBUG(lock, "refreshed to %ds", timeout);
return 1;
}
EXPORT_SYMBOL(ldlm_refresh_waiting_lock);
timeout_t ldlm_bl_timeout(struct ldlm_lock *lock)
{
timeout_t timeout;
+ struct obd_device *obd = lock->l_export->exp_obd;
- if (AT_OFF)
+ if (obd_at_off(obd))
return obd_timeout / 2;
/*
* It would be nice to have some kind of "early reply" mechanism for
* lock callbacks too...
*/
- timeout = at_get(&lock->l_export->exp_bl_lock_at);
+ timeout = obd_at_get(obd, &lock->l_export->exp_bl_lock_at);
return max_t(timeout_t, timeout + (timeout >> 1),
- (timeout_t)ldlm_enqueue_min);
+ (timeout_t)obd_get_ldlm_enqueue_min(obd));
}
EXPORT_SYMBOL(ldlm_bl_timeout);
/**
+ * Calculate the per-export Blocking timeout by the given RPC (covering the
+ * reply to this RPC and the next RPC). The next RPC could be still not CANCEL,
+ * but having the lock refresh mechanism it is enough.
+ *
+ * Used for lock refresh timeout when we are in the middle of the process -
+ * BL AST is sent, CANCEL is ahead - it is still 1 reply for the current RPC
+ * and at least 1 RPC (which will trigger another refresh if it will be not
+ * CANCEL) - but more accurate than ldlm_bl_timeout as the timeout is taken
+ * from the RPC (i.e. the view of the client on the current AT) is taken into
+ * account.
+ *
+ * \param[in] req req which export needs the timeout calculation
+ *
+ * \retval timeout in seconds to wait for the next client's RPC
+ */
+timeout_t ldlm_bl_timeout_by_rpc(struct ptlrpc_request *req)
+{
+ struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
+ timeout_t timeout, req_timeout, at_timeout, netl;
+ struct obd_device *obd = req->rq_export->exp_obd;
+
+ if (obd_at_off(obd))
+ return obd_timeout / 2;
+
+ /* A blocked lock means somebody in the cluster is waiting, and we
+ * should not consider the worst ever case, consisting of a chain of
+ * failures on each step, however this timeout should survive a
+ * recovery of at least 1 failure, let this one to be the worst one:
+ * in case a server NID is dead first re-connect is done through the
+ * same router and also times out.
+ *
+ * Either this on the next RPC times out, take the max.
+ * Considering the current RPC, take just the left time.
+ */
+ netl = obd_at_get(obd,
+ &req->rq_export->exp_imp_reverse->imp_at.iat_net_latency);
+ req_timeout = req->rq_deadline - ktime_get_real_seconds() + netl;
+ at_timeout = at_est2timeout(obd_at_get(obd, &svcpt->scp_at_estimate))
+ + netl;
+ req_timeout = max(req_timeout, at_timeout);
+
+ /* Take 1 re-connect failure and 1 re-connect success into account. */
+ timeout = at_timeout + INITIAL_CONNECT_TIMEOUT + netl + req_timeout;
+
+ /* Client's timeout is calculated as at_est2timeout(), let's be a bit
+ * more conservative than client
+ */
+ return max(timeout + (timeout >> 4),
+ (timeout_t)obd_get_ldlm_enqueue_min(obd));
+}
+EXPORT_SYMBOL(ldlm_bl_timeout_by_rpc);
+
+/**
* Perform lock cleanup if AST sending failed.
*/
static void ldlm_failed_ast(struct ldlm_lock *lock, int rc,
struct ptlrpc_request *req, int rc,
const char *ast_type)
{
- struct lnet_process_id peer = req->rq_import->imp_connection->c_peer;
+ struct lnet_processid *peer = &req->rq_import->imp_connection->c_peer;
if (!req->rq_replied || (rc && rc != -EINVAL)) {
if (ldlm_is_cancel(lock)) {
LDLM_DEBUG(lock,
"%s AST (req@%p x%llu) timeout from nid %s, but cancel was received (AST reply lost?)",
ast_type, req, req->rq_xid,
- libcfs_nid2str(peer.nid));
+ libcfs_nidstr(&peer->nid));
ldlm_lock_cancel(lock);
rc = -ERESTART;
} else if (rc == -ENODEV || rc == -ESHUTDOWN ||
* In all such cases errors are ignored.
*/
LDLM_DEBUG(lock,
- "%s AST can't be sent due to a server %s failure or umount process: rc = %d\n",
+ "%s AST can't be sent due to a server %s failure or umount process: rc = %d",
ast_type,
req->rq_import->imp_obd->obd_name, rc);
} else {
LDLM_ERROR(lock,
"client (nid %s) %s %s AST (req@%p x%llu status %d rc %d), evict it",
- libcfs_nid2str(peer.nid),
+ libcfs_nidstr(&peer->nid),
req->rq_replied ? "returned error from" :
"failed to reply to",
ast_type, req, req->rq_xid,
LDLM_DEBUG(lock,
"client (nid %s) returned %d from %s AST (req@%p x%llu) - normal race",
- libcfs_nid2str(peer.nid),
+ libcfs_nidstr(&peer->nid),
req->rq_repmsg ?
lustre_msg_get_status(req->rq_repmsg) : -1,
ast_type, req, req->rq_xid);
struct ptlrpc_request *req;
int instant_cancel = 0;
int rc = 0;
+ struct obd_device *obd;
ENTRY;
/* Don't need to do anything here. */
RETURN(0);
- if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_SRV_BL_AST)) {
+ if (CFS_FAIL_PRECHECK(OBD_FAIL_LDLM_SRV_BL_AST)) {
LDLM_DEBUG(lock, "dropping BL AST");
RETURN(0);
}
LASSERT(lock);
LASSERT(data != NULL);
- if (lock->l_export->exp_obd->obd_recovering != 0)
+
+ obd = lock->l_export->exp_obd;
+ if (obd->obd_recovering != 0)
LDLM_ERROR(lock, "BUG 6063: lock collide during recovery");
ldlm_lock_reorder_req(lock);
body->lock_handle[0] = lock->l_remote_handle;
body->lock_handle[1].cookie = lock->l_handle.h_cookie;
body->lock_desc = *desc;
- body->lock_flags |= ldlm_flags_to_wire(lock->l_flags & LDLM_FL_AST_MASK);
+ body->lock_flags |= ldlm_flags_to_wire(lock->l_flags &
+ LDLM_FL_AST_MASK);
LDLM_DEBUG(lock, "server preparing blocking AST");
req->rq_send_state = LUSTRE_IMP_FULL;
/* ptlrpc_request_alloc_pack already set timeout */
- if (AT_OFF)
+ if (obd_at_off(obd))
req->rq_timeout = ldlm_get_rq_timeout();
if (lock->l_export && lock->l_export->exp_nid_stats &&
int instant_cancel = 0;
int rc = 0;
int lvb_len;
+ struct obd_device *obd;
ENTRY;
LASSERT(lock != NULL);
LASSERT(data != NULL);
- if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_SRV_CP_AST)) {
+ if (CFS_FAIL_PRECHECK(OBD_FAIL_LDLM_SRV_CP_AST)) {
LDLM_DEBUG(lock, "dropping CP AST");
RETURN(0);
}
+ obd = lock->l_export->exp_obd;
req = ptlrpc_request_alloc(lock->l_export->exp_imp_reverse,
&RQF_LDLM_CP_CALLBACK);
if (req == NULL)
ldlm_lock2desc(lock, &body->lock_desc);
if (lvb_len > 0) {
void *lvb = req_capsule_client_get(&req->rq_pill, &RMF_DLM_LVB);
+
lvb_len = ldlm_lvbo_fill(lock, lvb, &lvb_len);
if (lvb_len < 0) {
/*
req->rq_send_state = LUSTRE_IMP_FULL;
/* ptlrpc_request_pack already set timeout */
- if (AT_OFF)
+ if (obd_at_off(obd))
req->rq_timeout = ldlm_get_rq_timeout();
/* We only send real blocking ASTs after the lock is granted */
RETURN(lvb_len < 0 ? lvb_len : rc);
}
+EXPORT_SYMBOL(ldlm_server_completion_ast);
/**
* Server side ->l_glimpse_ast handler for client locks.
struct ldlm_cb_async_args *ca;
int rc;
struct req_format *req_fmt;
+ struct obd_device *obd = lock->l_export->exp_obd;
ENTRY;
req->rq_send_state = LUSTRE_IMP_FULL;
/* ptlrpc_request_alloc_pack already set timeout */
- if (AT_OFF)
+ if (obd_at_off(obd))
req->rq_timeout = ldlm_get_rq_timeout();
req->rq_interpret_reply = ldlm_cb_interpret;
rc = ldlm_run_ast_work(ldlm_res_to_ns(res), gl_work_list,
LDLM_WORK_GL_AST);
if (rc == -ERESTART)
- ldlm_reprocess_all(res, NULL);
+ ldlm_reprocess_all(res, 0);
RETURN(rc);
}
* Main server-side entry point into LDLM for enqueue. This is called by ptlrpc
* service threads to carry out client lock enqueueing requests.
*/
-int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
- struct ptlrpc_request *req,
- const struct ldlm_request *dlm_req,
- const struct ldlm_callback_suite *cbs)
+int ldlm_handle_enqueue(struct ldlm_namespace *ns,
+ struct req_capsule *pill,
+ const struct ldlm_request *dlm_req,
+ const struct ldlm_callback_suite *cbs)
{
struct ldlm_reply *dlm_rep;
__u64 flags;
void *cookie = NULL;
int rc = 0;
struct ldlm_resource *res = NULL;
+ struct ptlrpc_request *req = pill->rc_req;
const struct lu_env *env = req->rq_svc_thread->t_env;
ENTRY;
LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
- ldlm_request_cancel(req, dlm_req, LDLM_ENQUEUE_CANCEL_OFF, LATF_SKIP);
- flags = ldlm_flags_from_wire(dlm_req->lock_flags);
+ LASSERT(req && req->rq_export);
- LASSERT(req->rq_export);
+ if (req_capsule_ptlreq(pill))
+ ldlm_request_cancel(req, dlm_req, LDLM_ENQUEUE_CANCEL_OFF,
+ LATF_SKIP);
+
+ flags = ldlm_flags_from_wire(dlm_req->lock_flags);
/* for intent enqueue the stat will be updated inside intent policy */
if (ptlrpc_req2svc(req)->srv_stats != NULL &&
!(dlm_req->lock_flags & LDLM_FL_HAS_INTENT))
ldlm_svc_get_eopc(dlm_req, ptlrpc_req2svc(req)->srv_stats);
- if (req->rq_export && req->rq_export->exp_nid_stats &&
+ if (req->rq_export->exp_nid_stats &&
req->rq_export->exp_nid_stats->nid_ldlm_stats)
lprocfs_counter_incr(req->rq_export->exp_nid_stats->nid_ldlm_stats,
LDLM_ENQUEUE - LDLM_FIRST_OPC);
* In the function below, .hs_keycmp resolves to
* ldlm_export_lock_keycmp()
*/
- /* coverity[overrun-buffer-val] */
lock = cfs_hash_lookup(req->rq_export->exp_lock_hash,
(void *)&dlm_req->lock_handle[0]);
if (lock != NULL) {
}
}
- OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_BLOCKED, obd_timeout * 2);
+ CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_BLOCKED, obd_timeout * 2);
/*
* Don't enqueue a lock onto the export if it is been disonnected
* due to eviction (b=3822) or server umount (b=24324).
dlm_req->lock_desc.l_resource.lr_type,
&dlm_req->lock_desc.l_policy_data,
&lock->l_policy_data);
- if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
+ if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) {
lock->l_req_extent = lock->l_policy_data.l_extent;
- else if (dlm_req->lock_desc.l_resource.lr_type == LDLM_IBITS)
+ } else if (dlm_req->lock_desc.l_resource.lr_type == LDLM_IBITS) {
lock->l_policy_data.l_inodebits.try_bits =
dlm_req->lock_desc.l_policy_data.l_inodebits.try_bits;
+ lock->l_policy_data.l_inodebits.li_gid =
+ dlm_req->lock_desc.l_policy_data.l_inodebits.li_gid;
+ }
existing_lock:
cookie = req;
if (!(flags & LDLM_FL_HAS_INTENT)) {
/* based on the assumption that lvb size never changes during
* resource life time otherwise it need resource->lr_lock's
- * protection */
- req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB,
+ * protection
+ */
+ req_capsule_set_size(pill, &RMF_DLM_LVB,
RCL_SERVER, ldlm_lvbo_size(lock));
- if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
+ if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
GOTO(out, rc = -ENOMEM);
- rc = req_capsule_server_pack(&req->rq_pill);
+ rc = req_capsule_server_pack(pill);
if (rc)
GOTO(out, rc);
}
GOTO(out, err);
}
- dlm_rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
+ dlm_rep = req_capsule_server_get(pill, &RMF_DLM_REP);
ldlm_lock2desc(lock, &dlm_rep->lock_desc);
ldlm_lock2handle(lock, &dlm_rep->lock_handle);
- if (lock && lock->l_resource->lr_type == LDLM_EXTENT)
- OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_BL_EVICT, 6);
+ if (lock->l_resource->lr_type == LDLM_EXTENT)
+ CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_BL_EVICT, 6);
/*
* We never send a blocking AST until the lock is granted, but
* Cancel it now instead.
*/
if (unlikely(req->rq_export->exp_disconnected ||
- OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_OLD_EXPORT))) {
+ CFS_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_OLD_EXPORT))) {
LDLM_ERROR(lock, "lock on destroyed export %p", req->rq_export);
rc = -ENOTCONN;
} else if (ldlm_is_ast_sent(lock)) {
EXIT;
out:
- req->rq_status = rc ?: err; /* return either error - b=11190 */
- if (!req->rq_packed_final) {
- err = lustre_pack_reply(req, 1, NULL, NULL);
- if (rc == 0)
- rc = err;
+ if (req_capsule_ptlreq(pill)) {
+ req->rq_status = rc ?: err; /* return either error - b=11190 */
+ if (!req->rq_packed_final) {
+ int rc1 = lustre_pack_reply(req, 1, NULL, NULL);
+
+ if (rc == 0)
+ rc = rc1;
+ }
}
/*
err, rc);
if (rc == 0 &&
- req_capsule_has_field(&req->rq_pill, &RMF_DLM_LVB,
+ req_capsule_has_field(pill, &RMF_DLM_LVB,
RCL_SERVER) &&
ldlm_lvbo_size(lock) > 0) {
void *buf;
int buflen;
retry:
- buf = req_capsule_server_get(&req->rq_pill,
- &RMF_DLM_LVB);
- LASSERTF(buf != NULL, "req %p, lock %p\n", req, lock);
- buflen = req_capsule_get_size(&req->rq_pill,
- &RMF_DLM_LVB, RCL_SERVER);
+ buf = req_capsule_server_get(pill, &RMF_DLM_LVB);
+ LASSERTF(buf != NULL, "req %px, lock %px\n", req, lock);
+ buflen = req_capsule_get_size(pill, &RMF_DLM_LVB,
+ RCL_SERVER);
/*
* non-replayed lock, delayed lvb init may
* need to be occur now
rc2 = ldlm_lvbo_fill(lock, buf, &buflen);
if (rc2 >= 0) {
- req_capsule_shrink(&req->rq_pill,
- &RMF_DLM_LVB,
+ req_capsule_shrink(pill, &RMF_DLM_LVB,
rc2, RCL_SERVER);
} else if (rc2 == -ERANGE) {
rc2 = req_capsule_server_grow(
- &req->rq_pill,
- &RMF_DLM_LVB, buflen);
+ pill, &RMF_DLM_LVB,
+ buflen);
if (!rc2) {
goto retry;
} else {
* to client.
*/
req_capsule_shrink(
- &req->rq_pill,
- &RMF_DLM_LVB, 0,
+ pill, &RMF_DLM_LVB, 0,
RCL_SERVER);
}
} else {
} else if (flags & LDLM_FL_REPLAY) {
/* no LVB resend upon replay */
if (buflen > 0)
- req_capsule_shrink(&req->rq_pill,
- &RMF_DLM_LVB,
+ req_capsule_shrink(pill, &RMF_DLM_LVB,
0, RCL_SERVER);
else
rc = buflen;
ldlm_resource_unlink_lock(lock);
ldlm_lock_destroy_nolock(lock);
unlock_res_and_lock(lock);
-
}
+ ldlm_reprocess_all(lock->l_resource,
+ lock->l_policy_data.l_inodebits.bits);
}
if (!err && !ldlm_is_cbpending(lock) &&
dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
- ldlm_reprocess_all(lock->l_resource, lock);
+ ldlm_reprocess_all(lock->l_resource,
+ lock->l_policy_data.l_inodebits.bits);
LDLM_LOCK_RELEASE(lock);
}
return rc;
}
+EXPORT_SYMBOL(ldlm_handle_enqueue);
/*
* Clear the blocking lock, the race is possible between ldlm_handle_convert0()
ldlm_clear_blocking_lock(lock);
}
-/**
- * Main LDLM entry point for server code to process lock conversion requests.
- */
+/* Main LDLM entry point for server code to process lock conversion requests */
int ldlm_handle_convert0(struct ptlrpc_request *req,
const struct ldlm_request *dlm_req)
{
ldlm_clear_blocking_data(lock);
unlock_res_and_lock(lock);
- ldlm_reprocess_all(lock->l_resource, NULL);
+ /* All old bits should be reprocessed to send new BL AST if
+ * it wasn't sent earlier due to LDLM_FL_AST_SENT bit set.
+ */
+ ldlm_reprocess_all(lock->l_resource, bits);
}
dlm_rep->lock_handle = lock->l_remote_handle;
*/
if (res != pres) {
if (pres != NULL) {
- ldlm_reprocess_all(pres, NULL);
+ ldlm_reprocess_all(pres, 0);
LDLM_RESOURCE_DELREF(pres);
ldlm_resource_putref(pres);
}
lock->l_blast_sent != 0) {
timeout_t delay = 0;
- if (ktime_get_real_seconds() > lock->l_blast_sent)
- delay = ktime_get_real_seconds() -
+ if (ktime_get_seconds() > lock->l_blast_sent)
+ delay = ktime_get_seconds() -
lock->l_blast_sent;
LDLM_DEBUG(lock,
"server cancels blocked lock after %ds",
delay);
- at_measured(&lock->l_export->exp_bl_lock_at, delay);
+ obd_at_measure(lock->l_export->exp_obd,
+ &lock->l_export->exp_bl_lock_at,
+ delay);
}
ldlm_lock_cancel(lock);
LDLM_LOCK_PUT(lock);
}
if (pres != NULL) {
- ldlm_reprocess_all(pres, NULL);
+ ldlm_reprocess_all(pres, 0);
LDLM_RESOURCE_DELREF(pres);
ldlm_resource_putref(pres);
}
* This only can happen on client side.
*/
static int ldlm_handle_cp_callback(struct ptlrpc_request *req,
- struct ldlm_namespace *ns,
- struct ldlm_request *dlm_req,
- struct ldlm_lock *lock)
+ struct ldlm_namespace *ns,
+ struct ldlm_request *dlm_req,
+ struct ldlm_lock *lock)
{
LIST_HEAD(ast_list);
int lvb_len;
LDLM_DEBUG(lock, "client completion callback handler START");
- if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) {
+ if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) {
long to = cfs_time_seconds(1);
ldlm_callback_reply(req, 0);
* Let Enqueue to call osc_lock_upcall() and initialize
* l_ast_data
*/
- OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 2);
+ CFS_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 2);
ldlm_run_ast_work(ns, &ast_list, LDLM_WORK_CP_AST);
enum ldlm_cancel_flags cancel_flags)
{
struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
+ char *prio = "regular";
+ int count;
ENTRY;
spin_lock(&blp->blp_lock);
+ /* cannot access blwi after added to list and lock is dropped */
+ count = blwi->blwi_lock ? 1 : blwi->blwi_count;
+
+ /* if the server is waiting on a lock to be cancelled (bl_ast), this is
+ * an urgent request and should go in the priority queue so it doesn't
+ * get stuck behind non-priority work (eg, lru size management)
+ *
+ * We also prioritize discard_data, which is for eviction handling
+ */
if (blwi->blwi_lock &&
- ldlm_is_discard_data(blwi->blwi_lock)) {
- /* add LDLM_FL_DISCARD_DATA requests to the priority list */
+ (ldlm_is_discard_data(blwi->blwi_lock) ||
+ ldlm_is_bl_ast(blwi->blwi_lock))) {
list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list);
+ prio = "priority";
} else {
/* other blocking callbacks are added to the regular list */
list_add_tail(&blwi->blwi_entry, &blp->blp_list);
}
+ blp->blp_total_locks += count;
+ blp->blp_total_blwis++;
spin_unlock(&blp->blp_lock);
wake_up(&blp->blp_waitq);
+ /* unlocked read of blp values is intentional - OK for debug */
+ CDEBUG(D_DLMTRACE,
+ "added %d/%d locks to %s blp list, %d blwis in pool\n",
+ count, blp->blp_total_locks, prio, blp->blp_total_blwis);
+
/*
* can not check blwi->blwi_flags as blwi could be already freed in
* LCF_ASYNC mode
{
DEBUG_REQ((req->rq_no_reply || rc) ? D_WARNING : D_DLMTRACE, req,
"%s, NID=%s lock=%#llx: rc = %d",
- msg, libcfs_id2str(req->rq_peer),
+ msg, libcfs_idstr(&req->rq_peer),
handle ? handle->cookie : 0, rc);
if (req->rq_no_reply)
CWARN("No reply was sent, maybe cause b=21636.\n");
switch (lustre_msg_get_opc(req->rq_reqmsg)) {
case LDLM_BL_CALLBACK:
- if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET)) {
+ if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET)) {
if (cfs_fail_err)
ldlm_callback_reply(req, -(int)cfs_fail_err);
RETURN(0);
}
break;
case LDLM_CP_CALLBACK:
- if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CP_CALLBACK_NET))
+ if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_CP_CALLBACK_NET))
RETURN(0);
break;
case LDLM_GL_CALLBACK:
- if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_GL_CALLBACK_NET))
+ if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_GL_CALLBACK_NET))
RETURN(0);
break;
case LDLM_SET_INFO:
* Force a known safe race, send a cancel to the server for a lock
* which the server has already started a blocking callback on.
*/
- if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE) &&
+ if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE) &&
lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
rc = ldlm_cli_cancel(&dlm_req->lock_handle[0], 0);
if (rc < 0)
if (ldlm_is_fail_loc(lock) &&
lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK)
- OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
+ CFS_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
/* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
lock_res_and_lock(lock);
switch (lustre_msg_get_opc(req->rq_reqmsg)) {
case LDLM_BL_CALLBACK:
- CDEBUG(D_INODE, "blocking ast\n");
+ LDLM_DEBUG(lock, "blocking ast");
req_capsule_extend(&req->rq_pill, &RQF_LDLM_BL_CALLBACK);
if (!ldlm_is_cancel_on_block(lock)) {
rc = ldlm_callback_reply(req, 0);
ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
break;
case LDLM_CP_CALLBACK:
- CDEBUG(D_INODE, "completion ast\n");
+ LDLM_DEBUG(lock, "completion ast");
req_capsule_extend(&req->rq_pill, &RQF_LDLM_CP_CALLBACK);
rc = ldlm_handle_cp_callback(req, ns, dlm_req, lock);
- if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE))
+ if (!CFS_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE))
ldlm_callback_reply(req, rc);
break;
case LDLM_GL_CALLBACK:
- CDEBUG(D_INODE, "glimpse ast\n");
+ LDLM_DEBUG(lock, "glimpse ast");
req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
ldlm_handle_gl_callback(req, ns, dlm_req, lock);
break;
CERROR("%s from %s arrived at %llu with bad export cookie %llu\n",
ll_opcode2str(lustre_msg_get_opc(req->rq_reqmsg)),
- libcfs_nid2str(req->rq_peer.nid),
+ libcfs_nidstr(&req->rq_peer.nid),
(unsigned long long)req->rq_arrival_time.tv_sec,
lustre_msg_get_handle(req->rq_reqmsg)->cookie);
* In the function below, .hs_keycmp resolves to
* ldlm_export_lock_keycmp()
*/
- /* coverity[overrun-buffer-val] */
cfs_hash_del(lock->l_export->exp_lock_hash,
&lock->l_remote_handle, &lock->l_exp_hash);
}
void ldlm_revoke_export_locks(struct obd_export *exp)
{
+ int rc;
LIST_HEAD(rpc_list);
ENTRY;
cfs_hash_for_each_nolock(exp->exp_lock_hash,
ldlm_revoke_lock_cb, &rpc_list, 0);
- ldlm_run_ast_work(exp->exp_obd->obd_namespace, &rpc_list,
+ rc = ldlm_run_ast_work(exp->exp_obd->obd_namespace, &rpc_list,
LDLM_WORK_REVOKE_AST);
+ if (rc == -ERESTART)
+ ldlm_reprocess_recovery_done(exp->exp_obd->obd_namespace);
+
EXIT;
}
EXPORT_SYMBOL(ldlm_revoke_export_locks);
/* process a request from the blp_list at least every blp_num_threads */
if (!list_empty(&blp->blp_list) &&
(list_empty(&blp->blp_prio_list) || num_bl == 0))
- blwi = list_entry(blp->blp_list.next,
- struct ldlm_bl_work_item, blwi_entry);
+ blwi = list_first_entry(&blp->blp_list,
+ struct ldlm_bl_work_item, blwi_entry);
else
if (!list_empty(&blp->blp_prio_list))
- blwi = list_entry(blp->blp_prio_list.next,
- struct ldlm_bl_work_item,
- blwi_entry);
+ blwi = list_first_entry(&blp->blp_prio_list,
+ struct ldlm_bl_work_item,
+ blwi_entry);
if (blwi) {
if (++num_bl >= num_th)
num_bl = 0;
list_del(&blwi->blwi_entry);
+ blp->blp_total_locks -= blwi->blwi_lock ? 1 : blwi->blwi_count;
+ blp->blp_total_blwis--;
}
spin_unlock(&blp->blp_lock);
*p_blwi = blwi;
+ /* intentional unlocked read of blp values - OK for debug */
+ if (blwi) {
+ CDEBUG(D_DLMTRACE,
+ "Got %d locks of %d total in blp. (%d blwis in pool)\n",
+ blwi->blwi_lock ? 1 : blwi->blwi_count,
+ blp->blp_total_locks, blp->blp_total_blwis);
+ } else {
+ CDEBUG(D_DLMTRACE,
+ "No blwi found in queue (no bl locks in queue)\n");
+ }
+
if (*p_exp != NULL && *p_blwi != NULL) {
obd_stale_export_put(*p_exp);
*p_exp = NULL;
if (blwi->blwi_mem_pressure)
mpflags = memalloc_noreclaim_save();
- OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL2, 4);
+ CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL2, 4);
if (blwi->blwi_count) {
int count;
ENTRY;
- OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_BL_EVICT, 4);
+ CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_BL_EVICT, 4);
num = ldlm_export_cancel_blocked_locks(exp);
if (num == 0)
EXIT;
}
-/*
- * Export handle<->lock hash operations.
- */
+/* Export handle<->lock hash operations. */
static unsigned
-ldlm_export_lock_hash(struct cfs_hash *hs, const void *key, unsigned int mask)
+ldlm_export_lock_hash(struct cfs_hash *hs, const void *key,
+ const unsigned int bits)
{
- return cfs_hash_u64_hash(((struct lustre_handle *)key)->cookie, mask);
+ return cfs_hash_64(((struct lustre_handle *)key)->cookie, bits);
}
static void *
.so_req_handler = ldlm_callback_handler,
},
};
- ldlm_state->ldlm_cb_service = \
- ptlrpc_register_service(&conf, ldlm_svc_kset,
- ldlm_svc_debugfs_dir);
+ ldlm_state->ldlm_cb_service = ptlrpc_register_service(&conf,
+ ldlm_svc_kset,
+ ldlm_svc_debugfs_dir);
if (IS_ERR(ldlm_state->ldlm_cb_service)) {
CERROR("failed to start service\n");
rc = PTR_ERR(ldlm_state->ldlm_cb_service);
.tc_nthrs_max = LDLM_NTHRS_MAX,
.tc_nthrs_user = ldlm_num_threads,
.tc_cpu_bind = ldlm_cpu_bind,
- .tc_ctx_tags = LCT_MD_THREAD | \
- LCT_DT_THREAD | \
+ .tc_ctx_tags = LCT_MD_THREAD |
+ LCT_DT_THREAD |
LCT_CL_THREAD,
},
.psc_cpt = {
.so_hpreq_handler = ldlm_hpreq_handler,
},
};
- ldlm_state->ldlm_cancel_service = \
+ ldlm_state->ldlm_cancel_service =
ptlrpc_register_service(&conf, ldlm_svc_kset,
ldlm_svc_debugfs_dir);
if (IS_ERR(ldlm_state->ldlm_cancel_service)) {
init_waitqueue_head(&blp->blp_waitq);
atomic_set(&blp->blp_num_threads, 0);
atomic_set(&blp->blp_busy_threads, 0);
+ blp->blp_total_locks = 0;
+ blp->blp_total_blwis = 0;
if (ldlm_num_threads == 0) {
blp->blp_min_threads = LDLM_NTHRS_INIT;
blp->blp_max_threads = LDLM_NTHRS_MAX;
} else {
- blp->blp_min_threads = blp->blp_max_threads = \
+ blp->blp_min_threads = blp->blp_max_threads =
min_t(int, LDLM_NTHRS_MAX, max_t(int, LDLM_NTHRS_INIT,
ldlm_num_threads));
}
void ldlm_exit(void)
{
if (ldlm_refcount)
- CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
+ CERROR("ldlm_refcount is %d in %s\n", ldlm_refcount, __func__);
+ rcu_barrier();
kmem_cache_destroy(ldlm_resource_slab);
/*
* ldlm_lock_put() use RCU to call ldlm_lock_free, so need call