X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fldlm%2Fldlm_lockd.c;h=980d22512e42ddd868eb447378551578f67a71d0;hp=6edef77ea264841983485e0b2ef12759109e71f0;hb=3cce65712d94cffe8f1626545845b95b88aef672;hpb=2a520282888d4fd1b7e3b791959a265cd9b8b9bf diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 6edef77..980d2251 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -49,6 +49,11 @@ static int ldlm_num_threads; module_param(ldlm_num_threads, int, 0444); MODULE_PARM_DESC(ldlm_num_threads, "number of DLM service threads to start"); +static unsigned int ldlm_cpu_bind = 1; +module_param(ldlm_cpu_bind, uint, 0444); +MODULE_PARM_DESC(ldlm_cpu_bind, + "bind DLM service threads to particular CPU partitions"); + static char *ldlm_cpts; module_param(ldlm_cpts, charp, 0444); MODULE_PARM_DESC(ldlm_cpts, "CPU partitions ldlm threads should run on"); @@ -64,7 +69,8 @@ struct kset *ldlm_svc_kset; static struct ldlm_state *ldlm_state; -/* timeout for initial callback (AST) reply (bz10399) +/* + * timeout for initial callback (AST) reply (bz10399) * Due to having to send a 32 bit time value over the * wire return it as time_t instead of time64_t */ @@ -77,27 +83,27 @@ static inline time_t ldlm_get_rq_timeout(void) } struct ldlm_bl_pool { - spinlock_t blp_lock; + spinlock_t blp_lock; /* * blp_prio_list is used for callbacks that should be handled * as a priority. It is used for LDLM_FL_DISCARD_DATA requests. - * see bug 13843 + * see b=13843 */ - struct list_head blp_prio_list; + struct list_head blp_prio_list; /* * blp_list is used for all other callbacks which are likely * to take longer to process. */ - struct list_head blp_list; - - wait_queue_head_t blp_waitq; - struct completion blp_comp; - atomic_t blp_num_threads; - atomic_t blp_busy_threads; - int blp_min_threads; - int blp_max_threads; + struct list_head blp_list; + + wait_queue_head_t blp_waitq; + struct completion blp_comp; + atomic_t blp_num_threads; + atomic_t blp_busy_threads; + int blp_min_threads; + int blp_max_threads; }; struct ldlm_bl_work_item { @@ -214,16 +220,20 @@ static int expired_lock_main(void *arg) lock->l_export); lock->l_export = NULL; LDLM_ERROR(lock, "free export"); - /* release extra ref grabbed by + /* + * release extra ref grabbed by * ldlm_add_waiting_lock() or - * ldlm_failed_ast() */ + * ldlm_failed_ast() + */ LDLM_LOCK_RELEASE(lock); continue; } if (ldlm_is_destroyed(lock)) { - /* release the lock refcount where - * waiting_locks_callback() founds */ + /* + * release the lock refcount where + * waiting_locks_callback() founds + */ LDLM_LOCK_RELEASE(lock); continue; } @@ -254,8 +264,10 @@ static int expired_lock_main(void *arg) class_fail_export(export); } class_export_lock_put(export, lock); - /* release extra ref grabbed by ldlm_add_waiting_lock() - * or ldlm_failed_ast() */ + /* + * release extra ref grabbed by ldlm_add_waiting_lock() + * or ldlm_failed_ast() + */ LDLM_LOCK_RELEASE(lock); spin_lock_bh(&waiting_locks_spinlock); @@ -284,6 +296,7 @@ static int ldlm_lock_busy(struct ldlm_lock *lock) { struct ptlrpc_request *req; int match = 0; + ENTRY; if (lock->l_export == NULL) @@ -305,8 +318,8 @@ static int ldlm_lock_busy(struct ldlm_lock *lock) /* This is called from within a timer interrupt and cannot schedule */ static void waiting_locks_callback(TIMER_DATA_TYPE unused) { - struct ldlm_lock *lock; - int need_dump = 0; + struct ldlm_lock *lock; + int need_dump = 0; spin_lock_bh(&waiting_locks_spinlock); while (!list_empty(&waiting_locks_list)) { @@ -316,9 +329,11 @@ static void waiting_locks_callback(TIMER_DATA_TYPE unused) lock->l_req_mode == LCK_GROUP) break; - /* no needs to take an extra ref on the lock since it was in - * the waiting_locks_list and ldlm_add_waiting_lock() - * already grabbed a ref */ + /* + * no needs to take an extra ref on the lock since it was in + * the waiting_locks_list and ldlm_add_waiting_lock() + * already grabbed a ref + */ list_del(&lock->l_pending_chain); list_add(&lock->l_pending_chain, &expired_lock_list); need_dump = 1; @@ -364,15 +379,15 @@ static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, time64_t seconds) time64_t timeout; if (!list_empty(&lock->l_pending_chain)) - return 0; + return 0; - if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT) || - OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT)) - seconds = 1; + if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT) || + OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT)) + seconds = 1; timeout = ktime_get_seconds() + seconds; if (likely(timeout > lock->l_callback_timeout)) - lock->l_callback_timeout = timeout; + lock->l_callback_timeout = timeout; timeout_jiffies = cfs_time_seconds(lock->l_callback_timeout); @@ -380,7 +395,8 @@ static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, time64_t seconds) !timer_pending(&waiting_locks_timer)) mod_timer(&waiting_locks_timer, timeout_jiffies); - /* if the new lock has a shorter timeout than something earlier on + /* + * if the new lock has a shorter timeout than something earlier on * the list, we'll wait the longer amount of time; no big deal. */ /* FIFO */ @@ -392,7 +408,7 @@ static void ldlm_add_blocked_lock(struct ldlm_lock *lock) { spin_lock_bh(&lock->l_export->exp_bl_list_lock); if (list_empty(&lock->l_exp_list)) { - if (lock->l_granted_mode != lock->l_req_mode) + if (!ldlm_is_granted(lock)) list_add_tail(&lock->l_exp_list, &lock->l_export->exp_bl_list); else @@ -401,11 +417,13 @@ static void ldlm_add_blocked_lock(struct ldlm_lock *lock) } spin_unlock_bh(&lock->l_export->exp_bl_list_lock); - /* A blocked lock is added. Adjust the position in + /* + * A blocked lock is added. Adjust the position in * the stale list if the export is in the list. * If export is stale and not in the list - it is being * processed and will be placed on the right position - * on obd_stale_export_put(). */ + * on obd_stale_export_put(). + */ if (!list_empty(&lock->l_export->exp_stale_list)) obd_stale_export_adjust(lock->l_export); } @@ -418,8 +436,10 @@ static int ldlm_add_waiting_lock(struct ldlm_lock *lock, time64_t timeout) LASSERT(ldlm_is_res_locked(lock)); LASSERT(!ldlm_is_cancel_on_block(lock)); - /* Do not put cross-MDT lock in the waiting list, since we - * will not evict it due to timeout for now */ + /* + * Do not put cross-MDT lock in the waiting list, since we + * will not evict it due to timeout for now + */ if (lock->l_export != NULL && (exp_connect_flags(lock->l_export) & OBD_CONNECT_MDS_MDS)) return 0; @@ -434,7 +454,7 @@ static int ldlm_add_waiting_lock(struct ldlm_lock *lock, time64_t timeout) static time64_t next; spin_unlock_bh(&waiting_locks_spinlock); - LDLM_ERROR(lock, "not waiting on destroyed lock (bug 5653)"); + LDLM_ERROR(lock, "not waiting on destroyed lock (b=5653)"); if (ktime_get_seconds() > next) { next = ktime_get_seconds() + 14400; libcfs_debug_dumpstack(NULL); @@ -446,8 +466,10 @@ static int ldlm_add_waiting_lock(struct ldlm_lock *lock, time64_t timeout) lock->l_blast_sent = ktime_get_real_seconds(); ret = __ldlm_add_waiting_lock(lock, timeout); if (ret) { - /* grab ref on the lock if it has been added to the - * waiting list */ + /* + * grab ref on the lock if it has been added to the + * waiting list + */ LDLM_LOCK_GET(lock); } spin_unlock_bh(&waiting_locks_spinlock); @@ -475,37 +497,37 @@ static int __ldlm_del_waiting_lock(struct ldlm_lock *lock) struct list_head *list_next; if (list_empty(&lock->l_pending_chain)) - return 0; + return 0; - list_next = lock->l_pending_chain.next; - if (lock->l_pending_chain.prev == &waiting_locks_list) { - /* Removing the head of the list, adjust timer. */ - if (list_next == &waiting_locks_list) { - /* No more, just cancel. */ + list_next = lock->l_pending_chain.next; + if (lock->l_pending_chain.prev == &waiting_locks_list) { + /* Removing the head of the list, adjust timer. */ + if (list_next == &waiting_locks_list) { + /* No more, just cancel. */ del_timer(&waiting_locks_timer); - } else { - struct ldlm_lock *next; + } else { + struct ldlm_lock *next; next = list_entry(list_next, struct ldlm_lock, l_pending_chain); mod_timer(&waiting_locks_timer, cfs_time_seconds(next->l_callback_timeout)); - } - } + } + } list_del_init(&lock->l_pending_chain); - return 1; + return 1; } int ldlm_del_waiting_lock(struct ldlm_lock *lock) { - int ret; + int ret; - if (lock->l_export == NULL) { - /* We don't have a "waiting locks list" on clients. */ - CDEBUG(D_DLMTRACE, "Client lock %p : no-op\n", lock); - return 0; - } + if (lock->l_export == NULL) { + /* We don't have a "waiting locks list" on clients. */ + CDEBUG(D_DLMTRACE, "Client lock %p : no-op\n", lock); + return 0; + } spin_lock_bh(&waiting_locks_spinlock); ret = __ldlm_del_waiting_lock(lock); @@ -517,14 +539,16 @@ int ldlm_del_waiting_lock(struct ldlm_lock *lock) list_del_init(&lock->l_exp_list); spin_unlock_bh(&lock->l_export->exp_bl_list_lock); - if (ret) { - /* release lock ref if it has indeed been removed - * from a list */ - LDLM_LOCK_RELEASE(lock); - } + if (ret) { + /* + * release lock ref if it has indeed been removed + * from a list + */ + LDLM_LOCK_RELEASE(lock); + } - LDLM_DEBUG(lock, "%s", ret == 0 ? "wasn't waiting" : "removed"); - return ret; + LDLM_DEBUG(lock, "%s", ret == 0 ? "wasn't waiting" : "removed"); + return ret; } /** @@ -554,8 +578,10 @@ int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, time64_t timeout) return 0; } - /* we remove/add the lock to the waiting list, so no needs to - * release/take a lock reference */ + /* + * we remove/add the lock to the waiting list, so no needs to + * release/take a lock reference + */ __ldlm_del_waiting_lock(lock); __ldlm_add_waiting_lock(lock, timeout); spin_unlock_bh(&waiting_locks_spinlock); @@ -569,12 +595,12 @@ EXPORT_SYMBOL(ldlm_refresh_waiting_lock); int ldlm_del_waiting_lock(struct ldlm_lock *lock) { - RETURN(0); + RETURN(0); } int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, time64_t timeout) { - RETURN(0); + RETURN(0); } #endif /* !HAVE_SERVER_SUPPORT */ @@ -597,10 +623,12 @@ time64_t ldlm_bl_timeout(struct ldlm_lock *lock) if (AT_OFF) return obd_timeout / 2; - /* Since these are non-updating timeouts, we should be conservative. + /* + * Since these are non-updating timeouts, we should be conservative. * Take more than usually, 150% * It would be nice to have some kind of "early reply" mechanism for - * lock callbacks too... */ + * lock callbacks too... + */ timeout = at_get(&lock->l_export->exp_bl_lock_at); return max(timeout + (timeout >> 1), (time64_t)ldlm_enqueue_min); } @@ -610,19 +638,21 @@ EXPORT_SYMBOL(ldlm_bl_timeout); * Perform lock cleanup if AST sending failed. */ static void ldlm_failed_ast(struct ldlm_lock *lock, int rc, - const char *ast_type) + const char *ast_type) { - LCONSOLE_ERROR_MSG(0x138, "%s: A client on nid %s was evicted due " - "to a lock %s callback time out: rc %d\n", - lock->l_export->exp_obd->obd_name, - obd_export_nid2str(lock->l_export), ast_type, rc); + LCONSOLE_ERROR_MSG(0x138, + "%s: A client on nid %s was evicted due to a lock %s callback time out: rc %d\n", + lock->l_export->exp_obd->obd_name, + obd_export_nid2str(lock->l_export), ast_type, rc); - if (obd_dump_on_timeout) - libcfs_debug_dumplog(); + if (obd_dump_on_timeout) + libcfs_debug_dumplog(); spin_lock_bh(&waiting_locks_spinlock); if (__ldlm_del_waiting_lock(lock) == 0) - /* the lock was not in any list, grab an extra ref before adding - * the lock to the expired list */ + /* + * the lock was not in any list, grab an extra ref before adding + * the lock to the expired list + */ LDLM_LOCK_GET(lock); lock->l_callback_timeout = 0; /* differentiate it from expired locks */ list_add(&lock->l_pending_chain, &expired_lock_list); @@ -633,8 +663,7 @@ static void ldlm_failed_ast(struct ldlm_lock *lock, int rc, /** * Perform lock cleanup if AST reply came with error. */ -static int ldlm_handle_ast_error(const struct lu_env *env, - struct ldlm_lock *lock, +static int ldlm_handle_ast_error(struct ldlm_lock *lock, struct ptlrpc_request *req, int rc, const char *ast_type) { @@ -651,7 +680,8 @@ static int ldlm_handle_ast_error(const struct lu_env *env, } else if (rc == -ENODEV || rc == -ESHUTDOWN || (rc == -EIO && req->rq_import->imp_state == LUSTRE_IMP_CLOSED)) { - /* Upon umount process the AST fails because cannot be + /* + * Upon umount process the AST fails because cannot be * sent. This shouldn't lead to the client eviction. * -ENODEV error is returned by ptl_send_rpc() for * new request in such import. @@ -661,10 +691,10 @@ static int ldlm_handle_ast_error(const struct lu_env *env, * in ptlrpc_import_delay_req() as well with -EIO code. * In all such cases errors are ignored. */ - LDLM_DEBUG(lock, "%s AST can't be sent due to a server" - " %s failure or umount process: rc = %d\n", - ast_type, - req->rq_import->imp_obd->obd_name, rc); + LDLM_DEBUG(lock, + "%s AST can't be sent due to a server %s failure or umount process: rc = %d\n", + ast_type, + req->rq_import->imp_obd->obd_name, rc); } else { LDLM_ERROR(lock, "client (nid %s) %s %s AST (req@%p x%llu status %d rc %d), evict it", @@ -690,10 +720,12 @@ static int ldlm_handle_ast_error(const struct lu_env *env, lustre_msg_get_status(req->rq_repmsg) : -1, ast_type, req, req->rq_xid); if (res) { - /* update lvbo to return proper attributes. - * see bug 23174 */ + /* + * update lvbo to return proper attributes. + * see b=23174 + */ ldlm_resource_getref(res); - ldlm_lvbo_update(env, res, lock, NULL, 1); + ldlm_lvbo_update(res, lock, NULL, 1); ldlm_resource_putref(res); } ldlm_lock_cancel(lock); @@ -704,18 +736,20 @@ static int ldlm_handle_ast_error(const struct lu_env *env, } static int ldlm_cb_interpret(const struct lu_env *env, - struct ptlrpc_request *req, void *data, int rc) + struct ptlrpc_request *req, void *args, int rc) { - struct ldlm_cb_async_args *ca = data; - struct ldlm_lock *lock = ca->ca_lock; - struct ldlm_cb_set_arg *arg = ca->ca_set_arg; - ENTRY; + struct ldlm_cb_async_args *ca = args; + struct ldlm_lock *lock = ca->ca_lock; + struct ldlm_cb_set_arg *arg = ca->ca_set_arg; + + ENTRY; - LASSERT(lock != NULL); + LASSERT(lock != NULL); switch (arg->type) { case LDLM_GL_CALLBACK: - /* Update the LVB from disk if the AST failed + /* + * Update the LVB from disk if the AST failed * (this is a legal race) * * - Glimpse callback of local lock just returns @@ -724,28 +758,25 @@ static int ldlm_cb_interpret(const struct lu_env *env, * -ELDLM_NO_LOCK_DATA when inode is cleared. LU-274 */ if (unlikely(arg->gl_interpret_reply)) { - rc = arg->gl_interpret_reply(env, req, data, rc); + rc = arg->gl_interpret_reply(NULL, req, args, rc); } else if (rc == -ELDLM_NO_LOCK_DATA) { - LDLM_DEBUG(lock, "lost race - client has a lock but no " - "inode"); - ldlm_lvbo_update(env, lock->l_resource, lock, NULL, 1); + LDLM_DEBUG(lock, + "lost race - client has a lock but no inode"); + ldlm_lvbo_update(lock->l_resource, lock, NULL, 1); } else if (rc != 0) { - rc = ldlm_handle_ast_error(env, lock, req, - rc, "glimpse"); + rc = ldlm_handle_ast_error(lock, req, rc, "glimpse"); } else { - rc = ldlm_lvbo_update(env, lock->l_resource, + rc = ldlm_lvbo_update(lock->l_resource, lock, req, 1); } break; case LDLM_BL_CALLBACK: if (rc != 0) - rc = ldlm_handle_ast_error(env, lock, req, - rc, "blocking"); + rc = ldlm_handle_ast_error(lock, req, rc, "blocking"); break; case LDLM_CP_CALLBACK: if (rc != 0) - rc = ldlm_handle_ast_error(env, lock, req, - rc, "completion"); + rc = ldlm_handle_ast_error(lock, req, rc, "completion"); break; default: LDLM_ERROR(lock, "invalid opcode for lock callback %d", @@ -754,7 +785,7 @@ static int ldlm_cb_interpret(const struct lu_env *env, } /* release extra reference taken in ldlm_ast_fini() */ - LDLM_LOCK_RELEASE(lock); + LDLM_LOCK_RELEASE(lock); if (rc == -ERESTART) atomic_inc(&arg->restart); @@ -776,6 +807,7 @@ static inline int ldlm_ast_fini(struct ptlrpc_request *req, int instant_cancel) { int rc = 0; + ENTRY; if (unlikely(instant_cancel)) { @@ -798,6 +830,7 @@ static inline int ldlm_ast_fini(struct ptlrpc_request *req, static void ldlm_lock_reorder_req(struct ldlm_lock *lock) { struct ptlrpc_request *req; + ENTRY; if (lock->l_export == NULL) { @@ -808,7 +841,8 @@ static void ldlm_lock_reorder_req(struct ldlm_lock *lock) spin_lock(&lock->l_export->exp_rpc_lock); list_for_each_entry(req, &lock->l_export->exp_hp_rpcs, rq_exp_list) { - /* Do not process requests that were not yet added to there + /* + * Do not process requests that were not yet added to there * incoming queue or were already removed from there for * processing. We evaluate ptlrpc_nrs_req_can_move() without * holding svcpt->scp_req_lock, and then redo the check with @@ -831,32 +865,33 @@ static void ldlm_lock_reorder_req(struct ldlm_lock *lock) * to wait for client response. */ int ldlm_server_blocking_ast(struct ldlm_lock *lock, - struct ldlm_lock_desc *desc, - void *data, int flag) -{ - struct ldlm_cb_async_args *ca; - struct ldlm_cb_set_arg *arg = data; - struct ldlm_request *body; - struct ptlrpc_request *req; - int instant_cancel = 0; - int rc = 0; - ENTRY; + struct ldlm_lock_desc *desc, + void *data, int flag) +{ + struct ldlm_cb_async_args *ca; + struct ldlm_cb_set_arg *arg = data; + struct ldlm_request *body; + struct ptlrpc_request *req; + int instant_cancel = 0; + int rc = 0; + + ENTRY; - if (flag == LDLM_CB_CANCELING) - /* Don't need to do anything here. */ - RETURN(0); + if (flag == LDLM_CB_CANCELING) + /* Don't need to do anything here. */ + RETURN(0); if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_SRV_BL_AST)) { LDLM_DEBUG(lock, "dropping BL AST"); RETURN(0); } - LASSERT(lock); - LASSERT(data != NULL); - if (lock->l_export->exp_obd->obd_recovering != 0) - LDLM_ERROR(lock, "BUG 6063: lock collide during recovery"); + LASSERT(lock); + LASSERT(data != NULL); + if (lock->l_export->exp_obd->obd_recovering != 0) + LDLM_ERROR(lock, "BUG 6063: lock collide during recovery"); - ldlm_lock_reorder_req(lock); + ldlm_lock_reorder_req(lock); req = ptlrpc_request_alloc_pack(lock->l_export->exp_imp_reverse, &RQF_LDLM_BL_CALLBACK, @@ -879,9 +914,11 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, RETURN(0); } - if (lock->l_granted_mode != lock->l_req_mode) { - /* this blocking AST will be communicated as part of the - * completion AST instead */ + if (!ldlm_is_granted(lock)) { + /* + * this blocking AST will be communicated as part of the + * completion AST instead + */ ldlm_add_blocked_lock(lock); ldlm_set_waited(lock); unlock_res_and_lock(lock); @@ -892,16 +929,16 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, } if (ldlm_is_cancel_on_block(lock)) - instant_cancel = 1; + instant_cancel = 1; - body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ); - body->lock_handle[0] = lock->l_remote_handle; - body->lock_desc = *desc; + body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ); + body->lock_handle[0] = lock->l_remote_handle; + body->lock_desc = *desc; body->lock_flags |= ldlm_flags_to_wire(lock->l_flags & LDLM_FL_AST_MASK); - LDLM_DEBUG(lock, "server preparing blocking AST"); + LDLM_DEBUG(lock, "server preparing blocking AST"); - ptlrpc_request_set_replen(req); + ptlrpc_request_set_replen(req); ldlm_set_cbpending(lock); if (instant_cancel) { unlock_res_and_lock(lock); @@ -909,7 +946,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, req->rq_no_resend = 1; } else { - LASSERT(lock->l_granted_mode == lock->l_req_mode); + LASSERT(ldlm_is_granted(lock)); ldlm_add_waiting_lock(lock, ldlm_bl_timeout(lock)); unlock_res_and_lock(lock); @@ -918,19 +955,19 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, req->rq_resend_cb = ldlm_update_resend; } - req->rq_send_state = LUSTRE_IMP_FULL; - /* ptlrpc_request_alloc_pack already set timeout */ - if (AT_OFF) - req->rq_timeout = ldlm_get_rq_timeout(); + req->rq_send_state = LUSTRE_IMP_FULL; + /* ptlrpc_request_alloc_pack already set timeout */ + if (AT_OFF) + req->rq_timeout = ldlm_get_rq_timeout(); - if (lock->l_export && lock->l_export->exp_nid_stats && - lock->l_export->exp_nid_stats->nid_ldlm_stats) - lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats, - LDLM_BL_CALLBACK - LDLM_FIRST_OPC); + if (lock->l_export && lock->l_export->exp_nid_stats && + lock->l_export->exp_nid_stats->nid_ldlm_stats) + lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats, + LDLM_BL_CALLBACK - LDLM_FIRST_OPC); rc = ldlm_ast_fini(req, arg, lock, instant_cancel); - RETURN(rc); + RETURN(rc); } /** @@ -942,34 +979,37 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, */ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data) { - struct ldlm_cb_set_arg *arg = data; - struct ldlm_request *body; - struct ptlrpc_request *req; - struct ldlm_cb_async_args *ca; - int instant_cancel = 0; - int rc = 0; - int lvb_len; - ENTRY; + struct ldlm_cb_set_arg *arg = data; + struct ldlm_request *body; + struct ptlrpc_request *req; + struct ldlm_cb_async_args *ca; + int instant_cancel = 0; + int rc = 0; + int lvb_len; - LASSERT(lock != NULL); - LASSERT(data != NULL); + ENTRY; + + LASSERT(lock != NULL); + LASSERT(data != NULL); if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_SRV_CP_AST)) { LDLM_DEBUG(lock, "dropping CP AST"); RETURN(0); } - req = ptlrpc_request_alloc(lock->l_export->exp_imp_reverse, - &RQF_LDLM_CP_CALLBACK); - if (req == NULL) - RETURN(-ENOMEM); + req = ptlrpc_request_alloc(lock->l_export->exp_imp_reverse, + &RQF_LDLM_CP_CALLBACK); + if (req == NULL) + RETURN(-ENOMEM); /* server namespace, doesn't need lock */ lvb_len = ldlm_lvbo_size(lock); - /* LU-3124 & LU-2187: to not return layout in completion AST because + /* + * LU-3124 & LU-2187: to not return layout in completion AST because * it may deadlock for LU-2187, or client may not have enough space * for large layout. The layout will be returned to client with an - * extra RPC to fetch xattr.lov */ + * extra RPC to fetch xattr.lov + */ if (ldlm_has_layout(lock)) lvb_len = 0; @@ -990,22 +1030,19 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data) body->lock_handle[0] = lock->l_remote_handle; body->lock_flags = ldlm_flags_to_wire(flags); - ldlm_lock2desc(lock, &body->lock_desc); + ldlm_lock2desc(lock, &body->lock_desc); if (lvb_len > 0) { void *lvb = req_capsule_client_get(&req->rq_pill, &RMF_DLM_LVB); - const struct lu_env *env = NULL; - - if (req->rq_svc_thread) - env = req->rq_svc_thread->t_env; - - lvb_len = ldlm_lvbo_fill(env, lock, lvb, lvb_len); + lvb_len = ldlm_lvbo_fill(lock, lvb, &lvb_len); if (lvb_len < 0) { - /* We still need to send the RPC to wake up the blocked + /* + * We still need to send the RPC to wake up the blocked * enqueue thread on the client. * * Consider old client, there is no better way to notify * the failure, just zero-sized the LVB, then the client - * will fail out as "-EPROTO". */ + * will fail out as "-EPROTO". + */ req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, 0, RCL_CLIENT); instant_cancel = 1; @@ -1013,31 +1050,33 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data) req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, lvb_len, RCL_CLIENT); } - } + } LDLM_DEBUG(lock, "server preparing completion AST"); - ptlrpc_request_set_replen(req); + ptlrpc_request_set_replen(req); - req->rq_send_state = LUSTRE_IMP_FULL; - /* ptlrpc_request_pack already set timeout */ - if (AT_OFF) - req->rq_timeout = ldlm_get_rq_timeout(); + req->rq_send_state = LUSTRE_IMP_FULL; + /* ptlrpc_request_pack already set timeout */ + if (AT_OFF) + req->rq_timeout = ldlm_get_rq_timeout(); - /* We only send real blocking ASTs after the lock is granted */ - lock_res_and_lock(lock); + /* We only send real blocking ASTs after the lock is granted */ + lock_res_and_lock(lock); if (ldlm_is_ast_sent(lock)) { body->lock_flags |= ldlm_flags_to_wire(LDLM_FL_AST_SENT); /* Copy AST flags like LDLM_FL_DISCARD_DATA. */ body->lock_flags |= ldlm_flags_to_wire(lock->l_flags & LDLM_FL_AST_MASK); - /* We might get here prior to ldlm_handle_enqueue setting - * LDLM_FL_CANCEL_ON_BLOCK flag. Then we will put this lock - * into waiting list, but this is safe and similar code in - * ldlm_handle_enqueue will call ldlm_lock_cancel() still, - * that would not only cancel the lock, but will also remove - * it from waiting list */ + /* + * We might get here prior to ldlm_handle_enqueue setting + * LDLM_FL_CANCEL_ON_BLOCK flag. Then we will put this lock + * into waiting list, but this is safe and similar code in + * ldlm_handle_enqueue will call ldlm_lock_cancel() still, + * that would not only cancel the lock, but will also remove + * it from waiting list + */ if (ldlm_is_cancel_on_block(lock)) { unlock_res_and_lock(lock); ldlm_lock_cancel(lock); @@ -1053,13 +1092,13 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data) req->rq_delay_limit = ldlm_bl_timeout(lock); req->rq_resend_cb = ldlm_update_resend; } - } - unlock_res_and_lock(lock); + } + unlock_res_and_lock(lock); - if (lock->l_export && lock->l_export->exp_nid_stats && - lock->l_export->exp_nid_stats->nid_ldlm_stats) - lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats, - LDLM_CP_CALLBACK - LDLM_FIRST_OPC); + if (lock->l_export && lock->l_export->exp_nid_stats && + lock->l_export->exp_nid_stats->nid_ldlm_stats) + lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats, + LDLM_CP_CALLBACK - LDLM_FIRST_OPC); rc = ldlm_ast_fini(req, arg, lock, instant_cancel); @@ -1074,15 +1113,16 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data) */ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data) { - struct ldlm_cb_set_arg *arg = data; - struct ldlm_request *body; - struct ptlrpc_request *req; - struct ldlm_cb_async_args *ca; - int rc; - struct req_format *req_fmt; - ENTRY; + struct ldlm_cb_set_arg *arg = data; + struct ldlm_request *body; + struct ptlrpc_request *req; + struct ldlm_cb_async_args *ca; + int rc; + struct req_format *req_fmt; + + ENTRY; - LASSERT(lock != NULL); + LASSERT(lock != NULL); if (arg->gl_desc != NULL) /* There is a glimpse descriptor to pack */ @@ -1090,16 +1130,17 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data) else req_fmt = &RQF_LDLM_GL_CALLBACK; - req = ptlrpc_request_alloc_pack(lock->l_export->exp_imp_reverse, + req = ptlrpc_request_alloc_pack(lock->l_export->exp_imp_reverse, req_fmt, LUSTRE_DLM_VERSION, LDLM_GL_CALLBACK); - if (req == NULL) - RETURN(-ENOMEM); + if (req == NULL) + RETURN(-ENOMEM); if (arg->gl_desc != NULL) { /* copy the GL descriptor */ union ldlm_gl_desc *desc; + desc = req_capsule_client_get(&req->rq_pill, &RMF_DLM_GL_DESC); *desc = *arg->gl_desc; } @@ -1113,22 +1154,24 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data) ca->ca_set_arg = arg; ca->ca_lock = lock; - /* server namespace, doesn't need lock */ - req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, - ldlm_lvbo_size(lock)); - ptlrpc_request_set_replen(req); + /* server namespace, doesn't need lock */ + req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, + ldlm_lvbo_size(lock)); + ptlrpc_request_set_replen(req); - req->rq_send_state = LUSTRE_IMP_FULL; - /* ptlrpc_request_alloc_pack already set timeout */ - if (AT_OFF) - req->rq_timeout = ldlm_get_rq_timeout(); + req->rq_send_state = LUSTRE_IMP_FULL; + /* ptlrpc_request_alloc_pack already set timeout */ + if (AT_OFF) + req->rq_timeout = ldlm_get_rq_timeout(); req->rq_interpret_reply = ldlm_cb_interpret; - if (lock->l_export && lock->l_export->exp_nid_stats && - lock->l_export->exp_nid_stats->nid_ldlm_stats) - lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats, - LDLM_GL_CALLBACK - LDLM_FIRST_OPC); + if (lock->l_export && lock->l_export->exp_nid_stats) { + struct nid_stat *nid_stats = lock->l_export->exp_nid_stats; + + lprocfs_counter_incr(nid_stats->nid_ldlm_stats, + LDLM_GL_CALLBACK - LDLM_FIRST_OPC); + } rc = ldlm_ast_fini(req, arg, lock, 0); @@ -1139,13 +1182,14 @@ EXPORT_SYMBOL(ldlm_server_glimpse_ast); int ldlm_glimpse_locks(struct ldlm_resource *res, struct list_head *gl_work_list) { - int rc; + int rc; + ENTRY; rc = ldlm_run_ast_work(ldlm_res_to_ns(res), gl_work_list, LDLM_WORK_GL_AST); if (rc == -ERESTART) - ldlm_reprocess_all(res); + ldlm_reprocess_all(res, NULL); RETURN(rc); } @@ -1154,8 +1198,9 @@ EXPORT_SYMBOL(ldlm_glimpse_locks); /* return LDLM lock associated with a lock callback request */ struct ldlm_lock *ldlm_request_lock(struct ptlrpc_request *req) { - struct ldlm_cb_async_args *ca; - struct ldlm_lock *lock; + struct ldlm_cb_async_args *ca; + struct ldlm_lock *lock; + ENTRY; ca = ptlrpc_req_async_args(req); @@ -1184,6 +1229,7 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, int rc = 0; struct ldlm_resource *res = NULL; const struct lu_env *env = req->rq_svc_thread->t_env; + ENTRY; LDLM_DEBUG_NOLOCK("server-side enqueue handler START"); @@ -1198,46 +1244,48 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, !(dlm_req->lock_flags & LDLM_FL_HAS_INTENT)) ldlm_svc_get_eopc(dlm_req, ptlrpc_req2svc(req)->srv_stats); - if (req->rq_export && req->rq_export->exp_nid_stats && - req->rq_export->exp_nid_stats->nid_ldlm_stats) - lprocfs_counter_incr(req->rq_export->exp_nid_stats->nid_ldlm_stats, - LDLM_ENQUEUE - LDLM_FIRST_OPC); - - if (unlikely(dlm_req->lock_desc.l_resource.lr_type < LDLM_MIN_TYPE || - dlm_req->lock_desc.l_resource.lr_type >= LDLM_MAX_TYPE)) { - DEBUG_REQ(D_ERROR, req, "invalid lock request type %d", - dlm_req->lock_desc.l_resource.lr_type); - GOTO(out, rc = -EFAULT); - } - - if (unlikely(dlm_req->lock_desc.l_req_mode <= LCK_MINMODE || - dlm_req->lock_desc.l_req_mode >= LCK_MAXMODE || - dlm_req->lock_desc.l_req_mode & - (dlm_req->lock_desc.l_req_mode-1))) { - DEBUG_REQ(D_ERROR, req, "invalid lock request mode %d", - dlm_req->lock_desc.l_req_mode); - GOTO(out, rc = -EFAULT); - } + if (req->rq_export && req->rq_export->exp_nid_stats && + req->rq_export->exp_nid_stats->nid_ldlm_stats) + lprocfs_counter_incr(req->rq_export->exp_nid_stats->nid_ldlm_stats, + LDLM_ENQUEUE - LDLM_FIRST_OPC); + + if (unlikely(dlm_req->lock_desc.l_resource.lr_type < LDLM_MIN_TYPE || + dlm_req->lock_desc.l_resource.lr_type >= LDLM_MAX_TYPE)) { + DEBUG_REQ(D_ERROR, req, "invalid lock request type %d", + dlm_req->lock_desc.l_resource.lr_type); + GOTO(out, rc = -EFAULT); + } + + if (unlikely(dlm_req->lock_desc.l_req_mode <= LCK_MINMODE || + dlm_req->lock_desc.l_req_mode >= LCK_MAXMODE || + dlm_req->lock_desc.l_req_mode & + (dlm_req->lock_desc.l_req_mode-1))) { + DEBUG_REQ(D_ERROR, req, "invalid lock request mode %d", + dlm_req->lock_desc.l_req_mode); + GOTO(out, rc = -EFAULT); + } if (unlikely((flags & LDLM_FL_REPLAY) || (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))) { - /* Find an existing lock in the per-export lock hash */ - /* In the function below, .hs_keycmp resolves to - * ldlm_export_lock_keycmp() */ + /* Find an existing lock in the per-export lock hash */ + /* + * In the function below, .hs_keycmp resolves to + * ldlm_export_lock_keycmp() + */ /* coverity[overrun-buffer-val] */ - lock = cfs_hash_lookup(req->rq_export->exp_lock_hash, - (void *)&dlm_req->lock_handle[0]); - if (lock != NULL) { - DEBUG_REQ(D_DLMTRACE, req, "found existing lock cookie %#llx", + lock = cfs_hash_lookup(req->rq_export->exp_lock_hash, + (void *)&dlm_req->lock_handle[0]); + if (lock != NULL) { + DEBUG_REQ(D_DLMTRACE, req, + "found existing lock cookie %#llx", lock->l_handle.h_cookie); flags |= LDLM_FL_RESENT; - GOTO(existing_lock, rc = 0); + GOTO(existing_lock, rc = 0); } } else { if (ldlm_reclaim_full()) { - DEBUG_REQ(D_DLMTRACE, req, "Too many granted locks, " - "reject current enqueue request and let the " - "client retry later.\n"); + DEBUG_REQ(D_DLMTRACE, req, + "Too many granted locks, reject current enqueue request and let the client retry later.\n"); GOTO(out, rc = -EINPROGRESS); } } @@ -1253,10 +1301,11 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, GOTO(out, rc); } - lock->l_remote_handle = dlm_req->lock_handle[0]; - LDLM_DEBUG(lock, "server-side enqueue handler, new lock created"); + lock->l_remote_handle = dlm_req->lock_handle[0]; + LDLM_DEBUG(lock, "server-side enqueue handler, new lock created"); - /* Initialize resource lvb but not for a lock being replayed since + /* + * Initialize resource lvb but not for a lock being replayed since * Client already got lvb sent in this case. * This must occur early since some policy methods assume resource * lvb is available (lr_lvb_data != NULL). @@ -1264,32 +1313,36 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, res = lock->l_resource; if (!(flags & LDLM_FL_REPLAY)) { /* non-replayed lock, delayed lvb init may need to be done */ - rc = ldlm_lvbo_init(env, res); + rc = ldlm_lvbo_init(res); if (rc < 0) { LDLM_DEBUG(lock, "delayed lvb init failed (rc %d)", rc); GOTO(out, rc); } } - OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_BLOCKED, obd_timeout * 2); - /* Don't enqueue a lock onto the export if it is been disonnected - * due to eviction (bug 3822) or server umount (bug 24324). - * Cancel it now instead. */ - if (req->rq_export->exp_disconnected) { - LDLM_ERROR(lock, "lock on disconnected export %p", - req->rq_export); - GOTO(out, rc = -ENOTCONN); - } + OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_BLOCKED, obd_timeout * 2); + /* + * Don't enqueue a lock onto the export if it is been disonnected + * due to eviction (b=3822) or server umount (b=24324). + * Cancel it now instead. + */ + if (req->rq_export->exp_disconnected) { + LDLM_ERROR(lock, "lock on disconnected export %p", + req->rq_export); + GOTO(out, rc = -ENOTCONN); + } - lock->l_export = class_export_lock_get(req->rq_export, lock); - if (lock->l_export->exp_lock_hash) - cfs_hash_add(lock->l_export->exp_lock_hash, - &lock->l_remote_handle, - &lock->l_exp_hash); + lock->l_export = class_export_lock_get(req->rq_export, lock); + if (lock->l_export->exp_lock_hash) + cfs_hash_add(lock->l_export->exp_lock_hash, + &lock->l_remote_handle, + &lock->l_exp_hash); - /* Inherit the enqueue flags before the operation, because we do not + /* + * Inherit the enqueue flags before the operation, because we do not * keep the res lock on return and next operations (BL AST) may proceed - * without them. */ + * without them. + */ lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags & LDLM_FL_INHERIT_MASK); @@ -1301,14 +1354,18 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, lock->l_req_extent = lock->l_policy_data.l_extent; existing_lock: - if (flags & LDLM_FL_HAS_INTENT) { - /* In this case, the reply buffer is allocated deep in - * local_lock_enqueue by the policy function. */ - cookie = req; - } else { - /* based on the assumption that lvb size never changes during - * resource life time otherwise it need resource->lr_lock's - * protection */ + if (flags & LDLM_FL_HAS_INTENT) { + /* + * In this case, the reply buffer is allocated deep in + * local_lock_enqueue by the policy function. + */ + cookie = req; + } else { + /* + * based on the assumption that lvb size never changes during + * resource life time otherwise it need resource->lr_lock's + * protection + */ req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, ldlm_lvbo_size(lock)); @@ -1335,22 +1392,28 @@ existing_lock: if (lock && lock->l_resource->lr_type == LDLM_EXTENT) OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_BL_EVICT, 6); - /* We never send a blocking AST until the lock is granted, but - * we can tell it right now */ - lock_res_and_lock(lock); + /* + * We never send a blocking AST until the lock is granted, but + * we can tell it right now + */ + lock_res_and_lock(lock); - /* Now take into account flags to be inherited from original lock - request both in reply to client and in our own lock flags. */ + /* + * Now take into account flags to be inherited from original lock + * request both in reply to client and in our own lock flags. + */ dlm_rep->lock_flags = ldlm_flags_to_wire(flags); lock->l_flags |= flags & LDLM_FL_INHERIT_MASK; - /* Don't move a pending lock onto the export if it has already been - * disconnected due to eviction (bug 5683) or server umount (bug 24324). - * Cancel it now instead. */ - if (unlikely(req->rq_export->exp_disconnected || - OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_OLD_EXPORT))) { - LDLM_ERROR(lock, "lock on destroyed export %p", req->rq_export); - rc = -ENOTCONN; + /* + * Don't move a pending lock onto the export if it has already been + * disconnected due to eviction (b=5683) or server umount (b=24324). + * Cancel it now instead. + */ + if (unlikely(req->rq_export->exp_disconnected || + OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_OLD_EXPORT))) { + LDLM_ERROR(lock, "lock on destroyed export %p", req->rq_export); + rc = -ENOTCONN; } else if (ldlm_is_ast_sent(lock)) { /* fill lock desc for possible lock convert */ if (lock->l_blocking_lock && @@ -1361,7 +1424,8 @@ existing_lock: LDLM_DEBUG(lock, "save blocking bits %llx in granted lock", bl_lock->l_policy_data.l_inodebits.bits); - /* If lock is blocked then save blocking ibits + /* + * If lock is blocked then save blocking ibits * in returned lock policy for the possible lock * convert on a client. */ @@ -1369,78 +1433,99 @@ existing_lock: bl_lock->l_policy_data.l_inodebits.bits; } dlm_rep->lock_flags |= ldlm_flags_to_wire(LDLM_FL_AST_SENT); - if (lock->l_granted_mode == lock->l_req_mode) { - /* - * Only cancel lock if it was granted, because it would - * be destroyed immediately and would never be granted - * in the future, causing timeouts on client. Not - * granted lock will be cancelled immediately after - * sending completion AST. - */ - if (dlm_rep->lock_flags & LDLM_FL_CANCEL_ON_BLOCK) { - unlock_res_and_lock(lock); - ldlm_lock_cancel(lock); - lock_res_and_lock(lock); + if (ldlm_is_granted(lock)) { + /* + * Only cancel lock if it was granted, because it would + * be destroyed immediately and would never be granted + * in the future, causing timeouts on client. Not + * granted lock will be cancelled immediately after + * sending completion AST. + */ + if (ldlm_is_cancel_on_block(lock)) { + unlock_res_and_lock(lock); + ldlm_lock_cancel(lock); + lock_res_and_lock(lock); } else { ldlm_add_waiting_lock(lock, ldlm_bl_timeout(lock)); } - } - } + } + } unlock_res_and_lock(lock); EXIT; - out: - req->rq_status = rc ?: err; /* return either error - bug 11190 */ - if (!req->rq_packed_final) { - err = lustre_pack_reply(req, 1, NULL, NULL); - if (rc == 0) - rc = err; - } - - /* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this - * ldlm_reprocess_all. If this moves, revisit that code. -phil */ +out: + req->rq_status = rc ?: err; /* return either error - b=11190 */ + if (!req->rq_packed_final) { + err = lustre_pack_reply(req, 1, NULL, NULL); + if (rc == 0) + rc = err; + } + + /* + * The LOCK_CHANGED code in ldlm_lock_enqueue depends on this + * ldlm_reprocess_all. If this moves, revisit that code. -phil + */ if (lock != NULL) { - LDLM_DEBUG(lock, "server-side enqueue handler, sending reply" - "(err=%d, rc=%d)", err, rc); - - if (rc == 0) { - if (req_capsule_has_field(&req->rq_pill, &RMF_DLM_LVB, - RCL_SERVER) && - ldlm_lvbo_size(lock) > 0) { - void *buf; - int buflen; - - buf = req_capsule_server_get(&req->rq_pill, - &RMF_DLM_LVB); - LASSERTF(buf != NULL, "req %p, lock %p\n", - req, lock); - buflen = req_capsule_get_size(&req->rq_pill, - &RMF_DLM_LVB, RCL_SERVER); - /* non-replayed lock, delayed lvb init may - * need to be occur now */ - if ((buflen > 0) && !(flags & LDLM_FL_REPLAY)) { - buflen = ldlm_lvbo_fill(env, lock, buf, - buflen); - if (buflen >= 0) - req_capsule_shrink( + LDLM_DEBUG(lock, + "server-side enqueue handler, sending reply (err=%d, rc=%d)", + err, rc); + + if (rc == 0 && + req_capsule_has_field(&req->rq_pill, &RMF_DLM_LVB, + RCL_SERVER) && + ldlm_lvbo_size(lock) > 0) { + void *buf; + int buflen; + +retry: + buf = req_capsule_server_get(&req->rq_pill, + &RMF_DLM_LVB); + LASSERTF(buf != NULL, "req %p, lock %p\n", req, lock); + buflen = req_capsule_get_size(&req->rq_pill, + &RMF_DLM_LVB, RCL_SERVER); + /* + * non-replayed lock, delayed lvb init may + * need to be occur now + */ + if ((buflen > 0) && !(flags & LDLM_FL_REPLAY)) { + int rc2; + + rc2 = ldlm_lvbo_fill(lock, buf, &buflen); + if (rc2 >= 0) { + req_capsule_shrink(&req->rq_pill, + &RMF_DLM_LVB, + rc2, RCL_SERVER); + } else if (rc2 == -ERANGE) { + rc2 = req_capsule_server_grow( &req->rq_pill, - &RMF_DLM_LVB, - buflen, RCL_SERVER); - else - rc = buflen; - } else if (flags & LDLM_FL_REPLAY) { - /* no LVB resend upon replay */ - if (buflen > 0) + &RMF_DLM_LVB, buflen); + if (!rc2) { + goto retry; + } else { + /* + * if we can't grow the buffer, + * it's ok to return empty lvb + * to client. + */ req_capsule_shrink( &req->rq_pill, - &RMF_DLM_LVB, - 0, RCL_SERVER); - else - rc = buflen; + &RMF_DLM_LVB, 0, + RCL_SERVER); + } } else { - rc = buflen; + rc = rc2; } + } else if (flags & LDLM_FL_REPLAY) { + /* no LVB resend upon replay */ + if (buflen > 0) + req_capsule_shrink(&req->rq_pill, + &RMF_DLM_LVB, + 0, RCL_SERVER); + else + rc = buflen; + } else { + rc = buflen; } } @@ -1458,7 +1543,7 @@ existing_lock: if (!err && !ldlm_is_cbpending(lock) && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK) - ldlm_reprocess_all(lock->l_resource); + ldlm_reprocess_all(lock->l_resource, lock); LDLM_LOCK_RELEASE(lock); } @@ -1466,7 +1551,33 @@ existing_lock: LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p, rc %d)", lock, rc); - return rc; + return rc; +} + +/* + * Clear the blocking lock, the race is possible between ldlm_handle_convert0() + * and ldlm_work_bl_ast_lock(), so this is done under lock with check for NULL. + */ +void ldlm_clear_blocking_lock(struct ldlm_lock *lock) +{ + if (lock->l_blocking_lock) { + LDLM_LOCK_RELEASE(lock->l_blocking_lock); + lock->l_blocking_lock = NULL; + } +} + +/* A lock can be converted to new ibits or mode and should be considered + * as new lock. Clear all states related to a previous blocking AST + * processing so new conflicts will cause new blocking ASTs. + * + * This is used during lock convert below and lock downgrade to COS mode in + * ldlm_lock_mode_downgrade(). + */ +void ldlm_clear_blocking_data(struct ldlm_lock *lock) +{ + ldlm_clear_ast_sent(lock); + lock->l_bl_ast_run = 0; + ldlm_clear_blocking_lock(lock); } /** @@ -1510,8 +1621,10 @@ int ldlm_handle_convert0(struct ptlrpc_request *req, LDLM_ERROR(lock, "lock mode differs!"); rc = ELDLM_NO_LOCK_DATA; } else if (bits == new) { - /* This can be valid situation if CONVERT RPCs are - * re-ordered. Just finish silently */ + /* + * This can be valid situation if CONVERT RPCs are + * re-ordered. Just finish silently + */ LDLM_DEBUG(lock, "lock is converted already!"); rc = ELDLM_OK; } else { @@ -1522,23 +1635,11 @@ int ldlm_handle_convert0(struct ptlrpc_request *req, ldlm_clear_cbpending(lock); lock->l_policy_data.l_inodebits.cancel_bits = 0; ldlm_inodebits_drop(lock, bits & ~new); - /* if lock is in a bl_ast list, remove it from the list - * here before reprocessing. - */ - if (!list_empty(&lock->l_bl_ast)) { - ldlm_discard_bl_lock(lock); - } else { - /* in this case lock was taken from bl_ast list - * already by ldlm_work_bl_ast_lock() and lock - * must clear only some remaining states. - */ - ldlm_clear_ast_sent(lock); - lock->l_bl_ast_run = 0; - ldlm_clear_blocking_lock(lock); - } + + ldlm_clear_blocking_data(lock); unlock_res_and_lock(lock); - ldlm_reprocess_all(lock->l_resource); + ldlm_reprocess_all(lock->l_resource, NULL); rc = ELDLM_OK; } @@ -1572,54 +1673,57 @@ int ldlm_request_cancel(struct ptlrpc_request *req, const struct ldlm_request *dlm_req, int first, enum lustre_at_flags flags) { - const struct lu_env *env = req->rq_svc_thread->t_env; - struct ldlm_resource *res, *pres = NULL; - struct ldlm_lock *lock; - int i, count, done = 0; - ENTRY; + struct ldlm_resource *res, *pres = NULL; + struct ldlm_lock *lock; + int i, count, done = 0; + + ENTRY; - count = dlm_req->lock_count ? dlm_req->lock_count : 1; - if (first >= count) - RETURN(0); + count = dlm_req->lock_count ? dlm_req->lock_count : 1; + if (first >= count) + RETURN(0); if (count == 1 && dlm_req->lock_handle[0].cookie == 0) RETURN(0); - /* There is no lock on the server at the replay time, - * skip lock cancelling to make replay tests to pass. */ - if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) - RETURN(0); + /* + * There is no lock on the server at the replay time, + * skip lock cancelling to make replay tests to pass. + */ + if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) + RETURN(0); - LDLM_DEBUG_NOLOCK("server-side cancel handler START: %d locks, " - "starting at %d", count, first); + LDLM_DEBUG_NOLOCK("server-side cancel handler START: %d locks, starting at %d", + count, first); - for (i = first; i < count; i++) { - lock = ldlm_handle2lock(&dlm_req->lock_handle[i]); - if (!lock) { - LDLM_DEBUG_NOLOCK("server-side cancel handler stale " - "lock (cookie %llu)", - dlm_req->lock_handle[i].cookie); - continue; - } + for (i = first; i < count; i++) { + lock = ldlm_handle2lock(&dlm_req->lock_handle[i]); + if (!lock) { + LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock (cookie %llu)", + dlm_req->lock_handle[i].cookie); + continue; + } - res = lock->l_resource; - done++; + res = lock->l_resource; + done++; - /* This code is an optimization to only attempt lock + /* + * This code is an optimization to only attempt lock * granting on the resource (that could be CPU-expensive) - * after we are done cancelling lock in that resource. */ - if (res != pres) { - if (pres != NULL) { - ldlm_reprocess_all(pres); - LDLM_RESOURCE_DELREF(pres); - ldlm_resource_putref(pres); - } + * after we are done cancelling lock in that resource. + */ + if (res != pres) { + if (pres != NULL) { + ldlm_reprocess_all(pres, NULL); + LDLM_RESOURCE_DELREF(pres); + ldlm_resource_putref(pres); + } if (res != NULL) { ldlm_resource_getref(res); LDLM_RESOURCE_ADDREF(res); if (!ldlm_is_discard_data(lock)) - ldlm_lvbo_update(env, res, lock, + ldlm_lvbo_update(res, lock, NULL, 1); } pres = res; @@ -1629,20 +1733,21 @@ int ldlm_request_cancel(struct ptlrpc_request *req, lock->l_blast_sent != 0) { time64_t delay = ktime_get_real_seconds() - lock->l_blast_sent; - LDLM_DEBUG(lock, "server cancels blocked lock after %llds", + LDLM_DEBUG(lock, + "server cancels blocked lock after %llds", (s64)delay); at_measured(&lock->l_export->exp_bl_lock_at, delay); } - ldlm_lock_cancel(lock); - LDLM_LOCK_PUT(lock); - } - if (pres != NULL) { - ldlm_reprocess_all(pres); - LDLM_RESOURCE_DELREF(pres); - ldlm_resource_putref(pres); - } - LDLM_DEBUG_NOLOCK("server-side cancel handler END"); - RETURN(done); + ldlm_lock_cancel(lock); + LDLM_LOCK_PUT(lock); + } + if (pres != NULL) { + ldlm_reprocess_all(pres, NULL); + LDLM_RESOURCE_DELREF(pres); + ldlm_resource_putref(pres); + } + LDLM_DEBUG_NOLOCK("server-side cancel handler END"); + RETURN(done); } EXPORT_SYMBOL(ldlm_request_cancel); @@ -1653,29 +1758,30 @@ EXPORT_SYMBOL(ldlm_request_cancel); */ int ldlm_handle_cancel(struct ptlrpc_request *req) { - struct ldlm_request *dlm_req; - int rc; - ENTRY; + struct ldlm_request *dlm_req; + int rc; - dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ); - if (dlm_req == NULL) { - CDEBUG(D_INFO, "bad request buffer for cancel\n"); - RETURN(-EFAULT); - } + ENTRY; - if (req->rq_export && req->rq_export->exp_nid_stats && - req->rq_export->exp_nid_stats->nid_ldlm_stats) - lprocfs_counter_incr(req->rq_export->exp_nid_stats->nid_ldlm_stats, - LDLM_CANCEL - LDLM_FIRST_OPC); + dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ); + if (dlm_req == NULL) { + CDEBUG(D_INFO, "bad request buffer for cancel\n"); + RETURN(-EFAULT); + } + + if (req->rq_export && req->rq_export->exp_nid_stats && + req->rq_export->exp_nid_stats->nid_ldlm_stats) + lprocfs_counter_incr(req->rq_export->exp_nid_stats->nid_ldlm_stats, + LDLM_CANCEL - LDLM_FIRST_OPC); - rc = req_capsule_server_pack(&req->rq_pill); - if (rc) - RETURN(rc); + rc = req_capsule_server_pack(&req->rq_pill); + if (rc) + RETURN(rc); if (!ldlm_request_cancel(req, dlm_req, 0, LATF_STATS)) req->rq_status = LUSTRE_ESTALE; - RETURN(ptlrpc_reply(req)); + RETURN(ptlrpc_reply(req)); } #endif /* HAVE_SERVER_SUPPORT */ @@ -1685,27 +1791,36 @@ int ldlm_handle_cancel(struct ptlrpc_request *req) * This can only happen on client side. */ void ldlm_handle_bl_callback(struct ldlm_namespace *ns, - struct ldlm_lock_desc *ld, struct ldlm_lock *lock) + struct ldlm_lock_desc *ld, struct ldlm_lock *lock) { - int do_ast; - ENTRY; + int do_ast; - LDLM_DEBUG(lock, "client blocking AST callback handler"); + ENTRY; - lock_res_and_lock(lock); + LDLM_DEBUG(lock, "client blocking AST callback handler"); + + lock_res_and_lock(lock); /* set bits to cancel for this lock for possible lock convert */ if (ns_is_client(ns) && (lock->l_resource->lr_type == LDLM_IBITS)) { - /* Lock description contains policy of blocking lock, + /* + * Lock description contains policy of blocking lock, * and its cancel_bits is used to pass conflicting bits. * NOTE: ld can be NULL or can be not NULL but zeroed if * passed from ldlm_bl_thread_blwi(), check below used bits * in ld to make sure it is valid description. + * + * If server may replace lock resource keeping the same cookie, + * never use cancel bits from different resource, full cancel + * is to be used. */ - if (ld && ld->l_policy_data.l_inodebits.bits) + if (ld && ld->l_policy_data.l_inodebits.bits && + ldlm_res_eq(&ld->l_resource.lr_name, + &lock->l_resource->lr_name)) lock->l_policy_data.l_inodebits.cancel_bits = ld->l_policy_data.l_inodebits.cancel_bits; - /* if there is no valid ld and lock is cbpending already + /* + * if there is no valid ld and lock is cbpending already * then cancel_bits should be kept, otherwise it is zeroed. */ else if (!ldlm_is_cbpending(lock)) @@ -1713,26 +1828,25 @@ void ldlm_handle_bl_callback(struct ldlm_namespace *ns, } ldlm_set_cbpending(lock); - if (ldlm_is_cancel_on_block(lock)) - ldlm_set_cancel(lock); - - do_ast = (!lock->l_readers && !lock->l_writers); - unlock_res_and_lock(lock); + do_ast = (!lock->l_readers && !lock->l_writers); + unlock_res_and_lock(lock); - if (do_ast) { - CDEBUG(D_DLMTRACE, "Lock %p already unused, calling callback (%p)\n", - lock, lock->l_blocking_ast); - if (lock->l_blocking_ast != NULL) - lock->l_blocking_ast(lock, ld, lock->l_ast_data, - LDLM_CB_BLOCKING); - } else { - CDEBUG(D_DLMTRACE, "Lock %p is referenced, will be cancelled later\n", - lock); - } + if (do_ast) { + CDEBUG(D_DLMTRACE, + "Lock %p already unused, calling callback (%p)\n", + lock, lock->l_blocking_ast); + if (lock->l_blocking_ast != NULL) + lock->l_blocking_ast(lock, ld, lock->l_ast_data, + LDLM_CB_BLOCKING); + } else { + CDEBUG(D_DLMTRACE, + "Lock %p is referenced, will be cancelled later\n", + lock); + } - LDLM_DEBUG(lock, "client blocking callback handler END"); - LDLM_LOCK_RELEASE(lock); - EXIT; + LDLM_DEBUG(lock, "client blocking callback handler END"); + LDLM_LOCK_RELEASE(lock); + EXIT; } /** @@ -1741,13 +1855,14 @@ void ldlm_handle_bl_callback(struct ldlm_namespace *ns, * This only can happen on client side. */ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, - struct ldlm_namespace *ns, - struct ldlm_request *dlm_req, - struct ldlm_lock *lock) + struct ldlm_namespace *ns, + struct ldlm_request *dlm_req, + struct ldlm_lock *lock) { struct list_head ast_list; int lvb_len; int rc = 0; + ENTRY; LDLM_DEBUG(lock, "client completion callback handler START"); @@ -1759,7 +1874,7 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, while (to > 0) { set_current_state(TASK_INTERRUPTIBLE); schedule_timeout(to); - if (lock->l_granted_mode == lock->l_req_mode || + if (ldlm_is_granted(lock) || ldlm_is_destroyed(lock)) break; } @@ -1775,9 +1890,8 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, LASSERT(lock->l_lvb_data != NULL); if (unlikely(lock->l_lvb_len < lvb_len)) { - LDLM_ERROR(lock, "Replied LVB is larger than " - "expectation, expected = %d, " - "replied = %d", + LDLM_ERROR(lock, + "Replied LVB is larger than expectation, expected = %d, replied = %d", lock->l_lvb_len, lvb_len); GOTO(out, rc = -EINVAL); } @@ -1785,16 +1899,33 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, } lock_res_and_lock(lock); + + if (!ldlm_res_eq(&dlm_req->lock_desc.l_resource.lr_name, + &lock->l_resource->lr_name)) { + ldlm_resource_unlink_lock(lock); + unlock_res_and_lock(lock); + rc = ldlm_lock_change_resource(ns, lock, + &dlm_req->lock_desc.l_resource.lr_name); + if (rc < 0) { + LDLM_ERROR(lock, "Failed to allocate resource"); + GOTO(out, rc); + } + LDLM_DEBUG(lock, "completion AST, new resource"); + lock_res_and_lock(lock); + } + if (ldlm_is_destroyed(lock) || - lock->l_granted_mode == lock->l_req_mode) { - /* bug 11300: the lock has already been granted */ + ldlm_is_granted(lock)) { + /* b=11300: the lock has already been granted */ unlock_res_and_lock(lock); LDLM_DEBUG(lock, "Double grant race happened"); GOTO(out, rc = 0); } - /* If we receive the completion AST before the actual enqueue returned, - * then we might need to switch lock modes, resources, or extents. */ + /* + * If we receive the completion AST before the actual enqueue returned, + * then we might need to switch lock modes, resources, or extents. + */ if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) { lock->l_req_mode = dlm_req->lock_desc.l_granted_mode; LDLM_DEBUG(lock, "completion AST, new lock mode"); @@ -1808,29 +1939,17 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, LDLM_DEBUG(lock, "completion AST, new policy data"); } - ldlm_resource_unlink_lock(lock); - if (memcmp(&dlm_req->lock_desc.l_resource.lr_name, - &lock->l_resource->lr_name, - sizeof(lock->l_resource->lr_name)) != 0) { - unlock_res_and_lock(lock); - rc = ldlm_lock_change_resource(ns, lock, - &dlm_req->lock_desc.l_resource.lr_name); - if (rc < 0) { - LDLM_ERROR(lock, "Failed to allocate resource"); - GOTO(out, rc); - } - LDLM_DEBUG(lock, "completion AST, new resource"); - CERROR("change resource!\n"); - lock_res_and_lock(lock); - } - - if (dlm_req->lock_flags & LDLM_FL_AST_SENT) { - /* BL_AST locks are not needed in LRU. - * Let ldlm_cancel_lru() be fast. */ - ldlm_lock_remove_from_lru(lock); + ldlm_resource_unlink_lock(lock); + + if (dlm_req->lock_flags & LDLM_FL_AST_SENT) { + /* + * BL_AST locks are not needed in LRU. + * Let ldlm_cancel_lru() be fast. + */ + ldlm_lock_remove_from_lru(lock); lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST; - LDLM_DEBUG(lock, "completion AST includes blocking AST"); - } + LDLM_DEBUG(lock, "completion AST includes blocking AST"); + } if (lock->l_lvb_len > 0) { rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_CLIENT, @@ -1841,19 +1960,21 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, } } - ldlm_grant_lock(lock, &ast_list); - unlock_res_and_lock(lock); + ldlm_grant_lock(lock, &ast_list); + unlock_res_and_lock(lock); - LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work"); + LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work"); - /* Let Enqueue to call osc_lock_upcall() and initialize - * l_ast_data */ - OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 2); + /* + * Let Enqueue to call osc_lock_upcall() and initialize + * l_ast_data + */ + OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 2); - ldlm_run_ast_work(ns, &ast_list, LDLM_WORK_CP_AST); + ldlm_run_ast_work(ns, &ast_list, LDLM_WORK_CP_AST); - LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)", - lock); + LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)", + lock); GOTO(out, rc); out: @@ -1874,61 +1995,63 @@ out: * long time. */ static void ldlm_handle_gl_callback(struct ptlrpc_request *req, - struct ldlm_namespace *ns, - struct ldlm_request *dlm_req, - struct ldlm_lock *lock) + struct ldlm_namespace *ns, + struct ldlm_request *dlm_req, + struct ldlm_lock *lock) { - int rc = -ENOSYS; - ENTRY; + int rc = -ENOSYS; - LDLM_DEBUG(lock, "client glimpse AST callback handler"); + ENTRY; - if (lock->l_glimpse_ast != NULL) - rc = lock->l_glimpse_ast(lock, req); + LDLM_DEBUG(lock, "client glimpse AST callback handler"); - if (req->rq_repmsg != NULL) { - ptlrpc_reply(req); - } else { - req->rq_status = rc; - ptlrpc_error(req); - } + if (lock->l_glimpse_ast != NULL) + rc = lock->l_glimpse_ast(lock, req); - lock_res_and_lock(lock); - if (lock->l_granted_mode == LCK_PW && - !lock->l_readers && !lock->l_writers && + if (req->rq_repmsg != NULL) { + ptlrpc_reply(req); + } else { + req->rq_status = rc; + ptlrpc_error(req); + } + + lock_res_and_lock(lock); + if (lock->l_granted_mode == LCK_PW && + !lock->l_readers && !lock->l_writers && ktime_after(ktime_get(), ktime_add(lock->l_last_used, ktime_set(ns->ns_dirty_age_limit, 0)))) { - unlock_res_and_lock(lock); - if (ldlm_bl_to_thread_lock(ns, NULL, lock)) - ldlm_handle_bl_callback(ns, NULL, lock); + unlock_res_and_lock(lock); + if (ldlm_bl_to_thread_lock(ns, NULL, lock)) + ldlm_handle_bl_callback(ns, NULL, lock); - EXIT; - return; - } - unlock_res_and_lock(lock); - LDLM_LOCK_RELEASE(lock); - EXIT; + EXIT; + return; + } + unlock_res_and_lock(lock); + LDLM_LOCK_RELEASE(lock); + EXIT; } static int ldlm_callback_reply(struct ptlrpc_request *req, int rc) { - if (req->rq_no_reply) - return 0; + if (req->rq_no_reply) + return 0; - req->rq_status = rc; - if (!req->rq_packed_final) { - rc = lustre_pack_reply(req, 1, NULL, NULL); - if (rc) - return rc; - } - return ptlrpc_reply(req); + req->rq_status = rc; + if (!req->rq_packed_final) { + rc = lustre_pack_reply(req, 1, NULL, NULL); + if (rc) + return rc; + } + return ptlrpc_reply(req); } static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi, enum ldlm_cancel_flags cancel_flags) { struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool; + ENTRY; spin_lock(&blp->blp_lock); @@ -1944,8 +2067,10 @@ static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi, wake_up(&blp->blp_waitq); - /* can not check blwi->blwi_flags as blwi could be already freed in - LCF_ASYNC mode */ + /* + * can not check blwi->blwi_flags as blwi could be already freed in + * LCF_ASYNC mode + */ if (!(cancel_flags & LCF_ASYNC)) wait_for_completion(&blwi->blwi_comp); @@ -1963,19 +2088,19 @@ static inline void init_blwi(struct ldlm_bl_work_item *blwi, INIT_LIST_HEAD(&blwi->blwi_head); if (memory_pressure_get()) - blwi->blwi_mem_pressure = 1; + blwi->blwi_mem_pressure = 1; - blwi->blwi_ns = ns; + blwi->blwi_ns = ns; blwi->blwi_flags = cancel_flags; - if (ld != NULL) - blwi->blwi_ld = *ld; - if (count) { + if (ld != NULL) + blwi->blwi_ld = *ld; + if (count) { list_add(&blwi->blwi_head, cancels); list_del_init(cancels); - blwi->blwi_count = count; - } else { - blwi->blwi_lock = lock; - } + blwi->blwi_count = count; + } else { + blwi->blwi_lock = lock; + } } /** @@ -2008,7 +2133,8 @@ static int ldlm_bl_to_thread(struct ldlm_namespace *ns, RETURN(__ldlm_bl_to_thread(blwi, cancel_flags)); } else { - /* if it is synchronous call do minimum mem alloc, as it could + /* + * if it is synchronous call do minimum mem alloc, as it could * be triggered from kernel shrinker */ struct ldlm_bl_work_item blwi; @@ -2042,90 +2168,94 @@ int ldlm_bl_thread_wakeup(void) /* Setinfo coming from Server (eg MDT) to Client (eg MDC)! */ static int ldlm_handle_setinfo(struct ptlrpc_request *req) { - struct obd_device *obd = req->rq_export->exp_obd; - char *key; - void *val; - int keylen, vallen; - int rc = -ENOSYS; - ENTRY; - - DEBUG_REQ(D_HSM, req, "%s: handle setinfo\n", obd->obd_name); - - req_capsule_set(&req->rq_pill, &RQF_OBD_SET_INFO); - - key = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY); - if (key == NULL) { - DEBUG_REQ(D_IOCTL, req, "no set_info key"); - RETURN(-EFAULT); - } - keylen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_KEY, - RCL_CLIENT); - val = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL); - if (val == NULL) { - DEBUG_REQ(D_IOCTL, req, "no set_info val"); - RETURN(-EFAULT); - } - vallen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_VAL, - RCL_CLIENT); - - /* We are responsible for swabbing contents of val */ - - if (KEY_IS(KEY_HSM_COPYTOOL_SEND)) - /* Pass it on to mdc (the "export" in this case) */ - rc = obd_set_info_async(req->rq_svc_thread->t_env, - req->rq_export, - sizeof(KEY_HSM_COPYTOOL_SEND), - KEY_HSM_COPYTOOL_SEND, - vallen, val, NULL); - else - DEBUG_REQ(D_WARNING, req, "ignoring unknown key %s", key); - - return rc; + struct obd_device *obd = req->rq_export->exp_obd; + char *key; + void *val; + int keylen, vallen; + int rc = -ENOSYS; + + ENTRY; + + DEBUG_REQ(D_HSM, req, "%s: handle setinfo\n", obd->obd_name); + + req_capsule_set(&req->rq_pill, &RQF_OBD_SET_INFO); + + key = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY); + if (key == NULL) { + DEBUG_REQ(D_IOCTL, req, "no set_info key"); + RETURN(-EFAULT); + } + keylen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_KEY, + RCL_CLIENT); + val = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL); + if (val == NULL) { + DEBUG_REQ(D_IOCTL, req, "no set_info val"); + RETURN(-EFAULT); + } + vallen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_VAL, + RCL_CLIENT); + + /* We are responsible for swabbing contents of val */ + + if (KEY_IS(KEY_HSM_COPYTOOL_SEND)) + /* Pass it on to mdc (the "export" in this case) */ + rc = obd_set_info_async(req->rq_svc_thread->t_env, + req->rq_export, + sizeof(KEY_HSM_COPYTOOL_SEND), + KEY_HSM_COPYTOOL_SEND, + vallen, val, NULL); + else + DEBUG_REQ(D_WARNING, req, "ignoring unknown key %s", key); + + return rc; } static inline void ldlm_callback_errmsg(struct ptlrpc_request *req, const char *msg, int rc, const struct lustre_handle *handle) { - DEBUG_REQ((req->rq_no_reply || rc) ? D_WARNING : D_DLMTRACE, req, + DEBUG_REQ((req->rq_no_reply || rc) ? D_WARNING : D_DLMTRACE, req, "%s: [nid %s] [rc %d] [lock %#llx]", - msg, libcfs_id2str(req->rq_peer), rc, - handle ? handle->cookie : 0); - if (req->rq_no_reply) - CWARN("No reply was sent, maybe cause bug 21636.\n"); - else if (rc) - CWARN("Send reply failed, maybe cause bug 21636.\n"); + msg, libcfs_id2str(req->rq_peer), rc, + handle ? handle->cookie : 0); + if (req->rq_no_reply) + CWARN("No reply was sent, maybe cause b=21636.\n"); + else if (rc) + CWARN("Send reply failed, maybe cause b=21636.\n"); } /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */ static int ldlm_callback_handler(struct ptlrpc_request *req) { - struct ldlm_namespace *ns; - struct ldlm_request *dlm_req; - struct ldlm_lock *lock; - int rc; - ENTRY; + struct ldlm_namespace *ns; + struct ldlm_request *dlm_req; + struct ldlm_lock *lock; + int rc; - /* Requests arrive in sender's byte order. The ptlrpc service - * handler has already checked and, if necessary, byte-swapped the - * incoming request message body, but I am responsible for the - * message buffers. */ + ENTRY; + + /* + * Requests arrive in sender's byte order. The ptlrpc service + * handler has already checked and, if necessary, byte-swapped the + * incoming request message body, but I am responsible for the + * message buffers. + */ - /* do nothing for sec context finalize */ - if (lustre_msg_get_opc(req->rq_reqmsg) == SEC_CTX_FINI) - RETURN(0); + /* do nothing for sec context finalize */ + if (lustre_msg_get_opc(req->rq_reqmsg) == SEC_CTX_FINI) + RETURN(0); - req_capsule_init(&req->rq_pill, req, RCL_SERVER); + req_capsule_init(&req->rq_pill, req, RCL_SERVER); - if (req->rq_export == NULL) { - rc = ldlm_callback_reply(req, -ENOTCONN); - ldlm_callback_errmsg(req, "Operate on unconnected server", - rc, NULL); - RETURN(0); - } + if (req->rq_export == NULL) { + rc = ldlm_callback_reply(req, -ENOTCONN); + ldlm_callback_errmsg(req, "Operate on unconnected server", + rc, NULL); + RETURN(0); + } - LASSERT(req->rq_export != NULL); - LASSERT(req->rq_export->exp_obd != NULL); + LASSERT(req->rq_export != NULL); + LASSERT(req->rq_export->exp_obd != NULL); switch (lustre_msg_get_opc(req->rq_reqmsg)) { case LDLM_BL_CALLBACK: @@ -2143,65 +2273,71 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_GL_CALLBACK_NET)) RETURN(0); break; - case LDLM_SET_INFO: - rc = ldlm_handle_setinfo(req); - ldlm_callback_reply(req, rc); - RETURN(0); - default: - CERROR("unknown opcode %u\n", - lustre_msg_get_opc(req->rq_reqmsg)); - ldlm_callback_reply(req, -EPROTO); - RETURN(0); - } - - ns = req->rq_export->exp_obd->obd_namespace; - LASSERT(ns != NULL); - - req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK); - - dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ); - if (dlm_req == NULL) { - rc = ldlm_callback_reply(req, -EPROTO); - ldlm_callback_errmsg(req, "Operate without parameter", rc, - NULL); - RETURN(0); - } - - /* Force a known safe race, send a cancel to the server for a lock - * which the server has already started a blocking callback on. */ - if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE) && - lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) { + case LDLM_SET_INFO: + rc = ldlm_handle_setinfo(req); + ldlm_callback_reply(req, rc); + RETURN(0); + default: + CERROR("unknown opcode %u\n", + lustre_msg_get_opc(req->rq_reqmsg)); + ldlm_callback_reply(req, -EPROTO); + RETURN(0); + } + + ns = req->rq_export->exp_obd->obd_namespace; + LASSERT(ns != NULL); + + req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK); + + dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ); + if (dlm_req == NULL) { + rc = ldlm_callback_reply(req, -EPROTO); + ldlm_callback_errmsg(req, "Operate without parameter", rc, + NULL); + RETURN(0); + } + + /* + * Force a known safe race, send a cancel to the server for a lock + * which the server has already started a blocking callback on. + */ + if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE) && + lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) { rc = ldlm_cli_cancel(&dlm_req->lock_handle[0], 0); - if (rc < 0) - CERROR("ldlm_cli_cancel: %d\n", rc); - } - - lock = ldlm_handle2lock_long(&dlm_req->lock_handle[0], 0); - if (!lock) { - CDEBUG(D_DLMTRACE, "callback on lock %#llx - lock " - "disappeared\n", dlm_req->lock_handle[0].cookie); - rc = ldlm_callback_reply(req, -EINVAL); - ldlm_callback_errmsg(req, "Operate with invalid parameter", rc, - &dlm_req->lock_handle[0]); - RETURN(0); - } + if (rc < 0) + CERROR("ldlm_cli_cancel: %d\n", rc); + } + + lock = ldlm_handle2lock_long(&dlm_req->lock_handle[0], 0); + if (!lock) { + CDEBUG(D_DLMTRACE, + "callback on lock %#llx - lock disappeared\n", + dlm_req->lock_handle[0].cookie); + rc = ldlm_callback_reply(req, -EINVAL); + ldlm_callback_errmsg(req, "Operate with invalid parameter", rc, + &dlm_req->lock_handle[0]); + RETURN(0); + } if (ldlm_is_fail_loc(lock) && - lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) - OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE); + lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) + OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE); - /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */ - lock_res_and_lock(lock); + /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */ + lock_res_and_lock(lock); lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags & LDLM_FL_AST_MASK); if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) { - /* If somebody cancels lock and cache is already dropped, + /* + * If somebody cancels lock and cache is already dropped, * or lock is failed before cp_ast received on client, * we can tell the server we have no lock. Otherwise, we - * should send cancel after dropping the cache. */ + * should send cancel after dropping the cache. + */ if ((ldlm_is_canceling(lock) && ldlm_is_bl_done(lock)) || ldlm_is_failed(lock)) { - LDLM_DEBUG(lock, "callback on lock %llx - lock disappeared", + LDLM_DEBUG(lock, + "callback on lock %llx - lock disappeared", dlm_req->lock_handle[0].cookie); unlock_res_and_lock(lock); LDLM_LOCK_RELEASE(lock); @@ -2210,51 +2346,55 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) &dlm_req->lock_handle[0]); RETURN(0); } - /* BL_AST locks are not needed in LRU. - * Let ldlm_cancel_lru() be fast. */ + /* + * BL_AST locks are not needed in LRU. + * Let ldlm_cancel_lru() be fast. + */ ldlm_lock_remove_from_lru(lock); ldlm_set_bl_ast(lock); } - unlock_res_and_lock(lock); - - /* We want the ost thread to get this reply so that it can respond - * to ost requests (write cache writeback) that might be triggered - * in the callback. - * - * But we'd also like to be able to indicate in the reply that we're - * cancelling right now, because it's unused, or have an intent result - * in the reply, so we might have to push the responsibility for sending - * the reply down into the AST handlers, alas. */ - - switch (lustre_msg_get_opc(req->rq_reqmsg)) { - case LDLM_BL_CALLBACK: - CDEBUG(D_INODE, "blocking ast\n"); - req_capsule_extend(&req->rq_pill, &RQF_LDLM_BL_CALLBACK); + unlock_res_and_lock(lock); + + /* + * We want the ost thread to get this reply so that it can respond + * to ost requests (write cache writeback) that might be triggered + * in the callback. + * + * But we'd also like to be able to indicate in the reply that we're + * cancelling right now, because it's unused, or have an intent result + * in the reply, so we might have to push the responsibility for sending + * the reply down into the AST handlers, alas. + */ + + switch (lustre_msg_get_opc(req->rq_reqmsg)) { + case LDLM_BL_CALLBACK: + CDEBUG(D_INODE, "blocking ast\n"); + req_capsule_extend(&req->rq_pill, &RQF_LDLM_BL_CALLBACK); if (!ldlm_is_cancel_on_block(lock)) { - rc = ldlm_callback_reply(req, 0); - if (req->rq_no_reply || rc) - ldlm_callback_errmsg(req, "Normal process", rc, - &dlm_req->lock_handle[0]); - } - if (ldlm_bl_to_thread_lock(ns, &dlm_req->lock_desc, lock)) - ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock); - break; - case LDLM_CP_CALLBACK: - CDEBUG(D_INODE, "completion ast\n"); - req_capsule_extend(&req->rq_pill, &RQF_LDLM_CP_CALLBACK); - ldlm_callback_reply(req, 0); - ldlm_handle_cp_callback(req, ns, dlm_req, lock); - break; - case LDLM_GL_CALLBACK: - CDEBUG(D_INODE, "glimpse ast\n"); - req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK); - ldlm_handle_gl_callback(req, ns, dlm_req, lock); - break; - default: - LBUG(); /* checked above */ - } - - RETURN(0); + rc = ldlm_callback_reply(req, 0); + if (req->rq_no_reply || rc) + ldlm_callback_errmsg(req, "Normal process", rc, + &dlm_req->lock_handle[0]); + } + if (ldlm_bl_to_thread_lock(ns, &dlm_req->lock_desc, lock)) + ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock); + break; + case LDLM_CP_CALLBACK: + CDEBUG(D_INODE, "completion ast\n"); + req_capsule_extend(&req->rq_pill, &RQF_LDLM_CP_CALLBACK); + ldlm_callback_reply(req, 0); + ldlm_handle_cp_callback(req, ns, dlm_req, lock); + break; + case LDLM_GL_CALLBACK: + CDEBUG(D_INODE, "glimpse ast\n"); + req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK); + ldlm_handle_gl_callback(req, ns, dlm_req, lock); + break; + default: + LBUG(); /* checked above */ + } + + RETURN(0); } #ifdef HAVE_SERVER_SUPPORT @@ -2269,21 +2409,22 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req) ENTRY; - /* Requests arrive in sender's byte order. The ptlrpc service + /* + * Requests arrive in sender's byte order. The ptlrpc service * handler has already checked and, if necessary, byte-swapped the * incoming request message body, but I am responsible for the - * message buffers. */ + * message buffers. + */ req_capsule_init(&req->rq_pill, req, RCL_SERVER); if (req->rq_export == NULL) { struct ldlm_request *dlm_req; - CERROR("%s from %s arrived at %lu with bad export cookie " - "%llu\n", + CERROR("%s from %s arrived at %llu with bad export cookie %llu\n", ll_opcode2str(lustre_msg_get_opc(req->rq_reqmsg)), libcfs_nid2str(req->rq_peer.nid), - req->rq_arrival_time.tv_sec, + (unsigned long long)req->rq_arrival_time.tv_sec, lustre_msg_get_handle(req->rq_reqmsg)->cookie); if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_CANCEL) { @@ -2299,7 +2440,7 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req) } switch (lustre_msg_get_opc(req->rq_reqmsg)) { - /* XXX FIXME move this back to mds/handler.c, bug 249 */ + /* XXX FIXME move this back to mds/handler.c, b=249 */ case LDLM_CANCEL: req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL); CDEBUG(D_INODE, "cancel\n"); @@ -2428,52 +2569,57 @@ static int ldlm_revoke_lock_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd, struct hlist_node *hnode, void *data) { - struct list_head *rpc_list = data; - struct ldlm_lock *lock = cfs_hash_object(hs, hnode); + struct list_head *rpc_list = data; + struct ldlm_lock *lock = cfs_hash_object(hs, hnode); - lock_res_and_lock(lock); + lock_res_and_lock(lock); - if (lock->l_req_mode != lock->l_granted_mode) { - unlock_res_and_lock(lock); - return 0; - } + if (!ldlm_is_granted(lock)) { + unlock_res_and_lock(lock); + return 0; + } - LASSERT(lock->l_resource); - if (lock->l_resource->lr_type != LDLM_IBITS && - lock->l_resource->lr_type != LDLM_PLAIN) { - unlock_res_and_lock(lock); - return 0; - } + LASSERT(lock->l_resource); + if (lock->l_resource->lr_type != LDLM_IBITS && + lock->l_resource->lr_type != LDLM_PLAIN) { + unlock_res_and_lock(lock); + return 0; + } if (ldlm_is_ast_sent(lock)) { - unlock_res_and_lock(lock); - return 0; - } + unlock_res_and_lock(lock); + return 0; + } - LASSERT(lock->l_blocking_ast); - LASSERT(!lock->l_blocking_lock); + LASSERT(lock->l_blocking_ast); + LASSERT(!lock->l_blocking_lock); ldlm_set_ast_sent(lock); - if (lock->l_export && lock->l_export->exp_lock_hash) { - /* NB: it's safe to call cfs_hash_del() even lock isn't - * in exp_lock_hash. */ - /* In the function below, .hs_keycmp resolves to - * ldlm_export_lock_keycmp() */ + if (lock->l_export && lock->l_export->exp_lock_hash) { + /* + * NB: it's safe to call cfs_hash_del() even lock isn't + * in exp_lock_hash. + */ + /* + * In the function below, .hs_keycmp resolves to + * ldlm_export_lock_keycmp() + */ /* coverity[overrun-buffer-val] */ cfs_hash_del(lock->l_export->exp_lock_hash, &lock->l_remote_handle, &lock->l_exp_hash); } list_add_tail(&lock->l_rk_ast, rpc_list); - LDLM_LOCK_GET(lock); + LDLM_LOCK_GET(lock); - unlock_res_and_lock(lock); - return 0; + unlock_res_and_lock(lock); + return 0; } void ldlm_revoke_export_locks(struct obd_export *exp) { - struct list_head rpc_list; + struct list_head rpc_list; + ENTRY; INIT_LIST_HEAD(&rpc_list); @@ -2492,7 +2638,7 @@ static int ldlm_bl_get_work(struct ldlm_bl_pool *blp, struct obd_export **p_exp) { struct ldlm_bl_work_item *blwi = NULL; - static unsigned int num_bl = 0; + static unsigned int num_bl; static unsigned int num_stale; int num_th = atomic_read(&blp->blp_num_threads); @@ -2503,9 +2649,8 @@ static int ldlm_bl_get_work(struct ldlm_bl_pool *blp, if (num_th == 1 || ++num_stale < num_th) { spin_unlock(&blp->blp_lock); return 1; - } else { - num_stale = 0; } + num_stale = 0; } /* process a request from the blp_list at least every blp_num_threads */ @@ -2611,10 +2756,12 @@ static int ldlm_bl_thread_blwi(struct ldlm_bl_pool *blp, if (blwi->blwi_count) { int count; - /* The special case when we cancel locks in lru + /* + * The special case when we cancel locks in lru * asynchronously, we pass the list of locks here. * Thus locks are marked LDLM_FL_CANCELING, but NOT - * canceled locally yet. */ + * canceled locally yet. + */ count = ldlm_cli_cancel_list_local(&blwi->blwi_head, blwi->blwi_count, LCF_BL_AST); @@ -2645,6 +2792,7 @@ static int ldlm_bl_thread_exports(struct ldlm_bl_pool *blp, struct obd_export *exp) { int num; + ENTRY; OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_BL_EVICT, 4); @@ -2668,9 +2816,22 @@ static int ldlm_bl_thread_exports(struct ldlm_bl_pool *blp, */ static int ldlm_bl_thread_main(void *arg) { - struct ldlm_bl_pool *blp; + struct lu_env *env; + struct ldlm_bl_pool *blp; struct ldlm_bl_thread_data *bltd = arg; - ENTRY; + int rc; + + ENTRY; + + OBD_ALLOC_PTR(env); + if (!env) + RETURN(-ENOMEM); + rc = lu_env_init(env, LCT_DT_THREAD); + if (rc) + GOTO(out_env, rc); + rc = lu_env_add(env); + if (rc) + GOTO(out_env_fini, rc); blp = bltd->bltd_blp; @@ -2706,15 +2867,23 @@ static int ldlm_bl_thread_main(void *arg) if (rc == LDLM_ITER_STOP) break; - /* If there are many namespaces, we will not sleep waiting for + /* + * If there are many namespaces, we will not sleep waiting for * work, and must do a cond_resched to avoid holding the CPU - * for too long */ + * for too long + */ cond_resched(); } atomic_dec(&blp->blp_num_threads); complete(&blp->blp_comp); - RETURN(0); + + lu_env_remove(env); +out_env_fini: + lu_env_fini(env); +out_env: + OBD_FREE_PTR(env); + RETURN(rc); } @@ -2723,68 +2892,70 @@ static int ldlm_cleanup(void); int ldlm_get_ref(void) { - int rc = 0; - ENTRY; + int rc = 0; + + ENTRY; mutex_lock(&ldlm_ref_mutex); - if (++ldlm_refcount == 1) { - rc = ldlm_setup(); - if (rc) - ldlm_refcount--; - } + if (++ldlm_refcount == 1) { + rc = ldlm_setup(); + if (rc) + ldlm_refcount--; + } mutex_unlock(&ldlm_ref_mutex); - RETURN(rc); + RETURN(rc); } void ldlm_put_ref(void) { - ENTRY; + ENTRY; mutex_lock(&ldlm_ref_mutex); - if (ldlm_refcount == 1) { - int rc = ldlm_cleanup(); - if (rc) - CERROR("ldlm_cleanup failed: %d\n", rc); - else - ldlm_refcount--; - } else { - ldlm_refcount--; - } + if (ldlm_refcount == 1) { + int rc = ldlm_cleanup(); + + if (rc) + CERROR("ldlm_cleanup failed: %d\n", rc); + else + ldlm_refcount--; + } else { + ldlm_refcount--; + } mutex_unlock(&ldlm_ref_mutex); - EXIT; + EXIT; } /* * Export handle<->lock hash operations. */ static unsigned -ldlm_export_lock_hash(struct cfs_hash *hs, const void *key, unsigned mask) +ldlm_export_lock_hash(struct cfs_hash *hs, const void *key, unsigned int mask) { - return cfs_hash_u64_hash(((struct lustre_handle *)key)->cookie, mask); + return cfs_hash_u64_hash(((struct lustre_handle *)key)->cookie, mask); } static void * ldlm_export_lock_key(struct hlist_node *hnode) { - struct ldlm_lock *lock; + struct ldlm_lock *lock; lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash); - return &lock->l_remote_handle; + return &lock->l_remote_handle; } static void ldlm_export_lock_keycpy(struct hlist_node *hnode, void *key) { - struct ldlm_lock *lock; + struct ldlm_lock *lock; lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash); - lock->l_remote_handle = *(struct lustre_handle *)key; + lock->l_remote_handle = *(struct lustre_handle *)key; } static int ldlm_export_lock_keycmp(const void *key, struct hlist_node *hnode) { - return lustre_handle_equal(ldlm_export_lock_key(hnode), key); + return lustre_handle_equal(ldlm_export_lock_key(hnode), key); } static void * @@ -2796,55 +2967,56 @@ ldlm_export_lock_object(struct hlist_node *hnode) static void ldlm_export_lock_get(struct cfs_hash *hs, struct hlist_node *hnode) { - struct ldlm_lock *lock; + struct ldlm_lock *lock; lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash); - LDLM_LOCK_GET(lock); + LDLM_LOCK_GET(lock); } static void ldlm_export_lock_put(struct cfs_hash *hs, struct hlist_node *hnode) { - struct ldlm_lock *lock; + struct ldlm_lock *lock; lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash); - LDLM_LOCK_RELEASE(lock); + LDLM_LOCK_RELEASE(lock); } static struct cfs_hash_ops ldlm_export_lock_ops = { - .hs_hash = ldlm_export_lock_hash, - .hs_key = ldlm_export_lock_key, - .hs_keycmp = ldlm_export_lock_keycmp, - .hs_keycpy = ldlm_export_lock_keycpy, - .hs_object = ldlm_export_lock_object, - .hs_get = ldlm_export_lock_get, - .hs_put = ldlm_export_lock_put, - .hs_put_locked = ldlm_export_lock_put, + .hs_hash = ldlm_export_lock_hash, + .hs_key = ldlm_export_lock_key, + .hs_keycmp = ldlm_export_lock_keycmp, + .hs_keycpy = ldlm_export_lock_keycpy, + .hs_object = ldlm_export_lock_object, + .hs_get = ldlm_export_lock_get, + .hs_put = ldlm_export_lock_put, + .hs_put_locked = ldlm_export_lock_put, }; int ldlm_init_export(struct obd_export *exp) { int rc; - ENTRY; - exp->exp_lock_hash = - cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid), - HASH_EXP_LOCK_CUR_BITS, - HASH_EXP_LOCK_MAX_BITS, - HASH_EXP_LOCK_BKT_BITS, 0, - CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA, - &ldlm_export_lock_ops, - CFS_HASH_DEFAULT | CFS_HASH_REHASH_KEY | - CFS_HASH_NBLK_CHANGE); + ENTRY; - if (!exp->exp_lock_hash) - RETURN(-ENOMEM); + exp->exp_lock_hash = + cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid), + HASH_EXP_LOCK_CUR_BITS, + HASH_EXP_LOCK_MAX_BITS, + HASH_EXP_LOCK_BKT_BITS, 0, + CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA, + &ldlm_export_lock_ops, + CFS_HASH_DEFAULT | CFS_HASH_REHASH_KEY | + CFS_HASH_NBLK_CHANGE); + + if (!exp->exp_lock_hash) + RETURN(-ENOMEM); rc = ldlm_init_flock_export(exp); if (rc) GOTO(err, rc); - RETURN(0); + RETURN(0); err: ldlm_destroy_export(exp); RETURN(rc); @@ -2853,12 +3025,12 @@ EXPORT_SYMBOL(ldlm_init_export); void ldlm_destroy_export(struct obd_export *exp) { - ENTRY; - cfs_hash_putref(exp->exp_lock_hash); - exp->exp_lock_hash = NULL; + ENTRY; + cfs_hash_putref(exp->exp_lock_hash); + exp->exp_lock_hash = NULL; ldlm_destroy_flock_export(exp); - EXIT; + EXIT; } EXPORT_SYMBOL(ldlm_destroy_export); @@ -2906,14 +3078,14 @@ static int ldlm_setup(void) int i; int rc = 0; - ENTRY; + ENTRY; - if (ldlm_state != NULL) - RETURN(-EALREADY); + if (ldlm_state != NULL) + RETURN(-EALREADY); - OBD_ALLOC(ldlm_state, sizeof(*ldlm_state)); - if (ldlm_state == NULL) - RETURN(-ENOMEM); + OBD_ALLOC(ldlm_state, sizeof(*ldlm_state)); + if (ldlm_state == NULL) + RETURN(-ENOMEM); ldlm_kobj = kobject_create_and_add("ldlm", &lustre_kset->kobj); if (!ldlm_kobj) @@ -2954,11 +3126,12 @@ static int ldlm_setup(void) .tc_nthrs_base = LDLM_NTHRS_BASE, .tc_nthrs_max = LDLM_NTHRS_MAX, .tc_nthrs_user = ldlm_num_threads, - .tc_cpu_affinity = 1, + .tc_cpu_bind = ldlm_cpu_bind, .tc_ctx_tags = LCT_MD_THREAD | LCT_DT_THREAD, }, .psc_cpt = { .cc_pattern = ldlm_cpts, + .cc_affinity = true, }, .psc_ops = { .so_req_handler = ldlm_callback_handler, @@ -2995,13 +3168,14 @@ static int ldlm_setup(void) .tc_nthrs_base = LDLM_NTHRS_BASE, .tc_nthrs_max = LDLM_NTHRS_MAX, .tc_nthrs_user = ldlm_num_threads, - .tc_cpu_affinity = 1, + .tc_cpu_bind = ldlm_cpu_bind, .tc_ctx_tags = LCT_MD_THREAD | \ LCT_DT_THREAD | \ LCT_CL_THREAD, }, .psc_cpt = { .cc_pattern = ldlm_cpts, + .cc_affinity = true, }, .psc_ops = { .so_req_handler = ldlm_cancel_handler, @@ -3078,15 +3252,15 @@ static int ldlm_setup(void) static int ldlm_cleanup(void) { - ENTRY; + ENTRY; if (!list_empty(ldlm_namespace_list(LDLM_NAMESPACE_SERVER)) || !list_empty(ldlm_namespace_list(LDLM_NAMESPACE_CLIENT))) { - CERROR("ldlm still has namespaces; clean these up first.\n"); - ldlm_dump_all_namespaces(LDLM_NAMESPACE_SERVER, D_DLMTRACE); - ldlm_dump_all_namespaces(LDLM_NAMESPACE_CLIENT, D_DLMTRACE); - RETURN(-EBUSY); - } + CERROR("ldlm still has namespaces; clean these up first.\n"); + ldlm_dump_all_namespaces(LDLM_NAMESPACE_SERVER, D_DLMTRACE); + ldlm_dump_all_namespaces(LDLM_NAMESPACE_CLIENT, D_DLMTRACE); + RETURN(-EBUSY); + } ldlm_reclaim_cleanup(); ldlm_pools_fini(); @@ -3137,10 +3311,10 @@ static int ldlm_cleanup(void) } #endif - OBD_FREE(ldlm_state, sizeof(*ldlm_state)); - ldlm_state = NULL; + OBD_FREE(ldlm_state, sizeof(*ldlm_state)); + ldlm_state = NULL; - RETURN(0); + RETURN(0); } int ldlm_init(void) @@ -3153,12 +3327,12 @@ int ldlm_init(void) ldlm_lock_slab = kmem_cache_create("ldlm_locks", sizeof(struct ldlm_lock), 0, - SLAB_HWCACHE_ALIGN | SLAB_DESTROY_BY_RCU, NULL); + SLAB_HWCACHE_ALIGN, NULL); if (ldlm_lock_slab == NULL) goto out_resource; ldlm_interval_slab = kmem_cache_create("interval_node", - sizeof(struct ldlm_interval), + sizeof(struct ldlm_interval), 0, SLAB_HWCACHE_ALIGN, NULL); if (ldlm_interval_slab == NULL) goto out_lock; @@ -3170,11 +3344,17 @@ int ldlm_init(void) goto out_interval; #ifdef HAVE_SERVER_SUPPORT + ldlm_inodebits_slab = kmem_cache_create("ldlm_ibits_node", + sizeof(struct ldlm_ibits_node), + 0, SLAB_HWCACHE_ALIGN, NULL); + if (ldlm_inodebits_slab == NULL) + goto out_interval_tree; + ldlm_glimpse_work_kmem = kmem_cache_create("ldlm_glimpse_work_kmem", sizeof(struct ldlm_glimpse_work), 0, 0, NULL); if (ldlm_glimpse_work_kmem == NULL) - goto out_interval_tree; + goto out_inodebits; #endif #if LUSTRE_TRACKS_LOCK_EXP_REFS @@ -3182,6 +3362,8 @@ int ldlm_init(void) #endif return 0; #ifdef HAVE_SERVER_SUPPORT +out_inodebits: + kmem_cache_destroy(ldlm_inodebits_slab); out_interval_tree: kmem_cache_destroy(ldlm_interval_tree_slab); #endif @@ -3200,14 +3382,17 @@ void ldlm_exit(void) if (ldlm_refcount) CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount); kmem_cache_destroy(ldlm_resource_slab); - /* ldlm_lock_put() use RCU to call ldlm_lock_free, so need call - * synchronize_rcu() to wait a grace period elapsed, so that - * ldlm_lock_free() get a chance to be called. */ - synchronize_rcu(); + /* + * ldlm_lock_put() use RCU to call ldlm_lock_free, so need call + * rcu_barrier() to wait all outstanding RCU callbacks to complete, + * so that ldlm_lock_free() get a chance to be called. + */ + rcu_barrier(); kmem_cache_destroy(ldlm_lock_slab); kmem_cache_destroy(ldlm_interval_slab); kmem_cache_destroy(ldlm_interval_tree_slab); #ifdef HAVE_SERVER_SUPPORT + kmem_cache_destroy(ldlm_inodebits_slab); kmem_cache_destroy(ldlm_glimpse_work_kmem); #endif }