X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fldlm%2Fldlm_lockd.c;h=d009a1fe7a88afb10846e08930ebeca43938117c;hp=cd8623d70b8b0d6b14cf27ff955866bf376e6aae;hb=0bd27be7f20a671e7128f341a070838a2bd318dc;hpb=f95393b0d0a59cf3dc2f29cffc35dcc4cc9d7728 diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index cd8623d..d009a1f 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -28,6 +28,9 @@ /* * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * + * Copyright (c) 2011 Whamcloud, Inc. + * */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -66,6 +69,11 @@ extern cfs_mem_cache_t *ldlm_lock_slab; static cfs_semaphore_t ldlm_ref_sem; static int ldlm_refcount; +struct ldlm_cb_async_args { + struct ldlm_cb_set_arg *ca_set_arg; + struct ldlm_lock *ca_lock; +}; + /* LDLM state */ static struct ldlm_state *ldlm_state; @@ -133,6 +141,9 @@ struct ldlm_bl_work_item { struct ldlm_lock *blwi_lock; cfs_list_t blwi_head; int blwi_count; + cfs_completion_t blwi_comp; + int blwi_mode; + int blwi_mem_pressure; }; #ifdef __KERNEL__ @@ -252,7 +263,7 @@ static int ldlm_lock_busy(struct ldlm_lock *lock) if (lock->l_export == NULL) return 0; - cfs_spin_lock(&lock->l_export->exp_lock); + cfs_spin_lock_bh(&lock->l_export->exp_rpc_lock); cfs_list_for_each_entry(req, &lock->l_export->exp_queued_rpc, rq_exp_list) { if (req->rq_ops->hpreq_lock_match) { @@ -261,14 +272,14 @@ static int ldlm_lock_busy(struct ldlm_lock *lock) break; } } - cfs_spin_unlock(&lock->l_export->exp_lock); + cfs_spin_unlock_bh(&lock->l_export->exp_rpc_lock); RETURN(match); } /* This is called from within a timer interrupt and cannot schedule */ static void waiting_locks_callback(unsigned long unused) { - struct ldlm_lock *lock, *last = NULL; + struct ldlm_lock *lock; repeat: cfs_spin_lock_bh(&waiting_locks_spinlock); @@ -337,15 +348,13 @@ repeat: LDLM_LOCK_RELEASE(lock); continue; } - lock->l_resource->lr_namespace->ns_timeouts++; + ldlm_lock_to_ns(lock)->ns_timeouts++; LDLM_ERROR(lock, "lock callback timer expired after %lds: " "evicting client at %s ", cfs_time_current_sec()- lock->l_last_activity, libcfs_nid2str( lock->l_export->exp_connection->c_peer.nid)); - last = lock; - /* no needs to take an extra ref on the lock since it was in * the waiting_locks_list and ldlm_add_waiting_lock() * already grabbed a ref */ @@ -603,19 +612,29 @@ static int ldlm_handle_ast_error(struct ldlm_lock *lock, ldlm_failed_ast(lock, rc, ast_type); } } else if (rc) { - if (rc == -EINVAL) + if (rc == -EINVAL) { + struct ldlm_resource *res = lock->l_resource; LDLM_DEBUG(lock, "client (nid %s) returned %d" " from %s AST - normal race", libcfs_nid2str(peer.nid), req->rq_repmsg ? lustre_msg_get_status(req->rq_repmsg) : -1, ast_type); - else + if (res) { + /* update lvbo to return proper attributes. + * see bug 23174 */ + ldlm_resource_getref(res); + ldlm_res_lvbo_update(res, NULL, 1); + ldlm_resource_putref(res); + } + + } else { LDLM_ERROR(lock, "client (nid %s) returned %d " "from %s AST", libcfs_nid2str(peer.nid), (req->rq_repmsg != NULL) ? lustre_msg_get_status(req->rq_repmsg) : 0, ast_type); + } ldlm_lock_cancel(lock); /* Server-side AST functions are called from ldlm_reprocess_all, * which needs to be told to please restart its reprocessing. */ @@ -628,35 +647,27 @@ static int ldlm_handle_ast_error(struct ldlm_lock *lock, static int ldlm_cb_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *data, int rc) { - struct ldlm_cb_set_arg *arg; - struct ldlm_lock *lock; + struct ldlm_cb_async_args *ca = data; + struct ldlm_lock *lock = ca->ca_lock; + struct ldlm_cb_set_arg *arg = ca->ca_set_arg; ENTRY; - LASSERT(data != NULL); - - arg = req->rq_async_args.pointer_arg[0]; - lock = req->rq_async_args.pointer_arg[1]; LASSERT(lock != NULL); if (rc != 0) { - /* If client canceled the lock but the cancel has not - * been received yet, we need to update lvbo to have the - * proper attributes cached. */ - if (rc == -EINVAL && arg->type == LDLM_BL_CALLBACK) - ldlm_res_lvbo_update(lock->l_resource, NULL, 1); rc = ldlm_handle_ast_error(lock, req, rc, arg->type == LDLM_BL_CALLBACK ? "blocking" : "completion"); + if (rc == -ERESTART) + cfs_atomic_inc(&arg->restart); } - LDLM_LOCK_RELEASE(lock); - if (rc == -ERESTART) - cfs_atomic_set(&arg->restart, 1); - + if (cfs_atomic_dec_return(&arg->rpcs) < arg->threshold) + cfs_waitq_signal(&arg->waitq); RETURN(0); } -static inline int ldlm_bl_and_cp_ast_fini(struct ptlrpc_request *req, +static inline int ldlm_bl_and_cp_ast_tail(struct ptlrpc_request *req, struct ldlm_cb_set_arg *arg, struct ldlm_lock *lock, int instant_cancel) @@ -668,12 +679,11 @@ static inline int ldlm_bl_and_cp_ast_fini(struct ptlrpc_request *req, rc = ptl_send_rpc(req, 1); ptlrpc_req_finished(req); if (rc == 0) - /* If we cancelled the lock, we need to restart - * ldlm_reprocess_queue */ - cfs_atomic_set(&arg->restart, 1); + cfs_atomic_inc(&arg->restart); } else { LDLM_LOCK_GET(lock); - ptlrpc_set_add_req(arg->set, req); + ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); + cfs_atomic_inc(&arg->rpcs); } RETURN(rc); @@ -693,14 +703,14 @@ static void ldlm_lock_reorder_req(struct ldlm_lock *lock) RETURN_EXIT; } - cfs_spin_lock(&lock->l_export->exp_lock); + cfs_spin_lock_bh(&lock->l_export->exp_rpc_lock); cfs_list_for_each_entry(req, &lock->l_export->exp_queued_rpc, rq_exp_list) { if (!req->rq_hp && req->rq_ops->hpreq_lock_match && req->rq_ops->hpreq_lock_match(req, lock)) ptlrpc_hpreq_reorder(req); } - cfs_spin_unlock(&lock->l_export->exp_lock); + cfs_spin_unlock_bh(&lock->l_export->exp_rpc_lock); EXIT; } @@ -715,6 +725,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, void *data, int flag) { + struct ldlm_cb_async_args *ca; struct ldlm_cb_set_arg *arg = data; struct ldlm_request *body; struct ptlrpc_request *req; @@ -741,8 +752,11 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, if (req == NULL) RETURN(-ENOMEM); - req->rq_async_args.pointer_arg[0] = arg; - req->rq_async_args.pointer_arg[1] = lock; + CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args)); + ca = ptlrpc_req_async_args(req); + ca->ca_set_arg = arg; + ca->ca_lock = lock; + req->rq_interpret_reply = ldlm_cb_interpret; req->rq_no_resend = 1; @@ -793,7 +807,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats, LDLM_BL_CALLBACK - LDLM_FIRST_OPC); - rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel); + rc = ldlm_bl_and_cp_ast_tail(req, arg, lock, instant_cancel); RETURN(rc); } @@ -803,6 +817,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) struct ldlm_cb_set_arg *arg = data; struct ldlm_request *body; struct ptlrpc_request *req; + struct ldlm_cb_async_args *ca; long total_enqueue_wait; int instant_cancel = 0; int rc = 0; @@ -819,11 +834,11 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) if (req == NULL) RETURN(-ENOMEM); - lock_res_and_lock(lock); - if (lock->l_resource->lr_lvb_len) + /* server namespace, doesn't need lock */ + if (lock->l_resource->lr_lvb_len) { req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT, lock->l_resource->lr_lvb_len); - unlock_res_and_lock(lock); + } rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK); if (rc) { @@ -831,8 +846,11 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) RETURN(rc); } - req->rq_async_args.pointer_arg[0] = arg; - req->rq_async_args.pointer_arg[1] = lock; + CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args)); + ca = ptlrpc_req_async_args(req); + ca->ca_set_arg = arg; + ca->ca_lock = lock; + req->rq_interpret_reply = ldlm_cb_interpret; req->rq_no_resend = 1; body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ); @@ -843,10 +861,10 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) if (lock->l_resource->lr_lvb_len) { void *lvb = req_capsule_client_get(&req->rq_pill, &RMF_DLM_LVB); - lock_res_and_lock(lock); + lock_res(lock->l_resource); memcpy(lvb, lock->l_resource->lr_lvb_data, lock->l_resource->lr_lvb_len); - unlock_res_and_lock(lock); + unlock_res(lock->l_resource); } LDLM_DEBUG(lock, "server preparing completion AST (after %lds wait)", @@ -855,7 +873,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) /* Server-side enqueue wait time estimate, used in __ldlm_add_waiting_lock to set future enqueue timers */ if (total_enqueue_wait < ldlm_get_enq_timeout(lock)) - at_measured(&lock->l_resource->lr_namespace->ns_at_estimate, + at_measured(ldlm_lock_to_ns_at(lock), total_enqueue_wait); else /* bz18618. Don't add lock enqueue time we spend waiting for a @@ -865,7 +883,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) LDLM_DEBUG(lock, "lock completed after %lus; estimate was %ds. " "It is likely that a previous callback timed out.", total_enqueue_wait, - at_get(&lock->l_resource->lr_namespace->ns_at_estimate)); + at_get(ldlm_lock_to_ns_at(lock))); ptlrpc_request_set_replen(req); @@ -904,7 +922,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats, LDLM_CP_CALLBACK - LDLM_FIRST_OPC); - rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel); + rc = ldlm_bl_and_cp_ast_tail(req, arg, lock, instant_cancel); RETURN(rc); } @@ -930,10 +948,9 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data) body->lock_handle[0] = lock->l_remote_handle; ldlm_lock2desc(lock, &body->lock_desc); - lock_res_and_lock(lock); + /* server namespace, doesn't need lock */ req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lock->l_resource->lr_lvb_len); - unlock_res_and_lock(lock); res = lock->l_resource; ptlrpc_request_set_replen(req); @@ -949,12 +966,20 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data) LDLM_GL_CALLBACK - LDLM_FIRST_OPC); rc = ptlrpc_queue_wait(req); - if (rc == -ELDLM_NO_LOCK_DATA) + /* Update the LVB from disk if the AST failed (this is a legal race) + * + * - Glimpse callback of local lock just return -ELDLM_NO_LOCK_DATA. + * - Glimpse callback of remote lock might return -ELDLM_NO_LOCK_DATA + * when inode is cleared. LU-274 + */ + if (rc == -ELDLM_NO_LOCK_DATA) { LDLM_DEBUG(lock, "lost race - client has a lock but no inode"); - else if (rc != 0) + ldlm_res_lvbo_update(res, NULL, 1); + } else if (rc != 0) { rc = ldlm_handle_ast_error(lock, req, rc, "glimpse"); - else + } else { rc = ldlm_res_lvbo_update(res, req, 1); + } ptlrpc_req_finished(req); if (rc == -ERESTART) @@ -1108,10 +1133,12 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, LDLM_DEBUG(lock, "server-side enqueue handler, new lock created"); OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_BLOCKED, obd_timeout * 2); - /* Don't enqueue a lock onto the export if it has already - * been evicted. Cancel it now instead. (bug 3822) */ - if (req->rq_export->exp_failed) { - LDLM_ERROR(lock, "lock on destroyed export %p", req->rq_export); + /* Don't enqueue a lock onto the export if it is been disonnected + * due to eviction (bug 3822) or server umount (bug 24324). + * Cancel it now instead. */ + if (req->rq_export->exp_disconnected) { + LDLM_ERROR(lock, "lock on disconnected export %p", + req->rq_export); GOTO(out, rc = -ENOTCONN); } @@ -1128,13 +1155,14 @@ existing_lock: * local_lock_enqueue by the policy function. */ cookie = req; } else { - lock_res_and_lock(lock); + /* based on the assumption that lvb size never changes during + * resource life time otherwise it need resource->lr_lock's + * protection */ if (lock->l_resource->lr_lvb_len) { req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lock->l_resource->lr_lvb_len); } - unlock_res_and_lock(lock); if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR)) GOTO(out, rc = -ENOMEM); @@ -1145,7 +1173,10 @@ existing_lock: } if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN) - lock->l_policy_data = dlm_req->lock_desc.l_policy_data; + ldlm_convert_policy_to_local( + dlm_req->lock_desc.l_resource.lr_type, + &dlm_req->lock_desc.l_policy_data, + &lock->l_policy_data); if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) lock->l_req_extent = lock->l_policy_data.l_extent; @@ -1168,9 +1199,10 @@ existing_lock: dlm_rep->lock_flags |= dlm_req->lock_flags & LDLM_INHERIT_FLAGS; lock->l_flags |= dlm_req->lock_flags & LDLM_INHERIT_FLAGS; - /* Don't move a pending lock onto the export if it has already - * been evicted. Cancel it now instead. (bug 5683) */ - if (unlikely(req->rq_export->exp_failed || + /* Don't move a pending lock onto the export if it has already been + * disconnected due to eviction (bug 5683) or server umount (bug 24324). + * Cancel it now instead. */ + if (unlikely(req->rq_export->exp_disconnected || OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_OLD_EXPORT))) { LDLM_ERROR(lock, "lock on destroyed export %p", req->rq_export); rc = -ENOTCONN; @@ -1234,24 +1266,28 @@ existing_lock: LDLM_DEBUG(lock, "server-side enqueue handler, sending reply" "(err=%d, rc=%d)", err, rc); - lock_res_and_lock(lock); if (rc == 0) { if (lock->l_resource->lr_lvb_len > 0) { + /* MDT path won't handle lr_lvb_data, so + * lock/unlock better be contained in the + * if block */ void *lvb; lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB); LASSERTF(lvb != NULL, "req %p, lock %p\n", req, lock); - + lock_res(lock->l_resource); memcpy(lvb, lock->l_resource->lr_lvb_data, lock->l_resource->lr_lvb_len); + unlock_res(lock->l_resource); } } else { + lock_res_and_lock(lock); ldlm_resource_unlink_lock(lock); ldlm_lock_destroy_nolock(lock); + unlock_res_and_lock(lock); } - unlock_res_and_lock(lock); if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK) ldlm_reprocess_all(lock->l_resource); @@ -1436,10 +1472,7 @@ int ldlm_handle_cancel(struct ptlrpc_request *req) if (!ldlm_request_cancel(req, dlm_req, 0)) req->rq_status = ESTALE; - if (ptlrpc_reply(req) != 0) - LBUG(); - - RETURN(0); + RETURN(ptlrpc_reply(req)); } void ldlm_handle_bl_callback(struct ldlm_namespace *ns, @@ -1515,7 +1548,10 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, } if (lock->l_resource->lr_type != LDLM_PLAIN) { - lock->l_policy_data = dlm_req->lock_desc.l_policy_data; + ldlm_convert_policy_to_local( + dlm_req->lock_desc.l_resource.lr_type, + &dlm_req->lock_desc.l_policy_data, + &lock->l_policy_data); LDLM_DEBUG(lock, "completion AST, new policy data"); } @@ -1561,7 +1597,11 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work"); - ldlm_run_ast_work(&ast_list, LDLM_WORK_CP_AST); + /* Let Enqueue to call osc_lock_upcall() and initialize + * l_ast_data */ + OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 2); + + ldlm_run_ast_work(ns, &ast_list, LDLM_WORK_CP_AST); LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)", lock); @@ -1622,22 +1662,46 @@ static int ldlm_callback_reply(struct ptlrpc_request *req, int rc) } #ifdef __KERNEL__ -static int ldlm_bl_to_thread(struct ldlm_namespace *ns, - struct ldlm_lock_desc *ld, struct ldlm_lock *lock, - cfs_list_t *cancels, int count) +static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi, int mode) { struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool; - struct ldlm_bl_work_item *blwi; ENTRY; - if (cancels && count == 0) - RETURN(0); + cfs_spin_lock(&blp->blp_lock); + if (blwi->blwi_lock && blwi->blwi_lock->l_flags & LDLM_FL_DISCARD_DATA) { + /* add LDLM_FL_DISCARD_DATA requests to the priority list */ + cfs_list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list); + } else { + /* other blocking callbacks are added to the regular list */ + cfs_list_add_tail(&blwi->blwi_entry, &blp->blp_list); + } + cfs_spin_unlock(&blp->blp_lock); - OBD_ALLOC(blwi, sizeof(*blwi)); - if (blwi == NULL) - RETURN(-ENOMEM); + cfs_waitq_signal(&blp->blp_waitq); + + /* can not use blwi->blwi_mode as blwi could be already freed in + LDLM_ASYNC mode */ + if (mode == LDLM_SYNC) + cfs_wait_for_completion(&blwi->blwi_comp); + + RETURN(0); +} + +static inline void init_blwi(struct ldlm_bl_work_item *blwi, + struct ldlm_namespace *ns, + struct ldlm_lock_desc *ld, + cfs_list_t *cancels, int count, + struct ldlm_lock *lock, + int mode) +{ + cfs_init_completion(&blwi->blwi_comp); + CFS_INIT_LIST_HEAD(&blwi->blwi_head); + + if (cfs_memory_pressure_get()) + blwi->blwi_mem_pressure = 1; blwi->blwi_ns = ns; + blwi->blwi_mode = mode; if (ld != NULL) blwi->blwi_ld = *ld; if (count) { @@ -1647,36 +1711,53 @@ static int ldlm_bl_to_thread(struct ldlm_namespace *ns, } else { blwi->blwi_lock = lock; } - cfs_spin_lock(&blp->blp_lock); - if (lock && lock->l_flags & LDLM_FL_DISCARD_DATA) { - /* add LDLM_FL_DISCARD_DATA requests to the priority list */ - cfs_list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list); +} + +static int ldlm_bl_to_thread(struct ldlm_namespace *ns, + struct ldlm_lock_desc *ld, struct ldlm_lock *lock, + cfs_list_t *cancels, int count, int mode) +{ + ENTRY; + + if (cancels && count == 0) + RETURN(0); + + if (mode == LDLM_SYNC) { + /* if it is synchronous call do minimum mem alloc, as it could + * be triggered from kernel shrinker + */ + struct ldlm_bl_work_item blwi; + memset(&blwi, 0, sizeof(blwi)); + init_blwi(&blwi, ns, ld, cancels, count, lock, LDLM_SYNC); + RETURN(__ldlm_bl_to_thread(&blwi, LDLM_SYNC)); } else { - /* other blocking callbacks are added to the regular list */ - cfs_list_add_tail(&blwi->blwi_entry, &blp->blp_list); - } - cfs_waitq_signal(&blp->blp_waitq); - cfs_spin_unlock(&blp->blp_lock); + struct ldlm_bl_work_item *blwi; + OBD_ALLOC(blwi, sizeof(*blwi)); + if (blwi == NULL) + RETURN(-ENOMEM); + init_blwi(blwi, ns, ld, cancels, count, lock, LDLM_ASYNC); - RETURN(0); + RETURN(__ldlm_bl_to_thread(blwi, LDLM_ASYNC)); + } } + #endif int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, struct ldlm_lock *lock) { #ifdef __KERNEL__ - RETURN(ldlm_bl_to_thread(ns, ld, lock, NULL, 0)); + RETURN(ldlm_bl_to_thread(ns, ld, lock, NULL, 0, LDLM_ASYNC)); #else RETURN(-ENOSYS); #endif } int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, - cfs_list_t *cancels, int count) + cfs_list_t *cancels, int count, int mode) { #ifdef __KERNEL__ - RETURN(ldlm_bl_to_thread(ns, ld, NULL, cancels, count)); + RETURN(ldlm_bl_to_thread(ns, ld, NULL, cancels, count, mode)); #else RETURN(-ENOSYS); #endif @@ -1964,9 +2045,11 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req) if (req->rq_export == NULL) { struct ldlm_request *dlm_req; - CERROR("operation %d from %s with bad export cookie "LPU64"\n", - lustre_msg_get_opc(req->rq_reqmsg), - libcfs_id2str(req->rq_peer), + CERROR("%s from %s arrived at %lu with bad export cookie " + LPU64"\n", + ll_opcode2str(lustre_msg_get_opc(req->rq_reqmsg)), + libcfs_nid2str(req->rq_peer.nid), + req->rq_arrival_time.tv_sec, lustre_msg_get_handle(req->rq_reqmsg)->cookie); if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_CANCEL) { @@ -2012,28 +2095,30 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req) RETURN(0); } -void ldlm_revoke_lock_cb(void *obj, void *data) +int ldlm_revoke_lock_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd, + cfs_hlist_node_t *hnode, void *data) + { cfs_list_t *rpc_list = data; - struct ldlm_lock *lock = obj; + struct ldlm_lock *lock = cfs_hash_object(hs, hnode); lock_res_and_lock(lock); if (lock->l_req_mode != lock->l_granted_mode) { unlock_res_and_lock(lock); - return; + return 0; } LASSERT(lock->l_resource); if (lock->l_resource->lr_type != LDLM_IBITS && lock->l_resource->lr_type != LDLM_PLAIN) { unlock_res_and_lock(lock); - return; + return 0; } if (lock->l_flags & LDLM_FL_AST_SENT) { unlock_res_and_lock(lock); - return; + return 0; } LASSERT(lock->l_blocking_ast); @@ -2048,6 +2133,7 @@ void ldlm_revoke_lock_cb(void *obj, void *data) LDLM_LOCK_GET(lock); unlock_res_and_lock(lock); + return 0; } void ldlm_revoke_export_locks(struct obd_export *exp) @@ -2058,7 +2144,8 @@ void ldlm_revoke_export_locks(struct obd_export *exp) CFS_INIT_LIST_HEAD(&rpc_list); cfs_hash_for_each_empty(exp->exp_lock_hash, ldlm_revoke_lock_cb, &rpc_list); - ldlm_run_ast_work(&rpc_list, LDLM_WORK_REVOKE_AST); + ldlm_run_ast_work(exp->exp_obd->obd_namespace, &rpc_list, + LDLM_WORK_REVOKE_AST); EXIT; } @@ -2107,7 +2194,7 @@ static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp) int rc; cfs_init_completion(&bltd.bltd_comp); - rc = cfs_kernel_thread(ldlm_bl_thread_main, &bltd, 0); + rc = cfs_create_thread(ldlm_bl_thread_main, &bltd, 0); if (rc < 0) { CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %d\n", cfs_atomic_read(&blp->blp_num_threads), rc); @@ -2161,7 +2248,8 @@ static int ldlm_bl_thread_main(void *arg) /* Not fatal if racy and have a few too many threads */ if (unlikely(busy < blp->blp_max_threads && - busy >= cfs_atomic_read(&blp->blp_num_threads))) + busy >= cfs_atomic_read(&blp->blp_num_threads) && + !blwi->blwi_mem_pressure)) /* discard the return value, we tried */ ldlm_bl_thread_start(blp); } else { @@ -2169,19 +2257,30 @@ static int ldlm_bl_thread_main(void *arg) /* added by ldlm_cleanup() */ break; } + if (blwi->blwi_mem_pressure) + cfs_memory_pressure_set(); if (blwi->blwi_count) { + int count; /* The special case when we cancel locks in lru * asynchronously, we pass the list of locks here. - * Thus lock is marked LDLM_FL_CANCELING, and already - * canceled locally. */ - ldlm_cli_cancel_list(&blwi->blwi_head, - blwi->blwi_count, NULL, 0); + * Thus locks are marked LDLM_FL_CANCELING, but NOT + * canceled locally yet. */ + count = ldlm_cli_cancel_list_local(&blwi->blwi_head, + blwi->blwi_count, + LCF_BL_AST); + ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL, 0); } else { ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld, blwi->blwi_lock); } - OBD_FREE(blwi, sizeof(*blwi)); + if (blwi->blwi_mem_pressure) + cfs_memory_pressure_clr(); + + if (blwi->blwi_mode == LDLM_ASYNC) + OBD_FREE(blwi, sizeof(*blwi)); + else + cfs_complete(&blwi->blwi_comp); } cfs_atomic_dec(&blp->blp_busy_threads); @@ -2232,7 +2331,7 @@ void ldlm_put_ref(void) * Export handle<->lock hash operations. */ static unsigned -ldlm_export_lock_hash(cfs_hash_t *hs, void *key, unsigned mask) +ldlm_export_lock_hash(cfs_hash_t *hs, const void *key, unsigned mask) { return cfs_hash_u64_hash(((struct lustre_handle *)key)->cookie, mask); } @@ -2241,49 +2340,59 @@ static void * ldlm_export_lock_key(cfs_hlist_node_t *hnode) { struct ldlm_lock *lock; - ENTRY; lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash); - RETURN(&lock->l_remote_handle); + return &lock->l_remote_handle; +} + +static void +ldlm_export_lock_keycpy(cfs_hlist_node_t *hnode, void *key) +{ + struct ldlm_lock *lock; + + lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash); + lock->l_remote_handle = *(struct lustre_handle *)key; } static int -ldlm_export_lock_compare(void *key, cfs_hlist_node_t *hnode) +ldlm_export_lock_keycmp(const void *key, cfs_hlist_node_t *hnode) { - ENTRY; - RETURN(lustre_handle_equal(ldlm_export_lock_key(hnode), key)); + return lustre_handle_equal(ldlm_export_lock_key(hnode), key); } static void * -ldlm_export_lock_get(cfs_hlist_node_t *hnode) +ldlm_export_lock_object(cfs_hlist_node_t *hnode) +{ + return cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash); +} + +static void +ldlm_export_lock_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode) { struct ldlm_lock *lock; - ENTRY; lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash); LDLM_LOCK_GET(lock); - - RETURN(lock); } -static void * -ldlm_export_lock_put(cfs_hlist_node_t *hnode) +static void +ldlm_export_lock_put(cfs_hash_t *hs, cfs_hlist_node_t *hnode) { struct ldlm_lock *lock; - ENTRY; lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash); LDLM_LOCK_RELEASE(lock); - - RETURN(lock); } static cfs_hash_ops_t ldlm_export_lock_ops = { - .hs_hash = ldlm_export_lock_hash, - .hs_key = ldlm_export_lock_key, - .hs_compare = ldlm_export_lock_compare, - .hs_get = ldlm_export_lock_get, - .hs_put = ldlm_export_lock_put + .hs_hash = ldlm_export_lock_hash, + .hs_key = ldlm_export_lock_key, + .hs_keycmp = ldlm_export_lock_keycmp, + .hs_keycpy = ldlm_export_lock_keycpy, + .hs_object = ldlm_export_lock_object, + .hs_get = ldlm_export_lock_get, + .hs_put = ldlm_export_lock_put, + .hs_put_locked = ldlm_export_lock_put, }; int ldlm_init_export(struct obd_export *exp) @@ -2292,8 +2401,13 @@ int ldlm_init_export(struct obd_export *exp) exp->exp_lock_hash = cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid), - HASH_EXP_LOCK_CUR_BITS, HASH_EXP_LOCK_MAX_BITS, - &ldlm_export_lock_ops, CFS_HASH_REHASH); + HASH_EXP_LOCK_CUR_BITS, + HASH_EXP_LOCK_MAX_BITS, + HASH_EXP_LOCK_BKT_BITS, 0, + CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA, + &ldlm_export_lock_ops, + CFS_HASH_DEFAULT | CFS_HASH_REHASH_KEY | + CFS_HASH_NBLK_CHANGE); if (!exp->exp_lock_hash) RETURN(-ENOMEM); @@ -2398,11 +2512,11 @@ static int ldlm_setup(void) GOTO(out_thread, rc); } - rc = ptlrpc_start_threads(NULL, ldlm_state->ldlm_cancel_service); + rc = ptlrpc_start_threads(ldlm_state->ldlm_cancel_service); if (rc) GOTO(out_thread, rc); - rc = ptlrpc_start_threads(NULL, ldlm_state->ldlm_cb_service); + rc = ptlrpc_start_threads(ldlm_state->ldlm_cb_service); if (rc) GOTO(out_thread, rc); @@ -2414,7 +2528,7 @@ static int ldlm_setup(void) cfs_spin_lock_init(&waiting_locks_spinlock); cfs_timer_init(&waiting_locks_timer, waiting_locks_callback, 0); - rc = cfs_kernel_thread(expired_lock_main, NULL, CLONE_VM | CLONE_FILES); + rc = cfs_create_thread(expired_lock_main, NULL, CFS_DAEMON_FLAGS); if (rc < 0) { CERROR("Cannot start ldlm expired-lock thread: %d\n", rc); GOTO(out_thread, rc); @@ -2601,9 +2715,9 @@ EXPORT_SYMBOL(ldlm_cli_cancel_req); EXPORT_SYMBOL(ldlm_replay_locks); EXPORT_SYMBOL(ldlm_resource_foreach); EXPORT_SYMBOL(ldlm_namespace_foreach); -EXPORT_SYMBOL(ldlm_namespace_foreach_res); EXPORT_SYMBOL(ldlm_resource_iterate); EXPORT_SYMBOL(ldlm_cancel_resource_local); +EXPORT_SYMBOL(ldlm_cli_cancel_list_local); EXPORT_SYMBOL(ldlm_cli_cancel_list); /* ldlm_lockd.c */