/*
* Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
+ *
+ * Copyright (c) 2011 Whamcloud, Inc.
+ *
*/
/*
* This file is part of Lustre, http://www.lustre.org/
static cfs_semaphore_t ldlm_ref_sem;
static int ldlm_refcount;
+struct ldlm_cb_async_args {
+ struct ldlm_cb_set_arg *ca_set_arg;
+ struct ldlm_lock *ca_lock;
+};
+
/* LDLM state */
static struct ldlm_state *ldlm_state;
/* This is called from within a timer interrupt and cannot schedule */
static void waiting_locks_callback(unsigned long unused)
{
- struct ldlm_lock *lock, *last = NULL;
+ struct ldlm_lock *lock;
repeat:
cfs_spin_lock_bh(&waiting_locks_spinlock);
libcfs_nid2str(
lock->l_export->exp_connection->c_peer.nid));
- last = lock;
-
/* no needs to take an extra ref on the lock since it was in
* the waiting_locks_list and ldlm_add_waiting_lock()
* already grabbed a ref */
static int ldlm_cb_interpret(const struct lu_env *env,
struct ptlrpc_request *req, void *data, int rc)
{
- struct ldlm_cb_set_arg *arg;
- struct ldlm_lock *lock;
+ struct ldlm_cb_async_args *ca = data;
+ struct ldlm_lock *lock = ca->ca_lock;
+ struct ldlm_cb_set_arg *arg = ca->ca_set_arg;
ENTRY;
- LASSERT(data != NULL);
-
- arg = req->rq_async_args.pointer_arg[0];
- lock = req->rq_async_args.pointer_arg[1];
LASSERT(lock != NULL);
if (rc != 0) {
rc = ldlm_handle_ast_error(lock, req, rc,
arg->type == LDLM_BL_CALLBACK
? "blocking" : "completion");
+ if (rc == -ERESTART)
+ cfs_atomic_inc(&arg->restart);
}
-
LDLM_LOCK_RELEASE(lock);
- if (rc == -ERESTART)
- cfs_atomic_set(&arg->restart, 1);
-
+ if (cfs_atomic_dec_return(&arg->rpcs) < arg->threshold)
+ cfs_waitq_signal(&arg->waitq);
RETURN(0);
}
-static inline int ldlm_bl_and_cp_ast_fini(struct ptlrpc_request *req,
+static inline int ldlm_bl_and_cp_ast_tail(struct ptlrpc_request *req,
struct ldlm_cb_set_arg *arg,
struct ldlm_lock *lock,
int instant_cancel)
rc = ptl_send_rpc(req, 1);
ptlrpc_req_finished(req);
if (rc == 0)
- /* If we cancelled the lock, we need to restart
- * ldlm_reprocess_queue */
- cfs_atomic_set(&arg->restart, 1);
+ cfs_atomic_inc(&arg->restart);
} else {
LDLM_LOCK_GET(lock);
- ptlrpc_set_add_req(arg->set, req);
+ ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+ cfs_atomic_inc(&arg->rpcs);
}
RETURN(rc);
struct ldlm_lock_desc *desc,
void *data, int flag)
{
+ struct ldlm_cb_async_args *ca;
struct ldlm_cb_set_arg *arg = data;
struct ldlm_request *body;
struct ptlrpc_request *req;
if (req == NULL)
RETURN(-ENOMEM);
- req->rq_async_args.pointer_arg[0] = arg;
- req->rq_async_args.pointer_arg[1] = lock;
+ CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args));
+ ca = ptlrpc_req_async_args(req);
+ ca->ca_set_arg = arg;
+ ca->ca_lock = lock;
+
req->rq_interpret_reply = ldlm_cb_interpret;
req->rq_no_resend = 1;
lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
LDLM_BL_CALLBACK - LDLM_FIRST_OPC);
- rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel);
+ rc = ldlm_bl_and_cp_ast_tail(req, arg, lock, instant_cancel);
RETURN(rc);
}
struct ldlm_cb_set_arg *arg = data;
struct ldlm_request *body;
struct ptlrpc_request *req;
+ struct ldlm_cb_async_args *ca;
long total_enqueue_wait;
int instant_cancel = 0;
int rc = 0;
RETURN(rc);
}
- req->rq_async_args.pointer_arg[0] = arg;
- req->rq_async_args.pointer_arg[1] = lock;
+ CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args));
+ ca = ptlrpc_req_async_args(req);
+ ca->ca_set_arg = arg;
+ ca->ca_lock = lock;
+
req->rq_interpret_reply = ldlm_cb_interpret;
req->rq_no_resend = 1;
body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
LDLM_CP_CALLBACK - LDLM_FIRST_OPC);
- rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel);
+ rc = ldlm_bl_and_cp_ast_tail(req, arg, lock, instant_cancel);
RETURN(rc);
}
LDLM_GL_CALLBACK - LDLM_FIRST_OPC);
rc = ptlrpc_queue_wait(req);
- if (rc == -ELDLM_NO_LOCK_DATA)
+ /* Update the LVB from disk if the AST failed (this is a legal race)
+ *
+ * - Glimpse callback of local lock just return -ELDLM_NO_LOCK_DATA.
+ * - Glimpse callback of remote lock might return -ELDLM_NO_LOCK_DATA
+ * when inode is cleared. LU-274
+ */
+ if (rc == -ELDLM_NO_LOCK_DATA) {
LDLM_DEBUG(lock, "lost race - client has a lock but no inode");
- else if (rc != 0)
+ ldlm_res_lvbo_update(res, NULL, 1);
+ } else if (rc != 0) {
rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
- else
+ } else {
rc = ldlm_res_lvbo_update(res, req, 1);
+ }
ptlrpc_req_finished(req);
if (rc == -ERESTART)
}
if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN)
- lock->l_policy_data = dlm_req->lock_desc.l_policy_data;
+ ldlm_convert_policy_to_local(
+ dlm_req->lock_desc.l_resource.lr_type,
+ &dlm_req->lock_desc.l_policy_data,
+ &lock->l_policy_data);
if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
lock->l_req_extent = lock->l_policy_data.l_extent;
}
if (lock->l_resource->lr_type != LDLM_PLAIN) {
- lock->l_policy_data = dlm_req->lock_desc.l_policy_data;
+ ldlm_convert_policy_to_local(
+ dlm_req->lock_desc.l_resource.lr_type,
+ &dlm_req->lock_desc.l_policy_data,
+ &lock->l_policy_data);
LDLM_DEBUG(lock, "completion AST, new policy data");
}
* l_ast_data */
OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 2);
- ldlm_run_ast_work(&ast_list, LDLM_WORK_CP_AST);
+ ldlm_run_ast_work(ns, &ast_list, LDLM_WORK_CP_AST);
LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
lock);
CFS_INIT_LIST_HEAD(&rpc_list);
cfs_hash_for_each_empty(exp->exp_lock_hash,
ldlm_revoke_lock_cb, &rpc_list);
- ldlm_run_ast_work(&rpc_list, LDLM_WORK_REVOKE_AST);
+ ldlm_run_ast_work(exp->exp_obd->obd_namespace, &rpc_list,
+ LDLM_WORK_REVOKE_AST);
EXIT;
}
int rc;
cfs_init_completion(&bltd.bltd_comp);
- rc = cfs_kernel_thread(ldlm_bl_thread_main, &bltd, 0);
+ rc = cfs_create_thread(ldlm_bl_thread_main, &bltd, 0);
if (rc < 0) {
CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %d\n",
cfs_atomic_read(&blp->blp_num_threads), rc);
while (1) {
struct l_wait_info lwi = { 0 };
struct ldlm_bl_work_item *blwi = NULL;
+ int busy;
blwi = ldlm_bl_get_work(blp);
if (blwi == NULL) {
- int busy;
-
cfs_atomic_dec(&blp->blp_busy_threads);
l_wait_event_exclusive(blp->blp_waitq,
(blwi = ldlm_bl_get_work(blp)) != NULL,
&lwi);
busy = cfs_atomic_inc_return(&blp->blp_busy_threads);
-
- if (blwi->blwi_ns == NULL)
- /* added by ldlm_cleanup() */
- break;
-
- /* Not fatal if racy and have a few too many threads */
- if (unlikely(busy < blp->blp_max_threads &&
- busy >= cfs_atomic_read(&blp->blp_num_threads) &&
- !blwi->blwi_mem_pressure))
- /* discard the return value, we tried */
- ldlm_bl_thread_start(blp);
} else {
- if (blwi->blwi_ns == NULL)
- /* added by ldlm_cleanup() */
- break;
+ busy = cfs_atomic_read(&blp->blp_busy_threads);
}
+
+ if (blwi->blwi_ns == NULL)
+ /* added by ldlm_cleanup() */
+ break;
+
+ /* Not fatal if racy and have a few too many threads */
+ if (unlikely(busy < blp->blp_max_threads &&
+ busy >= cfs_atomic_read(&blp->blp_num_threads) &&
+ !blwi->blwi_mem_pressure))
+ /* discard the return value, we tried */
+ ldlm_bl_thread_start(blp);
+
if (blwi->blwi_mem_pressure)
cfs_memory_pressure_set();
* Export handle<->lock hash operations.
*/
static unsigned
-ldlm_export_lock_hash(cfs_hash_t *hs, void *key, unsigned mask)
+ldlm_export_lock_hash(cfs_hash_t *hs, const void *key, unsigned mask)
{
return cfs_hash_u64_hash(((struct lustre_handle *)key)->cookie, mask);
}
}
static int
-ldlm_export_lock_keycmp(void *key, cfs_hlist_node_t *hnode)
+ldlm_export_lock_keycmp(const void *key, cfs_hlist_node_t *hnode)
{
return lustre_handle_equal(ldlm_export_lock_key(hnode), key);
}
cfs_spin_lock_init(&waiting_locks_spinlock);
cfs_timer_init(&waiting_locks_timer, waiting_locks_callback, 0);
- rc = cfs_kernel_thread(expired_lock_main, NULL, CLONE_VM | CLONE_FILES);
+ rc = cfs_create_thread(expired_lock_main, NULL, CFS_DAEMON_FLAGS);
if (rc < 0) {
CERROR("Cannot start ldlm expired-lock thread: %d\n", rc);
GOTO(out_thread, rc);
RETURN(0);
}
-int __init ldlm_init(void)
+int ldlm_init(void)
{
cfs_init_mutex(&ldlm_ref_sem);
cfs_init_mutex(ldlm_namespace_lock(LDLM_NAMESPACE_SERVER));
return 0;
}
-void __exit ldlm_exit(void)
+void ldlm_exit(void)
{
int rc;
if (ldlm_refcount)