*/
int cl_lock_mode_match(enum cl_lock_mode has, enum cl_lock_mode need)
{
- LINVRNT(need == CLM_READ || need == CLM_WRITE || need == CLM_PHANTOM);
- LINVRNT(has == CLM_READ || has == CLM_WRITE || has == CLM_PHANTOM);
+ LINVRNT(need == CLM_READ || need == CLM_WRITE ||
+ need == CLM_PHANTOM || need == CLM_GROUP);
+ LINVRNT(has == CLM_READ || has == CLM_WRITE ||
+ has == CLM_PHANTOM || has == CLM_GROUP);
CLASSERT(CLM_PHANTOM < CLM_READ);
CLASSERT(CLM_READ < CLM_WRITE);
+ CLASSERT(CLM_WRITE < CLM_GROUP);
- return need <= has;
+ if (has != CLM_GROUP)
+ return need <= has;
+ else
+ return need == has;
}
EXPORT_SYMBOL(cl_lock_mode_match);
return
has->cld_start <= need->cld_start &&
has->cld_end >= need->cld_end &&
- cl_lock_mode_match(has->cld_mode, need->cld_mode);
+ cl_lock_mode_match(has->cld_mode, need->cld_mode) &&
+ (has->cld_mode != CLM_GROUP || has->cld_gid == need->cld_gid);
}
EXPORT_SYMBOL(cl_lock_ext_match);
if (ok) {
cl_lock_hold_add(env, lock, scope, source);
cl_lock_user_add(env, lock);
+ cl_lock_put(env, lock);
}
cl_lock_mutex_put(env, lock);
if (!ok) {
* and head->coh_nesting == 1 check assumes two level top-sub
* hierarchy.
*/
- LASSERT(ergo(head->coh_nesting == 1 &&
- list_empty(&head->coh_locks), !head->coh_pages));
+ /*
+ * The count of pages of this object may NOT be zero because
+ * we don't cleanup the pages if they are in CPS_FREEING state.
+ * See cl_page_gang_lookup().
+ *
+ * It is safe to leave the CPS_FREEING pages in cache w/o
+ * a lock, because those page must not be uptodate.
+ * See cl_page_delete0 for details.
+ */
+ /* LASSERT(!ergo(head->coh_nesting == 1 &&
+ list_empty(&head->coh_locks), !head->coh_pages)); */
spin_unlock(&head->coh_lock_guard);
/*
* From now on, no new references to this lock can be acquired
lu_ref_del(&lock->cll_holders, scope, source);
cl_lock_hold_mod(env, lock, -1);
if (lock->cll_holds == 0) {
- if (lock->cll_descr.cld_mode == CLM_PHANTOM)
+ if (lock->cll_descr.cld_mode == CLM_PHANTOM ||
+ lock->cll_descr.cld_mode == CLM_GROUP)
/*
- * If lock is still phantom when user is done with
- * it---destroy the lock.
+ * If lock is still phantom or grouplock when user is
+ * done with it---destroy the lock.
*/
lock->cll_flags |= CLF_CANCELPEND|CLF_DOOMED;
if (lock->cll_flags & CLF_CANCELPEND) {
list_for_each_entry(scan, &head->coh_locks, cll_linkage) {
if (scan != except &&
cl_lock_ext_match(&scan->cll_descr, need) &&
+ scan->cll_state >= CLS_HELD &&
scan->cll_state < CLS_FREEING &&
/*
* This check is racy as the lock can be canceled right
struct cl_io *io = &info->clt_io;
struct cl_2queue *queue = &info->clt_queue;
struct cl_lock_descr *descr = &lock->cll_descr;
- int result;
- int rc0;
- int rc1;
+ long page_count;
+ int result;
LINVRNT(cl_lock_invariant(env, lock));
ENTRY;
io->ci_obj = cl_object_top(descr->cld_obj);
result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
if (result == 0) {
-
cl_2queue_init(queue);
cl_page_gang_lookup(env, descr->cld_obj, io, descr->cld_start,
descr->cld_end, &queue->c2_qin);
- if (queue->c2_qin.pl_nr > 0) {
+ page_count = queue->c2_qin.pl_nr;
+ if (page_count > 0) {
result = cl_page_list_unmap(env, io, &queue->c2_qin);
if (!discard) {
- rc0 = cl_io_submit_rw(env, io, CRT_WRITE,
- queue, CRP_CANCEL);
- rc1 = cl_page_list_own(env, io,
- &queue->c2_qout);
- result = result ?: rc0 ?: rc1;
+ long timeout = 600; /* 10 minutes. */
+ /* for debug purpose, if this request can't be
+ * finished in 10 minutes, we hope it can
+ * notify us.
+ */
+ result = cl_io_submit_sync(env, io, CRT_WRITE,
+ queue, CRP_CANCEL,
+ timeout);
+ if (result)
+ CWARN("Writing %lu pages error: %d\n",
+ page_count, result);
}
cl_lock_page_list_fixup(env, io, lock, &queue->c2_qout);
cl_2queue_discard(env, io, queue);
static const char *names[] = {
[CLM_PHANTOM] = "PHANTOM",
[CLM_READ] = "READ",
- [CLM_WRITE] = "WRITE"
+ [CLM_WRITE] = "WRITE",
+ [CLM_GROUP] = "GROUP"
};
if (0 <= mode && mode < ARRAY_SIZE(names))
return names[mode];