X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fldlm%2Fldlm_lockd.c;h=558eee3babf29c41f99bf33fd59ba6b644c52151;hp=b5b2a4baad2a199386c411610963be8d12eed99f;hb=5c3a69e52444f8ea17849df1f9de0465d1398224;hpb=eb9408d8973e34e291b7ce2e725a6e0f4e106df5 diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index b5b2a4b..558eee3 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -28,6 +28,8 @@ /* * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * + * Copyright (c) 2011, 2012, Whamcloud, Inc. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -66,6 +68,11 @@ extern cfs_mem_cache_t *ldlm_lock_slab; static cfs_semaphore_t ldlm_ref_sem; static int ldlm_refcount; +struct ldlm_cb_async_args { + struct ldlm_cb_set_arg *ca_set_arg; + struct ldlm_lock *ca_lock; +}; + /* LDLM state */ static struct ldlm_state *ldlm_state; @@ -172,13 +179,15 @@ static int expired_lock_main(void *arg) cfs_spin_lock_bh(&waiting_locks_spinlock); if (expired_lock_thread.elt_dump) { + struct libcfs_debug_msg_data msgdata = { + .msg_file = __FILE__, + .msg_fn = "waiting_locks_callback", + .msg_line = expired_lock_thread.elt_dump }; cfs_spin_unlock_bh(&waiting_locks_spinlock); /* from waiting_locks_callback, but not in timer */ libcfs_debug_dumplog(); - libcfs_run_lbug_upcall(__FILE__, - "waiting_locks_callback", - expired_lock_thread.elt_dump); + libcfs_run_lbug_upcall(&msgdata); cfs_spin_lock_bh(&waiting_locks_spinlock); expired_lock_thread.elt_dump = 0; @@ -255,8 +264,8 @@ static int ldlm_lock_busy(struct ldlm_lock *lock) if (lock->l_export == NULL) return 0; - cfs_spin_lock(&lock->l_export->exp_lock); - cfs_list_for_each_entry(req, &lock->l_export->exp_queued_rpc, + cfs_spin_lock_bh(&lock->l_export->exp_rpc_lock); + cfs_list_for_each_entry(req, &lock->l_export->exp_hp_rpcs, rq_exp_list) { if (req->rq_ops->hpreq_lock_match) { match = req->rq_ops->hpreq_lock_match(req, lock); @@ -264,14 +273,14 @@ static int ldlm_lock_busy(struct ldlm_lock *lock) break; } } - cfs_spin_unlock(&lock->l_export->exp_lock); + cfs_spin_unlock_bh(&lock->l_export->exp_rpc_lock); RETURN(match); } /* This is called from within a timer interrupt and cannot schedule */ static void waiting_locks_callback(unsigned long unused) { - struct ldlm_lock *lock, *last = NULL; + struct ldlm_lock *lock; repeat: cfs_spin_lock_bh(&waiting_locks_spinlock); @@ -340,15 +349,13 @@ repeat: LDLM_LOCK_RELEASE(lock); continue; } - lock->l_resource->lr_namespace->ns_timeouts++; + ldlm_lock_to_ns(lock)->ns_timeouts++; LDLM_ERROR(lock, "lock callback timer expired after %lds: " "evicting client at %s ", cfs_time_current_sec()- lock->l_last_activity, libcfs_nid2str( lock->l_export->exp_connection->c_peer.nid)); - last = lock; - /* no needs to take an extra ref on the lock since it was in * the waiting_locks_list and ldlm_add_waiting_lock() * already grabbed a ref */ @@ -438,12 +445,21 @@ static int ldlm_add_waiting_lock(struct ldlm_lock *lock) } ret = __ldlm_add_waiting_lock(lock, timeout); - if (ret) + if (ret) { /* grab ref on the lock if it has been added to the * waiting list */ LDLM_LOCK_GET(lock); + } cfs_spin_unlock_bh(&waiting_locks_spinlock); + if (ret) { + cfs_spin_lock_bh(&lock->l_export->exp_bl_list_lock); + if (cfs_list_empty(&lock->l_exp_list)) + cfs_list_add(&lock->l_exp_list, + &lock->l_export->exp_bl_list); + cfs_spin_unlock_bh(&lock->l_export->exp_bl_list_lock); + } + LDLM_DEBUG(lock, "%sadding to wait list(timeout: %d, AT: %s)", ret == 0 ? "not re-" : "", timeout, AT_OFF ? "off" : "on"); @@ -498,10 +514,17 @@ int ldlm_del_waiting_lock(struct ldlm_lock *lock) cfs_spin_lock_bh(&waiting_locks_spinlock); ret = __ldlm_del_waiting_lock(lock); cfs_spin_unlock_bh(&waiting_locks_spinlock); - if (ret) + + /* remove the lock out of export blocking list */ + cfs_spin_lock_bh(&lock->l_export->exp_bl_list_lock); + cfs_list_del_init(&lock->l_exp_list); + cfs_spin_unlock_bh(&lock->l_export->exp_bl_list_lock); + + if (ret) { /* release lock ref if it has indeed been removed * from a list */ LDLM_LOCK_RELEASE(lock); + } LDLM_DEBUG(lock, "%s", ret == 0 ? "wasn't waiting" : "removed"); return ret; @@ -606,19 +629,29 @@ static int ldlm_handle_ast_error(struct ldlm_lock *lock, ldlm_failed_ast(lock, rc, ast_type); } } else if (rc) { - if (rc == -EINVAL) + if (rc == -EINVAL) { + struct ldlm_resource *res = lock->l_resource; LDLM_DEBUG(lock, "client (nid %s) returned %d" " from %s AST - normal race", libcfs_nid2str(peer.nid), req->rq_repmsg ? lustre_msg_get_status(req->rq_repmsg) : -1, ast_type); - else + if (res) { + /* update lvbo to return proper attributes. + * see bug 23174 */ + ldlm_resource_getref(res); + ldlm_res_lvbo_update(res, NULL, 1); + ldlm_resource_putref(res); + } + + } else { LDLM_ERROR(lock, "client (nid %s) returned %d " "from %s AST", libcfs_nid2str(peer.nid), (req->rq_repmsg != NULL) ? lustre_msg_get_status(req->rq_repmsg) : 0, ast_type); + } ldlm_lock_cancel(lock); /* Server-side AST functions are called from ldlm_reprocess_all, * which needs to be told to please restart its reprocessing. */ @@ -631,35 +664,29 @@ static int ldlm_handle_ast_error(struct ldlm_lock *lock, static int ldlm_cb_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *data, int rc) { - struct ldlm_cb_set_arg *arg; - struct ldlm_lock *lock; + struct ldlm_cb_async_args *ca = data; + struct ldlm_lock *lock = ca->ca_lock; + struct ldlm_cb_set_arg *arg = ca->ca_set_arg; ENTRY; - LASSERT(data != NULL); - - arg = req->rq_async_args.pointer_arg[0]; - lock = req->rq_async_args.pointer_arg[1]; LASSERT(lock != NULL); if (rc != 0) { - /* If client canceled the lock but the cancel has not - * been received yet, we need to update lvbo to have the - * proper attributes cached. */ - if (rc == -EINVAL && arg->type == LDLM_BL_CALLBACK) - ldlm_res_lvbo_update(lock->l_resource, NULL, 1); rc = ldlm_handle_ast_error(lock, req, rc, arg->type == LDLM_BL_CALLBACK ? "blocking" : "completion"); + if (rc == -ERESTART) + cfs_atomic_inc(&arg->restart); } - LDLM_LOCK_RELEASE(lock); - if (rc == -ERESTART) - cfs_atomic_set(&arg->restart, 1); + if (cfs_atomic_dec_return(&arg->rpcs) < arg->threshold) + cfs_waitq_signal(&arg->waitq); + ldlm_csa_put(arg); RETURN(0); } -static inline int ldlm_bl_and_cp_ast_fini(struct ptlrpc_request *req, +static inline int ldlm_bl_and_cp_ast_tail(struct ptlrpc_request *req, struct ldlm_cb_set_arg *arg, struct ldlm_lock *lock, int instant_cancel) @@ -671,12 +698,12 @@ static inline int ldlm_bl_and_cp_ast_fini(struct ptlrpc_request *req, rc = ptl_send_rpc(req, 1); ptlrpc_req_finished(req); if (rc == 0) - /* If we cancelled the lock, we need to restart - * ldlm_reprocess_queue */ - cfs_atomic_set(&arg->restart, 1); + cfs_atomic_inc(&arg->restart); } else { LDLM_LOCK_GET(lock); - ptlrpc_set_add_req(arg->set, req); + cfs_atomic_inc(&arg->rpcs); + cfs_atomic_inc(&arg->refcount); + ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); } RETURN(rc); @@ -696,14 +723,18 @@ static void ldlm_lock_reorder_req(struct ldlm_lock *lock) RETURN_EXIT; } - cfs_spin_lock(&lock->l_export->exp_lock); - cfs_list_for_each_entry(req, &lock->l_export->exp_queued_rpc, + cfs_spin_lock_bh(&lock->l_export->exp_rpc_lock); + cfs_list_for_each_entry(req, &lock->l_export->exp_hp_rpcs, rq_exp_list) { - if (!req->rq_hp && req->rq_ops->hpreq_lock_match && + /* Do not process requests that were not yet added to there + * incoming queue or were already removed from there for + * processing */ + if (!req->rq_hp && !cfs_list_empty(&req->rq_list) && + req->rq_ops->hpreq_lock_match && req->rq_ops->hpreq_lock_match(req, lock)) ptlrpc_hpreq_reorder(req); } - cfs_spin_unlock(&lock->l_export->exp_lock); + cfs_spin_unlock_bh(&lock->l_export->exp_rpc_lock); EXIT; } @@ -718,6 +749,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, void *data, int flag) { + struct ldlm_cb_async_args *ca; struct ldlm_cb_set_arg *arg = data; struct ldlm_request *body; struct ptlrpc_request *req; @@ -744,8 +776,11 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, if (req == NULL) RETURN(-ENOMEM); - req->rq_async_args.pointer_arg[0] = arg; - req->rq_async_args.pointer_arg[1] = lock; + CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args)); + ca = ptlrpc_req_async_args(req); + ca->ca_set_arg = arg; + ca->ca_lock = lock; + req->rq_interpret_reply = ldlm_cb_interpret; req->rq_no_resend = 1; @@ -796,7 +831,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats, LDLM_BL_CALLBACK - LDLM_FIRST_OPC); - rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel); + rc = ldlm_bl_and_cp_ast_tail(req, arg, lock, instant_cancel); RETURN(rc); } @@ -806,6 +841,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) struct ldlm_cb_set_arg *arg = data; struct ldlm_request *body; struct ptlrpc_request *req; + struct ldlm_cb_async_args *ca; long total_enqueue_wait; int instant_cancel = 0; int rc = 0; @@ -822,11 +858,11 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) if (req == NULL) RETURN(-ENOMEM); - lock_res_and_lock(lock); - if (lock->l_resource->lr_lvb_len) + /* server namespace, doesn't need lock */ + if (lock->l_resource->lr_lvb_len) { req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT, lock->l_resource->lr_lvb_len); - unlock_res_and_lock(lock); + } rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK); if (rc) { @@ -834,8 +870,11 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) RETURN(rc); } - req->rq_async_args.pointer_arg[0] = arg; - req->rq_async_args.pointer_arg[1] = lock; + CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args)); + ca = ptlrpc_req_async_args(req); + ca->ca_set_arg = arg; + ca->ca_lock = lock; + req->rq_interpret_reply = ldlm_cb_interpret; req->rq_no_resend = 1; body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ); @@ -846,10 +885,10 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) if (lock->l_resource->lr_lvb_len) { void *lvb = req_capsule_client_get(&req->rq_pill, &RMF_DLM_LVB); - lock_res_and_lock(lock); + lock_res(lock->l_resource); memcpy(lvb, lock->l_resource->lr_lvb_data, lock->l_resource->lr_lvb_len); - unlock_res_and_lock(lock); + unlock_res(lock->l_resource); } LDLM_DEBUG(lock, "server preparing completion AST (after %lds wait)", @@ -858,7 +897,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) /* Server-side enqueue wait time estimate, used in __ldlm_add_waiting_lock to set future enqueue timers */ if (total_enqueue_wait < ldlm_get_enq_timeout(lock)) - at_measured(&lock->l_resource->lr_namespace->ns_at_estimate, + at_measured(ldlm_lock_to_ns_at(lock), total_enqueue_wait); else /* bz18618. Don't add lock enqueue time we spend waiting for a @@ -868,7 +907,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) LDLM_DEBUG(lock, "lock completed after %lus; estimate was %ds. " "It is likely that a previous callback timed out.", total_enqueue_wait, - at_get(&lock->l_resource->lr_namespace->ns_at_estimate)); + at_get(ldlm_lock_to_ns_at(lock))); ptlrpc_request_set_replen(req); @@ -907,7 +946,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats, LDLM_CP_CALLBACK - LDLM_FIRST_OPC); - rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel); + rc = ldlm_bl_and_cp_ast_tail(req, arg, lock, instant_cancel); RETURN(rc); } @@ -933,10 +972,9 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data) body->lock_handle[0] = lock->l_remote_handle; ldlm_lock2desc(lock, &body->lock_desc); - lock_res_and_lock(lock); + /* server namespace, doesn't need lock */ req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lock->l_resource->lr_lvb_len); - unlock_res_and_lock(lock); res = lock->l_resource; ptlrpc_request_set_replen(req); @@ -952,12 +990,20 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data) LDLM_GL_CALLBACK - LDLM_FIRST_OPC); rc = ptlrpc_queue_wait(req); - if (rc == -ELDLM_NO_LOCK_DATA) + /* Update the LVB from disk if the AST failed (this is a legal race) + * + * - Glimpse callback of local lock just return -ELDLM_NO_LOCK_DATA. + * - Glimpse callback of remote lock might return -ELDLM_NO_LOCK_DATA + * when inode is cleared. LU-274 + */ + if (rc == -ELDLM_NO_LOCK_DATA) { LDLM_DEBUG(lock, "lost race - client has a lock but no inode"); - else if (rc != 0) + ldlm_res_lvbo_update(res, NULL, 1); + } else if (rc != 0) { rc = ldlm_handle_ast_error(lock, req, rc, "glimpse"); - else + } else { rc = ldlm_res_lvbo_update(res, req, 1); + } ptlrpc_req_finished(req); if (rc == -ERESTART) @@ -1111,10 +1157,12 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, LDLM_DEBUG(lock, "server-side enqueue handler, new lock created"); OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_BLOCKED, obd_timeout * 2); - /* Don't enqueue a lock onto the export if it has already - * been evicted. Cancel it now instead. (bug 3822) */ - if (req->rq_export->exp_failed) { - LDLM_ERROR(lock, "lock on destroyed export %p", req->rq_export); + /* Don't enqueue a lock onto the export if it is been disonnected + * due to eviction (bug 3822) or server umount (bug 24324). + * Cancel it now instead. */ + if (req->rq_export->exp_disconnected) { + LDLM_ERROR(lock, "lock on disconnected export %p", + req->rq_export); GOTO(out, rc = -ENOTCONN); } @@ -1131,13 +1179,14 @@ existing_lock: * local_lock_enqueue by the policy function. */ cookie = req; } else { - lock_res_and_lock(lock); + /* based on the assumption that lvb size never changes during + * resource life time otherwise it need resource->lr_lock's + * protection */ if (lock->l_resource->lr_lvb_len) { req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lock->l_resource->lr_lvb_len); } - unlock_res_and_lock(lock); if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR)) GOTO(out, rc = -ENOMEM); @@ -1148,7 +1197,10 @@ existing_lock: } if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN) - lock->l_policy_data = dlm_req->lock_desc.l_policy_data; + ldlm_convert_policy_to_local( + dlm_req->lock_desc.l_resource.lr_type, + &dlm_req->lock_desc.l_policy_data, + &lock->l_policy_data); if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) lock->l_req_extent = lock->l_policy_data.l_extent; @@ -1171,9 +1223,10 @@ existing_lock: dlm_rep->lock_flags |= dlm_req->lock_flags & LDLM_INHERIT_FLAGS; lock->l_flags |= dlm_req->lock_flags & LDLM_INHERIT_FLAGS; - /* Don't move a pending lock onto the export if it has already - * been evicted. Cancel it now instead. (bug 5683) */ - if (unlikely(req->rq_export->exp_failed || + /* Don't move a pending lock onto the export if it has already been + * disconnected due to eviction (bug 5683) or server umount (bug 24324). + * Cancel it now instead. */ + if (unlikely(req->rq_export->exp_disconnected || OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_OLD_EXPORT))) { LDLM_ERROR(lock, "lock on destroyed export %p", req->rq_export); rc = -ENOTCONN; @@ -1237,24 +1290,28 @@ existing_lock: LDLM_DEBUG(lock, "server-side enqueue handler, sending reply" "(err=%d, rc=%d)", err, rc); - lock_res_and_lock(lock); if (rc == 0) { if (lock->l_resource->lr_lvb_len > 0) { + /* MDT path won't handle lr_lvb_data, so + * lock/unlock better be contained in the + * if block */ void *lvb; lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB); LASSERTF(lvb != NULL, "req %p, lock %p\n", req, lock); - + lock_res(lock->l_resource); memcpy(lvb, lock->l_resource->lr_lvb_data, lock->l_resource->lr_lvb_len); + unlock_res(lock->l_resource); } } else { + lock_res_and_lock(lock); ldlm_resource_unlink_lock(lock); ldlm_lock_destroy_nolock(lock); + unlock_res_and_lock(lock); } - unlock_res_and_lock(lock); if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK) ldlm_reprocess_all(lock->l_resource); @@ -1439,10 +1496,7 @@ int ldlm_handle_cancel(struct ptlrpc_request *req) if (!ldlm_request_cancel(req, dlm_req, 0)) req->rq_status = ESTALE; - if (ptlrpc_reply(req) != 0) - LBUG(); - - RETURN(0); + RETURN(ptlrpc_reply(req)); } void ldlm_handle_bl_callback(struct ldlm_namespace *ns, @@ -1518,7 +1572,10 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, } if (lock->l_resource->lr_type != LDLM_PLAIN) { - lock->l_policy_data = dlm_req->lock_desc.l_policy_data; + ldlm_convert_policy_to_local( + dlm_req->lock_desc.l_resource.lr_type, + &dlm_req->lock_desc.l_policy_data, + &lock->l_policy_data); LDLM_DEBUG(lock, "completion AST, new policy data"); } @@ -1564,7 +1621,11 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work"); - ldlm_run_ast_work(&ast_list, LDLM_WORK_CP_AST); + /* Let Enqueue to call osc_lock_upcall() and initialize + * l_ast_data */ + OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 2); + + ldlm_run_ast_work(ns, &ast_list, LDLM_WORK_CP_AST); LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)", lock); @@ -2008,9 +2069,11 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req) if (req->rq_export == NULL) { struct ldlm_request *dlm_req; - CERROR("operation %d from %s with bad export cookie "LPU64"\n", - lustre_msg_get_opc(req->rq_reqmsg), - libcfs_id2str(req->rq_peer), + CERROR("%s from %s arrived at %lu with bad export cookie " + LPU64"\n", + ll_opcode2str(lustre_msg_get_opc(req->rq_reqmsg)), + libcfs_nid2str(req->rq_peer.nid), + req->rq_arrival_time.tv_sec, lustre_msg_get_handle(req->rq_reqmsg)->cookie); if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_CANCEL) { @@ -2056,28 +2119,114 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req) RETURN(0); } -void ldlm_revoke_lock_cb(void *obj, void *data) +static int ldlm_cancel_hpreq_lock_match(struct ptlrpc_request *req, + struct ldlm_lock *lock) +{ + struct ldlm_request *dlm_req; + struct lustre_handle lockh; + int rc = 0; + int i; + ENTRY; + + dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ); + if (dlm_req == NULL) + RETURN(0); + + ldlm_lock2handle(lock, &lockh); + for (i = 0; i < dlm_req->lock_count; i++) { + if (lustre_handle_equal(&dlm_req->lock_handle[i], + &lockh)) { + DEBUG_REQ(D_RPCTRACE, req, + "Prio raised by lock "LPX64".", lockh.cookie); + + rc = 1; + break; + } + } + + RETURN(rc); + +} + +static int ldlm_cancel_hpreq_check(struct ptlrpc_request *req) +{ + struct ldlm_request *dlm_req; + int rc = 0; + int i; + ENTRY; + + /* no prolong in recovery */ + if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) + RETURN(0); + + dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ); + if (dlm_req == NULL) + RETURN(-EFAULT); + + for (i = 0; i < dlm_req->lock_count; i++) { + struct ldlm_lock *lock; + + lock = ldlm_handle2lock(&dlm_req->lock_handle[i]); + if (lock == NULL) + continue; + + rc = !!(lock->l_flags & LDLM_FL_AST_SENT); + if (rc) + LDLM_DEBUG(lock, "hpreq cancel lock"); + LDLM_LOCK_PUT(lock); + + if (rc) + break; + } + + RETURN(rc); +} + +static struct ptlrpc_hpreq_ops ldlm_cancel_hpreq_ops = { + .hpreq_lock_match = ldlm_cancel_hpreq_lock_match, + .hpreq_check = ldlm_cancel_hpreq_check +}; + +static int ldlm_hpreq_handler(struct ptlrpc_request *req) +{ + ENTRY; + + req_capsule_init(&req->rq_pill, req, RCL_SERVER); + + if (req->rq_export == NULL) + RETURN(0); + + if (LDLM_CANCEL == lustre_msg_get_opc(req->rq_reqmsg)) { + req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL); + req->rq_ops = &ldlm_cancel_hpreq_ops; + } + RETURN(0); +} + +int ldlm_revoke_lock_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd, + cfs_hlist_node_t *hnode, void *data) + { cfs_list_t *rpc_list = data; - struct ldlm_lock *lock = obj; + struct ldlm_lock *lock = cfs_hash_object(hs, hnode); lock_res_and_lock(lock); if (lock->l_req_mode != lock->l_granted_mode) { unlock_res_and_lock(lock); - return; + return 0; } LASSERT(lock->l_resource); if (lock->l_resource->lr_type != LDLM_IBITS && lock->l_resource->lr_type != LDLM_PLAIN) { unlock_res_and_lock(lock); - return; + return 0; } if (lock->l_flags & LDLM_FL_AST_SENT) { unlock_res_and_lock(lock); - return; + return 0; } LASSERT(lock->l_blocking_ast); @@ -2092,6 +2241,7 @@ void ldlm_revoke_lock_cb(void *obj, void *data) LDLM_LOCK_GET(lock); unlock_res_and_lock(lock); + return 0; } void ldlm_revoke_export_locks(struct obd_export *exp) @@ -2102,7 +2252,8 @@ void ldlm_revoke_export_locks(struct obd_export *exp) CFS_INIT_LIST_HEAD(&rpc_list); cfs_hash_for_each_empty(exp->exp_lock_hash, ldlm_revoke_lock_cb, &rpc_list); - ldlm_run_ast_work(&rpc_list, LDLM_WORK_REVOKE_AST); + ldlm_run_ast_work(exp->exp_obd->obd_namespace, &rpc_list, + LDLM_WORK_REVOKE_AST); EXIT; } @@ -2151,7 +2302,7 @@ static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp) int rc; cfs_init_completion(&bltd.bltd_comp); - rc = cfs_kernel_thread(ldlm_bl_thread_main, &bltd, 0); + rc = cfs_create_thread(ldlm_bl_thread_main, &bltd, 0); if (rc < 0) { CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %d\n", cfs_atomic_read(&blp->blp_num_threads), rc); @@ -2187,42 +2338,44 @@ static int ldlm_bl_thread_main(void *arg) while (1) { struct l_wait_info lwi = { 0 }; struct ldlm_bl_work_item *blwi = NULL; + int busy; blwi = ldlm_bl_get_work(blp); if (blwi == NULL) { - int busy; - cfs_atomic_dec(&blp->blp_busy_threads); l_wait_event_exclusive(blp->blp_waitq, (blwi = ldlm_bl_get_work(blp)) != NULL, &lwi); busy = cfs_atomic_inc_return(&blp->blp_busy_threads); - - if (blwi->blwi_ns == NULL) - /* added by ldlm_cleanup() */ - break; - - /* Not fatal if racy and have a few too many threads */ - if (unlikely(busy < blp->blp_max_threads && - busy >= cfs_atomic_read(&blp->blp_num_threads))) - /* discard the return value, we tried */ - ldlm_bl_thread_start(blp); } else { - if (blwi->blwi_ns == NULL) - /* added by ldlm_cleanup() */ - break; + busy = cfs_atomic_read(&blp->blp_busy_threads); } + + if (blwi->blwi_ns == NULL) + /* added by ldlm_cleanup() */ + break; + + /* Not fatal if racy and have a few too many threads */ + if (unlikely(busy < blp->blp_max_threads && + busy >= cfs_atomic_read(&blp->blp_num_threads) && + !blwi->blwi_mem_pressure)) + /* discard the return value, we tried */ + ldlm_bl_thread_start(blp); + if (blwi->blwi_mem_pressure) cfs_memory_pressure_set(); if (blwi->blwi_count) { + int count; /* The special case when we cancel locks in lru * asynchronously, we pass the list of locks here. - * Thus lock is marked LDLM_FL_CANCELING, and already - * canceled locally. */ - ldlm_cli_cancel_list(&blwi->blwi_head, - blwi->blwi_count, NULL, 0); + * Thus locks are marked LDLM_FL_CANCELING, but NOT + * canceled locally yet. */ + count = ldlm_cli_cancel_list_local(&blwi->blwi_head, + blwi->blwi_count, + LCF_BL_AST); + ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL, 0); } else { ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld, blwi->blwi_lock); @@ -2284,7 +2437,7 @@ void ldlm_put_ref(void) * Export handle<->lock hash operations. */ static unsigned -ldlm_export_lock_hash(cfs_hash_t *hs, void *key, unsigned mask) +ldlm_export_lock_hash(cfs_hash_t *hs, const void *key, unsigned mask) { return cfs_hash_u64_hash(((struct lustre_handle *)key)->cookie, mask); } @@ -2293,49 +2446,59 @@ static void * ldlm_export_lock_key(cfs_hlist_node_t *hnode) { struct ldlm_lock *lock; - ENTRY; lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash); - RETURN(&lock->l_remote_handle); + return &lock->l_remote_handle; +} + +static void +ldlm_export_lock_keycpy(cfs_hlist_node_t *hnode, void *key) +{ + struct ldlm_lock *lock; + + lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash); + lock->l_remote_handle = *(struct lustre_handle *)key; } static int -ldlm_export_lock_compare(void *key, cfs_hlist_node_t *hnode) +ldlm_export_lock_keycmp(const void *key, cfs_hlist_node_t *hnode) { - ENTRY; - RETURN(lustre_handle_equal(ldlm_export_lock_key(hnode), key)); + return lustre_handle_equal(ldlm_export_lock_key(hnode), key); } static void * -ldlm_export_lock_get(cfs_hlist_node_t *hnode) +ldlm_export_lock_object(cfs_hlist_node_t *hnode) +{ + return cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash); +} + +static void +ldlm_export_lock_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode) { struct ldlm_lock *lock; - ENTRY; lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash); LDLM_LOCK_GET(lock); - - RETURN(lock); } -static void * -ldlm_export_lock_put(cfs_hlist_node_t *hnode) +static void +ldlm_export_lock_put(cfs_hash_t *hs, cfs_hlist_node_t *hnode) { struct ldlm_lock *lock; - ENTRY; lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash); LDLM_LOCK_RELEASE(lock); - - RETURN(lock); } static cfs_hash_ops_t ldlm_export_lock_ops = { - .hs_hash = ldlm_export_lock_hash, - .hs_key = ldlm_export_lock_key, - .hs_compare = ldlm_export_lock_compare, - .hs_get = ldlm_export_lock_get, - .hs_put = ldlm_export_lock_put + .hs_hash = ldlm_export_lock_hash, + .hs_key = ldlm_export_lock_key, + .hs_keycmp = ldlm_export_lock_keycmp, + .hs_keycpy = ldlm_export_lock_keycpy, + .hs_object = ldlm_export_lock_object, + .hs_get = ldlm_export_lock_get, + .hs_put = ldlm_export_lock_put, + .hs_put_locked = ldlm_export_lock_put, }; int ldlm_init_export(struct obd_export *exp) @@ -2344,8 +2507,13 @@ int ldlm_init_export(struct obd_export *exp) exp->exp_lock_hash = cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid), - HASH_EXP_LOCK_CUR_BITS, HASH_EXP_LOCK_MAX_BITS, - &ldlm_export_lock_ops, CFS_HASH_REHASH); + HASH_EXP_LOCK_CUR_BITS, + HASH_EXP_LOCK_MAX_BITS, + HASH_EXP_LOCK_BKT_BITS, 0, + CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA, + &ldlm_export_lock_ops, + CFS_HASH_DEFAULT | CFS_HASH_REHASH_KEY | + CFS_HASH_NBLK_CHANGE); if (!exp->exp_lock_hash) RETURN(-ENOMEM); @@ -2422,7 +2590,7 @@ static int ldlm_setup(void) ldlm_min_threads, ldlm_max_threads, "ldlm_cn", LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD, - NULL); + ldlm_hpreq_handler); if (!ldlm_state->ldlm_cancel_service) { CERROR("failed to start service\n"); @@ -2450,11 +2618,11 @@ static int ldlm_setup(void) GOTO(out_thread, rc); } - rc = ptlrpc_start_threads(NULL, ldlm_state->ldlm_cancel_service); + rc = ptlrpc_start_threads(ldlm_state->ldlm_cancel_service); if (rc) GOTO(out_thread, rc); - rc = ptlrpc_start_threads(NULL, ldlm_state->ldlm_cb_service); + rc = ptlrpc_start_threads(ldlm_state->ldlm_cb_service); if (rc) GOTO(out_thread, rc); @@ -2466,7 +2634,7 @@ static int ldlm_setup(void) cfs_spin_lock_init(&waiting_locks_spinlock); cfs_timer_init(&waiting_locks_timer, waiting_locks_callback, 0); - rc = cfs_kernel_thread(expired_lock_main, NULL, CLONE_VM | CLONE_FILES); + rc = cfs_create_thread(expired_lock_main, NULL, CFS_DAEMON_FLAGS); if (rc < 0) { CERROR("Cannot start ldlm expired-lock thread: %d\n", rc); GOTO(out_thread, rc); @@ -2552,7 +2720,7 @@ static int ldlm_cleanup(void) RETURN(0); } -int __init ldlm_init(void) +int ldlm_init(void) { cfs_init_mutex(&ldlm_ref_sem); cfs_init_mutex(ldlm_namespace_lock(LDLM_NAMESPACE_SERVER)); @@ -2585,7 +2753,7 @@ int __init ldlm_init(void) return 0; } -void __exit ldlm_exit(void) +void ldlm_exit(void) { int rc; if (ldlm_refcount) @@ -2653,9 +2821,9 @@ EXPORT_SYMBOL(ldlm_cli_cancel_req); EXPORT_SYMBOL(ldlm_replay_locks); EXPORT_SYMBOL(ldlm_resource_foreach); EXPORT_SYMBOL(ldlm_namespace_foreach); -EXPORT_SYMBOL(ldlm_namespace_foreach_res); EXPORT_SYMBOL(ldlm_resource_iterate); EXPORT_SYMBOL(ldlm_cancel_resource_local); +EXPORT_SYMBOL(ldlm_cli_cancel_list_local); EXPORT_SYMBOL(ldlm_cli_cancel_list); /* ldlm_lockd.c */