X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fldlm%2Fldlm_request.c;h=4458ebcc438684d0dd71b785d5b4d53b4f4a0ce2;hb=3116b9e19dc09a4a8b73c2c4733df5fe4596e041;hp=ac63e6946dcf3ff1fd80ce6ee3a0fe4812cfab53;hpb=9c028e74c2202a8a481557c4cb22225734aaf19f;p=fs%2Flustre-release.git diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index ac63e69..4458ebc 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -67,14 +67,11 @@ unsigned int ldlm_enqueue_min = OBD_TIMEOUT_DEFAULT; module_param(ldlm_enqueue_min, uint, 0644); MODULE_PARM_DESC(ldlm_enqueue_min, "lock enqueue timeout minimum"); +EXPORT_SYMBOL(ldlm_enqueue_min); /* in client side, whether the cached locks will be canceled before replay */ unsigned int ldlm_cancel_unused_locks_before_replay = 1; -static void interrupted_completion_wait(void *data) -{ -} - struct lock_wait_data { struct ldlm_lock *lwd_lock; __u32 lwd_conn_cnt; @@ -111,9 +108,8 @@ int ldlm_request_bufsize(int count, int type) return sizeof(struct ldlm_request) + avail; } -int ldlm_expired_completion_wait(void *data) +void ldlm_expired_completion_wait(struct lock_wait_data *lwd) { - struct lock_wait_data *lwd = data; struct ldlm_lock *lock = lwd->lwd_lock; struct obd_import *imp; struct obd_device *obd; @@ -124,9 +120,8 @@ int ldlm_expired_completion_wait(void *data) LDLM_ERROR(lock, "lock timed out (enqueued at %lld, %llds ago); not entering recovery in server code, just going back to sleep", - (s64)lock->l_activity, - (s64)(ktime_get_real_seconds() - - lock->l_activity)); + lock->l_activity, + ktime_get_real_seconds() - lock->l_activity); if (ktime_get_seconds() > next_dump) { last_dump = next_dump; next_dump = ktime_get_seconds() + 300; @@ -135,7 +130,7 @@ int ldlm_expired_completion_wait(void *data) if (last_dump == 0) libcfs_debug_dumplog(); } - RETURN(0); + RETURN_EXIT; } obd = lock->l_conn_export->exp_obd; @@ -143,11 +138,11 @@ int ldlm_expired_completion_wait(void *data) ptlrpc_fail_import(imp, lwd->lwd_conn_cnt); LDLM_ERROR(lock, "lock timed out (enqueued at %lld, %llds ago), entering recovery for %s@%s", - (s64)lock->l_activity, - (s64)(ktime_get_real_seconds() - lock->l_activity), + lock->l_activity, + ktime_get_real_seconds() - lock->l_activity, obd2cli_tgt(obd), imp->imp_connection->c_remote_uuid.uuid); - RETURN(0); + EXIT; } int is_granted_or_cancelled_nolock(struct ldlm_lock *lock) @@ -176,9 +171,9 @@ EXPORT_SYMBOL(is_granted_or_cancelled_nolock); * We use the same basis for both server side and client side functions * from a single node. */ -static time64_t ldlm_cp_timeout(struct ldlm_lock *lock) +static timeout_t ldlm_cp_timeout(struct ldlm_lock *lock) { - time64_t timeout; + timeout_t timeout; if (AT_OFF) return obd_timeout; @@ -189,7 +184,7 @@ static time64_t ldlm_cp_timeout(struct ldlm_lock *lock) * doesn't respond reasonably, and then give us the lock. */ timeout = at_get(ldlm_lock_to_ns_at(lock)); - return max(3 * timeout, (time64_t) ldlm_enqueue_min); + return max(3 * timeout, (timeout_t)ldlm_enqueue_min); } /** @@ -198,7 +193,6 @@ static time64_t ldlm_cp_timeout(struct ldlm_lock *lock) */ static int ldlm_completion_tail(struct ldlm_lock *lock, void *data) { - time64_t delay; int result = 0; if (ldlm_is_destroyed(lock) || ldlm_is_failed(lock)) { @@ -208,10 +202,16 @@ static int ldlm_completion_tail(struct ldlm_lock *lock, void *data) LDLM_DEBUG(lock, "client-side enqueue: granted"); } else { /* Take into AT only CP RPC, not immediately granted locks */ - delay = ktime_get_real_seconds() - lock->l_activity; - LDLM_DEBUG(lock, "client-side enqueue: granted after %llds", - (s64)delay); + timeout_t delay = 0; + /* Discard negative timeouts. We should also limit the + * maximum value of the timeout + */ + if (ktime_get_real_seconds() > lock->l_activity) + delay = ktime_get_real_seconds() - lock->l_activity; + + LDLM_DEBUG(lock, "client-side enqueue: granted after %ds", + delay); /* Update our time estimate */ at_measured(ldlm_lock_to_ns_at(lock), delay); } @@ -239,7 +239,7 @@ int ldlm_completion_ast_async(struct ldlm_lock *lock, __u64 flags, void *data) LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, going forward"); - ldlm_reprocess_all(lock->l_resource); + ldlm_reprocess_all(lock->l_resource, NULL); RETURN(0); } EXPORT_SYMBOL(ldlm_completion_ast_async); @@ -269,8 +269,7 @@ int ldlm_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data) struct lock_wait_data lwd; struct obd_device *obd; struct obd_import *imp = NULL; - struct l_wait_info lwi; - time64_t timeout; + timeout_t timeout; int rc = 0; ENTRY; @@ -300,15 +299,6 @@ noreproc: lwd.lwd_lock = lock; lock->l_activity = ktime_get_real_seconds(); - if (ldlm_is_no_timeout(lock)) { - LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT"); - lwi = LWI_INTR(interrupted_completion_wait, &lwd); - } else { - lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout), - ldlm_expired_completion_wait, - interrupted_completion_wait, &lwd); - } - if (imp != NULL) { spin_lock(&imp->imp_lock); lwd.lwd_conn_cnt = imp->imp_conn_cnt; @@ -322,8 +312,22 @@ noreproc: rc = -EINTR; } else { /* Go to sleep until the lock is granted or cancelled. */ - rc = l_wait_event(lock->l_waitq, - is_granted_or_cancelled(lock), &lwi); + if (ldlm_is_no_timeout(lock)) { + LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT"); + rc = l_wait_event_abortable( + lock->l_waitq, + is_granted_or_cancelled(lock)); + } else { + if (wait_event_idle_timeout( + lock->l_waitq, + is_granted_or_cancelled(lock), + cfs_time_seconds(timeout)) == 0) { + ldlm_expired_completion_wait(&lwd); + rc = l_wait_event_abortable( + lock->l_waitq, + is_granted_or_cancelled(lock)); + } + } } if (rc) { @@ -583,6 +587,11 @@ static void failed_lock_cleanup(struct ldlm_namespace *ns, } } +static bool ldlm_request_slot_needed(enum ldlm_type type) +{ + return type == LDLM_FLOCK || type == LDLM_IBITS; +} + /** * Finishing portion of client lock enqueue code. * @@ -603,6 +612,11 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, ENTRY; + if (ldlm_request_slot_needed(type)) + obd_put_request_slot(&req->rq_import->imp_obd->u.cli); + + ptlrpc_put_mod_rpc_slot(req); + if (req && req->rq_svc_thread) env = req->rq_svc_thread->t_env; @@ -725,6 +739,7 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, if ((*flags) & LDLM_FL_AST_SENT) { lock_res_and_lock(lock); + ldlm_bl_desc2lock(&reply->lock_desc, lock); lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST; unlock_res_and_lock(lock); LDLM_DEBUG(lock, "enqueue reply includes blocking AST"); @@ -837,7 +852,7 @@ int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req, struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; struct req_capsule *pill = &req->rq_pill; struct ldlm_request *dlm = NULL; - struct list_head head = LIST_HEAD_INIT(head); + LIST_HEAD(head); enum ldlm_lru_flags lru_flags; int avail, to_free, pack = 0; int rc; @@ -1008,7 +1023,8 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, lock->l_conn_export = exp; lock->l_export = NULL; lock->l_blocking_ast = einfo->ei_cb_bl; - lock->l_flags |= (*flags & (LDLM_FL_NO_LRU | LDLM_FL_EXCL)); + lock->l_flags |= (*flags & (LDLM_FL_NO_LRU | LDLM_FL_EXCL | + LDLM_FL_ATOMIC_CB)); lock->l_activity = ktime_get_real_seconds(); /* lock not sent to server yet */ @@ -1034,7 +1050,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, } if (*flags & LDLM_FL_NDELAY) { - DEBUG_REQ(D_DLMTRACE, req, "enque lock with no delay\n"); + DEBUG_REQ(D_DLMTRACE, req, "enqueue lock with no delay"); req->rq_no_resend = req->rq_no_delay = 1; /* * probably set a shorter timeout value and handle ETIMEDOUT @@ -1071,6 +1087,24 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, LDLM_GLIMPSE_ENQUEUE); } + /* It is important to obtain modify RPC slot first (if applicable), so + * that threads that are waiting for a modify RPC slot are not polluting + * our rpcs in flight counter. */ + + if (einfo->ei_enq_slot) + ptlrpc_get_mod_rpc_slot(req); + + if (ldlm_request_slot_needed(einfo->ei_type)) { + rc = obd_get_request_slot(&req->rq_import->imp_obd->u.cli); + if (rc) { + if (einfo->ei_enq_slot) + ptlrpc_put_mod_rpc_slot(req); + failed_lock_cleanup(ns, lock, einfo->ei_mode); + LDLM_LOCK_RELEASE(lock); + GOTO(out, rc); + } + } + if (async) { LASSERT(reqp != NULL); RETURN(0); @@ -1093,6 +1127,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, else rc = err; +out: if (!req_passed_in && req != NULL) { ptlrpc_req_finished(req); if (reqp) @@ -1104,128 +1139,6 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, EXPORT_SYMBOL(ldlm_cli_enqueue); /** - * Client-side lock convert reply handling. - * - * Finish client lock converting, checks for concurrent converts - * and clear 'converting' flag so lock can be placed back into LRU. - */ -static int lock_convert_interpret(const struct lu_env *env, - struct ptlrpc_request *req, - void *args, int rc) -{ - struct ldlm_async_args *aa = args; - struct ldlm_lock *lock; - struct ldlm_reply *reply; - - ENTRY; - - lock = ldlm_handle2lock(&aa->lock_handle); - if (!lock) { - LDLM_DEBUG_NOLOCK("convert ACK for unknown local cookie %#llx", - aa->lock_handle.cookie); - RETURN(-ESTALE); - } - - LDLM_DEBUG(lock, "CONVERTED lock:"); - - if (rc != ELDLM_OK) - GOTO(out, rc); - - reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP); - if (reply == NULL) - GOTO(out, rc = -EPROTO); - - if (reply->lock_handle.cookie != aa->lock_handle.cookie) { - LDLM_ERROR(lock, - "convert ACK with wrong lock cookie %#llx but cookie %#llx from server %s id %s\n", - aa->lock_handle.cookie, reply->lock_handle.cookie, - req->rq_export->exp_client_uuid.uuid, - libcfs_id2str(req->rq_peer)); - GOTO(out, rc = ELDLM_NO_LOCK_DATA); - } - - lock_res_and_lock(lock); - /* - * Lock convert is sent for any new bits to drop, the converting flag - * is dropped when ibits on server are the same as on client. Meanwhile - * that can be so that more later convert will be replied first with - * and clear converting flag, so in case of such race just exit here. - * if lock has no converting bits then - */ - if (!ldlm_is_converting(lock)) { - LDLM_DEBUG(lock, - "convert ACK for lock without converting flag, reply ibits %#llx", - reply->lock_desc.l_policy_data.l_inodebits.bits); - } else if (reply->lock_desc.l_policy_data.l_inodebits.bits != - lock->l_policy_data.l_inodebits.bits) { - /* - * Compare server returned lock ibits and local lock ibits - * if they are the same we consider convertion is done, - * otherwise we have more converts inflight and keep - * converting flag. - */ - LDLM_DEBUG(lock, "convert ACK with ibits %#llx\n", - reply->lock_desc.l_policy_data.l_inodebits.bits); - } else { - ldlm_clear_converting(lock); - - /* - * Concurrent BL AST may arrive and cause another convert - * or cancel so just do nothing here if bl_ast is set, - * finish with convert otherwise. - */ - if (!ldlm_is_bl_ast(lock)) { - struct ldlm_namespace *ns = ldlm_lock_to_ns(lock); - - /* - * Drop cancel_bits since there are no more converts - * and put lock into LRU if it is still not used and - * is not there yet. - */ - lock->l_policy_data.l_inodebits.cancel_bits = 0; - if (!lock->l_readers && !lock->l_writers && - !ldlm_is_canceling(lock)) { - spin_lock(&ns->ns_lock); - /* there is check for list_empty() inside */ - ldlm_lock_remove_from_lru_nolock(lock); - ldlm_lock_add_to_lru_nolock(lock); - spin_unlock(&ns->ns_lock); - } - } - } - unlock_res_and_lock(lock); -out: - if (rc) { - int flag; - - lock_res_and_lock(lock); - if (ldlm_is_converting(lock)) { - ldlm_clear_converting(lock); - ldlm_set_cbpending(lock); - ldlm_set_bl_ast(lock); - lock->l_policy_data.l_inodebits.cancel_bits = 0; - } - unlock_res_and_lock(lock); - - /* - * fallback to normal lock cancel. If rc means there is no - * valid lock on server, do only local cancel - */ - if (rc == ELDLM_NO_LOCK_DATA) - flag = LCF_LOCAL; - else - flag = LCF_ASYNC; - - rc = ldlm_cli_cancel(&aa->lock_handle, flag); - if (rc < 0) - LDLM_DEBUG(lock, "failed to cancel lock: rc = %d\n", - rc); - } - LDLM_LOCK_PUT(lock); - RETURN(rc); -} - -/** * Client-side IBITS lock convert. * * Inform server that lock has been converted instead of canceling. @@ -1237,19 +1150,15 @@ out: * is made asynchronous. * */ -int ldlm_cli_convert(struct ldlm_lock *lock, __u32 *flags) +int ldlm_cli_convert_req(struct ldlm_lock *lock, __u32 *flags, __u64 new_bits) { struct ldlm_request *body; struct ptlrpc_request *req; - struct ldlm_async_args *aa; struct obd_export *exp = lock->l_conn_export; ENTRY; - if (exp == NULL) { - LDLM_ERROR(lock, "convert must not be called on local locks."); - RETURN(-EINVAL); - } + LASSERT(exp != NULL); /* * this is better to check earlier and it is done so already, @@ -1280,8 +1189,7 @@ int ldlm_cli_convert(struct ldlm_lock *lock, __u32 *flags) body->lock_desc.l_req_mode = lock->l_req_mode; body->lock_desc.l_granted_mode = lock->l_granted_mode; - body->lock_desc.l_policy_data.l_inodebits.bits = - lock->l_policy_data.l_inodebits.bits; + body->lock_desc.l_policy_data.l_inodebits.bits = new_bits; body->lock_desc.l_policy_data.l_inodebits.cancel_bits = 0; body->lock_flags = ldlm_flags_to_wire(*flags); @@ -1301,10 +1209,6 @@ int ldlm_cli_convert(struct ldlm_lock *lock, __u32 *flags) lprocfs_counter_incr(exp->exp_obd->obd_svc_stats, LDLM_CONVERT - LDLM_FIRST_OPC); - aa = ptlrpc_req_async_args(req); - ldlm_lock2handle(lock, &aa->lock_handle); - req->rq_interpret_reply = lock_convert_interpret; - ptlrpcd_add_req(req); RETURN(0); } @@ -1326,6 +1230,9 @@ static __u64 ldlm_cli_cancel_local(struct ldlm_lock *lock) bool local_only; LDLM_DEBUG(lock, "client-side cancel"); + OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL_LOCAL, + cfs_fail_val); + /* Set this flag to prevent others from getting new references*/ lock_res_and_lock(lock); ldlm_set_cbpending(lock); @@ -1349,7 +1256,7 @@ static __u64 ldlm_cli_cancel_local(struct ldlm_lock *lock) } LDLM_DEBUG(lock, "server-side local cancel"); ldlm_lock_cancel(lock); - ldlm_reprocess_all(lock->l_resource); + ldlm_reprocess_all(lock->l_resource, lock); } RETURN(rc); @@ -1532,7 +1439,7 @@ int ldlm_cli_update_pool(struct ptlrpc_request *req) if (lustre_msg_get_slv(req->rq_repmsg) == 0 || lustre_msg_get_limit(req->rq_repmsg) == 0) { DEBUG_REQ(D_HA, req, - "Zero SLV or Limit found (SLV: %llu, Limit: %u)", + "Zero SLV or limit found (SLV=%llu, limit=%u)", lustre_msg_get_slv(req->rq_repmsg), lustre_msg_get_limit(req->rq_repmsg)); RETURN(0); @@ -1542,6 +1449,14 @@ int ldlm_cli_update_pool(struct ptlrpc_request *req) new_slv = lustre_msg_get_slv(req->rq_repmsg); obd = req->rq_import->imp_obd; + read_lock(&obd->obd_pool_lock); + if (obd->obd_pool_slv == new_slv && + obd->obd_pool_limit == new_limit) { + read_unlock(&obd->obd_pool_lock); + RETURN(0); + } + read_unlock(&obd->obd_pool_lock); + /* * Set new SLV and limit in OBD fields to make them accessible * to the pool thread. We do not access obd_namespace and pool @@ -1557,6 +1472,27 @@ int ldlm_cli_update_pool(struct ptlrpc_request *req) RETURN(0); } +int ldlm_cli_convert(struct ldlm_lock *lock, + enum ldlm_cancel_flags cancel_flags) +{ + int rc = -EINVAL; + + LASSERT(!lock->l_readers && !lock->l_writers); + LDLM_DEBUG(lock, "client lock convert START"); + + if (lock->l_resource->lr_type == LDLM_IBITS) { + lock_res_and_lock(lock); + do { + rc = ldlm_cli_inodebits_convert(lock, cancel_flags); + } while (rc == -EAGAIN); + unlock_res_and_lock(lock); + } + + LDLM_DEBUG(lock, "client lock convert END"); + RETURN(rc); +} +EXPORT_SYMBOL(ldlm_cli_convert); + /** * Client side lock cancel. * @@ -1571,7 +1507,7 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh, __u64 rc = 0; struct ldlm_namespace *ns; struct ldlm_lock *lock; - struct list_head cancels = LIST_HEAD_INIT(cancels); + LIST_HEAD(cancels); ENTRY; @@ -1581,44 +1517,21 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh, RETURN(0); } - /* Convert lock bits instead of cancel for IBITS locks */ - if (cancel_flags & LCF_CONVERT) { - LASSERT(lock->l_resource->lr_type == LDLM_IBITS); - LASSERT(lock->l_policy_data.l_inodebits.cancel_bits != 0); - - rc = ldlm_cli_dropbits(lock, - lock->l_policy_data.l_inodebits.cancel_bits); - if (rc == 0) { - LDLM_LOCK_RELEASE(lock); - RETURN(0); - } - } - lock_res_and_lock(lock); + LASSERT(!ldlm_is_converting(lock)); + /* Lock is being canceled and the caller doesn't want to wait */ if (ldlm_is_canceling(lock)) { if (cancel_flags & LCF_ASYNC) { unlock_res_and_lock(lock); } else { - struct l_wait_info lwi = { 0 }; - unlock_res_and_lock(lock); - l_wait_event(lock->l_waitq, is_bl_done(lock), &lwi); + wait_event_idle(lock->l_waitq, is_bl_done(lock)); } LDLM_LOCK_RELEASE(lock); RETURN(0); } - /* - * Lock is being converted, cancel it immediately. - * When convert will end, it releases lock and it will be gone. - */ - if (ldlm_is_converting(lock)) { - /* set back flags removed by convert */ - ldlm_set_cbpending(lock); - ldlm_set_bl_ast(lock); - } - ldlm_set_canceling(lock); unlock_res_and_lock(lock); @@ -1664,7 +1577,7 @@ EXPORT_SYMBOL(ldlm_cli_cancel); int ldlm_cli_cancel_list_local(struct list_head *cancels, int count, enum ldlm_cancel_flags cancel_flags) { - struct list_head head = LIST_HEAD_INIT(head); + LIST_HEAD(head); struct ldlm_lock *lock, *next; int left = 0, bl_ast = 0; __u64 rc; @@ -1688,8 +1601,7 @@ int ldlm_cli_cancel_list_local(struct list_head *cancels, int count, */ if (!(cancel_flags & LCF_BL_AST) && (rc == LDLM_FL_BL_AST)) { LDLM_DEBUG(lock, "Cancel lock separately"); - list_del_init(&lock->l_bl_ast); - list_add(&lock->l_bl_ast, &head); + list_move(&lock->l_bl_ast, &head); bl_ast++; continue; } @@ -1730,6 +1642,7 @@ ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock, case LDLM_IBITS: if (ns->ns_cancel != NULL && ns->ns_cancel(lock) != 0) break; + /* fallthrough */ default: result = LDLM_POLICY_SKIP_LOCK; break; @@ -1990,8 +1903,7 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns, /* No locks which got blocking requests. */ LASSERT(!ldlm_is_bl_ast(lock)); - if (!ldlm_is_canceling(lock) && - !ldlm_is_converting(lock)) + if (!ldlm_is_canceling(lock)) break; /* @@ -2049,7 +1961,7 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns, lock_res_and_lock(lock); /* Check flags again under the lock. */ - if (ldlm_is_canceling(lock) || ldlm_is_converting(lock) || + if (ldlm_is_canceling(lock) || ldlm_lock_remove_from_lru_check(lock, last_use) == 0) { /* * Another thread is removing lock from LRU, or @@ -2133,7 +2045,7 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, enum ldlm_cancel_flags cancel_flags, enum ldlm_lru_flags lru_flags) { - struct list_head cancels = LIST_HEAD_INIT(cancels); + LIST_HEAD(cancels); int count, rc; ENTRY; @@ -2179,11 +2091,10 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res, continue; /* - * If somebody is already doing CANCEL, or blocking AST came, - * or lock is being converted then skip this lock. + * If somebody is already doing CANCEL, or blocking AST came + * then skip this lock. */ - if (ldlm_is_bl_ast(lock) || ldlm_is_canceling(lock) || - ldlm_is_converting(lock)) + if (ldlm_is_bl_ast(lock) || ldlm_is_canceling(lock)) continue; if (lockmode_compat(lock->l_granted_mode, mode)) @@ -2192,17 +2103,23 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res, /* * If policy is given and this is IBITS lock, add to list only * those locks that match by policy. - * Skip locks with DoM bit always to don't flush data. */ - if (policy && (lock->l_resource->lr_type == LDLM_IBITS) && - (!(lock->l_policy_data.l_inodebits.bits & - policy->l_inodebits.bits) || ldlm_has_dom(lock))) - continue; + if (policy && (lock->l_resource->lr_type == LDLM_IBITS)) { + if (!(lock->l_policy_data.l_inodebits.bits & + policy->l_inodebits.bits)) + continue; + /* Skip locks with DoM bit if it is not set in policy + * to don't flush data by side-bits. Lock convert will + * drop those bits separately. + */ + if (ldlm_has_dom(lock) && + !(policy->l_inodebits.bits & MDS_INODELOCK_DOM)) + continue; + } /* See CBPENDING comment in ldlm_cancel_lru */ lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING | lock_flags; - LASSERT(list_empty(&lock->l_bl_ast)); list_add(&lock->l_bl_ast, cancels); LDLM_LOCK_GET(lock); @@ -2289,7 +2206,7 @@ int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns, enum ldlm_cancel_flags flags, void *opaque) { struct ldlm_resource *res; - struct list_head cancels = LIST_HEAD_INIT(cancels); + LIST_HEAD(cancels); int count; int rc; @@ -2495,6 +2412,8 @@ static int replay_lock_interpret(const struct lu_env *env, ENTRY; atomic_dec(&req->rq_import->imp_replay_inflight); + wake_up(&req->rq_import->imp_replay_waitq); + if (rc != ELDLM_OK) GOTO(out, rc); @@ -2610,9 +2529,8 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock) LDLM_DEBUG(lock, "replaying lock:"); - atomic_inc(&req->rq_import->imp_replay_inflight); - CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = ptlrpc_req_async_args(req); + atomic_inc(&imp->imp_replay_inflight); + aa = ptlrpc_req_async_args(aa, req); aa->lock_handle = body->lock_handle[0]; req->rq_interpret_reply = replay_lock_interpret; ptlrpcd_add_req(req); @@ -2633,7 +2551,7 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock) static void ldlm_cancel_unused_locks_for_replay(struct ldlm_namespace *ns) { int canceled; - struct list_head cancels = LIST_HEAD_INIT(cancels); + LIST_HEAD(cancels); CDEBUG(D_DLMTRACE, "Dropping as many unused locks as possible before replay for namespace %s (%d)\n", @@ -2651,24 +2569,34 @@ static void ldlm_cancel_unused_locks_for_replay(struct ldlm_namespace *ns) canceled, ldlm_ns_name(ns)); } -int ldlm_replay_locks(struct obd_import *imp) +static int lock_can_replay(struct obd_import *imp) +{ + struct client_obd *cli = &imp->imp_obd->u.cli; + + CDEBUG(D_HA, "check lock replay limit, inflights = %u(%u)\n", + atomic_read(&imp->imp_replay_inflight) - 1, + cli->cl_max_rpcs_in_flight); + + /* +1 due to ldlm_lock_replay() increment */ + return atomic_read(&imp->imp_replay_inflight) < + 1 + min_t(u32, cli->cl_max_rpcs_in_flight, 8); +} + +int __ldlm_replay_locks(struct obd_import *imp, bool rate_limit) { struct ldlm_namespace *ns = imp->imp_obd->obd_namespace; - struct list_head list = LIST_HEAD_INIT(list); + LIST_HEAD(list); struct ldlm_lock *lock, *next; int rc = 0; ENTRY; - LASSERT(atomic_read(&imp->imp_replay_inflight) == 0); + LASSERT(atomic_read(&imp->imp_replay_inflight) == 1); /* don't replay locks if import failed recovery */ if (imp->imp_vbr_failed) RETURN(0); - /* ensure this doesn't fall to 0 before all have been queued */ - atomic_inc(&imp->imp_replay_inflight); - if (ldlm_cancel_unused_locks_before_replay) ldlm_cancel_unused_locks_for_replay(ns); @@ -2682,9 +2610,56 @@ int ldlm_replay_locks(struct obd_import *imp) } rc = replay_one_lock(imp, lock); LDLM_LOCK_RELEASE(lock); + + if (rate_limit) + wait_event_idle_exclusive(imp->imp_replay_waitq, + lock_can_replay(imp)); } + RETURN(rc); +} + +/** + * Lock replay uses rate control and can sleep waiting so + * must be in separate thread from ptlrpcd itself + */ +static int ldlm_lock_replay_thread(void *data) +{ + struct obd_import *imp = data; + + unshare_fs_struct(); + + CDEBUG(D_HA, "lock replay thread %s to %s@%s\n", + imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd), + imp->imp_connection->c_remote_uuid.uuid); + + __ldlm_replay_locks(imp, true); atomic_dec(&imp->imp_replay_inflight); + ptlrpc_import_recovery_state_machine(imp); + class_import_put(imp); - RETURN(rc); + return 0; +} + +int ldlm_replay_locks(struct obd_import *imp) +{ + struct task_struct *task; + int rc = 0; + + class_import_get(imp); + /* ensure this doesn't fall to 0 before all have been queued */ + atomic_inc(&imp->imp_replay_inflight); + + task = kthread_run(ldlm_lock_replay_thread, imp, "ldlm_lock_replay"); + if (IS_ERR(task)) { + rc = PTR_ERR(task); + CDEBUG(D_HA, "can't start lock replay thread: rc = %d\n", rc); + + /* run lock replay without rate control */ + rc = __ldlm_replay_locks(imp, false); + atomic_dec(&imp->imp_replay_inflight); + class_import_put(imp); + } + + return rc; }