head = cl_object_header(obj);
site = cl_object_site(obj);
- CDEBUG(D_DLMTRACE, "releasing reference: %d %p %lu\n",
+ CDEBUG(D_TRACE, "releasing reference: %d %p %lu\n",
atomic_read(&lock->cll_ref), lock, RETIP);
if (atomic_dec_and_test(&lock->cll_ref)) {
void cl_lock_get(struct cl_lock *lock)
{
LINVRNT(cl_lock_invariant(NULL, lock));
- CDEBUG(D_DLMTRACE|D_TRACE, "acquiring reference: %d %p %lu\n",
+ CDEBUG(D_TRACE, "acquiring reference: %d %p %lu\n",
atomic_read(&lock->cll_ref), lock, RETIP);
atomic_inc(&lock->cll_ref);
}
struct cl_site *site = cl_object_site(lock->cll_descr.cld_obj);
LASSERT(cl_is_lock(lock));
- CDEBUG(D_DLMTRACE|D_TRACE, "acquiring trusted reference: %d %p %lu\n",
+ CDEBUG(D_TRACE, "acquiring trusted reference: %d %p %lu\n",
atomic_read(&lock->cll_ref), lock, RETIP);
if (atomic_inc_return(&lock->cll_ref) == 1)
atomic_inc(&site->cs_locks.cs_busy);
}
/**
+ * Transfer the lock into INTRANSIT state and return the original state.
+ *
+ * \pre state: CLS_CACHED, CLS_HELD or CLS_ENQUEUED
+ * \post state: CLS_INTRANSIT
+ * \see CLS_INTRANSIT
+ */
+enum cl_lock_state cl_lock_intransit(const struct lu_env *env,
+ struct cl_lock *lock)
+{
+ enum cl_lock_state state = lock->cll_state;
+
+ LASSERT(cl_lock_is_mutexed(lock));
+ LASSERT(state != CLS_INTRANSIT);
+ LASSERTF(state >= CLS_ENQUEUED && state <= CLS_CACHED,
+ "Malformed lock state %d.\n", state);
+
+ cl_lock_state_set(env, lock, CLS_INTRANSIT);
+ lock->cll_intransit_owner = cfs_current();
+ cl_lock_hold_add(env, lock, "intransit", cfs_current());
+ return state;
+}
+EXPORT_SYMBOL(cl_lock_intransit);
+
+/**
+ * Exit the intransit state and restore the lock state to the original state
+ */
+void cl_lock_extransit(const struct lu_env *env, struct cl_lock *lock,
+ enum cl_lock_state state)
+{
+ LASSERT(cl_lock_is_mutexed(lock));
+ LASSERT(lock->cll_state == CLS_INTRANSIT);
+ LASSERT(state != CLS_INTRANSIT);
+ LASSERT(lock->cll_intransit_owner == cfs_current());
+
+ lock->cll_intransit_owner = NULL;
+ cl_lock_state_set(env, lock, state);
+ cl_lock_unhold(env, lock, "intransit", cfs_current());
+}
+EXPORT_SYMBOL(cl_lock_extransit);
+
+/**
+ * Checking whether the lock is intransit state
+ */
+int cl_lock_is_intransit(struct cl_lock *lock)
+{
+ LASSERT(cl_lock_is_mutexed(lock));
+ return lock->cll_state == CLS_INTRANSIT &&
+ lock->cll_intransit_owner != cfs_current();
+}
+EXPORT_SYMBOL(cl_lock_is_intransit);
+/**
* Returns true iff lock is "suitable" for given io. E.g., locks acquired by
* truncate and O_APPEND cannot be reused for read/non-append-write, as they
* cover multiple stripes and can trigger cascading timeouts.
struct cl_object_header *head;
struct cl_object *obj;
struct cl_lock *lock;
+ int ok;
obj = need->cld_obj;
head = cl_object_header(obj);
lock = cl_lock_lookup(env, obj, io, need);
spin_unlock(&head->coh_lock_guard);
- if (lock != NULL) {
- int ok;
+ if (lock == NULL)
+ return NULL;
- cl_lock_mutex_get(env, lock);
- if (lock->cll_state == CLS_CACHED)
- cl_use_try(env, lock);
- ok = lock->cll_state == CLS_HELD;
- if (ok) {
- cl_lock_hold_add(env, lock, scope, source);
- cl_lock_user_add(env, lock);
- }
- cl_lock_mutex_put(env, lock);
- if (!ok) {
- cl_lock_put(env, lock);
- lock = NULL;
- }
+ cl_lock_mutex_get(env, lock);
+ if (lock->cll_state == CLS_INTRANSIT)
+ cl_lock_state_wait(env, lock); /* Don't care return value. */
+ if (lock->cll_state == CLS_CACHED) {
+ int result;
+ result = cl_use_try(env, lock, 1);
+ if (result < 0)
+ cl_lock_error(env, lock, result);
+ }
+ ok = lock->cll_state == CLS_HELD;
+ if (ok) {
+ cl_lock_hold_add(env, lock, scope, source);
+ cl_lock_user_add(env, lock);
+ cl_lock_put(env, lock);
}
+ cl_lock_mutex_put(env, lock);
+ if (!ok) {
+ cl_lock_put(env, lock);
+ lock = NULL;
+ }
+
return lock;
}
EXPORT_SYMBOL(cl_lock_peek);
EXPORT_SYMBOL(cl_lock_mutex_try);
/**
- * Unlocks cl_lock object.
+ {* Unlocks cl_lock object.
*
* \pre cl_lock_is_mutexed(lock)
*
spin_lock(&head->coh_lock_guard);
list_del_init(&lock->cll_linkage);
- /*
- * No locks, no pages. This is only valid for bottom sub-locks
- * and head->coh_nesting == 1 check assumes two level top-sub
- * hierarchy.
- */
- LASSERT(ergo(head->coh_nesting == 1 &&
- list_empty(&head->coh_locks), !head->coh_pages));
+
spin_unlock(&head->coh_lock_guard);
/*
* From now on, no new references to this lock can be acquired
LASSERT(lock->cll_state != CLS_FREEING); /* too late to wait */
result = lock->cll_error;
- if (result == 0 && !(lock->cll_flags & CLF_STATE)) {
+ if (result == 0) {
cfs_waitlink_init(&waiter);
cfs_waitq_add(&lock->cll_wq, &waiter);
set_current_state(CFS_TASK_INTERRUPTIBLE);
cfs_waitq_del(&lock->cll_wq, &waiter);
result = cfs_signal_pending() ? -EINTR : 0;
}
- lock->cll_flags &= ~CLF_STATE;
RETURN(result);
}
EXPORT_SYMBOL(cl_lock_state_wait);
list_for_each_entry(slice, &lock->cll_layers, cls_linkage)
if (slice->cls_ops->clo_state != NULL)
slice->cls_ops->clo_state(env, slice, state);
- lock->cll_flags |= CLF_STATE;
cfs_waitq_broadcast(&lock->cll_wq);
EXIT;
}
LASSERT(lock->cll_state <= state ||
(lock->cll_state == CLS_CACHED &&
(state == CLS_HELD || /* lock found in cache */
- state == CLS_NEW /* sub-lock canceled */)) ||
- /* sub-lock canceled during unlocking */
- (lock->cll_state == CLS_UNLOCKING && state == CLS_NEW));
+ state == CLS_NEW || /* sub-lock canceled */
+ state == CLS_INTRANSIT)) ||
+ /* lock is in transit state */
+ lock->cll_state == CLS_INTRANSIT);
if (lock->cll_state != state) {
atomic_dec(&site->cs_locks_state[lock->cll_state]);
}
EXPORT_SYMBOL(cl_lock_state_set);
+static int cl_unuse_try_internal(const struct lu_env *env, struct cl_lock *lock)
+{
+ const struct cl_lock_slice *slice;
+ int result;
+
+ do {
+ result = 0;
+
+ if (lock->cll_error != 0)
+ break;
+
+ LINVRNT(cl_lock_is_mutexed(lock));
+ LINVRNT(cl_lock_invariant(env, lock));
+ LASSERT(lock->cll_state == CLS_INTRANSIT);
+ LASSERT(lock->cll_users > 0);
+ LASSERT(lock->cll_holds > 0);
+
+ result = -ENOSYS;
+ list_for_each_entry_reverse(slice, &lock->cll_layers,
+ cls_linkage) {
+ if (slice->cls_ops->clo_unuse != NULL) {
+ result = slice->cls_ops->clo_unuse(env, slice);
+ if (result != 0)
+ break;
+ }
+ }
+ LASSERT(result != -ENOSYS);
+ } while (result == CLO_REPEAT);
+
+ return result ?: lock->cll_error;
+}
+
/**
* Yanks lock from the cache (cl_lock_state::CLS_CACHED state) by calling
* cl_lock_operations::clo_use() top-to-bottom to notify layers.
+ * @atomic = 1, it must unuse the lock to recovery the lock to keep the
+ * use process atomic
*/
-int cl_use_try(const struct lu_env *env, struct cl_lock *lock)
+int cl_use_try(const struct lu_env *env, struct cl_lock *lock, int atomic)
{
- int result;
const struct cl_lock_slice *slice;
+ int result;
+ enum cl_lock_state state;
ENTRY;
result = -ENOSYS;
+
+ state = cl_lock_intransit(env, lock);
list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
if (slice->cls_ops->clo_use != NULL) {
result = slice->cls_ops->clo_use(env, slice);
}
}
LASSERT(result != -ENOSYS);
- if (result == 0)
- cl_lock_state_set(env, lock, CLS_HELD);
+
+ LASSERT(lock->cll_state == CLS_INTRANSIT);
+
+ if (result == 0) {
+ state = CLS_HELD;
+ } else {
+ if (result == -ESTALE) {
+ /*
+ * ESTALE means sublock being cancelled
+ * at this time, and set lock state to
+ * be NEW here and ask the caller to repeat.
+ */
+ state = CLS_NEW;
+ result = CLO_REPEAT;
+ }
+
+ /* @atomic means back-off-on-failure. */
+ if (atomic) {
+ int rc;
+
+ do {
+ rc = cl_unuse_try_internal(env, lock);
+ if (rc == 0)
+ break;
+ if (rc == CLO_WAIT)
+ rc = cl_lock_state_wait(env, lock);
+ if (rc < 0)
+ break;
+ } while(1);
+
+ /* Vet the results. */
+ if (rc < 0 && result > 0)
+ result = rc;
+ }
+
+ }
+ cl_lock_extransit(env, lock, state);
RETURN(result);
}
EXPORT_SYMBOL(cl_use_try);
if (result == 0)
cl_lock_state_set(env, lock, CLS_ENQUEUED);
break;
- case CLS_UNLOCKING:
- /* wait until unlocking finishes, and enqueue lock
- * afresh. */
+ case CLS_INTRANSIT:
+ LASSERT(cl_lock_is_intransit(lock));
result = CLO_WAIT;
break;
case CLS_CACHED:
/* yank lock from the cache. */
- result = cl_use_try(env, lock);
+ result = cl_use_try(env, lock, 0);
break;
case CLS_ENQUEUED:
case CLS_HELD:
* This function is called repeatedly by cl_unuse() until either lock is
* unlocked, or error occurs.
*
- * \ppre lock->cll_state <= CLS_HELD || lock->cll_state == CLS_UNLOCKING
+ * \pre lock->cll_state <= CLS_HELD || cl_lock_is_intransit(lock)
*
* \post ergo(result == 0, lock->cll_state == CLS_CACHED)
*
*/
int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock)
{
- const struct cl_lock_slice *slice;
int result;
+ enum cl_lock_state state = CLS_NEW;
ENTRY;
- if (lock->cll_state != CLS_UNLOCKING) {
+ if (lock->cll_state != CLS_INTRANSIT) {
if (lock->cll_users > 1) {
cl_lock_user_del(env, lock);
RETURN(0);
* CLS_CACHED, is reinitialized to CLS_NEW or fails into
* CLS_FREEING.
*/
- cl_lock_state_set(env, lock, CLS_UNLOCKING);
+ state = cl_lock_intransit(env, lock);
}
- do {
- result = 0;
- if (lock->cll_error != 0)
- break;
-
- LINVRNT(cl_lock_is_mutexed(lock));
- LINVRNT(cl_lock_invariant(env, lock));
- LASSERT(lock->cll_state == CLS_UNLOCKING);
- LASSERT(lock->cll_users > 0);
- LASSERT(lock->cll_holds > 0);
-
- result = -ENOSYS;
- list_for_each_entry_reverse(slice, &lock->cll_layers,
- cls_linkage) {
- if (slice->cls_ops->clo_unuse != NULL) {
- result = slice->cls_ops->clo_unuse(env, slice);
- if (result != 0)
- break;
- }
- }
- LASSERT(result != -ENOSYS);
- } while (result == CLO_REPEAT);
+ result = cl_unuse_try_internal(env, lock);
+ LASSERT(lock->cll_state == CLS_INTRANSIT);
if (result != CLO_WAIT)
/*
* Once there is no more need to iterate ->clo_unuse() calls,
*/
cl_lock_user_del(env, lock);
if (result == 0 || result == -ESTALE) {
- enum cl_lock_state state;
-
/*
* Return lock back to the cache. This is the only
* place where lock is moved into CLS_CACHED state.
* canceled while unlocking was in progress.
*/
state = result == 0 ? CLS_CACHED : CLS_NEW;
- cl_lock_state_set(env, lock, state);
+ cl_lock_extransit(env, lock, state);
/*
* Hide -ESTALE error.
* pages won't be written to OSTs. -jay
*/
result = 0;
+ } else {
+ CWARN("result = %d, this is unlikely!\n", result);
+ cl_lock_extransit(env, lock, state);
}
+
result = result ?: lock->cll_error;
if (result < 0)
cl_lock_error(env, lock, result);
LINVRNT(cl_lock_is_mutexed(lock));
LINVRNT(cl_lock_invariant(env, lock));
LASSERT(lock->cll_state == CLS_ENQUEUED ||
- lock->cll_state == CLS_HELD);
+ lock->cll_state == CLS_HELD ||
+ lock->cll_state == CLS_INTRANSIT);
LASSERT(lock->cll_users > 0);
LASSERT(lock->cll_holds > 0);
result = 0;
if (lock->cll_error != 0)
break;
+
+ if (cl_lock_is_intransit(lock)) {
+ result = CLO_WAIT;
+ break;
+ }
+
if (lock->cll_state == CLS_HELD)
/* nothing to do */
break;
need->cld_mode = CLM_READ; /* CLM_READ matches both READ & WRITE, but
* not PHANTOM */
need->cld_start = need->cld_end = page->cp_index;
+ need->cld_enq_flags = 0;
spin_lock(&head->coh_lock_guard);
+ /* It is fine to match any group lock since there could be only one
+ * with a uniq gid and it conflicts with all other lock modes too */
list_for_each_entry(scan, &head->coh_locks, cll_linkage) {
if (scan != except &&
- cl_lock_ext_match(&scan->cll_descr, need) &&
+ (scan->cll_descr.cld_mode == CLM_GROUP ||
+ cl_lock_ext_match(&scan->cll_descr, need)) &&
+ scan->cll_state >= CLS_HELD &&
scan->cll_state < CLS_FREEING &&
/*
* This check is racy as the lock can be canceled right
struct cl_io *io = &info->clt_io;
struct cl_2queue *queue = &info->clt_queue;
struct cl_lock_descr *descr = &lock->cll_descr;
- int result;
- int rc0;
- int rc1;
+ long page_count;
+ int result;
LINVRNT(cl_lock_invariant(env, lock));
ENTRY;
io->ci_obj = cl_object_top(descr->cld_obj);
result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
if (result == 0) {
+ int nonblock = 1;
+restart:
cl_2queue_init(queue);
cl_page_gang_lookup(env, descr->cld_obj, io, descr->cld_start,
- descr->cld_end, &queue->c2_qin);
- if (queue->c2_qin.pl_nr > 0) {
+ descr->cld_end, &queue->c2_qin, nonblock);
+ page_count = queue->c2_qin.pl_nr;
+ if (page_count > 0) {
result = cl_page_list_unmap(env, io, &queue->c2_qin);
if (!discard) {
- rc0 = cl_io_submit_rw(env, io, CRT_WRITE,
- queue, CRP_CANCEL);
- rc1 = cl_page_list_own(env, io,
- &queue->c2_qout);
- result = result ?: rc0 ?: rc1;
+ long timeout = 600; /* 10 minutes. */
+ /* for debug purpose, if this request can't be
+ * finished in 10 minutes, we hope it can
+ * notify us.
+ */
+ result = cl_io_submit_sync(env, io, CRT_WRITE,
+ queue, CRP_CANCEL,
+ timeout);
+ if (result)
+ CWARN("Writing %lu pages error: %d\n",
+ page_count, result);
}
cl_lock_page_list_fixup(env, io, lock, &queue->c2_qout);
cl_2queue_discard(env, io, queue);
cl_2queue_disown(env, io, queue);
}
cl_2queue_fini(env, queue);
+
+ if (nonblock) {
+ nonblock = 0;
+ goto restart;
+ }
}
cl_io_fini(env, io);
RETURN(result);
*/
struct cl_lock *cl_lock_request(const struct lu_env *env, struct cl_io *io,
const struct cl_lock_descr *need,
- __u32 enqflags,
const char *scope, const void *source)
{
struct cl_lock *lock;
int rc;
int iter;
int warn;
+ __u32 enqflags = need->cld_enq_flags;
ENTRY;
fid = lu_object_fid(&io->ci_obj->co_lu);