* Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2011 Whamcloud, Inc.
- *
+ * Copyright (c) 2011, 2012, Whamcloud, Inc.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
cfs_spin_lock_bh(&waiting_locks_spinlock);
if (expired_lock_thread.elt_dump) {
+ struct libcfs_debug_msg_data msgdata = {
+ .msg_file = __FILE__,
+ .msg_fn = "waiting_locks_callback",
+ .msg_line = expired_lock_thread.elt_dump };
cfs_spin_unlock_bh(&waiting_locks_spinlock);
/* from waiting_locks_callback, but not in timer */
libcfs_debug_dumplog();
- libcfs_run_lbug_upcall(__FILE__,
- "waiting_locks_callback",
- expired_lock_thread.elt_dump);
+ libcfs_run_lbug_upcall(&msgdata);
cfs_spin_lock_bh(&waiting_locks_spinlock);
expired_lock_thread.elt_dump = 0;
return 0;
cfs_spin_lock_bh(&lock->l_export->exp_rpc_lock);
- cfs_list_for_each_entry(req, &lock->l_export->exp_queued_rpc,
+ cfs_list_for_each_entry(req, &lock->l_export->exp_hp_rpcs,
rq_exp_list) {
if (req->rq_ops->hpreq_lock_match) {
match = req->rq_ops->hpreq_lock_match(req, lock);
}
ret = __ldlm_add_waiting_lock(lock, timeout);
- if (ret)
+ if (ret) {
/* grab ref on the lock if it has been added to the
* waiting list */
LDLM_LOCK_GET(lock);
+ }
cfs_spin_unlock_bh(&waiting_locks_spinlock);
+ if (ret) {
+ cfs_spin_lock_bh(&lock->l_export->exp_bl_list_lock);
+ if (cfs_list_empty(&lock->l_exp_list))
+ cfs_list_add(&lock->l_exp_list,
+ &lock->l_export->exp_bl_list);
+ cfs_spin_unlock_bh(&lock->l_export->exp_bl_list_lock);
+ }
+
LDLM_DEBUG(lock, "%sadding to wait list(timeout: %d, AT: %s)",
ret == 0 ? "not re-" : "", timeout,
AT_OFF ? "off" : "on");
cfs_spin_lock_bh(&waiting_locks_spinlock);
ret = __ldlm_del_waiting_lock(lock);
cfs_spin_unlock_bh(&waiting_locks_spinlock);
- if (ret)
+
+ /* remove the lock out of export blocking list */
+ cfs_spin_lock_bh(&lock->l_export->exp_bl_list_lock);
+ cfs_list_del_init(&lock->l_exp_list);
+ cfs_spin_unlock_bh(&lock->l_export->exp_bl_list_lock);
+
+ if (ret) {
/* release lock ref if it has indeed been removed
* from a list */
LDLM_LOCK_RELEASE(lock);
+ }
LDLM_DEBUG(lock, "%s", ret == 0 ? "wasn't waiting" : "removed");
return ret;
if (cfs_atomic_dec_return(&arg->rpcs) < arg->threshold)
cfs_waitq_signal(&arg->waitq);
+
+ ldlm_csa_put(arg);
RETURN(0);
}
cfs_atomic_inc(&arg->restart);
} else {
LDLM_LOCK_GET(lock);
- ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
cfs_atomic_inc(&arg->rpcs);
+ cfs_atomic_inc(&arg->refcount);
+ ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
}
RETURN(rc);
}
cfs_spin_lock_bh(&lock->l_export->exp_rpc_lock);
- cfs_list_for_each_entry(req, &lock->l_export->exp_queued_rpc,
+ cfs_list_for_each_entry(req, &lock->l_export->exp_hp_rpcs,
rq_exp_list) {
- if (!req->rq_hp && req->rq_ops->hpreq_lock_match &&
+ /* Do not process requests that were not yet added to there
+ * incoming queue or were already removed from there for
+ * processing */
+ if (!req->rq_hp && !cfs_list_empty(&req->rq_list) &&
+ req->rq_ops->hpreq_lock_match &&
req->rq_ops->hpreq_lock_match(req, lock))
ptlrpc_hpreq_reorder(req);
}
RETURN(0);
}
+static int ldlm_cancel_hpreq_lock_match(struct ptlrpc_request *req,
+ struct ldlm_lock *lock)
+{
+ struct ldlm_request *dlm_req;
+ struct lustre_handle lockh;
+ int rc = 0;
+ int i;
+ ENTRY;
+
+ dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
+ if (dlm_req == NULL)
+ RETURN(0);
+
+ ldlm_lock2handle(lock, &lockh);
+ for (i = 0; i < dlm_req->lock_count; i++) {
+ if (lustre_handle_equal(&dlm_req->lock_handle[i],
+ &lockh)) {
+ DEBUG_REQ(D_RPCTRACE, req,
+ "Prio raised by lock "LPX64".", lockh.cookie);
+
+ rc = 1;
+ break;
+ }
+ }
+
+ RETURN(rc);
+
+}
+
+static int ldlm_cancel_hpreq_check(struct ptlrpc_request *req)
+{
+ struct ldlm_request *dlm_req;
+ int rc = 0;
+ int i;
+ ENTRY;
+
+ /* no prolong in recovery */
+ if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
+ RETURN(0);
+
+ dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
+ if (dlm_req == NULL)
+ RETURN(-EFAULT);
+
+ for (i = 0; i < dlm_req->lock_count; i++) {
+ struct ldlm_lock *lock;
+
+ lock = ldlm_handle2lock(&dlm_req->lock_handle[i]);
+ if (lock == NULL)
+ continue;
+
+ rc = !!(lock->l_flags & LDLM_FL_AST_SENT);
+ if (rc)
+ LDLM_DEBUG(lock, "hpreq cancel lock");
+ LDLM_LOCK_PUT(lock);
+
+ if (rc)
+ break;
+ }
+
+ RETURN(rc);
+}
+
+static struct ptlrpc_hpreq_ops ldlm_cancel_hpreq_ops = {
+ .hpreq_lock_match = ldlm_cancel_hpreq_lock_match,
+ .hpreq_check = ldlm_cancel_hpreq_check
+};
+
+static int ldlm_hpreq_handler(struct ptlrpc_request *req)
+{
+ ENTRY;
+
+ req_capsule_init(&req->rq_pill, req, RCL_SERVER);
+
+ if (req->rq_export == NULL)
+ RETURN(0);
+
+ if (LDLM_CANCEL == lustre_msg_get_opc(req->rq_reqmsg)) {
+ req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
+ req->rq_ops = &ldlm_cancel_hpreq_ops;
+ }
+ RETURN(0);
+}
+
int ldlm_revoke_lock_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd,
cfs_hlist_node_t *hnode, void *data)
while (1) {
struct l_wait_info lwi = { 0 };
struct ldlm_bl_work_item *blwi = NULL;
+ int busy;
blwi = ldlm_bl_get_work(blp);
if (blwi == NULL) {
- int busy;
-
cfs_atomic_dec(&blp->blp_busy_threads);
l_wait_event_exclusive(blp->blp_waitq,
(blwi = ldlm_bl_get_work(blp)) != NULL,
&lwi);
busy = cfs_atomic_inc_return(&blp->blp_busy_threads);
-
- if (blwi->blwi_ns == NULL)
- /* added by ldlm_cleanup() */
- break;
-
- /* Not fatal if racy and have a few too many threads */
- if (unlikely(busy < blp->blp_max_threads &&
- busy >= cfs_atomic_read(&blp->blp_num_threads) &&
- !blwi->blwi_mem_pressure))
- /* discard the return value, we tried */
- ldlm_bl_thread_start(blp);
} else {
- if (blwi->blwi_ns == NULL)
- /* added by ldlm_cleanup() */
- break;
+ busy = cfs_atomic_read(&blp->blp_busy_threads);
}
+
+ if (blwi->blwi_ns == NULL)
+ /* added by ldlm_cleanup() */
+ break;
+
+ /* Not fatal if racy and have a few too many threads */
+ if (unlikely(busy < blp->blp_max_threads &&
+ busy >= cfs_atomic_read(&blp->blp_num_threads) &&
+ !blwi->blwi_mem_pressure))
+ /* discard the return value, we tried */
+ ldlm_bl_thread_start(blp);
+
if (blwi->blwi_mem_pressure)
cfs_memory_pressure_set();
ldlm_min_threads, ldlm_max_threads,
"ldlm_cn",
LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD,
- NULL);
+ ldlm_hpreq_handler);
if (!ldlm_state->ldlm_cancel_service) {
CERROR("failed to start service\n");