X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fldlm%2Fldlm_lockd.c;h=6a6eabcc448605e202ffc43ff2da585fd8bdfeb1;hp=39025215c85bae5f99ecf4a588a5fa982fe89ada;hb=ae0d69437e35961c257f076da6dcc1842a55456d;hpb=fc086a8623ae899ed0def8f7e7f622edfbd7a31b diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 3902521..6a6eabc 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -1,6 +1,4 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * +/* * GPL HEADER START * * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. @@ -41,9 +39,6 @@ * Author: Phil Schwan */ -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif #define DEBUG_SUBSYSTEM S_LDLM #ifdef __KERNEL__ @@ -57,11 +52,13 @@ #include #include "ldlm_internal.h" -#ifdef __KERNEL__ static int ldlm_num_threads; CFS_MODULE_PARM(ldlm_num_threads, "i", int, 0444, "number of DLM service threads to start"); -#endif + +static char *ldlm_cpts; +CFS_MODULE_PARM(ldlm_cpts, "s", charp, 0444, + "CPU partitions ldlm threads should run on"); extern cfs_mem_cache_t *ldlm_resource_slab; extern cfs_mem_cache_t *ldlm_lock_slab; @@ -91,20 +88,6 @@ static inline unsigned int ldlm_get_rq_timeout(void) return timeout < 1 ? 1 : timeout; } -#ifdef __KERNEL__ -/* w_l_spinlock protects both waiting_locks_list and expired_lock_thread */ -static cfs_spinlock_t waiting_locks_spinlock; /* BH lock (timer) */ -static cfs_list_t waiting_locks_list; -static cfs_timer_t waiting_locks_timer; - -static struct expired_lock_thread { - cfs_waitq_t elt_waitq; - int elt_state; - int elt_dump; - cfs_list_t elt_expired_locks; -} expired_lock_thread; -#endif - #define ELT_STOPPED 0 #define ELT_READY 1 #define ELT_TERMINATE 2 @@ -145,7 +128,19 @@ struct ldlm_bl_work_item { int blwi_mem_pressure; }; -#ifdef __KERNEL__ +#if defined(HAVE_SERVER_SUPPORT) && defined(__KERNEL__) + +/* w_l_spinlock protects both waiting_locks_list and expired_lock_thread */ +static cfs_spinlock_t waiting_locks_spinlock; /* BH lock (timer) */ +static cfs_list_t waiting_locks_list; +static cfs_timer_t waiting_locks_timer; + +static struct expired_lock_thread { + cfs_waitq_t elt_waitq; + int elt_state; + int elt_dump; + cfs_list_t elt_expired_locks; +} expired_lock_thread; static inline int have_expired_locks(void) { @@ -220,6 +215,13 @@ static int expired_lock_main(void *arg) LDLM_LOCK_RELEASE(lock); continue; } + + if (lock->l_destroyed) { + /* release the lock refcount where + * waiting_locks_callback() founds */ + LDLM_LOCK_RELEASE(lock); + continue; + } export = class_export_lock_get(lock->l_export, lock); cfs_spin_unlock_bh(&waiting_locks_spinlock); @@ -250,6 +252,7 @@ static int expired_lock_main(void *arg) } static int ldlm_add_waiting_lock(struct ldlm_lock *lock); +static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, int seconds); /** * Check if there is a request in the export request list @@ -280,9 +283,9 @@ static int ldlm_lock_busy(struct ldlm_lock *lock) /* This is called from within a timer interrupt and cannot schedule */ static void waiting_locks_callback(unsigned long unused) { - struct ldlm_lock *lock; + struct ldlm_lock *lock; + int need_dump = 0; -repeat: cfs_spin_lock_bh(&waiting_locks_spinlock); while (!cfs_list_empty(&waiting_locks_list)) { lock = cfs_list_entry(waiting_locks_list.next, struct ldlm_lock, @@ -304,9 +307,16 @@ repeat: libcfs_nid2str(lock->l_export->exp_connection->c_peer.nid)); cfs_list_del_init(&lock->l_pending_chain); - cfs_spin_unlock_bh(&waiting_locks_spinlock); - ldlm_add_waiting_lock(lock); - goto repeat; + if (lock->l_destroyed) { + /* relay the lock refcount decrease to + * expired lock thread */ + cfs_list_add(&lock->l_pending_chain, + &expired_lock_thread.elt_expired_locks); + } else { + __ldlm_add_waiting_lock(lock, + ldlm_get_enq_timeout(lock)); + } + continue; } /* if timeout overlaps the activation time of suspended timeouts @@ -320,9 +330,16 @@ repeat: libcfs_nid2str(lock->l_export->exp_connection->c_peer.nid)); cfs_list_del_init(&lock->l_pending_chain); - cfs_spin_unlock_bh(&waiting_locks_spinlock); - ldlm_add_waiting_lock(lock); - goto repeat; + if (lock->l_destroyed) { + /* relay the lock refcount decrease to + * expired lock thread */ + cfs_list_add(&lock->l_pending_chain, + &expired_lock_thread.elt_expired_locks); + } else { + __ldlm_add_waiting_lock(lock, + ldlm_get_enq_timeout(lock)); + } + continue; } /* Check if we need to prolong timeout */ @@ -362,14 +379,15 @@ repeat: cfs_list_del(&lock->l_pending_chain); cfs_list_add(&lock->l_pending_chain, &expired_lock_thread.elt_expired_locks); - } + need_dump = 1; + } - if (!cfs_list_empty(&expired_lock_thread.elt_expired_locks)) { - if (obd_dump_on_timeout) - expired_lock_thread.elt_dump = __LINE__; + if (!cfs_list_empty(&expired_lock_thread.elt_expired_locks)) { + if (obd_dump_on_timeout && need_dump) + expired_lock_thread.elt_dump = __LINE__; - cfs_waitq_signal(&expired_lock_thread.elt_waitq); - } + cfs_waitq_signal(&expired_lock_thread.elt_waitq); + } /* * Make sure the timer will fire again if we have any locks @@ -427,10 +445,14 @@ static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, int seconds) static int ldlm_add_waiting_lock(struct ldlm_lock *lock) { - int ret; - int timeout = ldlm_get_enq_timeout(lock); + int ret; + int timeout = ldlm_get_enq_timeout(lock); - LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)); + /* NB: must be called with hold of lock_res_and_lock() */ + LASSERT(lock->l_res_locked); + lock->l_waited = 1; + + LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)); cfs_spin_lock_bh(&waiting_locks_spinlock); if (lock->l_destroyed) { @@ -560,7 +582,8 @@ int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout) LDLM_DEBUG(lock, "refreshed"); return 1; } -#else /* !__KERNEL__ */ + +#else /* !HAVE_SERVER_SUPPORT || !__KERNEL__ */ int ldlm_del_waiting_lock(struct ldlm_lock *lock) { @@ -571,16 +594,19 @@ int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout) { RETURN(0); } -#endif /* __KERNEL__ */ -#ifdef HAVE_SERVER_SUPPORT -# ifndef __KERNEL__ +# ifdef HAVE_SERVER_SUPPORT static int ldlm_add_waiting_lock(struct ldlm_lock *lock) { - LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)); - RETURN(1); + LASSERT(lock->l_res_locked); + LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)); + RETURN(1); } + # endif +#endif /* HAVE_SERVER_SUPPORT && __KERNEL__ */ + +#ifdef HAVE_SERVER_SUPPORT static void ldlm_failed_ast(struct ldlm_lock *lock, int rc, const char *ast_type) @@ -673,43 +699,69 @@ static int ldlm_cb_interpret(const struct lu_env *env, ENTRY; LASSERT(lock != NULL); - if (rc != 0) { - rc = ldlm_handle_ast_error(lock, req, rc, - arg->type == LDLM_BL_CALLBACK - ? "blocking" : "completion"); - if (rc == -ERESTART) - cfs_atomic_inc(&arg->restart); - } + + switch (arg->type) { + case LDLM_GL_CALLBACK: + /* Update the LVB from disk if the AST failed + * (this is a legal race) + * + * - Glimpse callback of local lock just returns + * -ELDLM_NO_LOCK_DATA. + * - Glimpse callback of remote lock might return + * -ELDLM_NO_LOCK_DATA when inode is cleared. LU-274 + */ + if (rc == -ELDLM_NO_LOCK_DATA) { + LDLM_DEBUG(lock, "lost race - client has a lock but no " + "inode"); + ldlm_res_lvbo_update(lock->l_resource, NULL, 1); + } else if (rc != 0) { + rc = ldlm_handle_ast_error(lock, req, rc, "glimpse"); + } else { + rc = ldlm_res_lvbo_update(lock->l_resource, req, 1); + } + break; + case LDLM_BL_CALLBACK: + if (rc != 0) + rc = ldlm_handle_ast_error(lock, req, rc, "blocking"); + break; + case LDLM_CP_CALLBACK: + if (rc != 0) + rc = ldlm_handle_ast_error(lock, req, rc, "completion"); + break; + default: + LDLM_ERROR(lock, "invalid opcode for lock callback %d", + arg->type); + LBUG(); + } + + /* release extra reference taken in ldlm_ast_fini() */ LDLM_LOCK_RELEASE(lock); - if (cfs_atomic_dec_return(&arg->rpcs) < arg->threshold) - cfs_waitq_signal(&arg->waitq); + if (rc == -ERESTART) + cfs_atomic_inc(&arg->restart); - ldlm_csa_put(arg); RETURN(0); } -static inline int ldlm_bl_and_cp_ast_tail(struct ptlrpc_request *req, - struct ldlm_cb_set_arg *arg, - struct ldlm_lock *lock, - int instant_cancel) +static inline int ldlm_ast_fini(struct ptlrpc_request *req, + struct ldlm_cb_set_arg *arg, + struct ldlm_lock *lock, + int instant_cancel) { - int rc = 0; - ENTRY; + int rc = 0; + ENTRY; - if (unlikely(instant_cancel)) { - rc = ptl_send_rpc(req, 1); - ptlrpc_req_finished(req); - if (rc == 0) - cfs_atomic_inc(&arg->restart); - } else { - LDLM_LOCK_GET(lock); - cfs_atomic_inc(&arg->rpcs); - cfs_atomic_inc(&arg->refcount); - ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); - } + if (unlikely(instant_cancel)) { + rc = ptl_send_rpc(req, 1); + ptlrpc_req_finished(req); + if (rc == 0) + cfs_atomic_inc(&arg->restart); + } else { + LDLM_LOCK_GET(lock); + ptlrpc_set_add_req(arg->set, req); + } - RETURN(rc); + RETURN(rc); } /** @@ -766,10 +818,8 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, LASSERT(lock); LASSERT(data != NULL); - if (lock->l_export->exp_obd->obd_recovering != 0) { + if (lock->l_export->exp_obd->obd_recovering != 0) LDLM_ERROR(lock, "BUG 6063: lock collide during recovery"); - ldlm_lock_dump(D_ERROR, lock, 0); - } ldlm_lock_reorder_req(lock); @@ -787,22 +837,23 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, req->rq_interpret_reply = ldlm_cb_interpret; req->rq_no_resend = 1; - lock_res(lock->l_resource); - if (lock->l_granted_mode != lock->l_req_mode) { - /* this blocking AST will be communicated as part of the - * completion AST instead */ - unlock_res(lock->l_resource); - ptlrpc_req_finished(req); - LDLM_DEBUG(lock, "lock not granted, not sending blocking AST"); - RETURN(0); - } + lock_res_and_lock(lock); + if (lock->l_granted_mode != lock->l_req_mode) { + /* this blocking AST will be communicated as part of the + * completion AST instead */ + unlock_res_and_lock(lock); - if (lock->l_destroyed) { - /* What's the point? */ - unlock_res(lock->l_resource); - ptlrpc_req_finished(req); - RETURN(0); - } + ptlrpc_req_finished(req); + LDLM_DEBUG(lock, "lock not granted, not sending blocking AST"); + RETURN(0); + } + + if (lock->l_destroyed) { + /* What's the point? */ + unlock_res_and_lock(lock); + ptlrpc_req_finished(req); + RETURN(0); + } if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) instant_cancel = 1; @@ -815,14 +866,14 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, LDLM_DEBUG(lock, "server preparing blocking AST"); ptlrpc_request_set_replen(req); - if (instant_cancel) { - unlock_res(lock->l_resource); - ldlm_lock_cancel(lock); - } else { - LASSERT(lock->l_granted_mode == lock->l_req_mode); - ldlm_add_waiting_lock(lock); - unlock_res(lock->l_resource); - } + if (instant_cancel) { + unlock_res_and_lock(lock); + ldlm_lock_cancel(lock); + } else { + LASSERT(lock->l_granted_mode == lock->l_req_mode); + ldlm_add_waiting_lock(lock); + unlock_res_and_lock(lock); + } req->rq_send_state = LUSTRE_IMP_FULL; /* ptlrpc_request_alloc_pack already set timeout */ @@ -834,7 +885,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats, LDLM_BL_CALLBACK - LDLM_FIRST_OPC); - rc = ldlm_bl_and_cp_ast_tail(req, arg, lock, instant_cancel); + rc = ldlm_ast_fini(req, arg, lock, instant_cancel); RETURN(rc); } @@ -949,17 +1000,18 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats, LDLM_CP_CALLBACK - LDLM_FIRST_OPC); - rc = ldlm_bl_and_cp_ast_tail(req, arg, lock, instant_cancel); + rc = ldlm_ast_fini(req, arg, lock, instant_cancel); RETURN(rc); } int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data) { - struct ldlm_resource *res = lock->l_resource; - struct ldlm_request *body; - struct ptlrpc_request *req; - int rc; + struct ldlm_cb_set_arg *arg = data; + struct ldlm_request *body; + struct ptlrpc_request *req; + struct ldlm_cb_async_args *ca; + int rc; ENTRY; LASSERT(lock != NULL); @@ -975,44 +1027,44 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data) body->lock_handle[0] = lock->l_remote_handle; ldlm_lock2desc(lock, &body->lock_desc); + CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args)); + ca = ptlrpc_req_async_args(req); + ca->ca_set_arg = arg; + ca->ca_lock = lock; + /* server namespace, doesn't need lock */ req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lock->l_resource->lr_lvb_len); - res = lock->l_resource; ptlrpc_request_set_replen(req); - req->rq_send_state = LUSTRE_IMP_FULL; /* ptlrpc_request_alloc_pack already set timeout */ if (AT_OFF) req->rq_timeout = ldlm_get_rq_timeout(); + req->rq_interpret_reply = ldlm_cb_interpret; + if (lock->l_export && lock->l_export->exp_nid_stats && lock->l_export->exp_nid_stats->nid_ldlm_stats) lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats, LDLM_GL_CALLBACK - LDLM_FIRST_OPC); - rc = ptlrpc_queue_wait(req); - /* Update the LVB from disk if the AST failed (this is a legal race) - * - * - Glimpse callback of local lock just return -ELDLM_NO_LOCK_DATA. - * - Glimpse callback of remote lock might return -ELDLM_NO_LOCK_DATA - * when inode is cleared. LU-274 - */ - if (rc == -ELDLM_NO_LOCK_DATA) { - LDLM_DEBUG(lock, "lost race - client has a lock but no inode"); - ldlm_res_lvbo_update(res, NULL, 1); - } else if (rc != 0) { - rc = ldlm_handle_ast_error(lock, req, rc, "glimpse"); - } else { - rc = ldlm_res_lvbo_update(res, req, 1); - } + rc = ldlm_ast_fini(req, arg, lock, 0); - ptlrpc_req_finished(req); - if (rc == -ERESTART) - ldlm_reprocess_all(res); + RETURN(rc); +} - RETURN(rc); +int ldlm_glimpse_locks(struct ldlm_resource *res, cfs_list_t *gl_work_list) +{ + int rc; + ENTRY; + + rc = ldlm_run_ast_work(ldlm_res_to_ns(res), gl_work_list, + LDLM_WORK_GL_AST); + if (rc == -ERESTART) + ldlm_reprocess_all(res); + + RETURN(rc); } static void ldlm_svc_get_eopc(const struct ldlm_request *dlm_req, @@ -1073,9 +1125,8 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, LASSERT(req->rq_export); - if (req->rq_rqbd->rqbd_service->srv_stats) - ldlm_svc_get_eopc(dlm_req, - req->rq_rqbd->rqbd_service->srv_stats); + if (ptlrpc_req2svc(req)->srv_stats != NULL) + ldlm_svc_get_eopc(dlm_req, ptlrpc_req2svc(req)->srv_stats); if (req->rq_export && req->rq_export->exp_nid_stats && req->rq_export->exp_nid_stats->nid_ldlm_stats) @@ -1535,6 +1586,7 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, struct ldlm_request *dlm_req, struct ldlm_lock *lock) { + int lvb_len; CFS_LIST_HEAD(ast_list); ENTRY; @@ -1551,6 +1603,33 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, } } + lvb_len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT); + if (lvb_len > 0) { + if (lock->l_lvb_len > 0) { + /* for extent lock, lvb contains ost_lvb{}. */ + LASSERT(lock->l_lvb_data != NULL); + LASSERTF(lock->l_lvb_len == lvb_len, + "preallocated %d, actual %d.\n", + lock->l_lvb_len, lvb_len); + } else { /* for layout lock, lvb has variable length */ + void *lvb_data; + + OBD_ALLOC(lvb_data, lvb_len); + if (lvb_data == NULL) + LDLM_ERROR(lock, "no memory.\n"); + + lock_res_and_lock(lock); + if (lvb_data == NULL) { + lock->l_flags |= LDLM_FL_FAILED; + } else { + LASSERT(lock->l_lvb_data == NULL); + lock->l_lvb_data = lvb_data; + lock->l_lvb_len = lvb_len; + } + unlock_res_and_lock(lock); + } + } + lock_res_and_lock(lock); if (lock->l_destroyed || lock->l_granted_mode == lock->l_req_mode) { @@ -1818,7 +1897,8 @@ static int ldlm_handle_setinfo(struct ptlrpc_request *req) if (KEY_IS(KEY_HSM_COPYTOOL_SEND)) /* Pass it on to mdc (the "export" in this case) */ - rc = obd_set_info_async(req->rq_export, + rc = obd_set_info_async(req->rq_svc_thread->t_env, + req->rq_export, sizeof(KEY_HSM_COPYTOOL_SEND), KEY_HSM_COPYTOOL_SEND, vallen, val, NULL); @@ -1916,7 +1996,7 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE); if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET)) RETURN(0); - rc = llog_origin_handle_create(req); + rc = llog_origin_handle_open(req); ldlm_callback_reply(req, rc); RETURN(0); case LLOG_ORIGIN_HANDLE_NEXT_BLOCK: @@ -2183,7 +2263,8 @@ static int ldlm_cancel_hpreq_check(struct ptlrpc_request *req) static struct ptlrpc_hpreq_ops ldlm_cancel_hpreq_ops = { .hpreq_lock_match = ldlm_cancel_hpreq_lock_match, - .hpreq_check = ldlm_cancel_hpreq_check + .hpreq_check = ldlm_cancel_hpreq_check, + .hpreq_fini = NULL, }; static int ldlm_hpreq_handler(struct ptlrpc_request *req) @@ -2232,10 +2313,13 @@ int ldlm_revoke_lock_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd, LASSERT(!lock->l_blocking_lock); lock->l_flags |= LDLM_FL_AST_SENT; - if (lock->l_export && lock->l_export->exp_lock_hash && - !cfs_hlist_unhashed(&lock->l_exp_hash)) - cfs_hash_del(lock->l_export->exp_lock_hash, - &lock->l_remote_handle, &lock->l_exp_hash); + if (lock->l_export && lock->l_export->exp_lock_hash) { + /* NB: it's safe to call cfs_hash_del() even lock isn't + * in exp_lock_hash. */ + cfs_hash_del(lock->l_export->exp_lock_hash, + &lock->l_remote_handle, &lock->l_exp_hash); + } + cfs_list_add_tail(&lock->l_rk_ast, rpc_list); LDLM_LOCK_GET(lock); @@ -2527,16 +2611,17 @@ void ldlm_destroy_export(struct obd_export *exp) ENTRY; cfs_hash_putref(exp->exp_lock_hash); exp->exp_lock_hash = NULL; + + ldlm_destroy_flock_export(exp); EXIT; } EXPORT_SYMBOL(ldlm_destroy_export); static int ldlm_setup(void) { - struct ldlm_bl_pool *blp; + static struct ptlrpc_service_conf conf; + struct ldlm_bl_pool *blp = NULL; int rc = 0; - int ldlm_min_threads = LDLM_THREADS_AUTO_MIN; - int ldlm_max_threads = LDLM_THREADS_AUTO_MAX; #ifdef __KERNEL__ int i; #endif @@ -2552,56 +2637,94 @@ static int ldlm_setup(void) #ifdef LPROCFS rc = ldlm_proc_setup(); if (rc != 0) - GOTO(out_free, rc); -#endif - -#ifdef __KERNEL__ - if (ldlm_num_threads) { - /* If ldlm_num_threads is set, it is the min and the max. */ - if (ldlm_num_threads > LDLM_THREADS_AUTO_MAX) - ldlm_num_threads = LDLM_THREADS_AUTO_MAX; - if (ldlm_num_threads < LDLM_THREADS_AUTO_MIN) - ldlm_num_threads = LDLM_THREADS_AUTO_MIN; - ldlm_min_threads = ldlm_max_threads = ldlm_num_threads; - } + GOTO(out, rc); #endif - ldlm_state->ldlm_cb_service = - ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE, - LDLM_MAXREPSIZE, LDLM_CB_REQUEST_PORTAL, - LDLM_CB_REPLY_PORTAL, 2, - ldlm_callback_handler, "ldlm_cbd", - ldlm_svc_proc_dir, NULL, - ldlm_min_threads, ldlm_max_threads, - "ldlm_cb", - LCT_MD_THREAD|LCT_DT_THREAD, NULL); - - if (!ldlm_state->ldlm_cb_service) { - CERROR("failed to start service\n"); - GOTO(out_proc, rc = -ENOMEM); - } + memset(&conf, 0, sizeof(conf)); + conf = (typeof(conf)) { + .psc_name = "ldlm_cbd", + .psc_watchdog_factor = 2, + .psc_buf = { + .bc_nbufs = LDLM_NBUFS, + .bc_buf_size = LDLM_BUFSIZE, + .bc_req_max_size = LDLM_MAXREQSIZE, + .bc_rep_max_size = LDLM_MAXREPSIZE, + .bc_req_portal = LDLM_CB_REQUEST_PORTAL, + .bc_rep_portal = LDLM_CB_REPLY_PORTAL, + }, + .psc_thr = { + .tc_thr_name = "ldlm_cb", + .tc_thr_factor = LDLM_THR_FACTOR, + .tc_nthrs_init = LDLM_NTHRS_INIT, + .tc_nthrs_base = LDLM_NTHRS_BASE, + .tc_nthrs_max = LDLM_NTHRS_MAX, + .tc_nthrs_user = ldlm_num_threads, + .tc_cpu_affinity = 1, + .tc_ctx_tags = LCT_MD_THREAD | LCT_DT_THREAD, + }, + .psc_cpt = { + .cc_pattern = ldlm_cpts, + }, + .psc_ops = { + .so_req_handler = ldlm_callback_handler, + }, + }; + ldlm_state->ldlm_cb_service = \ + ptlrpc_register_service(&conf, ldlm_svc_proc_dir); + if (IS_ERR(ldlm_state->ldlm_cb_service)) { + CERROR("failed to start service\n"); + rc = PTR_ERR(ldlm_state->ldlm_cb_service); + ldlm_state->ldlm_cb_service = NULL; + GOTO(out, rc); + } #ifdef HAVE_SERVER_SUPPORT - ldlm_state->ldlm_cancel_service = - ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE, - LDLM_MAXREPSIZE, LDLM_CANCEL_REQUEST_PORTAL, - LDLM_CANCEL_REPLY_PORTAL, 6, - ldlm_cancel_handler, "ldlm_canceld", - ldlm_svc_proc_dir, NULL, - ldlm_min_threads, ldlm_max_threads, - "ldlm_cn", - LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD, - ldlm_hpreq_handler); - - if (!ldlm_state->ldlm_cancel_service) { - CERROR("failed to start service\n"); - GOTO(out_proc, rc = -ENOMEM); - } + memset(&conf, 0, sizeof(conf)); + conf = (typeof(conf)) { + .psc_name = "ldlm_canceld", + .psc_watchdog_factor = 6, + .psc_buf = { + .bc_nbufs = LDLM_NBUFS, + .bc_buf_size = LDLM_BUFSIZE, + .bc_req_max_size = LDLM_MAXREQSIZE, + .bc_rep_max_size = LDLM_MAXREPSIZE, + .bc_req_portal = LDLM_CANCEL_REQUEST_PORTAL, + .bc_rep_portal = LDLM_CANCEL_REPLY_PORTAL, + + }, + .psc_thr = { + .tc_thr_name = "ldlm_cn", + .tc_thr_factor = LDLM_THR_FACTOR, + .tc_nthrs_init = LDLM_NTHRS_INIT, + .tc_nthrs_base = LDLM_NTHRS_BASE, + .tc_nthrs_max = LDLM_NTHRS_MAX, + .tc_nthrs_user = ldlm_num_threads, + .tc_cpu_affinity = 1, + .tc_ctx_tags = LCT_MD_THREAD | \ + LCT_DT_THREAD | \ + LCT_CL_THREAD, + }, + .psc_cpt = { + .cc_pattern = ldlm_cpts, + }, + .psc_ops = { + .so_req_handler = ldlm_cancel_handler, + .so_hpreq_handler = ldlm_hpreq_handler, + }, + }; + ldlm_state->ldlm_cancel_service = \ + ptlrpc_register_service(&conf, ldlm_svc_proc_dir); + if (IS_ERR(ldlm_state->ldlm_cancel_service)) { + CERROR("failed to start service\n"); + rc = PTR_ERR(ldlm_state->ldlm_cancel_service); + ldlm_state->ldlm_cancel_service = NULL; + GOTO(out, rc); + } #endif - OBD_ALLOC(blp, sizeof(*blp)); - if (blp == NULL) - GOTO(out_proc, rc = -ENOMEM); + OBD_ALLOC(blp, sizeof(*blp)); + if (blp == NULL) + GOTO(out, rc = -ENOMEM); ldlm_state->ldlm_bl_pool = blp; cfs_spin_lock_init(&blp->blp_lock); @@ -2610,26 +2733,24 @@ static int ldlm_setup(void) cfs_waitq_init(&blp->blp_waitq); cfs_atomic_set(&blp->blp_num_threads, 0); cfs_atomic_set(&blp->blp_busy_threads, 0); - blp->blp_min_threads = ldlm_min_threads; - blp->blp_max_threads = ldlm_max_threads; #ifdef __KERNEL__ - for (i = 0; i < blp->blp_min_threads; i++) { - rc = ldlm_bl_thread_start(blp); - if (rc < 0) - GOTO(out_thread, rc); - } + if (ldlm_num_threads == 0) { + blp->blp_min_threads = LDLM_NTHRS_INIT; + blp->blp_max_threads = LDLM_NTHRS_MAX; + } else { + blp->blp_min_threads = blp->blp_max_threads = \ + min_t(int, LDLM_NTHRS_MAX, max_t(int, LDLM_NTHRS_INIT, + ldlm_num_threads)); + } + + for (i = 0; i < blp->blp_min_threads; i++) { + rc = ldlm_bl_thread_start(blp); + if (rc < 0) + GOTO(out, rc); + } # ifdef HAVE_SERVER_SUPPORT - rc = ptlrpc_start_threads(ldlm_state->ldlm_cancel_service); - if (rc) - GOTO(out_thread, rc); -# endif - - rc = ptlrpc_start_threads(ldlm_state->ldlm_cb_service); - if (rc) - GOTO(out_thread, rc); - CFS_INIT_LIST_HEAD(&expired_lock_thread.elt_expired_locks); expired_lock_thread.elt_state = ELT_STOPPED; cfs_waitq_init(&expired_lock_thread.elt_waitq); @@ -2639,45 +2760,30 @@ static int ldlm_setup(void) cfs_timer_init(&waiting_locks_timer, waiting_locks_callback, 0); rc = cfs_create_thread(expired_lock_main, NULL, CFS_DAEMON_FLAGS); - if (rc < 0) { - CERROR("Cannot start ldlm expired-lock thread: %d\n", rc); - GOTO(out_thread, rc); - } + if (rc < 0) { + CERROR("Cannot start ldlm expired-lock thread: %d\n", rc); + GOTO(out, rc); + } cfs_wait_event(expired_lock_thread.elt_waitq, expired_lock_thread.elt_state == ELT_READY); -#endif - -#ifdef __KERNEL__ - rc = ldlm_pools_init(); - if (rc) - GOTO(out_thread, rc); -#endif - RETURN(0); +# endif /* HAVE_SERVER_SUPPORT */ -#ifdef __KERNEL__ - out_thread: -# ifdef HAVE_SERVER_SUPPORT - ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service); -# endif - ptlrpc_unregister_service(ldlm_state->ldlm_cb_service); + rc = ldlm_pools_init(); + if (rc) { + CERROR("Failed to initialize LDLM pools: %d\n", rc); + GOTO(out, rc); + } #endif + RETURN(0); - out_proc: -#ifdef LPROCFS - ldlm_proc_cleanup(); - out_free: -#endif - OBD_FREE(ldlm_state, sizeof(*ldlm_state)); - ldlm_state = NULL; - return rc; + out: + ldlm_cleanup(); + RETURN(rc); } static int ldlm_cleanup(void) { -#ifdef __KERNEL__ - struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool; -#endif ENTRY; if (!cfs_list_empty(ldlm_namespace_list(LDLM_NAMESPACE_SERVER)) || @@ -2690,37 +2796,44 @@ static int ldlm_cleanup(void) #ifdef __KERNEL__ ldlm_pools_fini(); -#endif -#ifdef __KERNEL__ - while (cfs_atomic_read(&blp->blp_num_threads) > 0) { - struct ldlm_bl_work_item blwi = { .blwi_ns = NULL }; + if (ldlm_state->ldlm_bl_pool != NULL) { + struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool; - cfs_init_completion(&blp->blp_comp); + while (cfs_atomic_read(&blp->blp_num_threads) > 0) { + struct ldlm_bl_work_item blwi = { .blwi_ns = NULL }; - cfs_spin_lock(&blp->blp_lock); - cfs_list_add_tail(&blwi.blwi_entry, &blp->blp_list); - cfs_waitq_signal(&blp->blp_waitq); - cfs_spin_unlock(&blp->blp_lock); + cfs_init_completion(&blp->blp_comp); - cfs_wait_for_completion(&blp->blp_comp); - } - OBD_FREE(blp, sizeof(*blp)); + cfs_spin_lock(&blp->blp_lock); + cfs_list_add_tail(&blwi.blwi_entry, &blp->blp_list); + cfs_waitq_signal(&blp->blp_waitq); + cfs_spin_unlock(&blp->blp_lock); - ptlrpc_unregister_service(ldlm_state->ldlm_cb_service); + cfs_wait_for_completion(&blp->blp_comp); + } + + OBD_FREE(blp, sizeof(*blp)); + } +#endif /* __KERNEL__ */ + + if (ldlm_state->ldlm_cb_service != NULL) + ptlrpc_unregister_service(ldlm_state->ldlm_cb_service); # ifdef HAVE_SERVER_SUPPORT - ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service); + if (ldlm_state->ldlm_cancel_service != NULL) + ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service); # endif - ldlm_proc_cleanup(); - expired_lock_thread.elt_state = ELT_TERMINATE; - cfs_waitq_signal(&expired_lock_thread.elt_waitq); - cfs_wait_event(expired_lock_thread.elt_waitq, - expired_lock_thread.elt_state == ELT_STOPPED); -#else /* !__KERNEL__ */ - ptlrpc_unregister_service(ldlm_state->ldlm_cb_service); +#ifdef __KERNEL__ + ldlm_proc_cleanup(); + # ifdef HAVE_SERVER_SUPPORT - ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service); + if (expired_lock_thread.elt_state != ELT_STOPPED) { + expired_lock_thread.elt_state = ELT_TERMINATE; + cfs_waitq_signal(&expired_lock_thread.elt_waitq); + cfs_wait_event(expired_lock_thread.elt_waitq, + expired_lock_thread.elt_state == ELT_STOPPED); + } # endif #endif /* __KERNEL__ */ @@ -2805,7 +2918,6 @@ EXPORT_SYMBOL(ldlm_lock_decref); EXPORT_SYMBOL(ldlm_lock_decref_and_cancel); EXPORT_SYMBOL(ldlm_lock_change_resource); EXPORT_SYMBOL(ldlm_it2str); -EXPORT_SYMBOL(ldlm_lock_dump); EXPORT_SYMBOL(ldlm_lock_dump_handle); EXPORT_SYMBOL(ldlm_reprocess_all_ns); EXPORT_SYMBOL(ldlm_lock_allow_match_locked); @@ -2843,6 +2955,7 @@ EXPORT_SYMBOL(ldlm_cli_cancel_list); EXPORT_SYMBOL(ldlm_server_blocking_ast); EXPORT_SYMBOL(ldlm_server_completion_ast); EXPORT_SYMBOL(ldlm_server_glimpse_ast); +EXPORT_SYMBOL(ldlm_glimpse_locks); EXPORT_SYMBOL(ldlm_handle_enqueue); EXPORT_SYMBOL(ldlm_handle_enqueue0); EXPORT_SYMBOL(ldlm_handle_cancel);