*/
enum cl_lock_transition {
/** operation cannot be completed immediately. Wait for state change. */
- CLO_WAIT = 1,
+ CLO_WAIT = 1,
/** operation had to release lock mutex, restart. */
- CLO_REPEAT = 2
+ CLO_REPEAT = 2,
+ /** lower layer re-enqueued. */
+ CLO_REENQUEUED = 3,
};
/**
*/
CEF_NEVER = 0x00000010,
/**
+ * for async glimpse lock.
+ */
+ CEF_AGL = 0x00000020,
+ /**
+ * do not trigger re-enqueue.
+ */
+ CEF_NO_REENQUEUE = 0x00000040,
+ /**
* mask of enq_flags.
*/
- CEF_MASK = 0x0000001f
+ CEF_MASK = 0x0000007f,
};
/**
/*
* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
+ *
+ * Copyright (c) 2011 Whamcloud, Inc.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
blkcnt_t dirty_cnt(struct inode *inode);
-int cl_glimpse_size(struct inode *inode);
+int cl_glimpse_size0(struct inode *inode, int agl);
int cl_glimpse_lock(const struct lu_env *env, struct cl_io *io,
- struct inode *inode, struct cl_object *clob);
+ struct inode *inode, struct cl_object *clob, int agl);
+
+static inline int cl_glimpse_size(struct inode *inode)
+{
+ return cl_glimpse_size0(inode, 0);
+}
+
+static inline int cl_agl(struct inode *inode)
+{
+ return cl_glimpse_size0(inode, 1);
+}
/**
* Locking policy for setattr.
#define LDLM_FL_HAS_INTENT 0x001000 /* lock request has intent */
#define LDLM_FL_CANCELING 0x002000 /* lock cancel has already been sent */
#define LDLM_FL_LOCAL 0x004000 /* local lock (ie, no srv/cli split) */
-/* was LDLM_FL_WARN until 2.0.0 0x008000 */
#define LDLM_FL_DISCARD_DATA 0x010000 /* discard (no writeback) on cancel */
#define LDLM_FL_NO_TIMEOUT 0x020000 /* Blocked by group lock - wait
* indefinitely */
/* file & record locking */
-#define LDLM_FL_BLOCK_NOWAIT 0x040000 // server told not to wait if blocked
+#define LDLM_FL_BLOCK_NOWAIT 0x040000 /* server told not to wait if blocked.
+ * For AGL, OST will not send glimpse
+ * callback. */
#define LDLM_FL_TEST_LOCK 0x080000 // return blocking lock
/* XXX FIXME: This is being added to b_size as a low-risk fix to the fact that
* w/o involving separate thread. in order to decrease cs rate */
#define LDLM_FL_ATOMIC_CB 0x4000000
-/* was LDLM_FL_ASYNC until 2.0.0 0x8000000 */
-
/* It may happen that a client initiate 2 operations, e.g. unlink and mkdir,
* such that server send blocking ast for conflict locks to this client for
* the 1st operation, whereas the 2nd operation has canceled this lock and
* Client-side-only members.
*/
+ int l_fail_value;
/**
* Temporary storage for an LVB received during an enqueue operation.
*/
*/
__u32 l_pid;
+ int l_bl_ast_run;
/**
* For ldlm_add_ast_work_item().
*/
cfs_list_t l_rk_ast;
struct ldlm_lock *l_blocking_lock;
- int l_bl_ast_run;
/**
* Protected by lr_lock, linkages to "skip lists".
int ldlm_lock_addref_try(struct lustre_handle *lockh, __u32 mode);
void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode);
void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode);
+void ldlm_lock_fail_match_locked(struct ldlm_lock *lock, int rc);
+void ldlm_lock_fail_match(struct ldlm_lock *lock, int rc);
void ldlm_lock_allow_match(struct ldlm_lock *lock);
void ldlm_lock_allow_match_locked(struct ldlm_lock *lock);
ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags,
struct ost_lvb *oa_lvb;
struct lustre_handle *oa_lockh;
struct ldlm_enqueue_info *oa_ei;
+ unsigned int oa_agl:1;
};
#if 0
/*
* Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
+ *
+ * Copyright (c) 2011 Whamcloud, Inc.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
#define OBD_FAIL_LDLM_INTR_CP_AST 0x317
#define OBD_FAIL_LDLM_CP_BL_RACE 0x318
#define OBD_FAIL_LDLM_NEW_LOCK 0x319
+#define OBD_FAIL_LDLM_AGL_DELAY 0x31a
+#define OBD_FAIL_LDLM_AGL_NOLOCK 0x31b
/* LOCKLESS IO */
#define OBD_FAIL_LDLM_SET_CONTENTION 0x385
}
int cl_glimpse_lock(const struct lu_env *env, struct cl_io *io,
- struct inode *inode, struct cl_object *clob)
+ struct inode *inode, struct cl_object *clob, int agl)
{
struct cl_lock_descr *descr = &ccc_env_info(env)->cti_descr;
struct cl_inode_info *lli = cl_i2info(inode);
descr->cld_obj = clob;
descr->cld_mode = CLM_PHANTOM;
descr->cld_enq_flags = CEF_ASYNC | CEF_MUST;
+ if (agl)
+ descr->cld_enq_flags |= CEF_AGL;
cio->cui_glimpse = 1;
/*
* CEF_ASYNC is used because glimpse sub-locks cannot
cfs_current());
cio->cui_glimpse = 0;
+ if (lock == NULL)
+ RETURN(0);
+
if (IS_ERR(lock))
RETURN(PTR_ERR(lock));
+ LASSERT(agl == 0);
result = cl_wait(env, lock);
if (result == 0) {
cl_merge_lvb(inode);
return result;
}
-int cl_glimpse_size(struct inode *inode)
+int cl_glimpse_size0(struct inode *inode, int agl)
{
/*
* We don't need ast_flags argument to cl_glimpse_size(), because
*/
result = io->ci_result;
else if (result == 0)
- result = cl_glimpse_lock(env, io, inode, io->ci_obj);
+ result = cl_glimpse_lock(env, io, inode, io->ci_obj,
+ agl);
cl_io_fini(env, io);
cl_env_put(env, &refcheck);
}
* of the buffer (C)
*/
ccc_object_size_unlock(obj);
- result = cl_glimpse_lock(env, io, inode, obj);
+ result = cl_glimpse_lock(env, io, inode, obj, 0);
if (result == 0 && exceed != NULL) {
/* If objective page index exceed end-of-file
* page index, return directly. Do not expect
continue;
if (!unref &&
- (lock->l_destroyed || (lock->l_flags & LDLM_FL_FAILED)))
+ (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED ||
+ lock->l_fail_value != 0))
continue;
if ((flags & LDLM_FL_LOCAL_ONLY) &&
return NULL;
}
+void ldlm_lock_fail_match_locked(struct ldlm_lock *lock, int rc)
+{
+ if (lock->l_fail_value == 0) {
+ lock->l_fail_value = rc;
+ cfs_waitq_signal(&lock->l_waitq);
+ }
+}
+EXPORT_SYMBOL(ldlm_lock_fail_match_locked);
+
+void ldlm_lock_fail_match(struct ldlm_lock *lock, int rc)
+{
+ lock_res_and_lock(lock);
+ ldlm_lock_fail_match_locked(lock, rc);
+ unlock_res_and_lock(lock);
+}
+EXPORT_SYMBOL(ldlm_lock_fail_match);
+
void ldlm_lock_allow_match_locked(struct ldlm_lock *lock)
{
lock->l_flags |= LDLM_FL_LVB_READY;
/* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
l_wait_event(lock->l_waitq,
- (lock->l_flags & LDLM_FL_LVB_READY), &lwi);
+ lock->l_flags & LDLM_FL_LVB_READY ||
+ lock->l_fail_value != 0,
+ &lwi);
+ if (!(lock->l_flags & LDLM_FL_LVB_READY)) {
+ if (flags & LDLM_FL_TEST_LOCK)
+ LDLM_LOCK_RELEASE(lock);
+ else
+ ldlm_lock_decref_internal(lock, mode);
+ rc = 0;
+ }
}
}
out2:
lock = ldlm_handle2lock(lockh);
if (lock != NULL) {
lock_res_and_lock(lock);
- if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED)
+ if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED ||
+ lock->l_fail_value != 0)
GOTO(out, mode);
if (lock->l_flags & LDLM_FL_CBPENDING &&
EXIT;
}
-#ifndef HAVE_AGL_SUPPORT
-# define cl_agl(inode) do {} while (0)
-#endif
-
/* Do NOT forget to drop inode refcount when into sai_entries_agl. */
static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai)
{
/*
* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
+ *
+ * Copyright (c) 2011 Whamcloud, Inc.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
int result_rank;
int rc_rank;
+ ENTRY;
+
LASSERT(result <= 0 || result == CLO_REPEAT || result == CLO_WAIT);
LASSERT(rc <= 0 || rc == CLO_REPEAT || rc == CLO_WAIT);
CLASSERT(CLO_WAIT < CLO_REPEAT);
- ENTRY;
-
/* calculate ranks in the ordering above */
result_rank = result < 0 ? 1 + CLO_REPEAT : result;
rc_rank = rc < 0 ? 1 + CLO_REPEAT : rc;
/* first, try to enqueue a sub-lock ... */
result = cl_enqueue_try(env, sublock, io, enqflags);
- if (sublock->cll_state == CLS_ENQUEUED)
+ if ((sublock->cll_state == CLS_ENQUEUED) && !(enqflags & CEF_AGL))
/* if it is enqueued, try to `wait' on it---maybe it's already
* granted */
result = cl_wait_try(env, sublock);
* parallel, otherwise---enqueue has to wait until sub-lock is granted
* before proceeding to the next one.
*/
- if (result == CLO_WAIT && sublock->cll_state <= CLS_HELD &&
- enqflags & CEF_ASYNC && !last)
+ if ((result == CLO_WAIT) && (sublock->cll_state <= CLS_HELD) &&
+ (enqflags & CEF_ASYNC) && (!last || (enqflags & CEF_AGL)))
result = 0;
RETURN(result);
}
rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
if (rc == 0) {
if (lls->sub_flags & LSF_HELD) {
- LASSERT(sublock->cll_state == CLS_HELD);
+ LASSERT(sublock->cll_state == CLS_HELD ||
+ sublock->cll_state == CLS_ENQUEUED);
+ /* For AGL case, the sublock state maybe not
+ * match the lower layer state, so sync them
+ * before unuse. */
+ if (sublock->cll_users == 1 &&
+ sublock->cll_state == CLS_ENQUEUED) {
+ __u32 save;
+
+ save = sublock->cll_descr.cld_enq_flags;
+ sublock->cll_descr.cld_enq_flags |=
+ CEF_NO_REENQUEUE;
+ cl_wait_try(env, sublock);
+ sublock->cll_descr.cld_enq_flags = save;
+ }
rc = cl_unuse_try(subenv->lse_env, sublock);
rc = lov_sublock_release(env, lck, i, 0, rc);
}
struct lov_lock *lck = cl2lov_lock(slice);
struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock);
enum cl_lock_state minstate;
+ int reenqueued;
int result;
int i;
ENTRY;
- for (result = 0, minstate = CLS_FREEING, i = 0; i < lck->lls_nr; ++i) {
+again:
+ for (result = 0, minstate = CLS_FREEING, i = 0, reenqueued = 0;
+ i < lck->lls_nr; ++i) {
int rc;
struct lovsub_lock *sub;
struct cl_lock *sublock;
minstate = min(minstate, sublock->cll_state);
lov_sublock_unlock(env, sub, closure, subenv);
}
+ if (rc == CLO_REENQUEUED) {
+ reenqueued++;
+ rc = 0;
+ }
result = lov_subresult(result, rc);
if (result != 0)
break;
}
+ /* Each sublock only can be reenqueued once, so will not loop for
+ * ever. */
+ if (result == 0 && reenqueued != 0)
+ goto again;
cl_lock_closure_fini(closure);
RETURN(result ?: minstate >= CLS_HELD ? 0 : CLO_WAIT);
}
if (rc != 0)
rc = lov_sublock_release(env, lck,
i, 1, rc);
+ } else if (sublock->cll_state == CLS_NEW) {
+ /* Sub-lock might have been canceled, while
+ * top-lock was cached. */
+ result = -ESTALE;
+ lov_sublock_release(env, lck, i, 1, result);
}
lov_sublock_unlock(env, sub, closure, subenv);
}
EXIT;
}
-
/**
* Waits until lock state is changed.
*
cl_lock_user_del(env, lock);
cl_lock_error(env, lock, result);
}
- LASSERT(ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
+ LASSERT(ergo(result == 0 && !(enqflags & CEF_AGL),
+ lock->cll_state == CLS_ENQUEUED ||
lock->cll_state == CLS_HELD));
RETURN(result);
}
ENTRY;
do {
lock = cl_lock_hold_mutex(env, io, need, scope, source);
- if (!IS_ERR(lock)) {
- rc = cl_enqueue_locked(env, lock, io, enqflags);
- if (rc == 0) {
- if (cl_lock_fits_into(env, lock, need, io)) {
+ if (IS_ERR(lock))
+ break;
+
+ rc = cl_enqueue_locked(env, lock, io, enqflags);
+ if (rc == 0) {
+ if (cl_lock_fits_into(env, lock, need, io)) {
+ if (!(enqflags & CEF_AGL)) {
cl_lock_mutex_put(env, lock);
- cl_lock_lockdep_acquire(env,
- lock, enqflags);
+ cl_lock_lockdep_acquire(env, lock,
+ enqflags);
break;
}
- cl_unuse_locked(env, lock);
+ rc = 1;
}
- cl_lock_trace(D_DLMTRACE, env, "enqueue failed", lock);
- cl_lock_hold_release(env, lock, scope, source);
- cl_lock_mutex_put(env, lock);
- lu_ref_del(&lock->cll_reference, scope, source);
- cl_lock_put(env, lock);
+ cl_unuse_locked(env, lock);
+ }
+ cl_lock_trace(D_DLMTRACE, env,
+ rc <= 0 ? "enqueue failed" : "agl succeed", lock);
+ cl_lock_hold_release(env, lock, scope, source);
+ cl_lock_mutex_put(env, lock);
+ lu_ref_del(&lock->cll_reference, scope, source);
+ cl_lock_put(env, lock);
+ if (rc > 0) {
+ LASSERT(enqflags & CEF_AGL);
+ lock = NULL;
+ } else if (rc != 0) {
lock = ERR_PTR(rc);
- } else
- rc = PTR_ERR(lock);
+ }
} while (rc == 0);
RETURN(lock);
}
struct ldlm_lock **lockp, void *req_cookie,
ldlm_mode_t mode, int flags, void *data)
{
- CFS_LIST_HEAD(rpc_list);
struct ptlrpc_request *req = req_cookie;
struct ldlm_lock *lock = *lockp, *l = NULL;
struct ldlm_resource *res = lock->l_resource;
* lock, and should not be granted if the lock will be blocked.
*/
+ if (flags & LDLM_FL_BLOCK_NOWAIT) {
+ OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_AGL_DELAY, 5);
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_AGL_NOLOCK))
+ RETURN(ELDLM_LOCK_ABORTED);
+ }
+
LASSERT(ns == ldlm_res_to_ns(res));
lock_res(res);
- rc = policy(lock, &tmpflags, 0, &err, &rpc_list);
+ rc = policy(lock, &tmpflags, 0, &err, NULL);
check_res_locked(res);
- /* FIXME: we should change the policy function slightly, to not make
- * this list at all, since we just turn around and free it */
- while (!cfs_list_empty(&rpc_list)) {
- struct ldlm_lock *wlock =
- cfs_list_entry(rpc_list.next, struct ldlm_lock,
- l_cp_ast);
- LASSERT((lock->l_flags & LDLM_FL_AST_SENT) == 0);
- LASSERT(lock->l_flags & LDLM_FL_CP_REQD);
- lock->l_flags &= ~LDLM_FL_CP_REQD;
- cfs_list_del_init(&wlock->l_cp_ast);
- LDLM_LOCK_RELEASE(wlock);
- }
-
/* The lock met with no resistance; we're finished. */
if (rc == LDLM_ITER_CONTINUE) {
/* do not grant locks to the liblustre clients: they cannot
}
unlock_res(res);
RETURN(err);
+ } else if (flags & LDLM_FL_BLOCK_NOWAIT) {
+ /* LDLM_FL_BLOCK_NOWAIT means it is for AGL. Do not send glimpse
+ * callback for glimpse size. The real size user will trigger
+ * the glimpse callback when necessary. */
+ unlock_res(res);
+ RETURN(ELDLM_LOCK_ABORTED);
}
/* Do not grant any lock, but instead send GL callbacks. The extent
* granted.
* Glimpse lock should be destroyed immediately after use.
*/
- ols_glimpse:1;
+ ols_glimpse:1,
+ /**
+ * For async glimpse lock.
+ */
+ ols_agl:1;
/**
* IO that owns this lock. This field is used for a dead-lock
* avoidance by osc_lock_enqueue_wait().
obd_enqueue_update_f upcall,
void *cookie, struct ldlm_enqueue_info *einfo,
struct lustre_handle *lockh,
- struct ptlrpc_request_set *rqset, int async);
+ struct ptlrpc_request_set *rqset, int async, int agl);
int osc_cancel_base(struct lustre_handle *lockh, __u32 mode);
int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
{
struct osc_lock *ols = cl2osc_lock(slice);
- LASSERT(ols->ols_state == OLS_GRANTED ||
- ols->ols_state == OLS_UPCALL_RECEIVED);
LINVRNT(osc_lock_invariant(ols));
- if (ols->ols_glimpse) {
- LASSERT(ols->ols_hold == 0);
+ switch (ols->ols_state) {
+ case OLS_NEW:
+ LASSERT(!ols->ols_hold);
+ LASSERT(ols->ols_agl);
+ return 0;
+ case OLS_UPCALL_RECEIVED:
+ LASSERT(!ols->ols_hold);
+ ols->ols_state = OLS_NEW;
return 0;
+ case OLS_GRANTED:
+ LASSERT(!ols->ols_glimpse);
+ LASSERT(ols->ols_hold);
+ /*
+ * Move lock into OLS_RELEASED state before calling
+ * osc_cancel_base() so that possible synchronous cancellation
+ * (that always happens e.g., for liblustre) sees that lock is
+ * released.
+ */
+ ols->ols_state = OLS_RELEASED;
+ return osc_lock_unhold(ols);
+ default:
+ CERROR("Impossible state: %d\n", ols->ols_state);
+ LBUG();
}
- LASSERT(ols->ols_hold);
-
- /*
- * Move lock into OLS_RELEASED state before calling osc_cancel_base()
- * so that possible synchronous cancellation (that always happens
- * e.g., for liblustre) sees that lock is released.
- */
- ols->ols_state = OLS_RELEASED;
- return osc_lock_unhold(ols);
}
static void osc_lock_fini(const struct lu_env *env,
ENTRY;
- if (!(olck->ols_flags & LDLM_FL_LVB_READY)) {
- EXIT;
- return;
- }
+ if (!(olck->ols_flags & LDLM_FL_LVB_READY))
+ RETURN_EXIT;
lvb = &olck->ols_lvb;
obj = olck->ols_cl.cls_obj;
dlmlock->l_ast_data = NULL;
olck->ols_handle.cookie = 0ULL;
cfs_spin_unlock(&osc_ast_guard);
+ ldlm_lock_fail_match_locked(dlmlock, rc);
unlock_res_and_lock(dlmlock);
LDLM_LOCK_PUT(dlmlock);
}
rc = 0;
}
- if (rc == 0)
- /* on error, lock was signaled by cl_lock_error() */
+ if (rc == 0) {
cl_lock_signal(env, lock);
- else
+ /* del user for lock upcall cookie */
+ cl_unuse_try(env, lock);
+ } else {
+ /* del user for lock upcall cookie */
+ cl_lock_user_del(env, lock);
cl_lock_error(env, lock, rc);
+ }
cl_lock_mutex_put(env, lock);
/* release cookie reference, acquired by osc_lock_enqueue() */
lu_ref_del(&lock->cll_reference, "upcall", lock);
cl_lock_put(env, lock);
+
cl_env_nested_put(&nest, env);
} else
/* should never happen, similar to osc_ldlm_blocking_ast(). */
ENTRY;
LASSERT(cl_lock_is_mutexed(lock));
- LASSERT(lock->cll_state == CLS_QUEUING);
/* make it enqueue anyway for glimpse lock, because we actually
* don't need to cancel any conflicting locks. */
ENTRY;
LASSERT(cl_lock_is_mutexed(lock));
- LASSERT(lock->cll_state == CLS_QUEUING);
- LASSERT(ols->ols_state == OLS_NEW);
+ LASSERTF(ols->ols_state == OLS_NEW,
+ "Impossible state: %d\n", ols->ols_state);
ols->ols_flags = osc_enq2ldlm_flags(enqflags);
+ if (enqflags & CEF_AGL) {
+ ols->ols_flags |= LDLM_FL_BLOCK_NOWAIT;
+ ols->ols_agl = 1;
+ }
if (ols->ols_flags & LDLM_FL_HAS_INTENT)
ols->ols_glimpse = 1;
if (!osc_lock_is_lockless(ols) && !(enqflags & CEF_MUST))
/* a reference for lock, passed as an upcall cookie */
cl_lock_get(lock);
lu_ref_add(&lock->cll_reference, "upcall", lock);
+ /* a user for lock also */
+ cl_lock_user_add(env, lock);
ols->ols_state = OLS_ENQUEUED;
/*
obj->oo_oinfo->loi_kms_valid,
osc_lock_upcall,
ols, einfo, &ols->ols_handle,
- PTLRPCD_SET, 1);
+ PTLRPCD_SET, 1, ols->ols_agl);
if (result != 0) {
+ cl_lock_user_del(env, lock);
lu_ref_del(&lock->cll_reference,
"upcall", lock);
cl_lock_put(env, lock);
+ if (unlikely(result == -ECANCELED)) {
+ ols->ols_state = OLS_NEW;
+ result = 0;
+ }
}
} else {
ols->ols_state = OLS_GRANTED;
struct cl_lock *lock = olck->ols_cl.cls_lock;
LINVRNT(osc_lock_invariant(olck));
- if (olck->ols_glimpse && olck->ols_state >= OLS_UPCALL_RECEIVED)
- return 0;
+
+ if (olck->ols_glimpse && olck->ols_state >= OLS_UPCALL_RECEIVED) {
+ if (olck->ols_flags & LDLM_FL_LVB_READY) {
+ return 0;
+ } else if (olck->ols_agl) {
+ olck->ols_state = OLS_NEW;
+ } else {
+ LASSERT(lock->cll_error);
+ return lock->cll_error;
+ }
+ }
+
+ if (olck->ols_state == OLS_NEW) {
+ if (lock->cll_descr.cld_enq_flags & CEF_NO_REENQUEUE) {
+ return -ENAVAIL;
+ } else {
+ int rc;
+
+ LASSERT(olck->ols_agl);
+
+ rc = osc_lock_enqueue(env, slice, NULL, CEF_ASYNC |
+ CEF_MUST);
+ if (rc != 0)
+ return rc;
+ else
+ return CLO_REENQUEUED;
+ }
+ }
LASSERT(equi(olck->ols_state >= OLS_UPCALL_RECEIVED &&
lock->cll_error == 0, olck->ols_lock != NULL));
lock, result);
}
olck->ols_state = OLS_CANCELLED;
+ olck->ols_flags &= ~LDLM_FL_LVB_READY;
osc_lock_detach(env, olck);
}
return 0;
if (need->cld_mode == CLM_PHANTOM) {
+ if (ols->ols_agl)
+ return !(ols->ols_state > OLS_RELEASED);
+
/*
* Note: the QUEUED lock can't be matched here, otherwise
* it might cause the deadlocks.
static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
obd_enqueue_update_f upcall, void *cookie,
- int *flags, int rc)
+ int *flags, int agl, int rc)
{
int intent = *flags & LDLM_FL_HAS_INTENT;
ENTRY;
}
}
- if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
+ if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
+ (rc == 0)) {
*flags |= LDLM_FL_LVB_READY;
CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
struct ldlm_lock *lock;
struct lustre_handle handle;
__u32 mode;
+ struct ost_lvb *lvb;
+ __u32 lvb_len;
+ int *flags = aa->oa_flags;
/* Make a local copy of a lock handle and a mode, because aa->oa_*
* might be freed anytime after lock upcall has been called. */
/* Let CP AST to grant the lock first. */
OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
+ if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
+ lvb = NULL;
+ lvb_len = 0;
+ } else {
+ lvb = aa->oa_lvb;
+ lvb_len = sizeof(*aa->oa_lvb);
+ }
+
/* Complete obtaining the lock procedure. */
rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
- mode, aa->oa_flags, aa->oa_lvb,
- sizeof(*aa->oa_lvb), &handle, rc);
+ mode, flags, lvb, lvb_len, &handle, rc);
/* Complete osc stuff. */
- rc = osc_enqueue_fini(req, aa->oa_lvb,
- aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
+ rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
+ flags, aa->oa_agl, rc);
OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
struct lov_oinfo *loi, int flags,
struct ost_lvb *lvb, __u32 mode, int rc)
{
+ struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
+
if (rc == ELDLM_OK) {
- struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
__u64 tmp;
LASSERT(lock != NULL);
lock->l_policy_data.l_extent.end);
}
ldlm_lock_allow_match(lock);
- LDLM_LOCK_PUT(lock);
} else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
+ LASSERT(lock != NULL);
loi->loi_lvb = *lvb;
+ ldlm_lock_allow_match(lock);
CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
" kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
rc = ELDLM_OK;
}
+
+ if (lock != NULL) {
+ if (rc != ELDLM_OK)
+ ldlm_lock_fail_match(lock, rc);
+
+ LDLM_LOCK_PUT(lock);
+ }
}
EXPORT_SYMBOL(osc_update_enqueue);
obd_enqueue_update_f upcall, void *cookie,
struct ldlm_enqueue_info *einfo,
struct lustre_handle *lockh,
- struct ptlrpc_request_set *rqset, int async)
+ struct ptlrpc_request_set *rqset, int async, int agl)
{
struct obd_device *obd = exp->exp_obd;
struct ptlrpc_request *req = NULL;
int intent = *flags & LDLM_FL_HAS_INTENT;
+ int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
ldlm_mode_t mode;
int rc;
ENTRY;
mode = einfo->ei_mode;
if (einfo->ei_mode == LCK_PR)
mode |= LCK_PW;
- mode = ldlm_lock_match(obd->obd_namespace,
- *flags | LDLM_FL_LVB_READY, res_id,
+ mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
einfo->ei_type, policy, mode, lockh, 0);
if (mode) {
struct ldlm_lock *matched = ldlm_handle2lock(lockh);
- if (osc_set_lock_data_with_check(matched, einfo)) {
+ if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
+ /* For AGL, if enqueue RPC is sent but the lock is not
+ * granted, then skip to process this strpe.
+ * Return -ECANCELED to tell the caller. */
+ ldlm_lock_decref(lockh, mode);
+ LDLM_LOCK_PUT(matched);
+ RETURN(-ECANCELED);
+ } else if (osc_set_lock_data_with_check(matched, einfo)) {
+ *flags |= LDLM_FL_LVB_READY;
/* addref the lock only if not async requests and PW
* lock is matched whereas we asked for PR. */
if (!rqset && einfo->ei_mode != mode)
/* We already have a lock, and it's referenced */
(*upcall)(cookie, ELDLM_OK);
- /* For async requests, decref the lock. */
if (einfo->ei_mode != mode)
ldlm_lock_decref(lockh, LCK_PW);
else if (rqset)
+ /* For async requests, decref the lock. */
ldlm_lock_decref(lockh, einfo->ei_mode);
LDLM_LOCK_PUT(matched);
RETURN(ELDLM_OK);
- } else
+ } else {
ldlm_lock_decref(lockh, mode);
- LDLM_LOCK_PUT(matched);
+ LDLM_LOCK_PUT(matched);
+ }
}
no_match:
aa->oa_cookie = cookie;
aa->oa_lvb = lvb;
aa->oa_lockh = lockh;
+ aa->oa_agl = !!agl;
req->rq_interpret_reply =
(ptlrpc_interpterer_t)osc_enqueue_interpret;
RETURN(rc);
}
- rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
+ rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
if (intent)
ptlrpc_req_finished(req);
&oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
- rqset, rqset != NULL);
+ rqset, rqset != NULL, 0);
RETURN(rc);
}
}
run_test 221 "make sure fault and truncate race to not cause OOM"
+test_222a () {
+ rm -rf $DIR/$tdir
+ mkdir -p $DIR/$tdir
+ $LFS setstripe -c 1 -i 0 $DIR/$tdir
+ createmany -o $DIR/$tdir/$tfile 10
+ cancel_lru_locks mdc
+ cancel_lru_locks osc
+ #define OBD_FAIL_LDLM_AGL_DELAY 0x31a
+ $LCTL set_param fail_loc=0x31a
+ ls -l $DIR/$tdir > /dev/null || error "AGL for ls failed"
+ $LCTL set_param fail_loc=0
+ rm -r $DIR/$tdir
+}
+run_test 222a "AGL for ls should not trigger CLIO lock failure ================"
+
+test_222b () {
+ rm -rf $DIR/$tdir
+ mkdir -p $DIR/$tdir
+ $LFS setstripe -c 1 -i 0 $DIR/$tdir
+ createmany -o $DIR/$tdir/$tfile 10
+ cancel_lru_locks mdc
+ cancel_lru_locks osc
+ #define OBD_FAIL_LDLM_AGL_DELAY 0x31a
+ $LCTL set_param fail_loc=0x31a
+ rm -r $DIR/$tdir || "AGL for rmdir failed"
+ $LCTL set_param fail_loc=0
+}
+run_test 222b "AGL for rmdir should not trigger CLIO lock failure ============="
+
+test_223 () {
+ rm -rf $DIR/$tdir
+ mkdir -p $DIR/$tdir
+ $LFS setstripe -c 1 -i 0 $DIR/$tdir
+ createmany -o $DIR/$tdir/$tfile 10
+ cancel_lru_locks mdc
+ cancel_lru_locks osc
+ #define OBD_FAIL_LDLM_AGL_NOLOCK 0x31b
+ $LCTL set_param fail_loc=0x31b
+ ls -l $DIR/$tdir > /dev/null || error "reenqueue failed"
+ $LCTL set_param fail_loc=0
+ rm -r $DIR/$tdir
+}
+run_test 223 "osc reenqueue if without AGL lock granted ======================="
+
#
# tests that do cleanup/setup should be run at the end
#