X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fldlm%2Fldlm_lockd.c;h=27178d1754c6a1e908b9c517569740b2f048e27c;hb=2ad695f40cf1920033d77723311ed5ed7a3b07cc;hp=da9a6f76a8afafe7080a4d1f30c2893d801b49d8;hpb=30d23e8d36d8b0b4d2571e7801e3643b0e7ab22e;p=fs%2Flustre-release.git diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index da9a6f7..27178d1 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -1,6 +1,4 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * +/* * GPL HEADER START * * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. @@ -29,8 +27,7 @@ * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011 Whamcloud, Inc. - * + * Copyright (c) 2011, 2012, Whamcloud, Inc. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -42,9 +39,6 @@ * Author: Phil Schwan */ -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif #define DEBUG_SUBSYSTEM S_LDLM #ifdef __KERNEL__ @@ -58,15 +52,17 @@ #include #include "ldlm_internal.h" -#ifdef __KERNEL__ static int ldlm_num_threads; CFS_MODULE_PARM(ldlm_num_threads, "i", int, 0444, "number of DLM service threads to start"); -#endif + +static char *ldlm_cpts; +CFS_MODULE_PARM(ldlm_cpts, "s", charp, 0444, + "CPU partitions ldlm threads should run on"); extern cfs_mem_cache_t *ldlm_resource_slab; extern cfs_mem_cache_t *ldlm_lock_slab; -static cfs_semaphore_t ldlm_ref_sem; +static cfs_mutex_t ldlm_ref_mutex; static int ldlm_refcount; struct ldlm_cb_async_args { @@ -92,20 +88,6 @@ static inline unsigned int ldlm_get_rq_timeout(void) return timeout < 1 ? 1 : timeout; } -#ifdef __KERNEL__ -/* w_l_spinlock protects both waiting_locks_list and expired_lock_thread */ -static cfs_spinlock_t waiting_locks_spinlock; /* BH lock (timer) */ -static cfs_list_t waiting_locks_list; -static cfs_timer_t waiting_locks_timer; - -static struct expired_lock_thread { - cfs_waitq_t elt_waitq; - int elt_state; - int elt_dump; - cfs_list_t elt_expired_locks; -} expired_lock_thread; -#endif - #define ELT_STOPPED 0 #define ELT_READY 1 #define ELT_TERMINATE 2 @@ -146,7 +128,19 @@ struct ldlm_bl_work_item { int blwi_mem_pressure; }; -#ifdef __KERNEL__ +#if defined(HAVE_SERVER_SUPPORT) && defined(__KERNEL__) + +/* w_l_spinlock protects both waiting_locks_list and expired_lock_thread */ +static cfs_spinlock_t waiting_locks_spinlock; /* BH lock (timer) */ +static cfs_list_t waiting_locks_list; +static cfs_timer_t waiting_locks_timer; + +static struct expired_lock_thread { + cfs_waitq_t elt_waitq; + int elt_state; + int elt_dump; + cfs_list_t elt_expired_locks; +} expired_lock_thread; static inline int have_expired_locks(void) { @@ -180,13 +174,15 @@ static int expired_lock_main(void *arg) cfs_spin_lock_bh(&waiting_locks_spinlock); if (expired_lock_thread.elt_dump) { + struct libcfs_debug_msg_data msgdata = { + .msg_file = __FILE__, + .msg_fn = "waiting_locks_callback", + .msg_line = expired_lock_thread.elt_dump }; cfs_spin_unlock_bh(&waiting_locks_spinlock); /* from waiting_locks_callback, but not in timer */ libcfs_debug_dumplog(); - libcfs_run_lbug_upcall(__FILE__, - "waiting_locks_callback", - expired_lock_thread.elt_dump); + libcfs_run_lbug_upcall(&msgdata); cfs_spin_lock_bh(&waiting_locks_spinlock); expired_lock_thread.elt_dump = 0; @@ -219,6 +215,13 @@ static int expired_lock_main(void *arg) LDLM_LOCK_RELEASE(lock); continue; } + + if (lock->l_destroyed) { + /* release the lock refcount where + * waiting_locks_callback() founds */ + LDLM_LOCK_RELEASE(lock); + continue; + } export = class_export_lock_get(lock->l_export, lock); cfs_spin_unlock_bh(&waiting_locks_spinlock); @@ -249,6 +252,7 @@ static int expired_lock_main(void *arg) } static int ldlm_add_waiting_lock(struct ldlm_lock *lock); +static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, int seconds); /** * Check if there is a request in the export request list @@ -264,7 +268,7 @@ static int ldlm_lock_busy(struct ldlm_lock *lock) return 0; cfs_spin_lock_bh(&lock->l_export->exp_rpc_lock); - cfs_list_for_each_entry(req, &lock->l_export->exp_queued_rpc, + cfs_list_for_each_entry(req, &lock->l_export->exp_hp_rpcs, rq_exp_list) { if (req->rq_ops->hpreq_lock_match) { match = req->rq_ops->hpreq_lock_match(req, lock); @@ -279,9 +283,9 @@ static int ldlm_lock_busy(struct ldlm_lock *lock) /* This is called from within a timer interrupt and cannot schedule */ static void waiting_locks_callback(unsigned long unused) { - struct ldlm_lock *lock; + struct ldlm_lock *lock; + int need_dump = 0; -repeat: cfs_spin_lock_bh(&waiting_locks_spinlock); while (!cfs_list_empty(&waiting_locks_list)) { lock = cfs_list_entry(waiting_locks_list.next, struct ldlm_lock, @@ -303,9 +307,16 @@ repeat: libcfs_nid2str(lock->l_export->exp_connection->c_peer.nid)); cfs_list_del_init(&lock->l_pending_chain); - cfs_spin_unlock_bh(&waiting_locks_spinlock); - ldlm_add_waiting_lock(lock); - goto repeat; + if (lock->l_destroyed) { + /* relay the lock refcount decrease to + * expired lock thread */ + cfs_list_add(&lock->l_pending_chain, + &expired_lock_thread.elt_expired_locks); + } else { + __ldlm_add_waiting_lock(lock, + ldlm_get_enq_timeout(lock)); + } + continue; } /* if timeout overlaps the activation time of suspended timeouts @@ -319,9 +330,16 @@ repeat: libcfs_nid2str(lock->l_export->exp_connection->c_peer.nid)); cfs_list_del_init(&lock->l_pending_chain); - cfs_spin_unlock_bh(&waiting_locks_spinlock); - ldlm_add_waiting_lock(lock); - goto repeat; + if (lock->l_destroyed) { + /* relay the lock refcount decrease to + * expired lock thread */ + cfs_list_add(&lock->l_pending_chain, + &expired_lock_thread.elt_expired_locks); + } else { + __ldlm_add_waiting_lock(lock, + ldlm_get_enq_timeout(lock)); + } + continue; } /* Check if we need to prolong timeout */ @@ -361,14 +379,15 @@ repeat: cfs_list_del(&lock->l_pending_chain); cfs_list_add(&lock->l_pending_chain, &expired_lock_thread.elt_expired_locks); - } + need_dump = 1; + } - if (!cfs_list_empty(&expired_lock_thread.elt_expired_locks)) { - if (obd_dump_on_timeout) - expired_lock_thread.elt_dump = __LINE__; + if (!cfs_list_empty(&expired_lock_thread.elt_expired_locks)) { + if (obd_dump_on_timeout && need_dump) + expired_lock_thread.elt_dump = __LINE__; - cfs_waitq_signal(&expired_lock_thread.elt_waitq); - } + cfs_waitq_signal(&expired_lock_thread.elt_waitq); + } /* * Make sure the timer will fire again if we have any locks @@ -426,10 +445,14 @@ static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, int seconds) static int ldlm_add_waiting_lock(struct ldlm_lock *lock) { - int ret; - int timeout = ldlm_get_enq_timeout(lock); + int ret; + int timeout = ldlm_get_enq_timeout(lock); - LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)); + /* NB: must be called with hold of lock_res_and_lock() */ + LASSERT(lock->l_res_locked); + lock->l_waited = 1; + + LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)); cfs_spin_lock_bh(&waiting_locks_spinlock); if (lock->l_destroyed) { @@ -444,12 +467,21 @@ static int ldlm_add_waiting_lock(struct ldlm_lock *lock) } ret = __ldlm_add_waiting_lock(lock, timeout); - if (ret) + if (ret) { /* grab ref on the lock if it has been added to the * waiting list */ LDLM_LOCK_GET(lock); + } cfs_spin_unlock_bh(&waiting_locks_spinlock); + if (ret) { + cfs_spin_lock_bh(&lock->l_export->exp_bl_list_lock); + if (cfs_list_empty(&lock->l_exp_list)) + cfs_list_add(&lock->l_exp_list, + &lock->l_export->exp_bl_list); + cfs_spin_unlock_bh(&lock->l_export->exp_bl_list_lock); + } + LDLM_DEBUG(lock, "%sadding to wait list(timeout: %d, AT: %s)", ret == 0 ? "not re-" : "", timeout, AT_OFF ? "off" : "on"); @@ -504,14 +536,22 @@ int ldlm_del_waiting_lock(struct ldlm_lock *lock) cfs_spin_lock_bh(&waiting_locks_spinlock); ret = __ldlm_del_waiting_lock(lock); cfs_spin_unlock_bh(&waiting_locks_spinlock); - if (ret) + + /* remove the lock out of export blocking list */ + cfs_spin_lock_bh(&lock->l_export->exp_bl_list_lock); + cfs_list_del_init(&lock->l_exp_list); + cfs_spin_unlock_bh(&lock->l_export->exp_bl_list_lock); + + if (ret) { /* release lock ref if it has indeed been removed * from a list */ LDLM_LOCK_RELEASE(lock); + } LDLM_DEBUG(lock, "%s", ret == 0 ? "wasn't waiting" : "removed"); return ret; } +EXPORT_SYMBOL(ldlm_del_waiting_lock); /* * Prolong the lock @@ -543,13 +583,9 @@ int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout) LDLM_DEBUG(lock, "refreshed"); return 1; } -#else /* !__KERNEL__ */ +EXPORT_SYMBOL(ldlm_refresh_waiting_lock); -static int ldlm_add_waiting_lock(struct ldlm_lock *lock) -{ - LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)); - RETURN(1); -} +#else /* !HAVE_SERVER_SUPPORT || !__KERNEL__ */ int ldlm_del_waiting_lock(struct ldlm_lock *lock) { @@ -560,7 +596,19 @@ int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout) { RETURN(0); } -#endif /* __KERNEL__ */ + +# ifdef HAVE_SERVER_SUPPORT +static int ldlm_add_waiting_lock(struct ldlm_lock *lock) +{ + LASSERT(lock->l_res_locked); + LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)); + RETURN(1); +} + +# endif +#endif /* HAVE_SERVER_SUPPORT && __KERNEL__ */ + +#ifdef HAVE_SERVER_SUPPORT static void ldlm_failed_ast(struct ldlm_lock *lock, int rc, const char *ast_type) @@ -653,40 +701,69 @@ static int ldlm_cb_interpret(const struct lu_env *env, ENTRY; LASSERT(lock != NULL); - if (rc != 0) { - rc = ldlm_handle_ast_error(lock, req, rc, - arg->type == LDLM_BL_CALLBACK - ? "blocking" : "completion"); - if (rc == -ERESTART) - cfs_atomic_inc(&arg->restart); - } + + switch (arg->type) { + case LDLM_GL_CALLBACK: + /* Update the LVB from disk if the AST failed + * (this is a legal race) + * + * - Glimpse callback of local lock just returns + * -ELDLM_NO_LOCK_DATA. + * - Glimpse callback of remote lock might return + * -ELDLM_NO_LOCK_DATA when inode is cleared. LU-274 + */ + if (rc == -ELDLM_NO_LOCK_DATA) { + LDLM_DEBUG(lock, "lost race - client has a lock but no " + "inode"); + ldlm_res_lvbo_update(lock->l_resource, NULL, 1); + } else if (rc != 0) { + rc = ldlm_handle_ast_error(lock, req, rc, "glimpse"); + } else { + rc = ldlm_res_lvbo_update(lock->l_resource, req, 1); + } + break; + case LDLM_BL_CALLBACK: + if (rc != 0) + rc = ldlm_handle_ast_error(lock, req, rc, "blocking"); + break; + case LDLM_CP_CALLBACK: + if (rc != 0) + rc = ldlm_handle_ast_error(lock, req, rc, "completion"); + break; + default: + LDLM_ERROR(lock, "invalid opcode for lock callback %d", + arg->type); + LBUG(); + } + + /* release extra reference taken in ldlm_ast_fini() */ LDLM_LOCK_RELEASE(lock); - if (cfs_atomic_dec_return(&arg->rpcs) < arg->threshold) - cfs_waitq_signal(&arg->waitq); + if (rc == -ERESTART) + cfs_atomic_inc(&arg->restart); + RETURN(0); } -static inline int ldlm_bl_and_cp_ast_tail(struct ptlrpc_request *req, - struct ldlm_cb_set_arg *arg, - struct ldlm_lock *lock, - int instant_cancel) +static inline int ldlm_ast_fini(struct ptlrpc_request *req, + struct ldlm_cb_set_arg *arg, + struct ldlm_lock *lock, + int instant_cancel) { - int rc = 0; - ENTRY; - - if (unlikely(instant_cancel)) { - rc = ptl_send_rpc(req, 1); - ptlrpc_req_finished(req); - if (rc == 0) - cfs_atomic_inc(&arg->restart); - } else { - LDLM_LOCK_GET(lock); - ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); - cfs_atomic_inc(&arg->rpcs); - } - - RETURN(rc); + int rc = 0; + ENTRY; + + if (unlikely(instant_cancel)) { + rc = ptl_send_rpc(req, 1); + ptlrpc_req_finished(req); + if (rc == 0) + cfs_atomic_inc(&arg->restart); + } else { + LDLM_LOCK_GET(lock); + ptlrpc_set_add_req(arg->set, req); + } + + RETURN(rc); } /** @@ -704,9 +781,13 @@ static void ldlm_lock_reorder_req(struct ldlm_lock *lock) } cfs_spin_lock_bh(&lock->l_export->exp_rpc_lock); - cfs_list_for_each_entry(req, &lock->l_export->exp_queued_rpc, + cfs_list_for_each_entry(req, &lock->l_export->exp_hp_rpcs, rq_exp_list) { - if (!req->rq_hp && req->rq_ops->hpreq_lock_match && + /* Do not process requests that were not yet added to there + * incoming queue or were already removed from there for + * processing */ + if (!req->rq_hp && !cfs_list_empty(&req->rq_list) && + req->rq_ops->hpreq_lock_match && req->rq_ops->hpreq_lock_match(req, lock)) ptlrpc_hpreq_reorder(req); } @@ -739,10 +820,8 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, LASSERT(lock); LASSERT(data != NULL); - if (lock->l_export->exp_obd->obd_recovering != 0) { + if (lock->l_export->exp_obd->obd_recovering != 0) LDLM_ERROR(lock, "BUG 6063: lock collide during recovery"); - ldlm_lock_dump(D_ERROR, lock, 0); - } ldlm_lock_reorder_req(lock); @@ -760,22 +839,23 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, req->rq_interpret_reply = ldlm_cb_interpret; req->rq_no_resend = 1; - lock_res(lock->l_resource); - if (lock->l_granted_mode != lock->l_req_mode) { - /* this blocking AST will be communicated as part of the - * completion AST instead */ - unlock_res(lock->l_resource); - ptlrpc_req_finished(req); - LDLM_DEBUG(lock, "lock not granted, not sending blocking AST"); - RETURN(0); - } + lock_res_and_lock(lock); + if (lock->l_granted_mode != lock->l_req_mode) { + /* this blocking AST will be communicated as part of the + * completion AST instead */ + unlock_res_and_lock(lock); - if (lock->l_destroyed) { - /* What's the point? */ - unlock_res(lock->l_resource); - ptlrpc_req_finished(req); - RETURN(0); - } + ptlrpc_req_finished(req); + LDLM_DEBUG(lock, "lock not granted, not sending blocking AST"); + RETURN(0); + } + + if (lock->l_destroyed) { + /* What's the point? */ + unlock_res_and_lock(lock); + ptlrpc_req_finished(req); + RETURN(0); + } if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) instant_cancel = 1; @@ -788,14 +868,14 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, LDLM_DEBUG(lock, "server preparing blocking AST"); ptlrpc_request_set_replen(req); - if (instant_cancel) { - unlock_res(lock->l_resource); - ldlm_lock_cancel(lock); - } else { - LASSERT(lock->l_granted_mode == lock->l_req_mode); - ldlm_add_waiting_lock(lock); - unlock_res(lock->l_resource); - } + if (instant_cancel) { + unlock_res_and_lock(lock); + ldlm_lock_cancel(lock); + } else { + LASSERT(lock->l_granted_mode == lock->l_req_mode); + ldlm_add_waiting_lock(lock); + unlock_res_and_lock(lock); + } req->rq_send_state = LUSTRE_IMP_FULL; /* ptlrpc_request_alloc_pack already set timeout */ @@ -807,10 +887,11 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats, LDLM_BL_CALLBACK - LDLM_FIRST_OPC); - rc = ldlm_bl_and_cp_ast_tail(req, arg, lock, instant_cancel); + rc = ldlm_ast_fini(req, arg, lock, instant_cancel); RETURN(rc); } +EXPORT_SYMBOL(ldlm_server_blocking_ast); int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) { @@ -821,6 +902,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) long total_enqueue_wait; int instant_cancel = 0; int rc = 0; + int lvb_len; ENTRY; LASSERT(lock != NULL); @@ -834,11 +916,11 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) if (req == NULL) RETURN(-ENOMEM); - /* server namespace, doesn't need lock */ - if (lock->l_resource->lr_lvb_len) { + /* server namespace, doesn't need lock */ + lvb_len = ldlm_lvbo_size(lock); + if (lvb_len > 0) req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT, - lock->l_resource->lr_lvb_len); - } + lvb_len); rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK); if (rc) { @@ -858,13 +940,12 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) body->lock_handle[0] = lock->l_remote_handle; body->lock_flags = flags; ldlm_lock2desc(lock, &body->lock_desc); - if (lock->l_resource->lr_lvb_len) { - void *lvb = req_capsule_client_get(&req->rq_pill, &RMF_DLM_LVB); + if (lvb_len > 0) { + void *lvb = req_capsule_client_get(&req->rq_pill, &RMF_DLM_LVB); - lock_res(lock->l_resource); - memcpy(lvb, lock->l_resource->lr_lvb_data, - lock->l_resource->lr_lvb_len); - unlock_res(lock->l_resource); + lvb_len = ldlm_lvbo_fill(lock, lvb, lvb_len); + req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, + lvb_len, RCL_CLIENT); } LDLM_DEBUG(lock, "server preparing completion AST (after %lds wait)", @@ -922,77 +1003,105 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats, LDLM_CP_CALLBACK - LDLM_FIRST_OPC); - rc = ldlm_bl_and_cp_ast_tail(req, arg, lock, instant_cancel); + rc = ldlm_ast_fini(req, arg, lock, instant_cancel); RETURN(rc); } +EXPORT_SYMBOL(ldlm_server_completion_ast); int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data) { - struct ldlm_resource *res = lock->l_resource; - struct ldlm_request *body; - struct ptlrpc_request *req; - int rc; + struct ldlm_cb_set_arg *arg = data; + struct ldlm_request *body; + struct ptlrpc_request *req; + struct ldlm_cb_async_args *ca; + int rc; + struct req_format *req_fmt; ENTRY; LASSERT(lock != NULL); + if (arg->gl_desc != NULL) + /* There is a glimpse descriptor to pack */ + req_fmt = &RQF_LDLM_GL_DESC_CALLBACK; + else + req_fmt = &RQF_LDLM_GL_CALLBACK; + req = ptlrpc_request_alloc_pack(lock->l_export->exp_imp_reverse, - &RQF_LDLM_GL_CALLBACK, - LUSTRE_DLM_VERSION, LDLM_GL_CALLBACK); + req_fmt, LUSTRE_DLM_VERSION, + LDLM_GL_CALLBACK); if (req == NULL) RETURN(-ENOMEM); + if (arg->gl_desc != NULL) { + /* copy the GL descriptor */ + union ldlm_gl_desc *desc; + desc = req_capsule_client_get(&req->rq_pill, &RMF_DLM_GL_DESC); + *desc = *arg->gl_desc; + } + body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ); body->lock_handle[0] = lock->l_remote_handle; ldlm_lock2desc(lock, &body->lock_desc); + CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args)); + ca = ptlrpc_req_async_args(req); + ca->ca_set_arg = arg; + ca->ca_lock = lock; + /* server namespace, doesn't need lock */ req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, - lock->l_resource->lr_lvb_len); - res = lock->l_resource; + ldlm_lvbo_size(lock)); ptlrpc_request_set_replen(req); - req->rq_send_state = LUSTRE_IMP_FULL; /* ptlrpc_request_alloc_pack already set timeout */ if (AT_OFF) req->rq_timeout = ldlm_get_rq_timeout(); + req->rq_interpret_reply = ldlm_cb_interpret; + if (lock->l_export && lock->l_export->exp_nid_stats && lock->l_export->exp_nid_stats->nid_ldlm_stats) lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats, LDLM_GL_CALLBACK - LDLM_FIRST_OPC); - rc = ptlrpc_queue_wait(req); - /* Update the LVB from disk if the AST failed (this is a legal race) - * - * - Glimpse callback of local lock just return -ELDLM_NO_LOCK_DATA. - * - Glimpse callback of remote lock might return -ELDLM_NO_LOCK_DATA - * when inode is cleared. LU-274 - */ - if (rc == -ELDLM_NO_LOCK_DATA) { - LDLM_DEBUG(lock, "lost race - client has a lock but no inode"); - ldlm_res_lvbo_update(res, NULL, 1); - } else if (rc != 0) { - rc = ldlm_handle_ast_error(lock, req, rc, "glimpse"); - } else { - rc = ldlm_res_lvbo_update(res, req, 1); - } + rc = ldlm_ast_fini(req, arg, lock, 0); - ptlrpc_req_finished(req); - if (rc == -ERESTART) - ldlm_reprocess_all(res); + RETURN(rc); +} +EXPORT_SYMBOL(ldlm_server_glimpse_ast); - RETURN(rc); +int ldlm_glimpse_locks(struct ldlm_resource *res, cfs_list_t *gl_work_list) +{ + int rc; + ENTRY; + + rc = ldlm_run_ast_work(ldlm_res_to_ns(res), gl_work_list, + LDLM_WORK_GL_AST); + if (rc == -ERESTART) + ldlm_reprocess_all(res); + + RETURN(rc); } +EXPORT_SYMBOL(ldlm_glimpse_locks); -#ifdef __KERNEL__ -extern unsigned long long lu_time_stamp_get(void); -#else -#define lu_time_stamp_get() time(NULL) -#endif +/* return ldlm lock associated with a lock callback request */ +struct ldlm_lock *ldlm_request_lock(struct ptlrpc_request *req) +{ + struct ldlm_cb_async_args *ca; + struct ldlm_lock *lock; + ENTRY; + + ca = ptlrpc_req_async_args(req); + lock = ca->ca_lock; + if (lock == NULL) + RETURN(ERR_PTR(-EFAULT)); + + RETURN(lock); +} +EXPORT_SYMBOL(ldlm_request_lock); static void ldlm_svc_get_eopc(const struct ldlm_request *dlm_req, struct lprocfs_stats *srv_stats) @@ -1052,9 +1161,8 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, LASSERT(req->rq_export); - if (req->rq_rqbd->rqbd_service->srv_stats) - ldlm_svc_get_eopc(dlm_req, - req->rq_rqbd->rqbd_service->srv_stats); + if (ptlrpc_req2svc(req)->srv_stats != NULL) + ldlm_svc_get_eopc(dlm_req, ptlrpc_req2svc(req)->srv_stats); if (req->rq_export && req->rq_export->exp_nid_stats && req->rq_export->exp_nid_stats->nid_ldlm_stats) @@ -1110,6 +1218,9 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, if (unlikely(flags & LDLM_FL_REPLAY)) { /* Find an existing lock in the per-export lock hash */ + /* In the function below, .hs_keycmp resolves to + * ldlm_export_lock_keycmp() */ + /* coverity[overrun-buffer-val] */ lock = cfs_hash_lookup(req->rq_export->exp_lock_hash, (void *)&dlm_req->lock_handle[0]); if (lock != NULL) { @@ -1158,11 +1269,8 @@ existing_lock: /* based on the assumption that lvb size never changes during * resource life time otherwise it need resource->lr_lock's * protection */ - if (lock->l_resource->lr_lvb_len) { - req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, - RCL_SERVER, - lock->l_resource->lr_lvb_len); - } + req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, + RCL_SERVER, ldlm_lvbo_size(lock)); if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR)) GOTO(out, rc = -ENOMEM); @@ -1173,7 +1281,7 @@ existing_lock: } if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN) - ldlm_convert_policy_to_local( + ldlm_convert_policy_to_local(req->rq_export, dlm_req->lock_desc.l_resource.lr_type, &dlm_req->lock_desc.l_policy_data, &lock->l_policy_data); @@ -1267,21 +1375,22 @@ existing_lock: "(err=%d, rc=%d)", err, rc); if (rc == 0) { - if (lock->l_resource->lr_lvb_len > 0) { - /* MDT path won't handle lr_lvb_data, so - * lock/unlock better be contained in the - * if block */ - void *lvb; - - lvb = req_capsule_server_get(&req->rq_pill, - &RMF_DLM_LVB); - LASSERTF(lvb != NULL, "req %p, lock %p\n", - req, lock); - lock_res(lock->l_resource); - memcpy(lvb, lock->l_resource->lr_lvb_data, - lock->l_resource->lr_lvb_len); - unlock_res(lock->l_resource); - } + int lvb_len = ldlm_lvbo_size(lock); + + if (lvb_len > 0) { + void *buf; + int buflen; + + buf = req_capsule_server_get(&req->rq_pill, + &RMF_DLM_LVB); + LASSERTF(buf != NULL, "req %p, lock %p\n", + req, lock); + buflen = req_capsule_get_size(&req->rq_pill, + &RMF_DLM_LVB, RCL_SERVER); + buflen = ldlm_lvbo_fill(lock, buf, buflen); + req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, + buflen, RCL_SERVER); + } } else { lock_res_and_lock(lock); ldlm_resource_unlink_lock(lock); @@ -1300,6 +1409,7 @@ existing_lock: return rc; } +EXPORT_SYMBOL(ldlm_handle_enqueue0); int ldlm_handle_enqueue(struct ptlrpc_request *req, ldlm_completion_callback completion_callback, @@ -1323,6 +1433,7 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req, } return rc; } +EXPORT_SYMBOL(ldlm_handle_enqueue); int ldlm_handle_convert0(struct ptlrpc_request *req, const struct ldlm_request *dlm_req) @@ -1374,6 +1485,7 @@ int ldlm_handle_convert0(struct ptlrpc_request *req, RETURN(0); } +EXPORT_SYMBOL(ldlm_handle_convert0); int ldlm_handle_convert(struct ptlrpc_request *req) { @@ -1389,6 +1501,7 @@ int ldlm_handle_convert(struct ptlrpc_request *req) } return rc; } +EXPORT_SYMBOL(ldlm_handle_convert); /* Cancel all the locks whos handles are packed into ldlm_request */ int ldlm_request_cancel(struct ptlrpc_request *req, @@ -1403,6 +1516,9 @@ int ldlm_request_cancel(struct ptlrpc_request *req, if (first >= count) RETURN(0); + if (count == 1 && dlm_req->lock_handle[0].cookie == 0) + RETURN(0); + /* There is no lock on the server at the replay time, * skip lock cancelling to make replay tests to pass. */ if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) @@ -1447,6 +1563,7 @@ int ldlm_request_cancel(struct ptlrpc_request *req, LDLM_DEBUG_NOLOCK("server-side cancel handler END"); RETURN(done); } +EXPORT_SYMBOL(ldlm_request_cancel); int ldlm_handle_cancel(struct ptlrpc_request *req) { @@ -1474,6 +1591,8 @@ int ldlm_handle_cancel(struct ptlrpc_request *req) RETURN(ptlrpc_reply(req)); } +EXPORT_SYMBOL(ldlm_handle_cancel); +#endif /* HAVE_SERVER_SUPPORT */ void ldlm_handle_bl_callback(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, struct ldlm_lock *lock) @@ -1513,6 +1632,7 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, struct ldlm_request *dlm_req, struct ldlm_lock *lock) { + int lvb_len; CFS_LIST_HEAD(ast_list); ENTRY; @@ -1529,6 +1649,33 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, } } + lvb_len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT); + if (lvb_len > 0) { + if (lock->l_lvb_len > 0) { + /* for extent lock, lvb contains ost_lvb{}. */ + LASSERT(lock->l_lvb_data != NULL); + LASSERTF(lock->l_lvb_len == lvb_len, + "preallocated %d, actual %d.\n", + lock->l_lvb_len, lvb_len); + } else { /* for layout lock, lvb has variable length */ + void *lvb_data; + + OBD_ALLOC(lvb_data, lvb_len); + if (lvb_data == NULL) + LDLM_ERROR(lock, "no memory.\n"); + + lock_res_and_lock(lock); + if (lvb_data == NULL) { + lock->l_flags |= LDLM_FL_FAILED; + } else { + LASSERT(lock->l_lvb_data == NULL); + lock->l_lvb_data = lvb_data; + lock->l_lvb_len = lvb_len; + } + unlock_res_and_lock(lock); + } + } + lock_res_and_lock(lock); if (lock->l_destroyed || lock->l_granted_mode == lock->l_req_mode) { @@ -1548,7 +1695,7 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, } if (lock->l_resource->lr_type != LDLM_PLAIN) { - ldlm_convert_policy_to_local( + ldlm_convert_policy_to_local(req->rq_export, dlm_req->lock_desc.l_resource.lr_type, &dlm_req->lock_desc.l_policy_data, &lock->l_policy_data); @@ -1796,7 +1943,8 @@ static int ldlm_handle_setinfo(struct ptlrpc_request *req) if (KEY_IS(KEY_HSM_COPYTOOL_SEND)) /* Pass it on to mdc (the "export" in this case) */ - rc = obd_set_info_async(req->rq_export, + rc = obd_set_info_async(req->rq_svc_thread->t_env, + req->rq_export, sizeof(KEY_HSM_COPYTOOL_SEND), KEY_HSM_COPYTOOL_SEND, vallen, val, NULL); @@ -1850,19 +1998,19 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) LASSERT(req->rq_export != NULL); LASSERT(req->rq_export->exp_obd != NULL); - switch (lustre_msg_get_opc(req->rq_reqmsg)) { - case LDLM_BL_CALLBACK: - if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK)) - RETURN(0); - break; - case LDLM_CP_CALLBACK: - if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CP_CALLBACK)) - RETURN(0); - break; - case LDLM_GL_CALLBACK: - if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_GL_CALLBACK)) - RETURN(0); - break; + switch (lustre_msg_get_opc(req->rq_reqmsg)) { + case LDLM_BL_CALLBACK: + if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET)) + RETURN(0); + break; + case LDLM_CP_CALLBACK: + if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CP_CALLBACK_NET)) + RETURN(0); + break; + case LDLM_GL_CALLBACK: + if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_GL_CALLBACK_NET)) + RETURN(0); + break; case LDLM_SET_INFO: rc = ldlm_handle_setinfo(req); ldlm_callback_reply(req, rc); @@ -1877,24 +2025,11 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) RETURN(0); ldlm_callback_reply(req, rc); RETURN(0); - case OBD_QC_CALLBACK: - req_capsule_set(&req->rq_pill, &RQF_QC_CALLBACK); - if (OBD_FAIL_CHECK(OBD_FAIL_OBD_QC_CALLBACK_NET)) - RETURN(0); - rc = target_handle_qc_callback(req); - ldlm_callback_reply(req, rc); - RETURN(0); - case QUOTA_DQACQ: - case QUOTA_DQREL: - /* reply in handler */ - req_capsule_set(&req->rq_pill, &RQF_MDS_QUOTA_DQACQ); - rc = target_handle_dqacq_callback(req); - RETURN(0); case LLOG_ORIGIN_HANDLE_CREATE: req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE); if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET)) RETURN(0); - rc = llog_origin_handle_create(req); + rc = llog_origin_handle_open(req); ldlm_callback_reply(req, rc); RETURN(0); case LLOG_ORIGIN_HANDLE_NEXT_BLOCK: @@ -2030,6 +2165,7 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) RETURN(0); } +#ifdef HAVE_SERVER_SUPPORT static int ldlm_cancel_handler(struct ptlrpc_request *req) { int rc; @@ -2070,8 +2206,8 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req) case LDLM_CANCEL: req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL); CDEBUG(D_INODE, "cancel\n"); - if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL)) - RETURN(0); + if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_NET)) + RETURN(0); rc = ldlm_handle_cancel(req); if (rc) break; @@ -2095,6 +2231,91 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req) RETURN(0); } +static int ldlm_cancel_hpreq_lock_match(struct ptlrpc_request *req, + struct ldlm_lock *lock) +{ + struct ldlm_request *dlm_req; + struct lustre_handle lockh; + int rc = 0; + int i; + ENTRY; + + dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ); + if (dlm_req == NULL) + RETURN(0); + + ldlm_lock2handle(lock, &lockh); + for (i = 0; i < dlm_req->lock_count; i++) { + if (lustre_handle_equal(&dlm_req->lock_handle[i], + &lockh)) { + DEBUG_REQ(D_RPCTRACE, req, + "Prio raised by lock "LPX64".", lockh.cookie); + + rc = 1; + break; + } + } + + RETURN(rc); + +} + +static int ldlm_cancel_hpreq_check(struct ptlrpc_request *req) +{ + struct ldlm_request *dlm_req; + int rc = 0; + int i; + ENTRY; + + /* no prolong in recovery */ + if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) + RETURN(0); + + dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ); + if (dlm_req == NULL) + RETURN(-EFAULT); + + for (i = 0; i < dlm_req->lock_count; i++) { + struct ldlm_lock *lock; + + lock = ldlm_handle2lock(&dlm_req->lock_handle[i]); + if (lock == NULL) + continue; + + rc = !!(lock->l_flags & LDLM_FL_AST_SENT); + if (rc) + LDLM_DEBUG(lock, "hpreq cancel lock"); + LDLM_LOCK_PUT(lock); + + if (rc) + break; + } + + RETURN(rc); +} + +static struct ptlrpc_hpreq_ops ldlm_cancel_hpreq_ops = { + .hpreq_lock_match = ldlm_cancel_hpreq_lock_match, + .hpreq_check = ldlm_cancel_hpreq_check, + .hpreq_fini = NULL, +}; + +static int ldlm_hpreq_handler(struct ptlrpc_request *req) +{ + ENTRY; + + req_capsule_init(&req->rq_pill, req, RCL_SERVER); + + if (req->rq_export == NULL) + RETURN(0); + + if (LDLM_CANCEL == lustre_msg_get_opc(req->rq_reqmsg)) { + req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL); + req->rq_ops = &ldlm_cancel_hpreq_ops; + } + RETURN(0); +} + int ldlm_revoke_lock_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd, cfs_hlist_node_t *hnode, void *data) @@ -2125,10 +2346,16 @@ int ldlm_revoke_lock_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd, LASSERT(!lock->l_blocking_lock); lock->l_flags |= LDLM_FL_AST_SENT; - if (lock->l_export && lock->l_export->exp_lock_hash && - !cfs_hlist_unhashed(&lock->l_exp_hash)) - cfs_hash_del(lock->l_export->exp_lock_hash, - &lock->l_remote_handle, &lock->l_exp_hash); + if (lock->l_export && lock->l_export->exp_lock_hash) { + /* NB: it's safe to call cfs_hash_del() even lock isn't + * in exp_lock_hash. */ + /* In the function below, .hs_keycmp resolves to + * ldlm_export_lock_keycmp() */ + /* coverity[overrun-buffer-val] */ + cfs_hash_del(lock->l_export->exp_lock_hash, + &lock->l_remote_handle, &lock->l_exp_hash); + } + cfs_list_add_tail(&lock->l_rk_ast, rpc_list); LDLM_LOCK_GET(lock); @@ -2149,6 +2376,8 @@ void ldlm_revoke_export_locks(struct obd_export *exp) EXIT; } +EXPORT_SYMBOL(ldlm_revoke_export_locks); +#endif /* HAVE_SERVER_SUPPORT */ #ifdef __KERNEL__ static struct ldlm_bl_work_item *ldlm_bl_get_work(struct ldlm_bl_pool *blp) @@ -2230,33 +2459,31 @@ static int ldlm_bl_thread_main(void *arg) while (1) { struct l_wait_info lwi = { 0 }; struct ldlm_bl_work_item *blwi = NULL; + int busy; blwi = ldlm_bl_get_work(blp); if (blwi == NULL) { - int busy; - cfs_atomic_dec(&blp->blp_busy_threads); l_wait_event_exclusive(blp->blp_waitq, (blwi = ldlm_bl_get_work(blp)) != NULL, &lwi); busy = cfs_atomic_inc_return(&blp->blp_busy_threads); - - if (blwi->blwi_ns == NULL) - /* added by ldlm_cleanup() */ - break; - - /* Not fatal if racy and have a few too many threads */ - if (unlikely(busy < blp->blp_max_threads && - busy >= cfs_atomic_read(&blp->blp_num_threads) && - !blwi->blwi_mem_pressure)) - /* discard the return value, we tried */ - ldlm_bl_thread_start(blp); } else { - if (blwi->blwi_ns == NULL) - /* added by ldlm_cleanup() */ - break; + busy = cfs_atomic_read(&blp->blp_busy_threads); } + + if (blwi->blwi_ns == NULL) + /* added by ldlm_cleanup() */ + break; + + /* Not fatal if racy and have a few too many threads */ + if (unlikely(busy < blp->blp_max_threads && + busy >= cfs_atomic_read(&blp->blp_num_threads) && + !blwi->blwi_mem_pressure)) + /* discard the return value, we tried */ + ldlm_bl_thread_start(blp); + if (blwi->blwi_mem_pressure) cfs_memory_pressure_set(); @@ -2298,21 +2525,22 @@ int ldlm_get_ref(void) { int rc = 0; ENTRY; - cfs_mutex_down(&ldlm_ref_sem); + cfs_mutex_lock(&ldlm_ref_mutex); if (++ldlm_refcount == 1) { rc = ldlm_setup(); if (rc) ldlm_refcount--; } - cfs_mutex_up(&ldlm_ref_sem); + cfs_mutex_unlock(&ldlm_ref_mutex); RETURN(rc); } +EXPORT_SYMBOL(ldlm_get_ref); void ldlm_put_ref(void) { ENTRY; - cfs_mutex_down(&ldlm_ref_sem); + cfs_mutex_lock(&ldlm_ref_mutex); if (ldlm_refcount == 1) { int rc = ldlm_cleanup(); if (rc) @@ -2322,10 +2550,11 @@ void ldlm_put_ref(void) } else { ldlm_refcount--; } - cfs_mutex_up(&ldlm_ref_sem); + cfs_mutex_unlock(&ldlm_ref_mutex); EXIT; } +EXPORT_SYMBOL(ldlm_put_ref); /* * Export handle<->lock hash operations. @@ -2421,16 +2650,17 @@ void ldlm_destroy_export(struct obd_export *exp) ENTRY; cfs_hash_putref(exp->exp_lock_hash); exp->exp_lock_hash = NULL; + + ldlm_destroy_flock_export(exp); EXIT; } EXPORT_SYMBOL(ldlm_destroy_export); static int ldlm_setup(void) { - struct ldlm_bl_pool *blp; + static struct ptlrpc_service_conf conf; + struct ldlm_bl_pool *blp = NULL; int rc = 0; - int ldlm_min_threads = LDLM_THREADS_AUTO_MIN; - int ldlm_max_threads = LDLM_THREADS_AUTO_MAX; #ifdef __KERNEL__ int i; #endif @@ -2446,54 +2676,94 @@ static int ldlm_setup(void) #ifdef LPROCFS rc = ldlm_proc_setup(); if (rc != 0) - GOTO(out_free, rc); + GOTO(out, rc); #endif -#ifdef __KERNEL__ - if (ldlm_num_threads) { - /* If ldlm_num_threads is set, it is the min and the max. */ - if (ldlm_num_threads > LDLM_THREADS_AUTO_MAX) - ldlm_num_threads = LDLM_THREADS_AUTO_MAX; - if (ldlm_num_threads < LDLM_THREADS_AUTO_MIN) - ldlm_num_threads = LDLM_THREADS_AUTO_MIN; - ldlm_min_threads = ldlm_max_threads = ldlm_num_threads; - } + memset(&conf, 0, sizeof(conf)); + conf = (typeof(conf)) { + .psc_name = "ldlm_cbd", + .psc_watchdog_factor = 2, + .psc_buf = { + .bc_nbufs = LDLM_NBUFS, + .bc_buf_size = LDLM_BUFSIZE, + .bc_req_max_size = LDLM_MAXREQSIZE, + .bc_rep_max_size = LDLM_MAXREPSIZE, + .bc_req_portal = LDLM_CB_REQUEST_PORTAL, + .bc_rep_portal = LDLM_CB_REPLY_PORTAL, + }, + .psc_thr = { + .tc_thr_name = "ldlm_cb", + .tc_thr_factor = LDLM_THR_FACTOR, + .tc_nthrs_init = LDLM_NTHRS_INIT, + .tc_nthrs_base = LDLM_NTHRS_BASE, + .tc_nthrs_max = LDLM_NTHRS_MAX, + .tc_nthrs_user = ldlm_num_threads, + .tc_cpu_affinity = 1, + .tc_ctx_tags = LCT_MD_THREAD | LCT_DT_THREAD, + }, + .psc_cpt = { + .cc_pattern = ldlm_cpts, + }, + .psc_ops = { + .so_req_handler = ldlm_callback_handler, + }, + }; + ldlm_state->ldlm_cb_service = \ + ptlrpc_register_service(&conf, ldlm_svc_proc_dir); + if (IS_ERR(ldlm_state->ldlm_cb_service)) { + CERROR("failed to start service\n"); + rc = PTR_ERR(ldlm_state->ldlm_cb_service); + ldlm_state->ldlm_cb_service = NULL; + GOTO(out, rc); + } + +#ifdef HAVE_SERVER_SUPPORT + memset(&conf, 0, sizeof(conf)); + conf = (typeof(conf)) { + .psc_name = "ldlm_canceld", + .psc_watchdog_factor = 6, + .psc_buf = { + .bc_nbufs = LDLM_NBUFS, + .bc_buf_size = LDLM_BUFSIZE, + .bc_req_max_size = LDLM_MAXREQSIZE, + .bc_rep_max_size = LDLM_MAXREPSIZE, + .bc_req_portal = LDLM_CANCEL_REQUEST_PORTAL, + .bc_rep_portal = LDLM_CANCEL_REPLY_PORTAL, + + }, + .psc_thr = { + .tc_thr_name = "ldlm_cn", + .tc_thr_factor = LDLM_THR_FACTOR, + .tc_nthrs_init = LDLM_NTHRS_INIT, + .tc_nthrs_base = LDLM_NTHRS_BASE, + .tc_nthrs_max = LDLM_NTHRS_MAX, + .tc_nthrs_user = ldlm_num_threads, + .tc_cpu_affinity = 1, + .tc_ctx_tags = LCT_MD_THREAD | \ + LCT_DT_THREAD | \ + LCT_CL_THREAD, + }, + .psc_cpt = { + .cc_pattern = ldlm_cpts, + }, + .psc_ops = { + .so_req_handler = ldlm_cancel_handler, + .so_hpreq_handler = ldlm_hpreq_handler, + }, + }; + ldlm_state->ldlm_cancel_service = \ + ptlrpc_register_service(&conf, ldlm_svc_proc_dir); + if (IS_ERR(ldlm_state->ldlm_cancel_service)) { + CERROR("failed to start service\n"); + rc = PTR_ERR(ldlm_state->ldlm_cancel_service); + ldlm_state->ldlm_cancel_service = NULL; + GOTO(out, rc); + } #endif - ldlm_state->ldlm_cb_service = - ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE, - LDLM_MAXREPSIZE, LDLM_CB_REQUEST_PORTAL, - LDLM_CB_REPLY_PORTAL, 2, - ldlm_callback_handler, "ldlm_cbd", - ldlm_svc_proc_dir, NULL, - ldlm_min_threads, ldlm_max_threads, - "ldlm_cb", - LCT_MD_THREAD|LCT_DT_THREAD, NULL); - - if (!ldlm_state->ldlm_cb_service) { - CERROR("failed to start service\n"); - GOTO(out_proc, rc = -ENOMEM); - } - - ldlm_state->ldlm_cancel_service = - ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE, - LDLM_MAXREPSIZE, LDLM_CANCEL_REQUEST_PORTAL, - LDLM_CANCEL_REPLY_PORTAL, 6, - ldlm_cancel_handler, "ldlm_canceld", - ldlm_svc_proc_dir, NULL, - ldlm_min_threads, ldlm_max_threads, - "ldlm_cn", - LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD, - NULL); - - if (!ldlm_state->ldlm_cancel_service) { - CERROR("failed to start service\n"); - GOTO(out_proc, rc = -ENOMEM); - } - - OBD_ALLOC(blp, sizeof(*blp)); - if (blp == NULL) - GOTO(out_proc, rc = -ENOMEM); + OBD_ALLOC(blp, sizeof(*blp)); + if (blp == NULL) + GOTO(out, rc = -ENOMEM); ldlm_state->ldlm_bl_pool = blp; cfs_spin_lock_init(&blp->blp_lock); @@ -2502,24 +2772,24 @@ static int ldlm_setup(void) cfs_waitq_init(&blp->blp_waitq); cfs_atomic_set(&blp->blp_num_threads, 0); cfs_atomic_set(&blp->blp_busy_threads, 0); - blp->blp_min_threads = ldlm_min_threads; - blp->blp_max_threads = ldlm_max_threads; #ifdef __KERNEL__ - for (i = 0; i < blp->blp_min_threads; i++) { - rc = ldlm_bl_thread_start(blp); - if (rc < 0) - GOTO(out_thread, rc); - } - - rc = ptlrpc_start_threads(ldlm_state->ldlm_cancel_service); - if (rc) - GOTO(out_thread, rc); - - rc = ptlrpc_start_threads(ldlm_state->ldlm_cb_service); - if (rc) - GOTO(out_thread, rc); - + if (ldlm_num_threads == 0) { + blp->blp_min_threads = LDLM_NTHRS_INIT; + blp->blp_max_threads = LDLM_NTHRS_MAX; + } else { + blp->blp_min_threads = blp->blp_max_threads = \ + min_t(int, LDLM_NTHRS_MAX, max_t(int, LDLM_NTHRS_INIT, + ldlm_num_threads)); + } + + for (i = 0; i < blp->blp_min_threads; i++) { + rc = ldlm_bl_thread_start(blp); + if (rc < 0) + GOTO(out, rc); + } + +# ifdef HAVE_SERVER_SUPPORT CFS_INIT_LIST_HEAD(&expired_lock_thread.elt_expired_locks); expired_lock_thread.elt_state = ELT_STOPPED; cfs_waitq_init(&expired_lock_thread.elt_waitq); @@ -2529,43 +2799,30 @@ static int ldlm_setup(void) cfs_timer_init(&waiting_locks_timer, waiting_locks_callback, 0); rc = cfs_create_thread(expired_lock_main, NULL, CFS_DAEMON_FLAGS); - if (rc < 0) { - CERROR("Cannot start ldlm expired-lock thread: %d\n", rc); - GOTO(out_thread, rc); - } + if (rc < 0) { + CERROR("Cannot start ldlm expired-lock thread: %d\n", rc); + GOTO(out, rc); + } cfs_wait_event(expired_lock_thread.elt_waitq, expired_lock_thread.elt_state == ELT_READY); -#endif - -#ifdef __KERNEL__ - rc = ldlm_pools_init(); - if (rc) - GOTO(out_thread, rc); -#endif - RETURN(0); +# endif /* HAVE_SERVER_SUPPORT */ -#ifdef __KERNEL__ - out_thread: - ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service); - ptlrpc_unregister_service(ldlm_state->ldlm_cb_service); + rc = ldlm_pools_init(); + if (rc) { + CERROR("Failed to initialize LDLM pools: %d\n", rc); + GOTO(out, rc); + } #endif + RETURN(0); - out_proc: -#ifdef LPROCFS - ldlm_proc_cleanup(); - out_free: -#endif - OBD_FREE(ldlm_state, sizeof(*ldlm_state)); - ldlm_state = NULL; - return rc; + out: + ldlm_cleanup(); + RETURN(rc); } static int ldlm_cleanup(void) { -#ifdef __KERNEL__ - struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool; -#endif ENTRY; if (!cfs_list_empty(ldlm_namespace_list(LDLM_NAMESPACE_SERVER)) || @@ -2578,35 +2835,46 @@ static int ldlm_cleanup(void) #ifdef __KERNEL__ ldlm_pools_fini(); -#endif -#ifdef __KERNEL__ - while (cfs_atomic_read(&blp->blp_num_threads) > 0) { - struct ldlm_bl_work_item blwi = { .blwi_ns = NULL }; + if (ldlm_state->ldlm_bl_pool != NULL) { + struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool; - cfs_init_completion(&blp->blp_comp); + while (cfs_atomic_read(&blp->blp_num_threads) > 0) { + struct ldlm_bl_work_item blwi = { .blwi_ns = NULL }; - cfs_spin_lock(&blp->blp_lock); - cfs_list_add_tail(&blwi.blwi_entry, &blp->blp_list); - cfs_waitq_signal(&blp->blp_waitq); - cfs_spin_unlock(&blp->blp_lock); + cfs_init_completion(&blp->blp_comp); - cfs_wait_for_completion(&blp->blp_comp); - } - OBD_FREE(blp, sizeof(*blp)); + cfs_spin_lock(&blp->blp_lock); + cfs_list_add_tail(&blwi.blwi_entry, &blp->blp_list); + cfs_waitq_signal(&blp->blp_waitq); + cfs_spin_unlock(&blp->blp_lock); - ptlrpc_unregister_service(ldlm_state->ldlm_cb_service); - ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service); - ldlm_proc_cleanup(); + cfs_wait_for_completion(&blp->blp_comp); + } - expired_lock_thread.elt_state = ELT_TERMINATE; - cfs_waitq_signal(&expired_lock_thread.elt_waitq); - cfs_wait_event(expired_lock_thread.elt_waitq, - expired_lock_thread.elt_state == ELT_STOPPED); -#else - ptlrpc_unregister_service(ldlm_state->ldlm_cb_service); - ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service); -#endif + OBD_FREE(blp, sizeof(*blp)); + } +#endif /* __KERNEL__ */ + + if (ldlm_state->ldlm_cb_service != NULL) + ptlrpc_unregister_service(ldlm_state->ldlm_cb_service); +# ifdef HAVE_SERVER_SUPPORT + if (ldlm_state->ldlm_cancel_service != NULL) + ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service); +# endif + +#ifdef __KERNEL__ + ldlm_proc_cleanup(); + +# ifdef HAVE_SERVER_SUPPORT + if (expired_lock_thread.elt_state != ELT_STOPPED) { + expired_lock_thread.elt_state = ELT_TERMINATE; + cfs_waitq_signal(&expired_lock_thread.elt_waitq); + cfs_wait_event(expired_lock_thread.elt_waitq, + expired_lock_thread.elt_state == ELT_STOPPED); + } +# endif +#endif /* __KERNEL__ */ OBD_FREE(ldlm_state, sizeof(*ldlm_state)); ldlm_state = NULL; @@ -2616,22 +2884,22 @@ static int ldlm_cleanup(void) int ldlm_init(void) { - cfs_init_mutex(&ldlm_ref_sem); - cfs_init_mutex(ldlm_namespace_lock(LDLM_NAMESPACE_SERVER)); - cfs_init_mutex(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT)); + cfs_mutex_init(&ldlm_ref_mutex); + cfs_mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_SERVER)); + cfs_mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT)); ldlm_resource_slab = cfs_mem_cache_create("ldlm_resources", sizeof(struct ldlm_resource), 0, CFS_SLAB_HWCACHE_ALIGN); if (ldlm_resource_slab == NULL) return -ENOMEM; - ldlm_lock_slab = cfs_mem_cache_create("ldlm_locks", - sizeof(struct ldlm_lock), 0, - CFS_SLAB_HWCACHE_ALIGN | CFS_SLAB_DESTROY_BY_RCU); - if (ldlm_lock_slab == NULL) { - cfs_mem_cache_destroy(ldlm_resource_slab); - return -ENOMEM; - } + ldlm_lock_slab = cfs_mem_cache_create("ldlm_locks", + sizeof(struct ldlm_lock), 0, + CFS_SLAB_HWCACHE_ALIGN | SLAB_DESTROY_BY_RCU); + if (ldlm_lock_slab == NULL) { + cfs_mem_cache_destroy(ldlm_resource_slab); + return -ENOMEM; + } ldlm_interval_slab = cfs_mem_cache_create("interval_node", sizeof(struct ldlm_interval), @@ -2665,106 +2933,3 @@ void ldlm_exit(void) rc = cfs_mem_cache_destroy(ldlm_interval_slab); LASSERTF(rc == 0, "couldn't free interval node slab\n"); } - -/* ldlm_extent.c */ -EXPORT_SYMBOL(ldlm_extent_shift_kms); - -/* ldlm_lock.c */ -EXPORT_SYMBOL(ldlm_get_processing_policy); -EXPORT_SYMBOL(ldlm_lock2desc); -EXPORT_SYMBOL(ldlm_register_intent); -EXPORT_SYMBOL(ldlm_lockname); -EXPORT_SYMBOL(ldlm_typename); -EXPORT_SYMBOL(ldlm_lock2handle); -EXPORT_SYMBOL(__ldlm_handle2lock); -EXPORT_SYMBOL(ldlm_lock_get); -EXPORT_SYMBOL(ldlm_lock_put); -EXPORT_SYMBOL(ldlm_lock_match); -EXPORT_SYMBOL(ldlm_lock_cancel); -EXPORT_SYMBOL(ldlm_lock_addref); -EXPORT_SYMBOL(ldlm_lock_addref_try); -EXPORT_SYMBOL(ldlm_lock_decref); -EXPORT_SYMBOL(ldlm_lock_decref_and_cancel); -EXPORT_SYMBOL(ldlm_lock_change_resource); -EXPORT_SYMBOL(ldlm_it2str); -EXPORT_SYMBOL(ldlm_lock_dump); -EXPORT_SYMBOL(ldlm_lock_dump_handle); -EXPORT_SYMBOL(ldlm_reprocess_all_ns); -EXPORT_SYMBOL(ldlm_lock_allow_match_locked); -EXPORT_SYMBOL(ldlm_lock_allow_match); -EXPORT_SYMBOL(ldlm_lock_downgrade); -EXPORT_SYMBOL(ldlm_lock_convert); - -/* ldlm_request.c */ -EXPORT_SYMBOL(ldlm_completion_ast_async); -EXPORT_SYMBOL(ldlm_blocking_ast_nocheck); -EXPORT_SYMBOL(ldlm_completion_ast); -EXPORT_SYMBOL(ldlm_blocking_ast); -EXPORT_SYMBOL(ldlm_glimpse_ast); -EXPORT_SYMBOL(ldlm_expired_completion_wait); -EXPORT_SYMBOL(ldlm_prep_enqueue_req); -EXPORT_SYMBOL(ldlm_prep_elc_req); -EXPORT_SYMBOL(ldlm_cli_convert); -EXPORT_SYMBOL(ldlm_cli_enqueue); -EXPORT_SYMBOL(ldlm_cli_enqueue_fini); -EXPORT_SYMBOL(ldlm_cli_enqueue_local); -EXPORT_SYMBOL(ldlm_cli_cancel); -EXPORT_SYMBOL(ldlm_cli_cancel_unused); -EXPORT_SYMBOL(ldlm_cli_cancel_unused_resource); -EXPORT_SYMBOL(ldlm_cli_cancel_req); -EXPORT_SYMBOL(ldlm_replay_locks); -EXPORT_SYMBOL(ldlm_resource_foreach); -EXPORT_SYMBOL(ldlm_namespace_foreach); -EXPORT_SYMBOL(ldlm_resource_iterate); -EXPORT_SYMBOL(ldlm_cancel_resource_local); -EXPORT_SYMBOL(ldlm_cli_cancel_list_local); -EXPORT_SYMBOL(ldlm_cli_cancel_list); - -/* ldlm_lockd.c */ -EXPORT_SYMBOL(ldlm_server_blocking_ast); -EXPORT_SYMBOL(ldlm_server_completion_ast); -EXPORT_SYMBOL(ldlm_server_glimpse_ast); -EXPORT_SYMBOL(ldlm_handle_enqueue); -EXPORT_SYMBOL(ldlm_handle_enqueue0); -EXPORT_SYMBOL(ldlm_handle_cancel); -EXPORT_SYMBOL(ldlm_request_cancel); -EXPORT_SYMBOL(ldlm_handle_convert); -EXPORT_SYMBOL(ldlm_handle_convert0); -EXPORT_SYMBOL(ldlm_del_waiting_lock); -EXPORT_SYMBOL(ldlm_get_ref); -EXPORT_SYMBOL(ldlm_put_ref); -EXPORT_SYMBOL(ldlm_refresh_waiting_lock); -EXPORT_SYMBOL(ldlm_revoke_export_locks); - -/* ldlm_resource.c */ -EXPORT_SYMBOL(ldlm_namespace_new); -EXPORT_SYMBOL(ldlm_namespace_cleanup); -EXPORT_SYMBOL(ldlm_namespace_free); -EXPORT_SYMBOL(ldlm_namespace_dump); -EXPORT_SYMBOL(ldlm_dump_all_namespaces); -EXPORT_SYMBOL(ldlm_resource_get); -EXPORT_SYMBOL(ldlm_resource_putref); -EXPORT_SYMBOL(ldlm_resource_unlink_lock); - -/* ldlm_lib.c */ -EXPORT_SYMBOL(client_import_add_conn); -EXPORT_SYMBOL(client_import_del_conn); -EXPORT_SYMBOL(client_obd_setup); -EXPORT_SYMBOL(client_obd_cleanup); -EXPORT_SYMBOL(client_connect_import); -EXPORT_SYMBOL(client_disconnect_export); -EXPORT_SYMBOL(server_disconnect_export); -EXPORT_SYMBOL(target_stop_recovery_thread); -EXPORT_SYMBOL(target_handle_connect); -EXPORT_SYMBOL(target_cleanup_recovery); -EXPORT_SYMBOL(target_destroy_export); -EXPORT_SYMBOL(target_cancel_recovery_timer); -EXPORT_SYMBOL(target_send_reply); -EXPORT_SYMBOL(target_queue_recovery_request); -EXPORT_SYMBOL(target_handle_ping); -EXPORT_SYMBOL(target_pack_pool_reply); -EXPORT_SYMBOL(target_handle_disconnect); - -/* l_lock.c */ -EXPORT_SYMBOL(lock_res_and_lock); -EXPORT_SYMBOL(unlock_res_and_lock);