X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fldlm%2Fldlm_request.c;h=ff5ae53aa8fcfe746e38cede78f75373626607d1;hp=df59c6b83ac985adfcd03ece22793df2b8e19ebc;hb=6052cc88eb1232ac3b0193f0d47881887a2dcfdc;hpb=a71948a4e6de838190e9da99500556e98a84e196 diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index df59c6b..ff5ae53 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -67,14 +67,11 @@ unsigned int ldlm_enqueue_min = OBD_TIMEOUT_DEFAULT; module_param(ldlm_enqueue_min, uint, 0644); MODULE_PARM_DESC(ldlm_enqueue_min, "lock enqueue timeout minimum"); +EXPORT_SYMBOL(ldlm_enqueue_min); /* in client side, whether the cached locks will be canceled before replay */ unsigned int ldlm_cancel_unused_locks_before_replay = 1; -static void interrupted_completion_wait(void *data) -{ -} - struct lock_wait_data { struct ldlm_lock *lwd_lock; __u32 lwd_conn_cnt; @@ -111,9 +108,8 @@ int ldlm_request_bufsize(int count, int type) return sizeof(struct ldlm_request) + avail; } -int ldlm_expired_completion_wait(void *data) +void ldlm_expired_completion_wait(struct lock_wait_data *lwd) { - struct lock_wait_data *lwd = data; struct ldlm_lock *lock = lwd->lwd_lock; struct obd_import *imp; struct obd_device *obd; @@ -124,9 +120,8 @@ int ldlm_expired_completion_wait(void *data) LDLM_ERROR(lock, "lock timed out (enqueued at %lld, %llds ago); not entering recovery in server code, just going back to sleep", - (s64)lock->l_activity, - (s64)(ktime_get_real_seconds() - - lock->l_activity)); + lock->l_activity, + ktime_get_real_seconds() - lock->l_activity); if (ktime_get_seconds() > next_dump) { last_dump = next_dump; next_dump = ktime_get_seconds() + 300; @@ -135,7 +130,7 @@ int ldlm_expired_completion_wait(void *data) if (last_dump == 0) libcfs_debug_dumplog(); } - RETURN(0); + RETURN_EXIT; } obd = lock->l_conn_export->exp_obd; @@ -143,11 +138,11 @@ int ldlm_expired_completion_wait(void *data) ptlrpc_fail_import(imp, lwd->lwd_conn_cnt); LDLM_ERROR(lock, "lock timed out (enqueued at %lld, %llds ago), entering recovery for %s@%s", - (s64)lock->l_activity, - (s64)(ktime_get_real_seconds() - lock->l_activity), + lock->l_activity, + ktime_get_real_seconds() - lock->l_activity, obd2cli_tgt(obd), imp->imp_connection->c_remote_uuid.uuid); - RETURN(0); + EXIT; } int is_granted_or_cancelled_nolock(struct ldlm_lock *lock) @@ -176,9 +171,9 @@ EXPORT_SYMBOL(is_granted_or_cancelled_nolock); * We use the same basis for both server side and client side functions * from a single node. */ -static time64_t ldlm_cp_timeout(struct ldlm_lock *lock) +static timeout_t ldlm_cp_timeout(struct ldlm_lock *lock) { - time64_t timeout; + timeout_t timeout; if (AT_OFF) return obd_timeout; @@ -189,7 +184,7 @@ static time64_t ldlm_cp_timeout(struct ldlm_lock *lock) * doesn't respond reasonably, and then give us the lock. */ timeout = at_get(ldlm_lock_to_ns_at(lock)); - return max(3 * timeout, (time64_t) ldlm_enqueue_min); + return max(3 * timeout, (timeout_t)ldlm_enqueue_min); } /** @@ -198,7 +193,6 @@ static time64_t ldlm_cp_timeout(struct ldlm_lock *lock) */ static int ldlm_completion_tail(struct ldlm_lock *lock, void *data) { - time64_t delay; int result = 0; if (ldlm_is_destroyed(lock) || ldlm_is_failed(lock)) { @@ -208,10 +202,16 @@ static int ldlm_completion_tail(struct ldlm_lock *lock, void *data) LDLM_DEBUG(lock, "client-side enqueue: granted"); } else { /* Take into AT only CP RPC, not immediately granted locks */ - delay = ktime_get_real_seconds() - lock->l_activity; - LDLM_DEBUG(lock, "client-side enqueue: granted after %llds", - (s64)delay); + timeout_t delay = 0; + /* Discard negative timeouts. We should also limit the + * maximum value of the timeout + */ + if (ktime_get_real_seconds() > lock->l_activity) + delay = ktime_get_real_seconds() - lock->l_activity; + + LDLM_DEBUG(lock, "client-side enqueue: granted after %ds", + delay); /* Update our time estimate */ at_measured(ldlm_lock_to_ns_at(lock), delay); } @@ -239,7 +239,7 @@ int ldlm_completion_ast_async(struct ldlm_lock *lock, __u64 flags, void *data) LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, going forward"); - ldlm_reprocess_all(lock->l_resource); + ldlm_reprocess_all(lock->l_resource, NULL); RETURN(0); } EXPORT_SYMBOL(ldlm_completion_ast_async); @@ -269,8 +269,7 @@ int ldlm_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data) struct lock_wait_data lwd; struct obd_device *obd; struct obd_import *imp = NULL; - struct l_wait_info lwi; - time64_t timeout; + timeout_t timeout; int rc = 0; ENTRY; @@ -300,15 +299,6 @@ noreproc: lwd.lwd_lock = lock; lock->l_activity = ktime_get_real_seconds(); - if (ldlm_is_no_timeout(lock)) { - LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT"); - lwi = LWI_INTR(interrupted_completion_wait, &lwd); - } else { - lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout), - ldlm_expired_completion_wait, - interrupted_completion_wait, &lwd); - } - if (imp != NULL) { spin_lock(&imp->imp_lock); lwd.lwd_conn_cnt = imp->imp_conn_cnt; @@ -322,8 +312,22 @@ noreproc: rc = -EINTR; } else { /* Go to sleep until the lock is granted or cancelled. */ - rc = l_wait_event(lock->l_waitq, - is_granted_or_cancelled(lock), &lwi); + if (ldlm_is_no_timeout(lock)) { + LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT"); + rc = l_wait_event_abortable( + lock->l_waitq, + is_granted_or_cancelled(lock)); + } else { + if (wait_event_idle_timeout( + lock->l_waitq, + is_granted_or_cancelled(lock), + cfs_time_seconds(timeout)) == 0) { + ldlm_expired_completion_wait(&lwd); + rc = l_wait_event_abortable( + lock->l_waitq, + is_granted_or_cancelled(lock)); + } + } } if (rc) { @@ -583,6 +587,11 @@ static void failed_lock_cleanup(struct ldlm_namespace *ns, } } +static bool ldlm_request_slot_needed(enum ldlm_type type) +{ + return type == LDLM_FLOCK || type == LDLM_IBITS; +} + /** * Finishing portion of client lock enqueue code. * @@ -603,6 +612,11 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, ENTRY; + if (ldlm_request_slot_needed(type)) + obd_put_request_slot(&req->rq_import->imp_obd->u.cli); + + ptlrpc_put_mod_rpc_slot(req); + if (req && req->rq_svc_thread) env = req->rq_svc_thread->t_env; @@ -725,6 +739,7 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, if ((*flags) & LDLM_FL_AST_SENT) { lock_res_and_lock(lock); + ldlm_bl_desc2lock(&reply->lock_desc, lock); lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST; unlock_res_and_lock(lock); LDLM_DEBUG(lock, "enqueue reply includes blocking AST"); @@ -837,9 +852,8 @@ int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req, struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; struct req_capsule *pill = &req->rq_pill; struct ldlm_request *dlm = NULL; - struct list_head head = LIST_HEAD_INIT(head); - enum ldlm_lru_flags lru_flags; - int avail, to_free, pack = 0; + LIST_HEAD(head); + int avail, to_free = 0, pack = 0; int rc; ENTRY; @@ -851,10 +865,10 @@ int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req, req_capsule_filled_sizes(pill, RCL_CLIENT); avail = ldlm_capsule_handles_avail(pill, RCL_CLIENT, canceloff); - lru_flags = LDLM_LRU_FLAG_NO_WAIT | (ns_connect_lru_resize(ns) ? - LDLM_LRU_FLAG_LRUR : LDLM_LRU_FLAG_AGED); - to_free = !ns_connect_lru_resize(ns) && - opc == LDLM_ENQUEUE ? 1 : 0; + /* If we have reached the limit, free +1 slot for the new one */ + if (!ns_connect_lru_resize(ns) && opc == LDLM_ENQUEUE && + ns->ns_nr_unused >= ns->ns_max_unused) + to_free = 1; /* * Cancel LRU locks here _only_ if the server supports @@ -864,7 +878,7 @@ int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req, if (avail > count) count += ldlm_cancel_lru_local(ns, cancels, to_free, avail - count, 0, - lru_flags); + LDLM_LRU_FLAG_NO_WAIT); if (avail > count) pack = count; else @@ -1008,7 +1022,8 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, lock->l_conn_export = exp; lock->l_export = NULL; lock->l_blocking_ast = einfo->ei_cb_bl; - lock->l_flags |= (*flags & (LDLM_FL_NO_LRU | LDLM_FL_EXCL)); + lock->l_flags |= (*flags & (LDLM_FL_NO_LRU | LDLM_FL_EXCL | + LDLM_FL_ATOMIC_CB)); lock->l_activity = ktime_get_real_seconds(); /* lock not sent to server yet */ @@ -1034,7 +1049,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, } if (*flags & LDLM_FL_NDELAY) { - DEBUG_REQ(D_DLMTRACE, req, "enque lock with no delay\n"); + DEBUG_REQ(D_DLMTRACE, req, "enqueue lock with no delay"); req->rq_no_resend = req->rq_no_delay = 1; /* * probably set a shorter timeout value and handle ETIMEDOUT @@ -1071,6 +1086,24 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, LDLM_GLIMPSE_ENQUEUE); } + /* It is important to obtain modify RPC slot first (if applicable), so + * that threads that are waiting for a modify RPC slot are not polluting + * our rpcs in flight counter. */ + + if (einfo->ei_enq_slot) + ptlrpc_get_mod_rpc_slot(req); + + if (ldlm_request_slot_needed(einfo->ei_type)) { + rc = obd_get_request_slot(&req->rq_import->imp_obd->u.cli); + if (rc) { + if (einfo->ei_enq_slot) + ptlrpc_put_mod_rpc_slot(req); + failed_lock_cleanup(ns, lock, einfo->ei_mode); + LDLM_LOCK_RELEASE(lock); + GOTO(out, rc); + } + } + if (async) { LASSERT(reqp != NULL); RETURN(0); @@ -1093,6 +1126,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, else rc = err; +out: if (!req_passed_in && req != NULL) { ptlrpc_req_finished(req); if (reqp) @@ -1104,128 +1138,6 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, EXPORT_SYMBOL(ldlm_cli_enqueue); /** - * Client-side lock convert reply handling. - * - * Finish client lock converting, checks for concurrent converts - * and clear 'converting' flag so lock can be placed back into LRU. - */ -static int lock_convert_interpret(const struct lu_env *env, - struct ptlrpc_request *req, - void *args, int rc) -{ - struct ldlm_async_args *aa = args; - struct ldlm_lock *lock; - struct ldlm_reply *reply; - - ENTRY; - - lock = ldlm_handle2lock(&aa->lock_handle); - if (!lock) { - LDLM_DEBUG_NOLOCK("convert ACK for unknown local cookie %#llx", - aa->lock_handle.cookie); - RETURN(-ESTALE); - } - - LDLM_DEBUG(lock, "CONVERTED lock:"); - - if (rc != ELDLM_OK) - GOTO(out, rc); - - reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP); - if (reply == NULL) - GOTO(out, rc = -EPROTO); - - if (reply->lock_handle.cookie != aa->lock_handle.cookie) { - LDLM_ERROR(lock, - "convert ACK with wrong lock cookie %#llx but cookie %#llx from server %s id %s\n", - aa->lock_handle.cookie, reply->lock_handle.cookie, - req->rq_export->exp_client_uuid.uuid, - libcfs_id2str(req->rq_peer)); - GOTO(out, rc = ELDLM_NO_LOCK_DATA); - } - - lock_res_and_lock(lock); - /* - * Lock convert is sent for any new bits to drop, the converting flag - * is dropped when ibits on server are the same as on client. Meanwhile - * that can be so that more later convert will be replied first with - * and clear converting flag, so in case of such race just exit here. - * if lock has no converting bits then - */ - if (!ldlm_is_converting(lock)) { - LDLM_DEBUG(lock, - "convert ACK for lock without converting flag, reply ibits %#llx", - reply->lock_desc.l_policy_data.l_inodebits.bits); - } else if (reply->lock_desc.l_policy_data.l_inodebits.bits != - lock->l_policy_data.l_inodebits.bits) { - /* - * Compare server returned lock ibits and local lock ibits - * if they are the same we consider convertion is done, - * otherwise we have more converts inflight and keep - * converting flag. - */ - LDLM_DEBUG(lock, "convert ACK with ibits %#llx\n", - reply->lock_desc.l_policy_data.l_inodebits.bits); - } else { - ldlm_clear_converting(lock); - - /* - * Concurrent BL AST may arrive and cause another convert - * or cancel so just do nothing here if bl_ast is set, - * finish with convert otherwise. - */ - if (!ldlm_is_bl_ast(lock)) { - struct ldlm_namespace *ns = ldlm_lock_to_ns(lock); - - /* - * Drop cancel_bits since there are no more converts - * and put lock into LRU if it is still not used and - * is not there yet. - */ - lock->l_policy_data.l_inodebits.cancel_bits = 0; - if (!lock->l_readers && !lock->l_writers && - !ldlm_is_canceling(lock)) { - spin_lock(&ns->ns_lock); - /* there is check for list_empty() inside */ - ldlm_lock_remove_from_lru_nolock(lock); - ldlm_lock_add_to_lru_nolock(lock); - spin_unlock(&ns->ns_lock); - } - } - } - unlock_res_and_lock(lock); -out: - if (rc) { - int flag; - - lock_res_and_lock(lock); - if (ldlm_is_converting(lock)) { - ldlm_clear_converting(lock); - ldlm_set_cbpending(lock); - ldlm_set_bl_ast(lock); - lock->l_policy_data.l_inodebits.cancel_bits = 0; - } - unlock_res_and_lock(lock); - - /* - * fallback to normal lock cancel. If rc means there is no - * valid lock on server, do only local cancel - */ - if (rc == ELDLM_NO_LOCK_DATA) - flag = LCF_LOCAL; - else - flag = LCF_ASYNC; - - rc = ldlm_cli_cancel(&aa->lock_handle, flag); - if (rc < 0) - LDLM_DEBUG(lock, "failed to cancel lock: rc = %d\n", - rc); - } - LDLM_LOCK_PUT(lock); - RETURN(rc); -} - -/** * Client-side IBITS lock convert. * * Inform server that lock has been converted instead of canceling. @@ -1237,19 +1149,15 @@ out: * is made asynchronous. * */ -int ldlm_cli_convert(struct ldlm_lock *lock, __u32 *flags) +int ldlm_cli_convert_req(struct ldlm_lock *lock, __u32 *flags, __u64 new_bits) { struct ldlm_request *body; struct ptlrpc_request *req; - struct ldlm_async_args *aa; struct obd_export *exp = lock->l_conn_export; ENTRY; - if (exp == NULL) { - LDLM_ERROR(lock, "convert must not be called on local locks."); - RETURN(-EINVAL); - } + LASSERT(exp != NULL); /* * this is better to check earlier and it is done so already, @@ -1280,8 +1188,7 @@ int ldlm_cli_convert(struct ldlm_lock *lock, __u32 *flags) body->lock_desc.l_req_mode = lock->l_req_mode; body->lock_desc.l_granted_mode = lock->l_granted_mode; - body->lock_desc.l_policy_data.l_inodebits.bits = - lock->l_policy_data.l_inodebits.bits; + body->lock_desc.l_policy_data.l_inodebits.bits = new_bits; body->lock_desc.l_policy_data.l_inodebits.cancel_bits = 0; body->lock_flags = ldlm_flags_to_wire(*flags); @@ -1301,10 +1208,6 @@ int ldlm_cli_convert(struct ldlm_lock *lock, __u32 *flags) lprocfs_counter_incr(exp->exp_obd->obd_svc_stats, LDLM_CONVERT - LDLM_FIRST_OPC); - aa = ptlrpc_req_async_args(req); - ldlm_lock2handle(lock, &aa->lock_handle); - req->rq_interpret_reply = lock_convert_interpret; - ptlrpcd_add_req(req); RETURN(0); } @@ -1326,6 +1229,9 @@ static __u64 ldlm_cli_cancel_local(struct ldlm_lock *lock) bool local_only; LDLM_DEBUG(lock, "client-side cancel"); + OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL_LOCAL, + cfs_fail_val); + /* Set this flag to prevent others from getting new references*/ lock_res_and_lock(lock); ldlm_set_cbpending(lock); @@ -1349,7 +1255,7 @@ static __u64 ldlm_cli_cancel_local(struct ldlm_lock *lock) } LDLM_DEBUG(lock, "server-side local cancel"); ldlm_lock_cancel(lock); - ldlm_reprocess_all(lock->l_resource); + ldlm_reprocess_all(lock->l_resource, lock); } RETURN(rc); @@ -1532,7 +1438,7 @@ int ldlm_cli_update_pool(struct ptlrpc_request *req) if (lustre_msg_get_slv(req->rq_repmsg) == 0 || lustre_msg_get_limit(req->rq_repmsg) == 0) { DEBUG_REQ(D_HA, req, - "Zero SLV or Limit found (SLV: %llu, Limit: %u)", + "Zero SLV or limit found (SLV=%llu, limit=%u)", lustre_msg_get_slv(req->rq_repmsg), lustre_msg_get_limit(req->rq_repmsg)); RETURN(0); @@ -1542,6 +1448,14 @@ int ldlm_cli_update_pool(struct ptlrpc_request *req) new_slv = lustre_msg_get_slv(req->rq_repmsg); obd = req->rq_import->imp_obd; + read_lock(&obd->obd_pool_lock); + if (obd->obd_pool_slv == new_slv && + obd->obd_pool_limit == new_limit) { + read_unlock(&obd->obd_pool_lock); + RETURN(0); + } + read_unlock(&obd->obd_pool_lock); + /* * Set new SLV and limit in OBD fields to make them accessible * to the pool thread. We do not access obd_namespace and pool @@ -1557,6 +1471,27 @@ int ldlm_cli_update_pool(struct ptlrpc_request *req) RETURN(0); } +int ldlm_cli_convert(struct ldlm_lock *lock, + enum ldlm_cancel_flags cancel_flags) +{ + int rc = -EINVAL; + + LASSERT(!lock->l_readers && !lock->l_writers); + LDLM_DEBUG(lock, "client lock convert START"); + + if (lock->l_resource->lr_type == LDLM_IBITS) { + lock_res_and_lock(lock); + do { + rc = ldlm_cli_inodebits_convert(lock, cancel_flags); + } while (rc == -EAGAIN); + unlock_res_and_lock(lock); + } + + LDLM_DEBUG(lock, "client lock convert END"); + RETURN(rc); +} +EXPORT_SYMBOL(ldlm_cli_convert); + /** * Client side lock cancel. * @@ -1566,12 +1501,11 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh, enum ldlm_cancel_flags cancel_flags) { struct obd_export *exp; - enum ldlm_lru_flags lru_flags; int avail, count = 1; __u64 rc = 0; struct ldlm_namespace *ns; struct ldlm_lock *lock; - struct list_head cancels = LIST_HEAD_INIT(cancels); + LIST_HEAD(cancels); ENTRY; @@ -1581,44 +1515,21 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh, RETURN(0); } - /* Convert lock bits instead of cancel for IBITS locks */ - if (cancel_flags & LCF_CONVERT) { - LASSERT(lock->l_resource->lr_type == LDLM_IBITS); - LASSERT(lock->l_policy_data.l_inodebits.cancel_bits != 0); - - rc = ldlm_cli_dropbits(lock, - lock->l_policy_data.l_inodebits.cancel_bits); - if (rc == 0) { - LDLM_LOCK_RELEASE(lock); - RETURN(0); - } - } - lock_res_and_lock(lock); + LASSERT(!ldlm_is_converting(lock)); + /* Lock is being canceled and the caller doesn't want to wait */ if (ldlm_is_canceling(lock)) { if (cancel_flags & LCF_ASYNC) { unlock_res_and_lock(lock); } else { - struct l_wait_info lwi = { 0 }; - unlock_res_and_lock(lock); - l_wait_event(lock->l_waitq, is_bl_done(lock), &lwi); + wait_event_idle(lock->l_waitq, is_bl_done(lock)); } LDLM_LOCK_RELEASE(lock); RETURN(0); } - /* - * Lock is being converted, cancel it immediately. - * When convert will end, it releases lock and it will be gone. - */ - if (ldlm_is_converting(lock)) { - /* set back flags removed by convert */ - ldlm_set_cbpending(lock); - ldlm_set_bl_ast(lock); - } - ldlm_set_canceling(lock); unlock_res_and_lock(lock); @@ -1647,10 +1558,8 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh, LASSERT(avail > 0); ns = ldlm_lock_to_ns(lock); - lru_flags = ns_connect_lru_resize(ns) ? - LDLM_LRU_FLAG_LRUR : LDLM_LRU_FLAG_AGED; count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1, - LCF_BL_AST, lru_flags); + LCF_BL_AST, 0); } ldlm_cli_cancel_list(&cancels, count, NULL, cancel_flags); RETURN(0); @@ -1664,7 +1573,7 @@ EXPORT_SYMBOL(ldlm_cli_cancel); int ldlm_cli_cancel_list_local(struct list_head *cancels, int count, enum ldlm_cancel_flags cancel_flags) { - struct list_head head = LIST_HEAD_INIT(head); + LIST_HEAD(head); struct ldlm_lock *lock, *next; int left = 0, bl_ast = 0; __u64 rc; @@ -1714,12 +1623,12 @@ int ldlm_cli_cancel_list_local(struct list_head *cancels, int count, */ static enum ldlm_policy_res ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock, - int unused, int added, int count) + int added, int min) { enum ldlm_policy_res result = LDLM_POLICY_CANCEL_LOCK; /* - * don't check added & count since we want to process all locks + * don't check @added & @min since we want to process all locks * from unused list. * It's fine to not take lock to access lock->l_resource since * the lock has already been granted so it won't change. @@ -1729,6 +1638,7 @@ ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock, case LDLM_IBITS: if (ns->ns_cancel != NULL && ns->ns_cancel(lock) != 0) break; + /* fallthrough */ default: result = LDLM_POLICY_SKIP_LOCK; break; @@ -1739,8 +1649,8 @@ ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock, /** * Callback function for LRU-resize policy. Decides whether to keep - * \a lock in LRU for current \a LRU size \a unused, added in current - * scan \a added and number of locks to be preferably canceled \a count. + * \a lock in LRU for \a added in current scan and \a min number of locks + * to be preferably canceled. * * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning * @@ -1748,34 +1658,28 @@ ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock, */ static enum ldlm_policy_res ldlm_cancel_lrur_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock, - int unused, int added, - int count) + int added, int min) { ktime_t cur = ktime_get(); struct ldlm_pool *pl = &ns->ns_pool; u64 slv, lvf, lv; s64 la; - /* - * Stop LRU processing when we reach past @count or have checked all - * locks in LRU. - */ - if (count && added >= count) - return LDLM_POLICY_KEEP_LOCK; + if (added < min) + return LDLM_POLICY_CANCEL_LOCK; /* * Despite of the LV, It doesn't make sense to keep the lock which * is unused for ns_max_age time. */ - if (ktime_after(ktime_get(), - ktime_add(lock->l_last_used, ns->ns_max_age))) + if (ktime_after(cur, ktime_add(lock->l_last_used, ns->ns_max_age))) return LDLM_POLICY_CANCEL_LOCK; slv = ldlm_pool_get_slv(pl); lvf = ldlm_pool_get_lvf(pl); la = div_u64(ktime_to_ns(ktime_sub(cur, lock->l_last_used)), NSEC_PER_SEC); - lv = lvf * la * unused; + lv = lvf * la * ns->ns_nr_unused; /* Inform pool about current CLV to see it via debugfs. */ ldlm_pool_set_clv(pl, lv); @@ -1793,44 +1697,21 @@ static enum ldlm_policy_res ldlm_cancel_lrur_policy(struct ldlm_namespace *ns, static enum ldlm_policy_res ldlm_cancel_lrur_no_wait_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock, - int unused, int added, - int count) + int added, int min) { enum ldlm_policy_res result; - result = ldlm_cancel_lrur_policy(ns, lock, unused, added, count); + result = ldlm_cancel_lrur_policy(ns, lock, added, min); if (result == LDLM_POLICY_KEEP_LOCK) return result; - return ldlm_cancel_no_wait_policy(ns, lock, unused, added, count); -} - -/** - * Callback function for debugfs used policy. Makes decision whether to keep - * \a lock in LRU for current \a LRU size \a unused, added in current scan \a - * added and number of locks to be preferably canceled \a count. - * - * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning - * - * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU - */ -static enum ldlm_policy_res ldlm_cancel_passed_policy(struct ldlm_namespace *ns, - struct ldlm_lock *lock, - int unused, int added, - int count) -{ - /* - * Stop LRU processing when we reach past @count or have checked all - * locks in LRU. - */ - return (added >= count) ? - LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK; + return ldlm_cancel_no_wait_policy(ns, lock, added, min); } /** - * Callback function for aged policy. Makes decision whether to keep \a lock in - * LRU for current LRU size \a unused, added in current scan \a added and - * number of locks to be preferably canceled \a count. + * Callback function for aged policy. Decides whether to keep + * \a lock in LRU for \a added in current scan and \a min number of locks + * to be preferably canceled. * * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning * @@ -1838,10 +1719,9 @@ static enum ldlm_policy_res ldlm_cancel_passed_policy(struct ldlm_namespace *ns, */ static enum ldlm_policy_res ldlm_cancel_aged_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock, - int unused, int added, - int count) + int added, int min) { - if ((added >= count) && + if ((added >= min) && ktime_before(ktime_get(), ktime_add(lock->l_last_used, ns->ns_max_age))) return LDLM_POLICY_KEEP_LOCK; @@ -1852,78 +1732,43 @@ static enum ldlm_policy_res ldlm_cancel_aged_policy(struct ldlm_namespace *ns, static enum ldlm_policy_res ldlm_cancel_aged_no_wait_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock, - int unused, int added, int count) + int added, int min) { enum ldlm_policy_res result; - result = ldlm_cancel_aged_policy(ns, lock, unused, added, count); + result = ldlm_cancel_aged_policy(ns, lock, added, min); if (result == LDLM_POLICY_KEEP_LOCK) return result; - return ldlm_cancel_no_wait_policy(ns, lock, unused, added, count); -} - -/** - * Callback function for default policy. Makes decision whether to keep \a lock - * in LRU for current LRU size \a unused, added in current scan \a added and - * number of locks to be preferably canceled \a count. - * - * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning - * - * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU - */ -static -enum ldlm_policy_res ldlm_cancel_default_policy(struct ldlm_namespace *ns, - struct ldlm_lock *lock, - int unused, int added, - int count) -{ - /* - * Stop LRU processing when we reach past count or have checked all - * locks in LRU. - */ - return (added >= count) ? - LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK; + return ldlm_cancel_no_wait_policy(ns, lock, added, min); } typedef enum ldlm_policy_res (*ldlm_cancel_lru_policy_t)(struct ldlm_namespace *ns, struct ldlm_lock *lock, - int unused, int added, int count); + int added, int min); static ldlm_cancel_lru_policy_t ldlm_cancel_lru_policy(struct ldlm_namespace *ns, enum ldlm_lru_flags lru_flags) { if (ns_connect_lru_resize(ns)) { - if (lru_flags & LDLM_LRU_FLAG_SHRINK) - /* We kill passed number of old locks. */ - return ldlm_cancel_passed_policy; - if (lru_flags & LDLM_LRU_FLAG_LRUR) { - if (lru_flags & LDLM_LRU_FLAG_NO_WAIT) - return ldlm_cancel_lrur_no_wait_policy; - else - return ldlm_cancel_lrur_policy; - } - if (lru_flags & LDLM_LRU_FLAG_PASSED) - return ldlm_cancel_passed_policy; + if (lru_flags & LDLM_LRU_FLAG_NO_WAIT) + return ldlm_cancel_lrur_no_wait_policy; + else + return ldlm_cancel_lrur_policy; } else { - if (lru_flags & LDLM_LRU_FLAG_AGED) { - if (lru_flags & LDLM_LRU_FLAG_NO_WAIT) - return ldlm_cancel_aged_no_wait_policy; - else - return ldlm_cancel_aged_policy; - } + if (lru_flags & LDLM_LRU_FLAG_NO_WAIT) + return ldlm_cancel_aged_no_wait_policy; + else + return ldlm_cancel_aged_policy; } - if (lru_flags & LDLM_LRU_FLAG_NO_WAIT) - return ldlm_cancel_no_wait_policy; - - return ldlm_cancel_default_policy; } /** - * - Free space in LRU for \a count new locks, + * - Free space in LRU for \a min new locks, * redundant unused locks are canceled locally; * - also cancel locally unused aged locks; * - do not cancel more than \a max locks; + * - if some locks are cancelled, try to cancel at least \a batch locks * - GET the found locks and add them into the \a cancels list. * * A client lock can be added to the l_bl_ast list only when it is @@ -1934,40 +1779,49 @@ ldlm_cancel_lru_policy(struct ldlm_namespace *ns, enum ldlm_lru_flags lru_flags) * attempt to cancel a lock rely on this flag, l_bl_ast list is accessed * later without any special locking. * - * Calling policies for enabled LRU resize: - * ---------------------------------------- - * flags & LDLM_LRU_FLAG_LRUR - use LRU resize policy (SLV from server) to - * cancel not more than \a count locks; - * - * flags & LDLM_LRU_FLAG_PASSED - cancel \a count number of old locks (located - * at the beginning of LRU list); - * - * flags & LDLM_LRU_FLAG_SHRINK - cancel not more than \a count locks according - * to memory pressre policy function; + * Locks are cancelled according to the LRU resize policy (SLV from server) + * if LRU resize is enabled; otherwise, the "aged policy" is used; * - * flags & LDLM_LRU_FLAG_AGED - cancel \a count locks according to "aged policy" + * LRU flags: + * ---------------------------------------- * - * flags & LDLM_LRU_FLAG_NO_WAIT - cancel as many unused locks as possible - * (typically before replaying locks) w/o - * sending any RPCs or waiting for any - * outstanding RPC to complete. + * flags & LDLM_LRU_FLAG_NO_WAIT - cancel locks w/o sending any RPCs or waiting + * for any outstanding RPC to complete. * * flags & LDLM_CANCEL_CLEANUP - when cancelling read locks, do not check for - * other read locks covering the same pages, just - * discard those pages. + * other read locks covering the same pages, just + * discard those pages. */ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns, - struct list_head *cancels, int count, int max, + struct list_head *cancels, + int min, int max, int batch, enum ldlm_lru_flags lru_flags) { ldlm_cancel_lru_policy_t pf; int added = 0; int no_wait = lru_flags & LDLM_LRU_FLAG_NO_WAIT; - ENTRY; + /* + * Let only 1 thread to proceed. However, not for those which have the + * @max limit given (ELC), as LRU may be left not cleaned up in full. + */ + if (max == 0) { + if (test_and_set_bit(LDLM_LRU_CANCEL, &ns->ns_flags)) + RETURN(0); + } else if (test_bit(LDLM_LRU_CANCEL, &ns->ns_flags)) + RETURN(0); + + LASSERT(ergo(max, min <= max)); + /* No sense to give @batch for ELC */ + LASSERT(ergo(max, batch == 0)); + if (!ns_connect_lru_resize(ns)) - count += ns->ns_nr_unused - ns->ns_max_unused; + min = max_t(int, min, ns->ns_nr_unused - ns->ns_max_unused); + + /* If at least 1 lock is to be cancelled, cancel at least @batch locks */ + if (min && min < batch) + min = batch; pf = ldlm_cancel_lru_policy(ns, lru_flags); LASSERT(pf != NULL); @@ -1989,8 +1843,7 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns, /* No locks which got blocking requests. */ LASSERT(!ldlm_is_bl_ast(lock)); - if (!ldlm_is_canceling(lock) && - !ldlm_is_converting(lock)) + if (!ldlm_is_canceling(lock)) break; /* @@ -2025,7 +1878,7 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns, * their weight. Big extent locks will stay in * the cache. */ - result = pf(ns, lock, ns->ns_nr_unused, added, count); + result = pf(ns, lock, added, min); if (result == LDLM_POLICY_KEEP_LOCK) { lu_ref_del(&lock->l_reference, __func__, current); LDLM_LOCK_RELEASE(lock); @@ -2048,7 +1901,7 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns, lock_res_and_lock(lock); /* Check flags again under the lock. */ - if (ldlm_is_canceling(lock) || ldlm_is_converting(lock) || + if (ldlm_is_canceling(lock) || ldlm_lock_remove_from_lru_check(lock, last_use) == 0) { /* * Another thread is removing lock from LRU, or @@ -2102,18 +1955,25 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns, unlock_res_and_lock(lock); lu_ref_del(&lock->l_reference, __FUNCTION__, current); added++; + /* Once a lock added, batch the requested amount */ + if (min == 0) + min = batch; } + + if (max == 0) + clear_bit(LDLM_LRU_CANCEL, &ns->ns_flags); + RETURN(added); } int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels, - int count, int max, + int min, int max, enum ldlm_cancel_flags cancel_flags, enum ldlm_lru_flags lru_flags) { int added; - added = ldlm_prepare_lru_list(ns, cancels, count, max, lru_flags); + added = ldlm_prepare_lru_list(ns, cancels, min, max, 0, lru_flags); if (added <= 0) return added; @@ -2121,18 +1981,18 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels, } /** - * Cancel at least \a nr locks from given namespace LRU. + * Cancel at least \a min locks from given namespace LRU. * * When called with LCF_ASYNC the blocking callback will be handled * in a thread and this function will return after the thread has been * asked to call the callback. When called with LCF_ASYNC the blocking * callback will be performed in this function. */ -int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, +int ldlm_cancel_lru(struct ldlm_namespace *ns, int min, enum ldlm_cancel_flags cancel_flags, enum ldlm_lru_flags lru_flags) { - struct list_head cancels = LIST_HEAD_INIT(cancels); + LIST_HEAD(cancels); int count, rc; ENTRY; @@ -2141,7 +2001,8 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, * Just prepare the list of locks, do not actually cancel them yet. * Locks are cancelled later in a separate thread. */ - count = ldlm_prepare_lru_list(ns, &cancels, nr, 0, lru_flags); + count = ldlm_prepare_lru_list(ns, &cancels, min, 0, + ns->ns_cancel_batch, lru_flags); rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count, cancel_flags); if (rc == 0) RETURN(count); @@ -2178,11 +2039,10 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res, continue; /* - * If somebody is already doing CANCEL, or blocking AST came, - * or lock is being converted then skip this lock. + * If somebody is already doing CANCEL, or blocking AST came + * then skip this lock. */ - if (ldlm_is_bl_ast(lock) || ldlm_is_canceling(lock) || - ldlm_is_converting(lock)) + if (ldlm_is_bl_ast(lock) || ldlm_is_canceling(lock)) continue; if (lockmode_compat(lock->l_granted_mode, mode)) @@ -2208,7 +2068,6 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res, /* See CBPENDING comment in ldlm_cancel_lru */ lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING | lock_flags; - LASSERT(list_empty(&lock->l_bl_ast)); list_add(&lock->l_bl_ast, cancels); LDLM_LOCK_GET(lock); @@ -2295,7 +2154,7 @@ int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns, enum ldlm_cancel_flags flags, void *opaque) { struct ldlm_resource *res; - struct list_head cancels = LIST_HEAD_INIT(cancels); + LIST_HEAD(cancels); int count; int rc; @@ -2501,6 +2360,8 @@ static int replay_lock_interpret(const struct lu_env *env, ENTRY; atomic_dec(&req->rq_import->imp_replay_inflight); + wake_up(&req->rq_import->imp_replay_waitq); + if (rc != ELDLM_OK) GOTO(out, rc); @@ -2616,9 +2477,8 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock) LDLM_DEBUG(lock, "replaying lock:"); - atomic_inc(&req->rq_import->imp_replay_inflight); - CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = ptlrpc_req_async_args(req); + atomic_inc(&imp->imp_replay_inflight); + aa = ptlrpc_req_async_args(aa, req); aa->lock_handle = body->lock_handle[0]; req->rq_interpret_reply = replay_lock_interpret; ptlrpcd_add_req(req); @@ -2639,7 +2499,7 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock) static void ldlm_cancel_unused_locks_for_replay(struct ldlm_namespace *ns) { int canceled; - struct list_head cancels = LIST_HEAD_INIT(cancels); + LIST_HEAD(cancels); CDEBUG(D_DLMTRACE, "Dropping as many unused locks as possible before replay for namespace %s (%d)\n", @@ -2657,24 +2517,34 @@ static void ldlm_cancel_unused_locks_for_replay(struct ldlm_namespace *ns) canceled, ldlm_ns_name(ns)); } -int ldlm_replay_locks(struct obd_import *imp) +static int lock_can_replay(struct obd_import *imp) +{ + struct client_obd *cli = &imp->imp_obd->u.cli; + + CDEBUG(D_HA, "check lock replay limit, inflights = %u(%u)\n", + atomic_read(&imp->imp_replay_inflight) - 1, + cli->cl_max_rpcs_in_flight); + + /* +1 due to ldlm_lock_replay() increment */ + return atomic_read(&imp->imp_replay_inflight) < + 1 + min_t(u32, cli->cl_max_rpcs_in_flight, 8); +} + +int __ldlm_replay_locks(struct obd_import *imp, bool rate_limit) { struct ldlm_namespace *ns = imp->imp_obd->obd_namespace; - struct list_head list = LIST_HEAD_INIT(list); + LIST_HEAD(list); struct ldlm_lock *lock, *next; int rc = 0; ENTRY; - LASSERT(atomic_read(&imp->imp_replay_inflight) == 0); + LASSERT(atomic_read(&imp->imp_replay_inflight) == 1); /* don't replay locks if import failed recovery */ if (imp->imp_vbr_failed) RETURN(0); - /* ensure this doesn't fall to 0 before all have been queued */ - atomic_inc(&imp->imp_replay_inflight); - if (ldlm_cancel_unused_locks_before_replay) ldlm_cancel_unused_locks_for_replay(ns); @@ -2688,9 +2558,54 @@ int ldlm_replay_locks(struct obd_import *imp) } rc = replay_one_lock(imp, lock); LDLM_LOCK_RELEASE(lock); + + if (rate_limit) + wait_event_idle_exclusive(imp->imp_replay_waitq, + lock_can_replay(imp)); } + RETURN(rc); +} + +/** + * Lock replay uses rate control and can sleep waiting so + * must be in separate thread from ptlrpcd itself + */ +static int ldlm_lock_replay_thread(void *data) +{ + struct obd_import *imp = data; + + CDEBUG(D_HA, "lock replay thread %s to %s@%s\n", + imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd), + imp->imp_connection->c_remote_uuid.uuid); + + __ldlm_replay_locks(imp, true); atomic_dec(&imp->imp_replay_inflight); + ptlrpc_import_recovery_state_machine(imp); + class_import_put(imp); - RETURN(rc); + return 0; +} + +int ldlm_replay_locks(struct obd_import *imp) +{ + struct task_struct *task; + int rc = 0; + + class_import_get(imp); + /* ensure this doesn't fall to 0 before all have been queued */ + atomic_inc(&imp->imp_replay_inflight); + + task = kthread_run(ldlm_lock_replay_thread, imp, "ldlm_lock_replay"); + if (IS_ERR(task)) { + rc = PTR_ERR(task); + CDEBUG(D_HA, "can't start lock replay thread: rc = %d\n", rc); + + /* run lock replay without rate control */ + rc = __ldlm_replay_locks(imp, false); + atomic_dec(&imp->imp_replay_inflight); + class_import_put(imp); + } + + return rc; }