X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fldlm%2Fldlm_lockd.c;h=d77a893b6e48cfe08663524cc19cce8a7a759d43;hb=58f98d4c4477f16557b54162a9566ac38a0fef38;hp=78375dd21562d86556fbfa7d489741df708efcec;hpb=6e3ec5812ebd1b5ecf7cae584f429b013ffe7431;p=fs%2Flustre-release.git diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 78375dd..d77a893 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -26,8 +26,11 @@ * GPL HEADER END */ /* - * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * + * Copyright (c) 2011 Whamcloud, Inc. + * */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -66,6 +69,11 @@ extern cfs_mem_cache_t *ldlm_lock_slab; static cfs_semaphore_t ldlm_ref_sem; static int ldlm_refcount; +struct ldlm_cb_async_args { + struct ldlm_cb_set_arg *ca_set_arg; + struct ldlm_lock *ca_lock; +}; + /* LDLM state */ static struct ldlm_state *ldlm_state; @@ -133,6 +141,9 @@ struct ldlm_bl_work_item { struct ldlm_lock *blwi_lock; cfs_list_t blwi_head; int blwi_count; + cfs_completion_t blwi_comp; + int blwi_mode; + int blwi_mem_pressure; }; #ifdef __KERNEL__ @@ -252,7 +263,7 @@ static int ldlm_lock_busy(struct ldlm_lock *lock) if (lock->l_export == NULL) return 0; - cfs_spin_lock(&lock->l_export->exp_lock); + cfs_spin_lock_bh(&lock->l_export->exp_rpc_lock); cfs_list_for_each_entry(req, &lock->l_export->exp_queued_rpc, rq_exp_list) { if (req->rq_ops->hpreq_lock_match) { @@ -261,14 +272,14 @@ static int ldlm_lock_busy(struct ldlm_lock *lock) break; } } - cfs_spin_unlock(&lock->l_export->exp_lock); + cfs_spin_unlock_bh(&lock->l_export->exp_rpc_lock); RETURN(match); } /* This is called from within a timer interrupt and cannot schedule */ static void waiting_locks_callback(unsigned long unused) { - struct ldlm_lock *lock, *last = NULL; + struct ldlm_lock *lock; repeat: cfs_spin_lock_bh(&waiting_locks_spinlock); @@ -337,15 +348,13 @@ repeat: LDLM_LOCK_RELEASE(lock); continue; } - lock->l_resource->lr_namespace->ns_timeouts++; + ldlm_lock_to_ns(lock)->ns_timeouts++; LDLM_ERROR(lock, "lock callback timer expired after %lds: " "evicting client at %s ", cfs_time_current_sec()- lock->l_last_activity, libcfs_nid2str( lock->l_export->exp_connection->c_peer.nid)); - last = lock; - /* no needs to take an extra ref on the lock since it was in * the waiting_locks_list and ldlm_add_waiting_lock() * already grabbed a ref */ @@ -603,19 +612,29 @@ static int ldlm_handle_ast_error(struct ldlm_lock *lock, ldlm_failed_ast(lock, rc, ast_type); } } else if (rc) { - if (rc == -EINVAL) + if (rc == -EINVAL) { + struct ldlm_resource *res = lock->l_resource; LDLM_DEBUG(lock, "client (nid %s) returned %d" " from %s AST - normal race", libcfs_nid2str(peer.nid), req->rq_repmsg ? lustre_msg_get_status(req->rq_repmsg) : -1, ast_type); - else + if (res) { + /* update lvbo to return proper attributes. + * see bug 23174 */ + ldlm_resource_getref(res); + ldlm_res_lvbo_update(res, NULL, 1); + ldlm_resource_putref(res); + } + + } else { LDLM_ERROR(lock, "client (nid %s) returned %d " "from %s AST", libcfs_nid2str(peer.nid), (req->rq_repmsg != NULL) ? lustre_msg_get_status(req->rq_repmsg) : 0, ast_type); + } ldlm_lock_cancel(lock); /* Server-side AST functions are called from ldlm_reprocess_all, * which needs to be told to please restart its reprocessing. */ @@ -628,35 +647,27 @@ static int ldlm_handle_ast_error(struct ldlm_lock *lock, static int ldlm_cb_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *data, int rc) { - struct ldlm_cb_set_arg *arg; - struct ldlm_lock *lock; + struct ldlm_cb_async_args *ca = data; + struct ldlm_lock *lock = ca->ca_lock; + struct ldlm_cb_set_arg *arg = ca->ca_set_arg; ENTRY; - LASSERT(data != NULL); - - arg = req->rq_async_args.pointer_arg[0]; - lock = req->rq_async_args.pointer_arg[1]; LASSERT(lock != NULL); if (rc != 0) { - /* If client canceled the lock but the cancel has not - * been received yet, we need to update lvbo to have the - * proper attributes cached. */ - if (rc == -EINVAL && arg->type == LDLM_BL_CALLBACK) - ldlm_res_lvbo_update(lock->l_resource, NULL, 1); rc = ldlm_handle_ast_error(lock, req, rc, arg->type == LDLM_BL_CALLBACK ? "blocking" : "completion"); + if (rc == -ERESTART) + cfs_atomic_inc(&arg->restart); } - LDLM_LOCK_RELEASE(lock); - if (rc == -ERESTART) - cfs_atomic_set(&arg->restart, 1); - + if (cfs_atomic_dec_return(&arg->rpcs) < arg->threshold) + cfs_waitq_signal(&arg->waitq); RETURN(0); } -static inline int ldlm_bl_and_cp_ast_fini(struct ptlrpc_request *req, +static inline int ldlm_bl_and_cp_ast_tail(struct ptlrpc_request *req, struct ldlm_cb_set_arg *arg, struct ldlm_lock *lock, int instant_cancel) @@ -668,12 +679,11 @@ static inline int ldlm_bl_and_cp_ast_fini(struct ptlrpc_request *req, rc = ptl_send_rpc(req, 1); ptlrpc_req_finished(req); if (rc == 0) - /* If we cancelled the lock, we need to restart - * ldlm_reprocess_queue */ - cfs_atomic_set(&arg->restart, 1); + cfs_atomic_inc(&arg->restart); } else { LDLM_LOCK_GET(lock); - ptlrpc_set_add_req(arg->set, req); + ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); + cfs_atomic_inc(&arg->rpcs); } RETURN(rc); @@ -693,14 +703,18 @@ static void ldlm_lock_reorder_req(struct ldlm_lock *lock) RETURN_EXIT; } - cfs_spin_lock(&lock->l_export->exp_lock); + cfs_spin_lock_bh(&lock->l_export->exp_rpc_lock); cfs_list_for_each_entry(req, &lock->l_export->exp_queued_rpc, rq_exp_list) { - if (!req->rq_hp && req->rq_ops->hpreq_lock_match && + /* Do not process requests that were not yet added to there + * incoming queue or were already removed from there for + * processing */ + if (!req->rq_hp && !cfs_list_empty(&req->rq_list) && + req->rq_ops->hpreq_lock_match && req->rq_ops->hpreq_lock_match(req, lock)) ptlrpc_hpreq_reorder(req); } - cfs_spin_unlock(&lock->l_export->exp_lock); + cfs_spin_unlock_bh(&lock->l_export->exp_rpc_lock); EXIT; } @@ -715,6 +729,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, void *data, int flag) { + struct ldlm_cb_async_args *ca; struct ldlm_cb_set_arg *arg = data; struct ldlm_request *body; struct ptlrpc_request *req; @@ -741,8 +756,11 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, if (req == NULL) RETURN(-ENOMEM); - req->rq_async_args.pointer_arg[0] = arg; - req->rq_async_args.pointer_arg[1] = lock; + CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args)); + ca = ptlrpc_req_async_args(req); + ca->ca_set_arg = arg; + ca->ca_lock = lock; + req->rq_interpret_reply = ldlm_cb_interpret; req->rq_no_resend = 1; @@ -793,7 +811,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats, LDLM_BL_CALLBACK - LDLM_FIRST_OPC); - rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel); + rc = ldlm_bl_and_cp_ast_tail(req, arg, lock, instant_cancel); RETURN(rc); } @@ -803,6 +821,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) struct ldlm_cb_set_arg *arg = data; struct ldlm_request *body; struct ptlrpc_request *req; + struct ldlm_cb_async_args *ca; long total_enqueue_wait; int instant_cancel = 0; int rc = 0; @@ -819,11 +838,11 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) if (req == NULL) RETURN(-ENOMEM); - lock_res_and_lock(lock); - if (lock->l_resource->lr_lvb_len) + /* server namespace, doesn't need lock */ + if (lock->l_resource->lr_lvb_len) { req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT, lock->l_resource->lr_lvb_len); - unlock_res_and_lock(lock); + } rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK); if (rc) { @@ -831,8 +850,11 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) RETURN(rc); } - req->rq_async_args.pointer_arg[0] = arg; - req->rq_async_args.pointer_arg[1] = lock; + CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args)); + ca = ptlrpc_req_async_args(req); + ca->ca_set_arg = arg; + ca->ca_lock = lock; + req->rq_interpret_reply = ldlm_cb_interpret; req->rq_no_resend = 1; body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ); @@ -843,10 +865,10 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) if (lock->l_resource->lr_lvb_len) { void *lvb = req_capsule_client_get(&req->rq_pill, &RMF_DLM_LVB); - lock_res_and_lock(lock); + lock_res(lock->l_resource); memcpy(lvb, lock->l_resource->lr_lvb_data, lock->l_resource->lr_lvb_len); - unlock_res_and_lock(lock); + unlock_res(lock->l_resource); } LDLM_DEBUG(lock, "server preparing completion AST (after %lds wait)", @@ -855,8 +877,8 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) /* Server-side enqueue wait time estimate, used in __ldlm_add_waiting_lock to set future enqueue timers */ if (total_enqueue_wait < ldlm_get_enq_timeout(lock)) - at_add(&lock->l_resource->lr_namespace->ns_at_estimate, - total_enqueue_wait); + at_measured(ldlm_lock_to_ns_at(lock), + total_enqueue_wait); else /* bz18618. Don't add lock enqueue time we spend waiting for a previous callback to fail. Locks waiting legitimately will @@ -865,7 +887,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) LDLM_DEBUG(lock, "lock completed after %lus; estimate was %ds. " "It is likely that a previous callback timed out.", total_enqueue_wait, - at_get(&lock->l_resource->lr_namespace->ns_at_estimate)); + at_get(ldlm_lock_to_ns_at(lock))); ptlrpc_request_set_replen(req); @@ -904,7 +926,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats, LDLM_CP_CALLBACK - LDLM_FIRST_OPC); - rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel); + rc = ldlm_bl_and_cp_ast_tail(req, arg, lock, instant_cancel); RETURN(rc); } @@ -930,10 +952,9 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data) body->lock_handle[0] = lock->l_remote_handle; ldlm_lock2desc(lock, &body->lock_desc); - lock_res_and_lock(lock); + /* server namespace, doesn't need lock */ req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lock->l_resource->lr_lvb_len); - unlock_res_and_lock(lock); res = lock->l_resource; ptlrpc_request_set_replen(req); @@ -949,12 +970,20 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data) LDLM_GL_CALLBACK - LDLM_FIRST_OPC); rc = ptlrpc_queue_wait(req); - if (rc == -ELDLM_NO_LOCK_DATA) + /* Update the LVB from disk if the AST failed (this is a legal race) + * + * - Glimpse callback of local lock just return -ELDLM_NO_LOCK_DATA. + * - Glimpse callback of remote lock might return -ELDLM_NO_LOCK_DATA + * when inode is cleared. LU-274 + */ + if (rc == -ELDLM_NO_LOCK_DATA) { LDLM_DEBUG(lock, "lost race - client has a lock but no inode"); - else if (rc != 0) + ldlm_res_lvbo_update(res, NULL, 1); + } else if (rc != 0) { rc = ldlm_handle_ast_error(lock, req, rc, "glimpse"); - else + } else { rc = ldlm_res_lvbo_update(res, req, 1); + } ptlrpc_req_finished(req); if (rc == -ERESTART) @@ -1108,10 +1137,12 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, LDLM_DEBUG(lock, "server-side enqueue handler, new lock created"); OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_BLOCKED, obd_timeout * 2); - /* Don't enqueue a lock onto the export if it has already - * been evicted. Cancel it now instead. (bug 3822) */ - if (req->rq_export->exp_failed) { - LDLM_ERROR(lock, "lock on destroyed export %p", req->rq_export); + /* Don't enqueue a lock onto the export if it is been disonnected + * due to eviction (bug 3822) or server umount (bug 24324). + * Cancel it now instead. */ + if (req->rq_export->exp_disconnected) { + LDLM_ERROR(lock, "lock on disconnected export %p", + req->rq_export); GOTO(out, rc = -ENOTCONN); } @@ -1128,13 +1159,14 @@ existing_lock: * local_lock_enqueue by the policy function. */ cookie = req; } else { - lock_res_and_lock(lock); + /* based on the assumption that lvb size never changes during + * resource life time otherwise it need resource->lr_lock's + * protection */ if (lock->l_resource->lr_lvb_len) { req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lock->l_resource->lr_lvb_len); } - unlock_res_and_lock(lock); if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR)) GOTO(out, rc = -ENOMEM); @@ -1145,7 +1177,10 @@ existing_lock: } if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN) - lock->l_policy_data = dlm_req->lock_desc.l_policy_data; + ldlm_convert_policy_to_local( + dlm_req->lock_desc.l_resource.lr_type, + &dlm_req->lock_desc.l_policy_data, + &lock->l_policy_data); if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) lock->l_req_extent = lock->l_policy_data.l_extent; @@ -1168,9 +1203,10 @@ existing_lock: dlm_rep->lock_flags |= dlm_req->lock_flags & LDLM_INHERIT_FLAGS; lock->l_flags |= dlm_req->lock_flags & LDLM_INHERIT_FLAGS; - /* Don't move a pending lock onto the export if it has already - * been evicted. Cancel it now instead. (bug 5683) */ - if (unlikely(req->rq_export->exp_failed || + /* Don't move a pending lock onto the export if it has already been + * disconnected due to eviction (bug 5683) or server umount (bug 24324). + * Cancel it now instead. */ + if (unlikely(req->rq_export->exp_disconnected || OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_OLD_EXPORT))) { LDLM_ERROR(lock, "lock on destroyed export %p", req->rq_export); rc = -ENOTCONN; @@ -1234,24 +1270,28 @@ existing_lock: LDLM_DEBUG(lock, "server-side enqueue handler, sending reply" "(err=%d, rc=%d)", err, rc); - lock_res_and_lock(lock); if (rc == 0) { if (lock->l_resource->lr_lvb_len > 0) { + /* MDT path won't handle lr_lvb_data, so + * lock/unlock better be contained in the + * if block */ void *lvb; lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB); LASSERTF(lvb != NULL, "req %p, lock %p\n", req, lock); - + lock_res(lock->l_resource); memcpy(lvb, lock->l_resource->lr_lvb_data, lock->l_resource->lr_lvb_len); + unlock_res(lock->l_resource); } } else { + lock_res_and_lock(lock); ldlm_resource_unlink_lock(lock); ldlm_lock_destroy_nolock(lock); + unlock_res_and_lock(lock); } - unlock_res_and_lock(lock); if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK) ldlm_reprocess_all(lock->l_resource); @@ -1436,10 +1476,7 @@ int ldlm_handle_cancel(struct ptlrpc_request *req) if (!ldlm_request_cancel(req, dlm_req, 0)) req->rq_status = ESTALE; - if (ptlrpc_reply(req) != 0) - LBUG(); - - RETURN(0); + RETURN(ptlrpc_reply(req)); } void ldlm_handle_bl_callback(struct ldlm_namespace *ns, @@ -1515,7 +1552,10 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, } if (lock->l_resource->lr_type != LDLM_PLAIN) { - lock->l_policy_data = dlm_req->lock_desc.l_policy_data; + ldlm_convert_policy_to_local( + dlm_req->lock_desc.l_resource.lr_type, + &dlm_req->lock_desc.l_policy_data, + &lock->l_policy_data); LDLM_DEBUG(lock, "completion AST, new policy data"); } @@ -1561,7 +1601,11 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work"); - ldlm_run_ast_work(&ast_list, LDLM_WORK_CP_AST); + /* Let Enqueue to call osc_lock_upcall() and initialize + * l_ast_data */ + OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 2); + + ldlm_run_ast_work(ns, &ast_list, LDLM_WORK_CP_AST); LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)", lock); @@ -1622,22 +1666,46 @@ static int ldlm_callback_reply(struct ptlrpc_request *req, int rc) } #ifdef __KERNEL__ -static int ldlm_bl_to_thread(struct ldlm_namespace *ns, - struct ldlm_lock_desc *ld, struct ldlm_lock *lock, - cfs_list_t *cancels, int count) +static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi, int mode) { struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool; - struct ldlm_bl_work_item *blwi; ENTRY; - if (cancels && count == 0) - RETURN(0); + cfs_spin_lock(&blp->blp_lock); + if (blwi->blwi_lock && blwi->blwi_lock->l_flags & LDLM_FL_DISCARD_DATA) { + /* add LDLM_FL_DISCARD_DATA requests to the priority list */ + cfs_list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list); + } else { + /* other blocking callbacks are added to the regular list */ + cfs_list_add_tail(&blwi->blwi_entry, &blp->blp_list); + } + cfs_spin_unlock(&blp->blp_lock); - OBD_ALLOC(blwi, sizeof(*blwi)); - if (blwi == NULL) - RETURN(-ENOMEM); + cfs_waitq_signal(&blp->blp_waitq); + + /* can not use blwi->blwi_mode as blwi could be already freed in + LDLM_ASYNC mode */ + if (mode == LDLM_SYNC) + cfs_wait_for_completion(&blwi->blwi_comp); + + RETURN(0); +} + +static inline void init_blwi(struct ldlm_bl_work_item *blwi, + struct ldlm_namespace *ns, + struct ldlm_lock_desc *ld, + cfs_list_t *cancels, int count, + struct ldlm_lock *lock, + int mode) +{ + cfs_init_completion(&blwi->blwi_comp); + CFS_INIT_LIST_HEAD(&blwi->blwi_head); + + if (cfs_memory_pressure_get()) + blwi->blwi_mem_pressure = 1; blwi->blwi_ns = ns; + blwi->blwi_mode = mode; if (ld != NULL) blwi->blwi_ld = *ld; if (count) { @@ -1647,36 +1715,53 @@ static int ldlm_bl_to_thread(struct ldlm_namespace *ns, } else { blwi->blwi_lock = lock; } - cfs_spin_lock(&blp->blp_lock); - if (lock && lock->l_flags & LDLM_FL_DISCARD_DATA) { - /* add LDLM_FL_DISCARD_DATA requests to the priority list */ - cfs_list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list); +} + +static int ldlm_bl_to_thread(struct ldlm_namespace *ns, + struct ldlm_lock_desc *ld, struct ldlm_lock *lock, + cfs_list_t *cancels, int count, int mode) +{ + ENTRY; + + if (cancels && count == 0) + RETURN(0); + + if (mode == LDLM_SYNC) { + /* if it is synchronous call do minimum mem alloc, as it could + * be triggered from kernel shrinker + */ + struct ldlm_bl_work_item blwi; + memset(&blwi, 0, sizeof(blwi)); + init_blwi(&blwi, ns, ld, cancels, count, lock, LDLM_SYNC); + RETURN(__ldlm_bl_to_thread(&blwi, LDLM_SYNC)); } else { - /* other blocking callbacks are added to the regular list */ - cfs_list_add_tail(&blwi->blwi_entry, &blp->blp_list); - } - cfs_waitq_signal(&blp->blp_waitq); - cfs_spin_unlock(&blp->blp_lock); + struct ldlm_bl_work_item *blwi; + OBD_ALLOC(blwi, sizeof(*blwi)); + if (blwi == NULL) + RETURN(-ENOMEM); + init_blwi(blwi, ns, ld, cancels, count, lock, LDLM_ASYNC); - RETURN(0); + RETURN(__ldlm_bl_to_thread(blwi, LDLM_ASYNC)); + } } + #endif int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, struct ldlm_lock *lock) { #ifdef __KERNEL__ - RETURN(ldlm_bl_to_thread(ns, ld, lock, NULL, 0)); + RETURN(ldlm_bl_to_thread(ns, ld, lock, NULL, 0, LDLM_ASYNC)); #else RETURN(-ENOSYS); #endif } int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, - cfs_list_t *cancels, int count) + cfs_list_t *cancels, int count, int mode) { #ifdef __KERNEL__ - RETURN(ldlm_bl_to_thread(ns, ld, NULL, cancels, count)); + RETURN(ldlm_bl_to_thread(ns, ld, NULL, cancels, count, mode)); #else RETURN(-ENOSYS); #endif @@ -1692,7 +1777,7 @@ static int ldlm_handle_setinfo(struct ptlrpc_request *req) int rc = -ENOSYS; ENTRY; - DEBUG_REQ(D_ERROR, req, "%s: handle setinfo\n", obd->obd_name); + DEBUG_REQ(D_HSM, req, "%s: handle setinfo\n", obd->obd_name); req_capsule_set(&req->rq_pill, &RQF_OBD_SET_INFO); @@ -1725,6 +1810,20 @@ static int ldlm_handle_setinfo(struct ptlrpc_request *req) return rc; } +static inline void ldlm_callback_errmsg(struct ptlrpc_request *req, + const char *msg, int rc, + struct lustre_handle *handle) +{ + DEBUG_REQ((req->rq_no_reply || rc) ? D_WARNING : D_DLMTRACE, req, + "%s: [nid %s] [rc %d] [lock "LPX64"]", + msg, libcfs_id2str(req->rq_peer), rc, + handle ? handle->cookie : 0); + if (req->rq_no_reply) + CWARN("No reply was sent, maybe cause bug 21636.\n"); + else if (rc) + CWARN("Send reply failed, maybe cause bug 21636.\n"); +} + /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */ static int ldlm_callback_handler(struct ptlrpc_request *req) { @@ -1746,7 +1845,9 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) req_capsule_init(&req->rq_pill, req, RCL_SERVER); if (req->rq_export == NULL) { - ldlm_callback_reply(req, -ENOTCONN); + rc = ldlm_callback_reply(req, -ENOTCONN); + ldlm_callback_errmsg(req, "Operate on unconnected server", + rc, NULL); RETURN(0); } @@ -1836,7 +1937,9 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ); if (dlm_req == NULL) { - ldlm_callback_reply(req, -EPROTO); + rc = ldlm_callback_reply(req, -EPROTO); + ldlm_callback_errmsg(req, "Operate without parameter", rc, + NULL); RETURN(0); } @@ -1853,7 +1956,9 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) if (!lock) { CDEBUG(D_DLMTRACE, "callback on lock "LPX64" - lock " "disappeared\n", dlm_req->lock_handle[0].cookie); - ldlm_callback_reply(req, -EINVAL); + rc = ldlm_callback_reply(req, -EINVAL); + ldlm_callback_errmsg(req, "Operate with invalid parameter", rc, + &dlm_req->lock_handle[0]); RETURN(0); } @@ -1877,7 +1982,9 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) dlm_req->lock_handle[0].cookie); unlock_res_and_lock(lock); LDLM_LOCK_RELEASE(lock); - ldlm_callback_reply(req, -EINVAL); + rc = ldlm_callback_reply(req, -EINVAL); + ldlm_callback_errmsg(req, "Operate on stale lock", rc, + &dlm_req->lock_handle[0]); RETURN(0); } /* BL_AST locks are not needed in lru. @@ -1900,8 +2007,12 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) case LDLM_BL_CALLBACK: CDEBUG(D_INODE, "blocking ast\n"); req_capsule_extend(&req->rq_pill, &RQF_LDLM_BL_CALLBACK); - if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)) - ldlm_callback_reply(req, 0); + if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)) { + rc = ldlm_callback_reply(req, 0); + if (req->rq_no_reply || rc) + ldlm_callback_errmsg(req, "Normal process", rc, + &dlm_req->lock_handle[0]); + } if (ldlm_bl_to_thread_lock(ns, &dlm_req->lock_desc, lock)) ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock); break; @@ -1938,9 +2049,11 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req) if (req->rq_export == NULL) { struct ldlm_request *dlm_req; - CERROR("operation %d from %s with bad export cookie "LPU64"\n", - lustre_msg_get_opc(req->rq_reqmsg), - libcfs_id2str(req->rq_peer), + CERROR("%s from %s arrived at %lu with bad export cookie " + LPU64"\n", + ll_opcode2str(lustre_msg_get_opc(req->rq_reqmsg)), + libcfs_nid2str(req->rq_peer.nid), + req->rq_arrival_time.tv_sec, lustre_msg_get_handle(req->rq_reqmsg)->cookie); if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_CANCEL) { @@ -1986,28 +2099,114 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req) RETURN(0); } -void ldlm_revoke_lock_cb(void *obj, void *data) +static int ldlm_cancel_hpreq_lock_match(struct ptlrpc_request *req, + struct ldlm_lock *lock) +{ + struct ldlm_request *dlm_req; + struct lustre_handle lockh; + int rc = 0; + int i; + ENTRY; + + dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ); + if (dlm_req == NULL) + RETURN(0); + + ldlm_lock2handle(lock, &lockh); + for (i = 0; i < dlm_req->lock_count; i++) { + if (lustre_handle_equal(&dlm_req->lock_handle[i], + &lockh)) { + DEBUG_REQ(D_RPCTRACE, req, + "Prio raised by lock "LPX64".", lockh.cookie); + + rc = 1; + break; + } + } + + RETURN(rc); + +} + +static int ldlm_cancel_hpreq_check(struct ptlrpc_request *req) +{ + struct ldlm_request *dlm_req; + int rc = 0; + int i; + ENTRY; + + /* no prolong in recovery */ + if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) + RETURN(0); + + dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ); + if (dlm_req == NULL) + RETURN(-EFAULT); + + for (i = 0; i < dlm_req->lock_count; i++) { + struct ldlm_lock *lock; + + lock = ldlm_handle2lock(&dlm_req->lock_handle[i]); + if (lock == NULL) + continue; + + rc = !!(lock->l_flags & LDLM_FL_AST_SENT); + if (rc) + LDLM_DEBUG(lock, "hpreq cancel lock"); + LDLM_LOCK_PUT(lock); + + if (rc) + break; + } + + RETURN(rc); +} + +static struct ptlrpc_hpreq_ops ldlm_cancel_hpreq_ops = { + .hpreq_lock_match = ldlm_cancel_hpreq_lock_match, + .hpreq_check = ldlm_cancel_hpreq_check +}; + +static int ldlm_hpreq_handler(struct ptlrpc_request *req) +{ + ENTRY; + + req_capsule_init(&req->rq_pill, req, RCL_SERVER); + + if (req->rq_export == NULL) + RETURN(0); + + if (LDLM_CANCEL == lustre_msg_get_opc(req->rq_reqmsg)) { + req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL); + req->rq_ops = &ldlm_cancel_hpreq_ops; + } + RETURN(0); +} + +int ldlm_revoke_lock_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd, + cfs_hlist_node_t *hnode, void *data) + { cfs_list_t *rpc_list = data; - struct ldlm_lock *lock = obj; + struct ldlm_lock *lock = cfs_hash_object(hs, hnode); lock_res_and_lock(lock); if (lock->l_req_mode != lock->l_granted_mode) { unlock_res_and_lock(lock); - return; + return 0; } LASSERT(lock->l_resource); if (lock->l_resource->lr_type != LDLM_IBITS && lock->l_resource->lr_type != LDLM_PLAIN) { unlock_res_and_lock(lock); - return; + return 0; } if (lock->l_flags & LDLM_FL_AST_SENT) { unlock_res_and_lock(lock); - return; + return 0; } LASSERT(lock->l_blocking_ast); @@ -2022,6 +2221,7 @@ void ldlm_revoke_lock_cb(void *obj, void *data) LDLM_LOCK_GET(lock); unlock_res_and_lock(lock); + return 0; } void ldlm_revoke_export_locks(struct obd_export *exp) @@ -2032,7 +2232,8 @@ void ldlm_revoke_export_locks(struct obd_export *exp) CFS_INIT_LIST_HEAD(&rpc_list); cfs_hash_for_each_empty(exp->exp_lock_hash, ldlm_revoke_lock_cb, &rpc_list); - ldlm_run_ast_work(&rpc_list, LDLM_WORK_REVOKE_AST); + ldlm_run_ast_work(exp->exp_obd->obd_namespace, &rpc_list, + LDLM_WORK_REVOKE_AST); EXIT; } @@ -2081,7 +2282,7 @@ static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp) int rc; cfs_init_completion(&bltd.bltd_comp); - rc = cfs_kernel_thread(ldlm_bl_thread_main, &bltd, 0); + rc = cfs_create_thread(ldlm_bl_thread_main, &bltd, 0); if (rc < 0) { CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %d\n", cfs_atomic_read(&blp->blp_num_threads), rc); @@ -2117,45 +2318,55 @@ static int ldlm_bl_thread_main(void *arg) while (1) { struct l_wait_info lwi = { 0 }; struct ldlm_bl_work_item *blwi = NULL; + int busy; blwi = ldlm_bl_get_work(blp); if (blwi == NULL) { - int busy; - cfs_atomic_dec(&blp->blp_busy_threads); l_wait_event_exclusive(blp->blp_waitq, (blwi = ldlm_bl_get_work(blp)) != NULL, &lwi); busy = cfs_atomic_inc_return(&blp->blp_busy_threads); - - if (blwi->blwi_ns == NULL) - /* added by ldlm_cleanup() */ - break; - - /* Not fatal if racy and have a few too many threads */ - if (unlikely(busy < blp->blp_max_threads && - busy >= cfs_atomic_read(&blp->blp_num_threads))) - /* discard the return value, we tried */ - ldlm_bl_thread_start(blp); } else { - if (blwi->blwi_ns == NULL) - /* added by ldlm_cleanup() */ - break; + busy = cfs_atomic_read(&blp->blp_busy_threads); } + if (blwi->blwi_ns == NULL) + /* added by ldlm_cleanup() */ + break; + + /* Not fatal if racy and have a few too many threads */ + if (unlikely(busy < blp->blp_max_threads && + busy >= cfs_atomic_read(&blp->blp_num_threads) && + !blwi->blwi_mem_pressure)) + /* discard the return value, we tried */ + ldlm_bl_thread_start(blp); + + if (blwi->blwi_mem_pressure) + cfs_memory_pressure_set(); + if (blwi->blwi_count) { + int count; /* The special case when we cancel locks in lru * asynchronously, we pass the list of locks here. - * Thus lock is marked LDLM_FL_CANCELING, and already - * canceled locally. */ - ldlm_cli_cancel_list(&blwi->blwi_head, - blwi->blwi_count, NULL, 0); + * Thus locks are marked LDLM_FL_CANCELING, but NOT + * canceled locally yet. */ + count = ldlm_cli_cancel_list_local(&blwi->blwi_head, + blwi->blwi_count, + LCF_BL_AST); + ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL, 0); } else { ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld, blwi->blwi_lock); } - OBD_FREE(blwi, sizeof(*blwi)); + if (blwi->blwi_mem_pressure) + cfs_memory_pressure_clr(); + + if (blwi->blwi_mode == LDLM_ASYNC) + OBD_FREE(blwi, sizeof(*blwi)); + else + cfs_complete(&blwi->blwi_comp); } cfs_atomic_dec(&blp->blp_busy_threads); @@ -2206,7 +2417,7 @@ void ldlm_put_ref(void) * Export handle<->lock hash operations. */ static unsigned -ldlm_export_lock_hash(cfs_hash_t *hs, void *key, unsigned mask) +ldlm_export_lock_hash(cfs_hash_t *hs, const void *key, unsigned mask) { return cfs_hash_u64_hash(((struct lustre_handle *)key)->cookie, mask); } @@ -2215,49 +2426,59 @@ static void * ldlm_export_lock_key(cfs_hlist_node_t *hnode) { struct ldlm_lock *lock; - ENTRY; lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash); - RETURN(&lock->l_remote_handle); + return &lock->l_remote_handle; +} + +static void +ldlm_export_lock_keycpy(cfs_hlist_node_t *hnode, void *key) +{ + struct ldlm_lock *lock; + + lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash); + lock->l_remote_handle = *(struct lustre_handle *)key; } static int -ldlm_export_lock_compare(void *key, cfs_hlist_node_t *hnode) +ldlm_export_lock_keycmp(const void *key, cfs_hlist_node_t *hnode) { - ENTRY; - RETURN(lustre_handle_equal(ldlm_export_lock_key(hnode), key)); + return lustre_handle_equal(ldlm_export_lock_key(hnode), key); } static void * -ldlm_export_lock_get(cfs_hlist_node_t *hnode) +ldlm_export_lock_object(cfs_hlist_node_t *hnode) +{ + return cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash); +} + +static void +ldlm_export_lock_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode) { struct ldlm_lock *lock; - ENTRY; lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash); LDLM_LOCK_GET(lock); - - RETURN(lock); } -static void * -ldlm_export_lock_put(cfs_hlist_node_t *hnode) +static void +ldlm_export_lock_put(cfs_hash_t *hs, cfs_hlist_node_t *hnode) { struct ldlm_lock *lock; - ENTRY; lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash); LDLM_LOCK_RELEASE(lock); - - RETURN(lock); } static cfs_hash_ops_t ldlm_export_lock_ops = { - .hs_hash = ldlm_export_lock_hash, - .hs_key = ldlm_export_lock_key, - .hs_compare = ldlm_export_lock_compare, - .hs_get = ldlm_export_lock_get, - .hs_put = ldlm_export_lock_put + .hs_hash = ldlm_export_lock_hash, + .hs_key = ldlm_export_lock_key, + .hs_keycmp = ldlm_export_lock_keycmp, + .hs_keycpy = ldlm_export_lock_keycpy, + .hs_object = ldlm_export_lock_object, + .hs_get = ldlm_export_lock_get, + .hs_put = ldlm_export_lock_put, + .hs_put_locked = ldlm_export_lock_put, }; int ldlm_init_export(struct obd_export *exp) @@ -2266,8 +2487,13 @@ int ldlm_init_export(struct obd_export *exp) exp->exp_lock_hash = cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid), - HASH_EXP_LOCK_CUR_BITS, HASH_EXP_LOCK_MAX_BITS, - &ldlm_export_lock_ops, CFS_HASH_REHASH); + HASH_EXP_LOCK_CUR_BITS, + HASH_EXP_LOCK_MAX_BITS, + HASH_EXP_LOCK_BKT_BITS, 0, + CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA, + &ldlm_export_lock_ops, + CFS_HASH_DEFAULT | CFS_HASH_REHASH_KEY | + CFS_HASH_NBLK_CHANGE); if (!exp->exp_lock_hash) RETURN(-ENOMEM); @@ -2279,7 +2505,7 @@ EXPORT_SYMBOL(ldlm_init_export); void ldlm_destroy_export(struct obd_export *exp) { ENTRY; - cfs_hash_destroy(exp->exp_lock_hash); + cfs_hash_putref(exp->exp_lock_hash); exp->exp_lock_hash = NULL; EXIT; } @@ -2344,7 +2570,7 @@ static int ldlm_setup(void) ldlm_min_threads, ldlm_max_threads, "ldlm_cn", LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD, - NULL); + ldlm_hpreq_handler); if (!ldlm_state->ldlm_cancel_service) { CERROR("failed to start service\n"); @@ -2372,11 +2598,11 @@ static int ldlm_setup(void) GOTO(out_thread, rc); } - rc = ptlrpc_start_threads(NULL, ldlm_state->ldlm_cancel_service); + rc = ptlrpc_start_threads(ldlm_state->ldlm_cancel_service); if (rc) GOTO(out_thread, rc); - rc = ptlrpc_start_threads(NULL, ldlm_state->ldlm_cb_service); + rc = ptlrpc_start_threads(ldlm_state->ldlm_cb_service); if (rc) GOTO(out_thread, rc); @@ -2388,7 +2614,7 @@ static int ldlm_setup(void) cfs_spin_lock_init(&waiting_locks_spinlock); cfs_timer_init(&waiting_locks_timer, waiting_locks_callback, 0); - rc = cfs_kernel_thread(expired_lock_main, NULL, CLONE_VM | CLONE_FILES); + rc = cfs_create_thread(expired_lock_main, NULL, CFS_DAEMON_FLAGS); if (rc < 0) { CERROR("Cannot start ldlm expired-lock thread: %d\n", rc); GOTO(out_thread, rc); @@ -2474,7 +2700,7 @@ static int ldlm_cleanup(void) RETURN(0); } -int __init ldlm_init(void) +int ldlm_init(void) { cfs_init_mutex(&ldlm_ref_sem); cfs_init_mutex(ldlm_namespace_lock(LDLM_NAMESPACE_SERVER)); @@ -2507,7 +2733,7 @@ int __init ldlm_init(void) return 0; } -void __exit ldlm_exit(void) +void ldlm_exit(void) { int rc; if (ldlm_refcount) @@ -2575,9 +2801,9 @@ EXPORT_SYMBOL(ldlm_cli_cancel_req); EXPORT_SYMBOL(ldlm_replay_locks); EXPORT_SYMBOL(ldlm_resource_foreach); EXPORT_SYMBOL(ldlm_namespace_foreach); -EXPORT_SYMBOL(ldlm_namespace_foreach_res); EXPORT_SYMBOL(ldlm_resource_iterate); EXPORT_SYMBOL(ldlm_cancel_resource_local); +EXPORT_SYMBOL(ldlm_cli_cancel_list_local); EXPORT_SYMBOL(ldlm_cli_cancel_list); /* ldlm_lockd.c */