/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
* vim:expandtab:shiftwidth=8:tabstop=8:
*
- * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
+ * GPL HEADER START
*
- * This file is part of the Lustre file system, http://www.lustre.org
- * Lustre is a trademark of Cluster File Systems, Inc.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
- * You may have signed or agreed to another license before downloading
- * this software. If so, you are bound by the terms and conditions
- * of that agreement, and the following does not apply to you. See the
- * LICENSE file included with this distribution for more information.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
*
- * If you did not agree to a different license, then this copy of Lustre
- * is open source software; you can redistribute it and/or modify it
- * under the terms of version 2 of the GNU General Public License as
- * published by the Free Software Foundation.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
*
- * In either case, Lustre is distributed in the hope that it will be
- * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
- * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * license text for more details.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright 2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
*/
#define DEBUG_SUBSYSTEM S_LDLM
timeout = timeout + (timeout >> 1); /* 150% */
return max(timeout, ldlm_enqueue_min);
}
+EXPORT_SYMBOL(ldlm_get_enq_timeout);
static int is_granted_or_cancelled(struct ldlm_lock *lock)
{
spin_unlock(&imp->imp_lock);
}
- /* Go to sleep until the lock is granted or cancelled. */
- rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
+ if (ns_is_client(lock->l_resource->lr_namespace) &&
+ lock->l_resource->lr_type == LDLM_EXTENT &&
+ OBD_FAIL_CHECK(OBD_FAIL_LDLM_INTR_CP_AST | OBD_FAIL_ONCE)) {
+ obd_fail_loc = OBD_FAIL_LDLM_CP_BL_RACE | OBD_FAIL_ONCE;
+ lock->l_flags |= LDLM_FL_FAIL_LOC;
+ rc = -EINTR;
+ } else {
+ /* Go to sleep until the lock is granted or cancelled. */
+ rc = l_wait_event(lock->l_waitq,
+ is_granted_or_cancelled(lock), &lwi);
+ }
if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED) {
LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
struct ldlm_lock *lock,
struct lustre_handle *lockh, int mode)
{
+ int need_cancel = 0;
+
/* Set a flag to prevent us from sending a CANCEL (bug 407) */
lock_res_and_lock(lock);
- lock->l_flags |= LDLM_FL_LOCAL_ONLY;
+ /* Check that lock is not granted or failed, we might race. */
+ if ((lock->l_req_mode != lock->l_granted_mode) &&
+ !(lock->l_flags & LDLM_FL_FAILED)) {
+ /* Make sure that this lock will not be found by raced
+ * bl_ast and -EINVAL reply is sent to server anyways.
+ * bug 17645 */
+ lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_FAILED |
+ LDLM_FL_ATOMIC_CB;
+ need_cancel = 1;
+ }
unlock_res_and_lock(lock);
- LDLM_DEBUG(lock, "setting FL_LOCAL_ONLY");
-
- ldlm_lock_decref_and_cancel(lockh, mode);
+
+ if (need_cancel) {
+ LDLM_DEBUG(lock,
+ "setting FL_LOCAL_ONLY | LDLM_FL_FAILED | "
+ "LDLM_FL_ATOMIC_CB");
+ ldlm_lock_decref_and_cancel(lockh, mode);
+ } else {
+ LDLM_DEBUG(lock, "lock was granted or failed in race");
+ ldlm_lock_decref(lockh, mode);
+ }
/* XXX - HACK because we shouldn't call ldlm_lock_destroy()
* from llite/file.c/ll_file_flock(). */
{
struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
int is_replay = *flags & LDLM_FL_REPLAY;
+ struct lustre_handle old_hash_key;
struct ldlm_lock *lock;
struct ldlm_reply *reply;
int cleanup_phase = 1;
cleanup_phase = 0;
lock_res_and_lock(lock);
+ old_hash_key = lock->l_remote_handle;
lock->l_remote_handle = reply->lock_handle;
+
+ /* Key change rehash lock in per-export hash with new key */
+ if (exp->exp_lock_hash)
+ lustre_hash_rehash_key(exp->exp_lock_hash, &old_hash_key,
+ &lock->l_remote_handle,
+ &lock->l_exp_hash);
+
*flags = reply->lock_flags;
lock->l_flags |= reply->lock_flags & LDLM_INHERIT_FLAGS;
/* move NO_TIMEOUT flag to the lock to force ldlm_lock_match()
lock->l_req_mode = newmode;
}
- if (reply->lock_desc.l_resource.lr_name.name[0] !=
- lock->l_resource->lr_name.name[0]) {
- CDEBUG(D_INFO, "remote intent success, locking %ld "
- "instead of %ld\n",
- (long)reply->lock_desc.l_resource.lr_name.name[0],
- (long)lock->l_resource->lr_name.name[0]);
+ if (memcmp(reply->lock_desc.l_resource.lr_name.name,
+ lock->l_resource->lr_name.name,
+ sizeof(struct ldlm_res_id))) {
+ CDEBUG(D_INFO, "remote intent success, locking "
+ "("LPU64"/"LPU64"/"LPU64") instead of "
+ "("LPU64"/"LPU64"/"LPU64")\n",
+ reply->lock_desc.l_resource.lr_name.name[0],
+ reply->lock_desc.l_resource.lr_name.name[1],
+ reply->lock_desc.l_resource.lr_name.name[2],
+ lock->l_resource->lr_name.name[0],
+ lock->l_resource->lr_name.name[1],
+ lock->l_resource->lr_name.name[2]);
rc = ldlm_lock_change_resource(ns, lock,
reply->lock_desc.l_resource.lr_name);
* a single page on the send/receive side. XXX: 512 should be changed
* to more adequate value. */
static inline int ldlm_req_handles_avail(struct obd_export *exp,
- int *size, int bufcount, int off)
+ __u32 *size, int bufcount, int off)
{
int avail = min_t(int, LDLM_MAXREQSIZE, CFS_PAGE_SIZE - 512);
avail -= lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic,
bufcount, size);
- avail /= sizeof(struct lustre_handle);
+ if (likely(avail >= 0))
+ avail /= (int)sizeof(struct lustre_handle);
+ else
+ avail = 0;
avail += LDLM_LOCKREQ_HANDLES - off;
return avail;
static inline int ldlm_cancel_handles_avail(struct obd_export *exp)
{
- int size[2] = { sizeof(struct ptlrpc_body),
+ __u32 size[2] = { sizeof(struct ptlrpc_body),
sizeof(struct ldlm_request) };
return ldlm_req_handles_avail(exp, size, 2, 0);
}
/* Cancel lru locks and pack them into the enqueue request. Pack there the given
* @count locks in @cancels. */
struct ptlrpc_request *ldlm_prep_elc_req(struct obd_export *exp, int version,
- int opc, int bufcount, int *size,
+ int opc, int bufcount, __u32 *size,
int bufoff, int canceloff,
struct list_head *cancels, int count)
{
LASSERT(bufoff < bufcount);
avail = ldlm_req_handles_avail(exp, size, bufcount, canceloff);
- flags = ns_connect_lru_resize(ns) ?
+ flags = ns_connect_lru_resize(ns) ?
LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
to_free = !ns_connect_lru_resize(ns) &&
opc == LDLM_ENQUEUE ? 1 : 0;
- /* Cancel lru locks here _only_ if the server supports
+ /* Cancel lru locks here _only_ if the server supports
* EARLY_CANCEL. Otherwise we have to send extra CANCEL
* rpc, what will make us slower. */
if (avail > count)
pack = avail;
size[bufoff] = ldlm_request_bufsize(pack, opc);
}
+
req = ptlrpc_prep_req(class_exp2cliimp(exp), version,
opc, bufcount, size, NULL);
+ req->rq_export = class_export_get(exp);
if (exp_connect_cancelset(exp) && req) {
if (canceloff) {
dlm = lustre_msg_buf(req->rq_reqmsg, bufoff,
}
struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp,
- int bufcount, int *size,
+ int bufcount, __u32 *size,
struct list_head *cancels,
int count)
{
struct ldlm_lock *lock;
struct ldlm_request *body;
struct ldlm_reply *reply;
- int size[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
+ __u32 size[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
[DLM_LOCKREQ_OFF] = sizeof(*body),
- [DLM_REPLY_REC_OFF] = lvb_len };
+ [DLM_REPLY_REC_OFF] = lvb_len ? lvb_len :
+ sizeof(struct ost_lvb) };
int is_replay = *flags & LDLM_FL_REPLAY;
int req_passed_in = 1, rc, err;
struct ptlrpc_request *req;
/* Continue as normal. */
if (!req_passed_in) {
size[DLM_LOCKREPLY_OFF] = sizeof(*reply);
- ptlrpc_req_set_repsize(req, 2 + (lvb_len > 0), size);
+ ptlrpc_req_set_repsize(req, 3, size);
}
/*
struct ldlm_lock *lock;
struct ldlm_resource *res;
struct ptlrpc_request *req;
- int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
+ __u32 size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
[DLM_LOCKREQ_OFF] = sizeof(*body) };
int rc;
ENTRY;
{
struct ptlrpc_request *req = NULL;
struct ldlm_request *body;
- int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
+ __u32 size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
[DLM_LOCKREQ_OFF] = sizeof(*body) };
struct obd_import *imp;
int free, sent = 0;
__u64 old_slv, new_slv;
__u32 new_limit;
ENTRY;
-
- if (unlikely(!req->rq_import || !req->rq_import->imp_obd ||
+
+ if (unlikely(!req->rq_import || !req->rq_import->imp_obd ||
!imp_connect_lru_resize(req->rq_import)))
{
- /*
- * Do nothing for corner cases.
+ /*
+ * Do nothing for corner cases.
*/
RETURN(0);
}
- /*
- * In some cases RPC may contain slv and limit zeroed out. This is
+ /*
+ * In some cases RPC may contain slv and limit zeroed out. This is
* the case when server does not support lru resize feature. This is
* also possible in some recovery cases when server side reqs have no
- * ref to obd export and thus access to server side namespace is no
- * possible.
+ * ref to obd export and thus access to server side namespace is no
+ * possible.
*/
- if (lustre_msg_get_slv(req->rq_repmsg) == 0 ||
+ if (lustre_msg_get_slv(req->rq_repmsg) == 0 ||
lustre_msg_get_limit(req->rq_repmsg) == 0) {
DEBUG_REQ(D_HA, req, "Zero SLV or Limit found "
- "(SLV: "LPU64", Limit: %u)",
- lustre_msg_get_slv(req->rq_repmsg),
+ "(SLV: "LPU64", Limit: %u)",
+ lustre_msg_get_slv(req->rq_repmsg),
lustre_msg_get_limit(req->rq_repmsg));
RETURN(0);
}
new_slv = lustre_msg_get_slv(req->rq_repmsg);
obd = req->rq_import->imp_obd;
- /*
- * Set new SLV and Limit to obd fields to make accessible for pool
+ /*
+ * Set new SLV and Limit to obd fields to make accessible for pool
* thread. We do not access obd_namespace and pool directly here
* as there is no reliable way to make sure that they are still
* alive in cleanup time. Evil races are possible which may cause
- * oops in that time.
+ * oops in that time.
*/
write_lock(&obd->obd_pool_lock);
old_slv = obd->obd_pool_slv;
obd->obd_pool_limit = new_limit;
write_unlock(&obd->obd_pool_lock);
- /*
- * Check if we need to wakeup pools thread for fast SLV change.
- * This is only done when threads period is noticably long like
- * 10s or more.
- */
-#if defined(__KERNEL__) && (LDLM_POOLS_THREAD_PERIOD >= 10)
- if (old_slv > 0) {
- __u64 fast_change = old_slv * LDLM_POOLS_FAST_SLV_CHANGE;
- do_div(fast_change, 100);
-
- /*
- * Wake up pools thread only if SLV has changed more than
- * 50% since last update. In this case we want to react asap.
- * Otherwise it is no sense to wake up pools as they are
- * re-calculated every LDLM_POOLS_THREAD_PERIOD anyways.
- */
- if (old_slv > new_slv && old_slv - new_slv > fast_change)
- ldlm_pools_wakeup();
- }
-#endif
RETURN(0);
}
EXPORT_SYMBOL(ldlm_cli_update_pool);
LDLM_DEBUG_NOLOCK("lock is already being destroyed\n");
RETURN(0);
}
-
+
rc = ldlm_cli_cancel_local(lock);
if (rc < 0 || rc == LDLM_FL_LOCAL_ONLY) {
LDLM_LOCK_PUT(lock);
RETURN(count);
}
-/* Return 1 to stop lru processing and keep current lock cached. Return zero
+/* Return 1 to stop lru processing and keep current lock cached. Return zero
* otherwise. */
static ldlm_policy_res_t ldlm_cancel_shrink_policy(struct ldlm_namespace *ns,
struct ldlm_lock *lock,
- int unused, int added,
+ int unused, int added,
int count)
{
int lock_cost;
__u64 page_nr;
- /* Stop lru processing when we reached passed @count or checked all
+ /* Stop lru processing when we reached passed @count or checked all
* locks in lru. */
if (count && added >= count)
return LDLM_POLICY_KEEP_LOCK;
#ifdef __KERNEL__
/* XXX: In fact this is evil hack, we can't access inode
* here. For doing it right we need somehow to have number
- * of covered by lock. This should be fixed later when 10718
+ * of covered by lock. This should be fixed later when 10718
* is landed. */
if (lock->l_ast_data != NULL) {
struct inode *inode = lock->l_ast_data;
/* Keep all expensive locks in lru for the memory pressure time
* cancel policy. They anyways may be canceled by lru resize
* pplicy if they have not small enough CLV. */
- return lock_cost > ns->ns_shrink_thumb ?
+ return lock_cost > ns->ns_shrink_thumb ?
LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
}
-/* Return 1 to stop lru processing and keep current lock cached. Return zero
+/* Return 1 to stop lru processing and keep current lock cached. Return zero
* otherwise. */
static ldlm_policy_res_t ldlm_cancel_lrur_policy(struct ldlm_namespace *ns,
- struct ldlm_lock *lock,
- int unused, int added,
+ struct ldlm_lock *lock,
+ int unused, int added,
int count)
{
cfs_time_t cur = cfs_time_current();
__u64 slv, lvf, lv;
cfs_time_t la;
- /* Stop lru processing when we reached passed @count or checked all
+ /* Stop lru processing when we reached passed @count or checked all
* locks in lru. */
if (count && added >= count)
return LDLM_POLICY_KEEP_LOCK;
slv = ldlm_pool_get_slv(pl);
lvf = ldlm_pool_get_lvf(pl);
- la = cfs_duration_sec(cfs_time_sub(cur,
+ la = cfs_duration_sec(cfs_time_sub(cur,
lock->l_last_used));
- /* Stop when slv is not yet come from server or
+ /* Stop when slv is not yet come from server or
* lv is smaller than it is. */
lv = lvf * la * unused;
/* Inform pool about current CLV to see it via proc. */
ldlm_pool_set_clv(pl, lv);
- return (slv == 1 || lv < slv) ?
+ return (slv == 1 || lv < slv) ?
LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
}
-/* Return 1 to stop lru processing and keep current lock cached. Return zero
+/* Return 1 to stop lru processing and keep current lock cached. Return zero
* otherwise. */
static ldlm_policy_res_t ldlm_cancel_passed_policy(struct ldlm_namespace *ns,
- struct ldlm_lock *lock,
+ struct ldlm_lock *lock,
int unused, int added,
int count)
{
- /* Stop lru processing when we reached passed @count or checked all
+ /* Stop lru processing when we reached passed @count or checked all
* locks in lru. */
- return (added >= count) ?
+ return (added >= count) ?
LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
}
-/* Return 1 to stop lru processing and keep current lock cached. Return zero
+/* Return 1 to stop lru processing and keep current lock cached. Return zero
* otherwise. */
static ldlm_policy_res_t ldlm_cancel_aged_policy(struct ldlm_namespace *ns,
- struct ldlm_lock *lock,
+ struct ldlm_lock *lock,
int unused, int added,
int count)
{
- /* Stop lru processing if young lock is found and we reached passed
+ /* Stop lru processing if young lock is found and we reached passed
* @count. */
- return ((added >= count) &&
+ return ((added >= count) &&
cfs_time_before(cfs_time_current(),
cfs_time_add(lock->l_last_used,
- ns->ns_max_age))) ?
+ ns->ns_max_age))) ?
LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
}
-/* Return 1 to stop lru processing and keep current lock cached. Return zero
+/* Return 1 to stop lru processing and keep current lock cached. Return zero
* otherwise. */
static ldlm_policy_res_t ldlm_cancel_default_policy(struct ldlm_namespace *ns,
- struct ldlm_lock *lock,
+ struct ldlm_lock *lock,
int unused, int added,
int count)
{
- /* Stop lru processing when we reached passed @count or checked all
+ /* Stop lru processing when we reached passed @count or checked all
* locks in lru. */
- return (added >= count) ?
+ return (added >= count) ?
LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
}
-typedef ldlm_policy_res_t (*ldlm_cancel_lru_policy_t)(struct ldlm_namespace *,
- struct ldlm_lock *, int,
+typedef ldlm_policy_res_t (*ldlm_cancel_lru_policy_t)(struct ldlm_namespace *,
+ struct ldlm_lock *, int,
int, int);
static ldlm_cancel_lru_policy_t
if (flags & LDLM_CANCEL_AGED)
return ldlm_cancel_aged_policy;
}
-
+
return ldlm_cancel_default_policy;
}
-
+
/* - Free space in lru for @count new locks,
* redundant unused locks are canceled locally;
* - also cancel locally unused aged locks;
pf = ldlm_cancel_lru_policy(ns, flags);
LASSERT(pf != NULL);
-
+
while (!list_empty(&ns->ns_unused_list)) {
/* For any flags, stop scanning if @max is reached. */
if (max && added >= max)
* we find a lock that should stay in the cache.
* We should take into account lock age anyway
* as new lock even if it is small of weight is
- * valuable resource.
+ * valuable resource.
*
* That is, for shrinker policy we drop only
* old locks, but additionally chose them by
- * their weight. Big extent locks will stay in
+ * their weight. Big extent locks will stay in
* the cache. */
if (pf(ns, lock, unused, added, count) == LDLM_POLICY_KEEP_LOCK)
break;
/* If we have chosen to cancel this lock voluntarily, we
* better send cancel notification to server, so that it
- * frees appropriate state. This might lead to a race
- * where while we are doing cancel here, server is also
+ * frees appropriate state. This might lead to a race
+ * where while we are doing cancel here, server is also
* silently cancelling this lock. */
lock->l_flags &= ~LDLM_FL_CANCEL_ON_BLOCK;
RETURN(ldlm_cancel_list(cancels, added, cancel_flags));
}
-/* Returns number of locks which could be canceled next time when
+/* Returns number of locks which could be canceled next time when
* ldlm_cancel_lru() is called. Used from locks pool shrinker. */
int ldlm_cancel_lru_estimate(struct ldlm_namespace *ns,
int count, int max, int flags)
break;
/* Somebody is already doing CANCEL or there is a
- * blocking request will send cancel. Let's not count
+ * blocking request will send cancel. Let's not count
* this lock. */
if ((lock->l_flags & LDLM_FL_CANCELING) ||
- (lock->l_flags & LDLM_FL_BL_AST))
+ (lock->l_flags & LDLM_FL_BL_AST))
continue;
/* Pass the lock through the policy filter and see if it
* in a thread and this function will return after the thread has been
* asked to call the callback. when called with LDLM_SYNC the blocking
* callback will be performed in this function. */
-int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ldlm_sync_t sync,
+int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ldlm_sync_t sync,
int flags)
{
CFS_LIST_HEAD(cancels);
/* If somebody is already doing CANCEL, or blocking ast came,
* skip this lock. */
- if (lock->l_flags & LDLM_FL_BL_AST ||
+ if (lock->l_flags & LDLM_FL_BL_AST ||
lock->l_flags & LDLM_FL_CANCELING)
continue;
static int replay_lock_interpret(struct ptlrpc_request *req,
struct ldlm_async_args *aa, int rc)
{
- struct ldlm_lock *lock;
- struct ldlm_reply *reply;
+ struct lustre_handle old_hash_key;
+ struct ldlm_lock *lock;
+ struct ldlm_reply *reply;
+ struct obd_export *exp;
ENTRY;
atomic_dec(&req->rq_import->imp_replay_inflight);
if (rc != ELDLM_OK)
GOTO(out, rc);
-
reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF, sizeof(*reply),
lustre_swab_ldlm_reply);
if (reply == NULL) {
GOTO(out, rc = -ESTALE);
}
+ old_hash_key = lock->l_remote_handle;
lock->l_remote_handle = reply->lock_handle;
+
+ /* Key change rehash lock in per-export hash with new key */
+ exp = req->rq_export;
+ if (exp && exp->exp_lock_hash)
+ lustre_hash_rehash_key(exp->exp_lock_hash, &old_hash_key,
+ &lock->l_remote_handle,
+ &lock->l_exp_hash);
+
LDLM_DEBUG(lock, "replayed lock:");
ptlrpc_import_recovery_state_machine(req->rq_import);
LDLM_LOCK_PUT(lock);
struct ldlm_reply *reply;
struct ldlm_async_args *aa;
int buffers = 2;
- int size[3] = { sizeof(struct ptlrpc_body) };
+ __u32 size[3] = { sizeof(struct ptlrpc_body) };
int flags;
ENTRY;
ldlm_lock_cancel(lock);
RETURN(0);
}
+
/*
* If granted mode matches the requested mode, this lock is granted.
*
atomic_inc(&req->rq_import->imp_replay_inflight);
CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
- aa = (struct ldlm_async_args *)&req->rq_async_args;
+ aa = ptlrpc_req_async_args(req);
aa->lock_handle = body->lock_handle[0];
req->rq_interpret_reply = replay_lock_interpret;
ptlrpcd_add_req(req);
/* ensure this doesn't fall to 0 before all have been queued */
atomic_inc(&imp->imp_replay_inflight);
- (void)ldlm_namespace_foreach(ns, ldlm_chain_lock_for_replay, &list);
-
- list_for_each_entry_safe(lock, next, &list, l_pending_chain) {
- list_del_init(&lock->l_pending_chain);
- if (rc)
- continue; /* or try to do the rest? */
- rc = replay_one_lock(imp, lock);
+ if (imp->imp_no_lock_replay) {
+ /* VBR: locks should be cancelled here */
+ ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
+ spin_lock(&imp->imp_lock);
+ imp->imp_no_lock_replay = 0;
+ spin_unlock(&imp->imp_lock);
+ } else {
+ (void)ldlm_namespace_foreach(ns, ldlm_chain_lock_for_replay,
+ &list);
+ list_for_each_entry_safe(lock, next, &list, l_pending_chain) {
+ list_del_init(&lock->l_pending_chain);
+ if (rc)
+ continue; /* or try to do the rest? */
+ rc = replay_one_lock(imp, lock);
+ }
}
-
atomic_dec(&imp->imp_replay_inflight);
RETURN(rc);