From: jxiong Date: Fri, 19 Dec 2008 02:58:02 +0000 (+0000) Subject: b=17633 X-Git-Tag: v1_9_130~4 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=89f659febd4220cc517a181f8ac6cc52235b76ca;p=fs%2Flustre-release.git b=17633 r=nikita,zam lockless io fixes for multistripe objects. --- diff --git a/lustre/include/cl_object.h b/lustre/include/cl_object.h index 3343139..ad4b468 100644 --- a/lustre/include/cl_object.h +++ b/lustre/include/cl_object.h @@ -2098,7 +2098,24 @@ enum cl_enq_flags { * owner of the conflicting lock, that it can drop dirty pages * protected by this lock, without sending them to the server. */ - CEF_DISCARD_DATA = 0x00000004 + CEF_DISCARD_DATA = 0x00000004, + /** + * tell the sub layers that it must be a `real' lock. + */ + CEF_MUST = 0x00000008, + /** + * tell the sub layers that never request a `real' lock. + * currently, the CEF_MUST & CEF_NEVER are only used for mmap locks. + * cl_io::ci_lockreq and these two flags: ci_lockreq just describes + * generic information of lock requirement for this IO, especially for + * locks which belong to the object doing IO; however, lock itself may + * have precise requirements, this is described by the latter. + */ + CEF_NEVER = 0x00000010, + /** + * mask of enq_flags. + */ + CEF_MASK = 0x0000001f }; /** @@ -2837,7 +2854,7 @@ void cl_io_end (const struct lu_env *env, struct cl_io *io); int cl_io_lock_add (const struct lu_env *env, struct cl_io *io, struct cl_io_lock_link *link); int cl_io_lock_alloc_add(const struct lu_env *env, struct cl_io *io, - struct cl_lock_descr *descr); + struct cl_lock_descr *descr, int enqflags); int cl_io_read_page (const struct lu_env *env, struct cl_io *io, struct cl_page *page); int cl_io_prepare_write(const struct lu_env *env, struct cl_io *io, diff --git a/lustre/llite/llite_mmap.c b/lustre/llite/llite_mmap.c index 4126a83..52a8970 100644 --- a/lustre/llite/llite_mmap.c +++ b/lustre/llite/llite_mmap.c @@ -150,6 +150,7 @@ struct page *ll_nopage(struct vm_area_struct *vma, unsigned long address, struct cl_fault_io *fio; io = &ccc_env_info(env)->cti_io; + memset(io, 0, sizeof(*io)); io->ci_obj = ll_i2info(inode)->lli_clob; LASSERT(io->ci_obj != NULL); @@ -174,13 +175,22 @@ struct page *ll_nopage(struct vm_area_struct *vma, unsigned long address, if (cl_io_init(env, io, CIT_FAULT, io->ci_obj) == 0) { struct vvp_io *vio = vvp_env_io(env); struct ccc_io *cio = ccc_env_io(env); + struct ll_file_data *fd = LUSTRE_FPRIVATE(file); + struct ll_sb_info *sbi = ll_i2sbi(inode); LASSERT(cio->cui_cl.cis_io == io); + /* mmap lock should be MANDATORY or NEVER. */ + if (fd->fd_flags & LL_FILE_IGNORE_LOCK || + sbi->ll_flags & LL_SBI_NOLCK) + io->ci_lockreq = CILR_NEVER; + else + io->ci_lockreq = CILR_MANDATORY; + vio->u.fault.ft_vma = vma; vio->u.fault.ft_address = address; vio->u.fault.ft_type = type; - cio->cui_fd = LUSTRE_FPRIVATE(file); + cio->cui_fd = fd; result = cl_io_loop(env, io); if (result == 0) { diff --git a/lustre/llite/vvp_io.c b/lustre/llite/vvp_io.c index aa2dcf0..890997e 100644 --- a/lustre/llite/vvp_io.c +++ b/lustre/llite/vvp_io.c @@ -146,9 +146,17 @@ static int vvp_mmap_locks(const struct lu_env *env, count += addr & (~CFS_PAGE_MASK); addr &= CFS_PAGE_MASK; while((vma = our_vma(addr, count)) != NULL) { - LASSERT(vma->vm_file); + struct file *file = vma->vm_file; + struct ll_file_data *fd; + + LASSERT(file); + fd = LUSTRE_FPRIVATE(file); + + inode = file->f_dentry->d_inode; + if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK || + ll_i2sbi(inode)->ll_flags & LL_SBI_NOLCK)) + goto cont; - inode = vma->vm_file->f_dentry->d_inode; /* * XXX: Required lock mode can be weakened: CIT_WRITE * io only ever reads user level buffer, and CIT_READ @@ -161,9 +169,11 @@ static int vvp_mmap_locks(const struct lu_env *env, policy.l_extent.start); descr->cld_end = cl_index(descr->cld_obj, policy.l_extent.end); - result = cl_io_lock_alloc_add(env, io, descr); + result = cl_io_lock_alloc_add(env, io, descr, CEF_MUST); if (result < 0) RETURN(result); + + cont: if (vma->vm_end - addr >= count) break; count -= vma->vm_end - addr; diff --git a/lustre/lov/lov_cl_internal.h b/lustre/lov/lov_cl_internal.h index 90f53fb..6a98fbc 100644 --- a/lustre/lov/lov_cl_internal.h +++ b/lustre/lov/lov_cl_internal.h @@ -396,6 +396,15 @@ struct lovsub_lock { struct cl_lock *lss_active; }; +/** + * Describe the environment settings for sublocks. + */ +struct lov_sublock_env { + const struct lu_env *lse_env; + struct cl_io *lse_io; + struct lov_io_sub *lse_sub; +}; + struct lovsub_page { struct cl_page_slice lsb_cl; }; @@ -507,7 +516,8 @@ struct lov_io { }; struct lov_session { - struct lov_io ls_io; + struct lov_io ls_io; + struct lov_sublock_env ls_subenv; }; /** @@ -564,7 +574,9 @@ int lov_io_init_empty (const struct lu_env *env, struct cl_object *obj, void lov_lock_unlink (const struct lu_env *env, struct lov_lock_link *link, struct lovsub_lock *sub); -void lov_sub_put (struct lov_io_sub *sub); +struct lov_io_sub *lov_sub_get(const struct lu_env *env, struct lov_io *lio, + int stripe); +void lov_sub_put (struct lov_io_sub *sub); int lov_sublock_modify (const struct lu_env *env, struct lov_lock *lov, struct lovsub_lock *sublock, const struct cl_lock_descr *d, int idx); diff --git a/lustre/lov/lov_io.c b/lustre/lov/lov_io.c index 346a992..4ab7d25 100644 --- a/lustre/lov/lov_io.c +++ b/lustre/lov/lov_io.c @@ -201,8 +201,8 @@ static int lov_io_sub_init(const struct lu_env *env, struct lov_io *lio, RETURN(result); } -static struct lov_io_sub *lov_sub_get(const struct lu_env *env, - struct lov_io *lio, int stripe) +struct lov_io_sub *lov_sub_get(const struct lu_env *env, + struct lov_io *lio, int stripe) { int rc; struct lov_io_sub *sub = &lio->lis_subs[stripe]; @@ -391,6 +391,7 @@ static int lov_io_iter_init(const struct lu_env *env, stripe, start, end); } else rc = PTR_ERR(sub); + if (!rc) list_add_tail(&sub->sub_linkage, &lio->lis_active); else diff --git a/lustre/lov/lov_lock.c b/lustre/lov/lov_lock.c index 14ecd68..905b6cc 100644 --- a/lustre/lov/lov_lock.c +++ b/lustre/lov/lov_lock.c @@ -53,6 +53,50 @@ static struct cl_lock_closure *lov_closure_get(const struct lu_env *env, * */ +static struct lov_sublock_env *lov_sublock_env_get(const struct lu_env *env, + struct cl_lock *parent, + struct lov_lock_sub *lls) +{ + struct lov_sublock_env *subenv; + struct lov_io *lio = lov_env_io(env); + struct cl_io *io = lio->lis_cl.cis_io; + struct lov_io_sub *sub; + + subenv = &lov_env_session(env)->ls_subenv; + + /* + * FIXME: We tend to use the subio's env & io to call the sublock + * lock operations because osc lock sometimes stores some control + * variables in thread's IO infomation(Now only lockless information). + * However, if the lock's host(object) is different from the object + * for current IO, we have no way to get the subenv and subio because + * they are not initialized at all. As a temp fix, in this case, + * we still borrow the parent's env to call sublock operations. + */ + if (!cl_object_same(io->ci_obj, parent->cll_descr.cld_obj)) { + subenv->lse_env = env; + subenv->lse_io = io; + subenv->lse_sub = NULL; + } else { + LASSERT(io != NULL); + sub = lov_sub_get(env, lio, lls->sub_stripe); + if (!IS_ERR(sub)) { + subenv->lse_env = sub->sub_env; + subenv->lse_io = sub->sub_io; + subenv->lse_sub = sub; + } else { + subenv = (void*)sub; + } + } + return subenv; +} + +static void lov_sublock_env_put(struct lov_sublock_env *subenv) +{ + if (subenv && subenv->lse_sub) + lov_sub_put(subenv->lse_sub); +} + static void lov_sublock_adopt(const struct lu_env *env, struct lov_lock *lck, struct cl_lock *sublock, int idx, struct lov_lock_link *link) @@ -102,15 +146,30 @@ static struct cl_lock *lov_sublock_alloc(const struct lu_env *env, OBD_SLAB_ALLOC_PTR(link, lov_lock_link_kmem); if (link != NULL) { - struct lov_lock_sub *sub; + struct lov_sublock_env *subenv; + struct lov_lock_sub *lls; struct cl_lock_descr *descr; parent = lck->lls_cl.cls_lock; - sub = &lck->lls_sub[idx]; - descr = &sub->sub_descr; + lls = &lck->lls_sub[idx]; + descr = &lls->sub_descr; + + subenv = lov_sublock_env_get(env, parent, lls); + if (!IS_ERR(subenv)) { + /* CAVEAT: Don't try to add a field in lov_lock_sub + * to remember the subio. This is because lock is able + * to be cached, but this is not true for IO. This + * further means a sublock might be referenced in + * different io context. -jay */ + + sublock = cl_lock_hold(subenv->lse_env, subenv->lse_io, + descr, "lov-parent", parent); + lov_sublock_env_put(subenv); + } else { + /* error occurs. */ + sublock = (void*)subenv; + } - /* XXX maybe sub-io? */ - sublock = cl_lock_hold(env, io, descr, "lov-parent", parent); if (!IS_ERR(sublock)) *out = link; else @@ -122,28 +181,46 @@ static struct cl_lock *lov_sublock_alloc(const struct lu_env *env, static void lov_sublock_unlock(const struct lu_env *env, struct lovsub_lock *lsl, - struct cl_lock_closure *closure) + struct cl_lock_closure *closure, + struct lov_sublock_env *subenv) { ENTRY; + lov_sublock_env_put(subenv); lsl->lss_active = NULL; cl_lock_disclosure(env, closure); EXIT; } -static int lov_sublock_lock(const struct lu_env *env, struct lovsub_lock *lsl, - struct cl_lock_closure *closure) +static int lov_sublock_lock(const struct lu_env *env, + struct lov_lock_sub *lls, + struct cl_lock_closure *closure, + struct lov_sublock_env **lsep) { struct cl_lock *child; - int result; + int result = 0; + ENTRY; LASSERT(list_empty(&closure->clc_list)); - ENTRY; - child = lsl->lss_cl.cls_lock; + child = lls->sub_lock->lss_cl.cls_lock; result = cl_lock_closure_build(env, child, closure); if (result == 0) { + struct cl_lock *parent = closure->clc_origin; + LASSERT(cl_lock_is_mutexed(child)); - lsl->lss_active = closure->clc_origin; + lls->sub_lock->lss_active = parent; + + if (lsep) { + struct lov_sublock_env *subenv; + subenv = lov_sublock_env_get(env, parent, lls); + if (IS_ERR(subenv)) { + lov_sublock_unlock(env, lls->sub_lock, + closure, NULL); + result = PTR_ERR(subenv); + } else { + *lsep = subenv; + } + } } RETURN(result); } @@ -308,7 +385,7 @@ static int lov_sublock_release(const struct lu_env *env, struct lov_lock *lck, ENTRY; if (lck->lls_sub[i].sub_flags & LSF_HELD) { - struct cl_lock *sublock; + struct cl_lock *sublock; int dying; LASSERT(lck->lls_sub[i].sub_lock != NULL); @@ -404,8 +481,8 @@ static int lov_lock_enqueue_one(const struct lu_env *env, struct lov_lock *lck, struct cl_io *io, __u32 enqflags, int last) { int result; - ENTRY; + /* first, try to enqueue a sub-lock ... */ result = cl_enqueue_try(env, sublock, io, enqflags); if (sublock->cll_state == CLS_ENQUEUED) @@ -480,8 +557,10 @@ static int lov_lock_enqueue(const struct lu_env *env, for (result = 0, minstate = CLS_FREEING, i = 0; i < lck->lls_nr; ++i) { int rc; - struct lovsub_lock *sub; - struct cl_lock *sublock; + struct lovsub_lock *sub; + struct lov_lock_sub *lls; + struct cl_lock *sublock; + struct lov_sublock_env *subenv; if (lock->cll_state != CLS_QUEUING) { /* @@ -493,7 +572,8 @@ static int lov_lock_enqueue(const struct lu_env *env, break; } - sub = lck->lls_sub[i].sub_lock; + lls = &lck->lls_sub[i]; + sub = lls->sub_lock; /* * Sub-lock might have been canceled, while top-lock was * cached. @@ -505,11 +585,11 @@ static int lov_lock_enqueue(const struct lu_env *env, break; } sublock = sub->lss_cl.cls_lock; - rc = lov_sublock_lock(env, sub, closure); + rc = lov_sublock_lock(env, lls, closure, &subenv); if (rc == 0) { lov_sublock_hold(env, lck, i); - rc = lov_lock_enqueue_one(env, lck, sublock, io, - enqflags, + rc = lov_lock_enqueue_one(subenv->lse_env, lck, sublock, + subenv->lse_io, enqflags, i == lck->lls_nr - 1); minstate = min(minstate, sublock->cll_state); /* @@ -518,7 +598,7 @@ static int lov_lock_enqueue(const struct lu_env *env, */ if (sublock->cll_state > CLS_HELD) rc = lov_sublock_release(env, lck, i, 1, rc); - lov_sublock_unlock(env, sub, closure); + lov_sublock_unlock(env, sub, closure, subenv); } result = lov_subresult(result, rc); if (result < 0) @@ -540,28 +620,31 @@ static int lov_lock_unuse(const struct lu_env *env, for (result = 0, i = 0; i < lck->lls_nr; ++i) { int rc; - struct lovsub_lock *sub; - struct cl_lock *sublock; + struct lovsub_lock *sub; + struct cl_lock *sublock; + struct lov_lock_sub *lls; + struct lov_sublock_env *subenv; /* top-lock state cannot change concurrently, because single * thread (one that released the last hold) carries unlocking * to the completion. */ LASSERT(slice->cls_lock->cll_state == CLS_UNLOCKING); - sub = lck->lls_sub[i].sub_lock; + lls = &lck->lls_sub[i]; + sub = lls->sub_lock; if (sub == NULL) continue; sublock = sub->lss_cl.cls_lock; - rc = lov_sublock_lock(env, sub, closure); + rc = lov_sublock_lock(env, lls, closure, &subenv); if (rc == 0) { if (lck->lls_sub[i].sub_flags & LSF_HELD) { LASSERT(sublock->cll_state == CLS_HELD); - rc = cl_unuse_try(env, sublock); + rc = cl_unuse_try(subenv->lse_env, sublock); if (rc != CLO_WAIT) rc = lov_sublock_release(env, lck, i, 0, rc); } - lov_sublock_unlock(env, sub, closure); + lov_sublock_unlock(env, sub, closure, subenv); } result = lov_subresult(result, rc); if (result < 0) @@ -588,19 +671,23 @@ static int lov_lock_wait(const struct lu_env *env, for (result = 0, minstate = CLS_FREEING, i = 0; i < lck->lls_nr; ++i) { int rc; - struct lovsub_lock *sub; - struct cl_lock *sublock; + struct lovsub_lock *sub; + struct cl_lock *sublock; + struct lov_lock_sub *lls; + struct lov_sublock_env *subenv; - sub = lck->lls_sub[i].sub_lock; + lls = &lck->lls_sub[i]; + sub = lls->sub_lock; LASSERT(sub != NULL); sublock = sub->lss_cl.cls_lock; - rc = lov_sublock_lock(env, sub, closure); + rc = lov_sublock_lock(env, lls, closure, &subenv); if (rc == 0) { LASSERT(sublock->cll_state >= CLS_ENQUEUED); if (sublock->cll_state < CLS_HELD) rc = cl_wait_try(env, sublock); + minstate = min(minstate, sublock->cll_state); - lov_sublock_unlock(env, sub, closure); + lov_sublock_unlock(env, sub, closure, subenv); } result = lov_subresult(result, rc); if (result < 0) @@ -623,8 +710,10 @@ static int lov_lock_use(const struct lu_env *env, for (result = 0, i = 0; i < lck->lls_nr; ++i) { int rc; - struct lovsub_lock *sub; - struct cl_lock *sublock; + struct lovsub_lock *sub; + struct cl_lock *sublock; + struct lov_lock_sub *lls; + struct lov_sublock_env *subenv; if (slice->cls_lock->cll_state != CLS_CACHED) { /* see comment in lov_lock_enqueue(). */ @@ -636,21 +725,22 @@ static int lov_lock_use(const struct lu_env *env, * CLS_CACHED state, top-lock would have been moved into * CLS_NEW state, so all sub-locks have to be in place. */ - sub = lck->lls_sub[i].sub_lock; + lls = &lck->lls_sub[i]; + sub = lls->sub_lock; LASSERT(sub != NULL); sublock = sub->lss_cl.cls_lock; - rc = lov_sublock_lock(env, sub, closure); + rc = lov_sublock_lock(env, lls, closure, &subenv); if (rc == 0) { LASSERT(sublock->cll_state != CLS_FREEING); lov_sublock_hold(env, lck, i); if (sublock->cll_state == CLS_CACHED) { - rc = cl_use_try(env, sublock); + rc = cl_use_try(subenv->lse_env, sublock); if (rc != 0) rc = lov_sublock_release(env, lck, i, 1, rc); } else rc = 0; - lov_sublock_unlock(env, sub, closure); + lov_sublock_unlock(env, sub, closure, subenv); } result = lov_subresult(result, rc); if (result < 0) @@ -838,16 +928,18 @@ static void lov_lock_delete(const struct lu_env *env, ENTRY; for (i = 0; i < lck->lls_nr; ++i) { - struct lovsub_lock *lsl; - struct cl_lock *sublock; + struct lov_lock_sub *lls; + struct lovsub_lock *lsl; + struct cl_lock *sublock; int rc; - lsl = lck->lls_sub[i].sub_lock; + lls = &lck->lls_sub[i]; + lsl = lls->sub_lock; if (lsl == NULL) continue; sublock = lsl->lss_cl.cls_lock; - rc = lov_sublock_lock(env, lsl, closure); + rc = lov_sublock_lock(env, lls, closure, NULL); if (rc == 0) { if (lck->lls_sub[i].sub_flags & LSF_HELD) lov_sublock_release(env, lck, i, 1, 0); @@ -859,7 +951,7 @@ static void lov_lock_delete(const struct lu_env *env, lov_lock_unlink(env, link, lsl); LASSERT(lck->lls_sub[i].sub_lock == NULL); } - lov_sublock_unlock(env, lsl, closure); + lov_sublock_unlock(env, lsl, closure, NULL); } else if (rc == CLO_REPEAT) { --i; /* repeat with this lock */ } else { diff --git a/lustre/obdclass/cl_io.c b/lustre/obdclass/cl_io.c index 62357e7..0e2f342 100644 --- a/lustre/obdclass/cl_io.c +++ b/lustre/obdclass/cl_io.c @@ -582,7 +582,7 @@ static void cl_free_io_lock_link(const struct lu_env *env, * Allocates new lock link, and uses it to add a lock to a lockset. */ int cl_io_lock_alloc_add(const struct lu_env *env, struct cl_io *io, - struct cl_lock_descr *descr) + struct cl_lock_descr *descr, int enqflags) { struct cl_io_lock_link *link; int result; @@ -590,8 +590,9 @@ int cl_io_lock_alloc_add(const struct lu_env *env, struct cl_io *io, ENTRY; OBD_ALLOC_PTR(link); if (link != NULL) { - link->cill_descr = *descr; - link->cill_fini = cl_free_io_lock_link; + link->cill_descr = *descr; + link->cill_enq_flags = enqflags; + link->cill_fini = cl_free_io_lock_link; result = cl_io_lock_add(env, io, link); if (result) /* lock match */ link->cill_fini(env, link); diff --git a/lustre/osc/osc_io.c b/lustre/osc/osc_io.c index 180e4f1..fd38a1c 100644 --- a/lustre/osc/osc_io.c +++ b/lustre/osc/osc_io.c @@ -422,19 +422,23 @@ static int osc_io_trunc_start(const struct lu_env *env, struct osc_punch_cbargs *cbargs = &oio->oi_punch_cbarg; struct obd_capa *capa; loff_t size = io->u.ci_truncate.tr_size; - int result; + int result = 0; + memset(oa, 0, sizeof(*oa)); osc_trunc_check(env, io, oio, size); - cl_object_attr_lock(obj); - result = cl_object_attr_get(env, obj, attr); - if (result == 0) { - attr->cat_size = attr->cat_kms = size; - result = cl_object_attr_set(env, obj, attr, CAT_SIZE|CAT_KMS); + if (oio->oi_lockless == 0) { + cl_object_attr_lock(obj); + result = cl_object_attr_get(env, obj, attr); + if (result == 0) { + attr->cat_size = attr->cat_kms = size; + result = cl_object_attr_set(env, obj, attr, + CAT_SIZE|CAT_KMS); + } + cl_object_attr_unlock(obj); } - cl_object_attr_unlock(obj); if (result == 0) { oa->o_id = loi->loi_id; diff --git a/lustre/osc/osc_lock.c b/lustre/osc/osc_lock.c index a0e3190..8f2b2b5 100644 --- a/lustre/osc/osc_lock.c +++ b/lustre/osc/osc_lock.c @@ -60,6 +60,8 @@ static const struct cl_lock_operations osc_lock_ops; static const struct cl_lock_operations osc_lock_lockless_ops; +static void osc_lock_to_lockless(const struct lu_env *env, + struct osc_lock *ols, int force); int osc_lock_is_lockless(const struct osc_lock *olck) { @@ -247,7 +249,7 @@ static int osc_enq2ldlm_flags(__u32 enqflags) { int result = 0; - LASSERT((enqflags & ~(CEF_NONBLOCK|CEF_ASYNC|CEF_DISCARD_DATA)) == 0); + LASSERT((enqflags & ~CEF_MASK) == 0); if (enqflags & CEF_NONBLOCK) result |= LDLM_FL_BLOCK_NOWAIT; @@ -303,19 +305,6 @@ static void osc_ast_data_put(const struct lu_env *env, struct osc_lock *olck) cl_lock_put(env, lock); } -static void osc_lock_to_lockless(struct osc_lock *olck) -{ - struct cl_lock_slice *slice = &olck->ols_cl; - struct cl_lock *lock = slice->cls_lock; - - /* - * TODO: Discover which locks we need to convert the lock - * to ldlmlockless. - */ - LASSERT(cl_lock_is_mutexed(lock)); - slice->cls_ops = &osc_lock_lockless_ops; -} - /** * Updates object attributes from a lock value block (lvb) received together * with the DLM lock reply from the server. Copy of osc_update_enqueue() @@ -519,7 +508,7 @@ static int osc_lock_upcall(void *cookie, int errcode) LASSERT(slice->cls_ops == &osc_lock_ops); /* Change this lock to ldlmlock-less lock. */ - osc_lock_to_lockless(olck); + osc_lock_to_lockless(env, olck, 1); olck->ols_state = OLS_GRANTED; rc = 0; } else if (olck->ols_glimpse && rc == -ENAVAIL) { @@ -998,6 +987,60 @@ static int osc_lock_cancel_wait(const struct lu_env *env, struct cl_lock *lock, } /** + * Determine if the lock should be converted into a lockless lock. + * + * Steps to check: + * - if the lock has an explicite requirment for a non-lockless lock; + * - if the io lock request type ci_lockreq; + * - send the enqueue rpc to ost to make the further decision; + * - special treat to truncate lockless lock + * + * Additional policy can be implemented here, e.g., never do lockless-io + * for large extents. + */ +static void osc_lock_to_lockless(const struct lu_env *env, + struct osc_lock *ols, int force) +{ + struct cl_lock_slice *slice = &ols->ols_cl; + struct cl_lock *lock = slice->cls_lock; + + LASSERT(ols->ols_state == OLS_NEW || + ols->ols_state == OLS_UPCALL_RECEIVED); + + if (force) { + ols->ols_locklessable = 1; + LASSERT(cl_lock_is_mutexed(lock)); + slice->cls_ops = &osc_lock_lockless_ops; + } else { + struct osc_io *oio = osc_env_io(env); + struct cl_io *io = oio->oi_cl.cis_io; + struct cl_object *obj = slice->cls_obj; + struct osc_object *oob = cl2osc(obj); + const struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev); + struct obd_connect_data *ocd; + + LASSERT(io->ci_lockreq == CILR_MANDATORY || + io->ci_lockreq == CILR_MAYBE || + io->ci_lockreq == CILR_NEVER); + + ocd = &class_exp2cliimp(osc_export(oob))->imp_connect_data; + ols->ols_locklessable = (io->ci_type != CIT_TRUNC) && + (io->ci_lockreq == CILR_MAYBE) && + (ocd->ocd_connect_flags & OBD_CONNECT_SRVLOCK); + if (io->ci_lockreq == CILR_NEVER || + /* lockless IO */ + (ols->ols_locklessable && osc_object_is_contended(oob)) || + /* lockless truncate */ + (io->ci_type == CIT_TRUNC && + (ocd->ocd_connect_flags & OBD_CONNECT_TRUNCLOCK) && + osd->od_lockless_truncate)) { + ols->ols_locklessable = 1; + slice->cls_ops = &osc_lock_lockless_ops; + } + } +} + +/** * Cancel all conflicting locks and wait for them to be destroyed. * * This function is used for two purposes: @@ -1190,8 +1233,6 @@ static int osc_lock_enqueue(const struct lu_env *env, osc_lock_build_res(env, obj, resname); osc_lock_build_policy(env, lock, policy); ols->ols_flags = osc_enq2ldlm_flags(enqflags); - if (ols->ols_locklessable) - ols->ols_flags |= LDLM_FL_DENY_ON_CONTENTION; if (osc_deadlock_is_possible(env, lock)) ols->ols_flags |= LDLM_FL_BLOCK_GRANTED; if (ols->ols_flags & LDLM_FL_HAS_INTENT) @@ -1199,26 +1240,37 @@ static int osc_lock_enqueue(const struct lu_env *env, result = osc_lock_enqueue_wait(env, ols); if (result == 0) { - /* a reference for lock, passed as an upcall cookie */ - cl_lock_get(lock); - lu_ref_add(&lock->cll_reference, "upcall", lock); - ols->ols_state = OLS_ENQUEUED; + if (!(enqflags & CEF_MUST)) + /* try to convert this lock to a lockless lock */ + osc_lock_to_lockless(env, ols, (enqflags & CEF_NEVER)); + if (!osc_lock_is_lockless(ols)) { + if (ols->ols_locklessable) + ols->ols_flags |= LDLM_FL_DENY_ON_CONTENTION; + + /* a reference for lock, passed as an upcall cookie */ + cl_lock_get(lock); + lu_ref_add(&lock->cll_reference, "upcall", lock); + ols->ols_state = OLS_ENQUEUED; - /* - * XXX: this is possible blocking point as - * ldlm_lock_match(LDLM_FL_LVB_READY) waits for - * LDLM_CP_CALLBACK. - */ - result = osc_enqueue_base(osc_export(obj), resname, + /* + * XXX: this is possible blocking point as + * ldlm_lock_match(LDLM_FL_LVB_READY) waits for + * LDLM_CP_CALLBACK. + */ + result = osc_enqueue_base(osc_export(obj), resname, &ols->ols_flags, policy, &ols->ols_lvb, obj->oo_oinfo->loi_kms_valid, osc_lock_upcall, ols, einfo, &ols->ols_handle, PTLRPCD_SET, 1); - if (result != 0) { - lu_ref_del(&lock->cll_reference, "upcall", lock); - cl_lock_put(env, lock); + if (result != 0) { + lu_ref_del(&lock->cll_reference, + "upcall", lock); + cl_lock_put(env, lock); + } + } else { + ols->ols_state = OLS_GRANTED; } } @@ -1473,18 +1525,8 @@ static int osc_lock_lockless_enqueue(const struct lu_env *env, const struct cl_lock_slice *slice, struct cl_io *_, __u32 enqflags) { - struct osc_lock *ols = cl2osc_lock(slice); - struct cl_lock *lock = ols->ols_cl.cls_lock; - int result; - - LASSERT(cl_lock_is_mutexed(lock)); - LASSERT(lock->cll_state == CLS_QUEUING); - LASSERT(ols->ols_state == OLS_NEW); - - result = osc_lock_enqueue_wait(env, ols); - if (result == 0) - ols->ols_state = OLS_GRANTED; - return result; + LBUG(); + return 0; } static int osc_lock_lockless_unuse(const struct lu_env *env, @@ -1537,7 +1579,11 @@ static void osc_lock_lockless_state(const struct lu_env *env, if (state == CLS_HELD) { LASSERT(lock->ols_owner == NULL); lock->ols_owner = oio; - oio->oi_lockless = 1; + + /* set the io to be lockless if this lock is for io's + * host object */ + if (cl_object_same(oio->oi_cl.cis_obj, slice->cls_obj)) + oio->oi_lockless = 1; } else lock->ols_owner = NULL; } @@ -1563,56 +1609,16 @@ static const struct cl_lock_operations osc_lock_lockless_ops = { int osc_lock_init(const struct lu_env *env, struct cl_object *obj, struct cl_lock *lock, - const struct cl_io *io) + const struct cl_io *_) { - struct osc_lock *clk; - struct osc_io *oio = osc_env_io(env); - struct osc_object *oob = cl2osc(obj); + struct osc_lock *clk; int result; OBD_SLAB_ALLOC_PTR(clk, osc_lock_kmem); if (clk != NULL) { - const struct cl_lock_operations *ops; - const struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev); - struct obd_connect_data *ocd; - osc_lock_build_einfo(env, lock, clk, &clk->ols_einfo); clk->ols_state = OLS_NEW; - - /* - * Check if we need to do lockless IO here. - * Following conditions must be satisfied: - * - the current IO must be locklessable; - * - the stripe is in contention; - * - requested lock is not a glimpse. - * - * if not, we have to inherit the locklessable flag to - * osc_lock, and let ost make the decision. - * - * Additional policy can be implemented here, e.g., never do - * lockless-io for large extents. - */ - LASSERT(io->ci_lockreq == CILR_MANDATORY || - io->ci_lockreq == CILR_MAYBE || - io->ci_lockreq == CILR_NEVER); - ocd = &class_exp2cliimp(osc_export(oob))->imp_connect_data; - clk->ols_locklessable = (io->ci_type != CIT_TRUNC) && - (io->ci_lockreq == CILR_MAYBE) && - (ocd->ocd_connect_flags & OBD_CONNECT_SRVLOCK); - ops = &osc_lock_ops; - if (io->ci_lockreq == CILR_NEVER || - /* lockless IO */ - (clk->ols_locklessable && osc_object_is_contended(oob)) || - /* lockless truncate */ - (io->ci_type == CIT_TRUNC && - (ocd->ocd_connect_flags & OBD_CONNECT_TRUNCLOCK) && - osd->od_lockless_truncate)) { - ops = &osc_lock_lockless_ops; - oio->oi_lockless = 1; - clk->ols_locklessable = 1; - } - - cl_lock_slice_add(lock, &clk->ols_cl, obj, ops); + cl_lock_slice_add(lock, &clk->ols_cl, obj, &osc_lock_ops); result = 0; } else result = -ENOMEM; diff --git a/lustre/tests/sanityN.sh b/lustre/tests/sanityN.sh index 89cbd93..591024d 100644 --- a/lustre/tests/sanityN.sh +++ b/lustre/tests/sanityN.sh @@ -636,9 +636,11 @@ test_32a() { # bug 11270 local p="$TMP/sanityN-$TESTNAME.parameters" save_lustre_params $HOSTNAME osc.*.lockless_truncate > $p cancel_lru_locks osc - clear_osc_stats enable_lockless_truncate 1 + rm -f $DIR1/$tfile + lfs setstripe -c -1 $DIR1/$tfile dd if=/dev/zero of=$DIR1/$tfile count=10 bs=1M > /dev/null 2>&1 + clear_osc_stats log "checking cached lockless truncate" $TRUNCATE $DIR1/$tfile 8000000