* owner of the conflicting lock, that it can drop dirty pages
* protected by this lock, without sending them to the server.
*/
- CEF_DISCARD_DATA = 0x00000004
+ CEF_DISCARD_DATA = 0x00000004,
+ /**
+ * tell the sub layers that it must be a `real' lock.
+ */
+ CEF_MUST = 0x00000008,
+ /**
+ * tell the sub layers that never request a `real' lock.
+ * currently, the CEF_MUST & CEF_NEVER are only used for mmap locks.
+ * cl_io::ci_lockreq and these two flags: ci_lockreq just describes
+ * generic information of lock requirement for this IO, especially for
+ * locks which belong to the object doing IO; however, lock itself may
+ * have precise requirements, this is described by the latter.
+ */
+ CEF_NEVER = 0x00000010,
+ /**
+ * mask of enq_flags.
+ */
+ CEF_MASK = 0x0000001f
};
/**
int cl_io_lock_add (const struct lu_env *env, struct cl_io *io,
struct cl_io_lock_link *link);
int cl_io_lock_alloc_add(const struct lu_env *env, struct cl_io *io,
- struct cl_lock_descr *descr);
+ struct cl_lock_descr *descr, int enqflags);
int cl_io_read_page (const struct lu_env *env, struct cl_io *io,
struct cl_page *page);
int cl_io_prepare_write(const struct lu_env *env, struct cl_io *io,
struct cl_fault_io *fio;
io = &ccc_env_info(env)->cti_io;
+ memset(io, 0, sizeof(*io));
io->ci_obj = ll_i2info(inode)->lli_clob;
LASSERT(io->ci_obj != NULL);
if (cl_io_init(env, io, CIT_FAULT, io->ci_obj) == 0) {
struct vvp_io *vio = vvp_env_io(env);
struct ccc_io *cio = ccc_env_io(env);
+ struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
LASSERT(cio->cui_cl.cis_io == io);
+ /* mmap lock should be MANDATORY or NEVER. */
+ if (fd->fd_flags & LL_FILE_IGNORE_LOCK ||
+ sbi->ll_flags & LL_SBI_NOLCK)
+ io->ci_lockreq = CILR_NEVER;
+ else
+ io->ci_lockreq = CILR_MANDATORY;
+
vio->u.fault.ft_vma = vma;
vio->u.fault.ft_address = address;
vio->u.fault.ft_type = type;
- cio->cui_fd = LUSTRE_FPRIVATE(file);
+ cio->cui_fd = fd;
result = cl_io_loop(env, io);
if (result == 0) {
count += addr & (~CFS_PAGE_MASK);
addr &= CFS_PAGE_MASK;
while((vma = our_vma(addr, count)) != NULL) {
- LASSERT(vma->vm_file);
+ struct file *file = vma->vm_file;
+ struct ll_file_data *fd;
+
+ LASSERT(file);
+ fd = LUSTRE_FPRIVATE(file);
+
+ inode = file->f_dentry->d_inode;
+ if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK ||
+ ll_i2sbi(inode)->ll_flags & LL_SBI_NOLCK))
+ goto cont;
- inode = vma->vm_file->f_dentry->d_inode;
/*
* XXX: Required lock mode can be weakened: CIT_WRITE
* io only ever reads user level buffer, and CIT_READ
policy.l_extent.start);
descr->cld_end = cl_index(descr->cld_obj,
policy.l_extent.end);
- result = cl_io_lock_alloc_add(env, io, descr);
+ result = cl_io_lock_alloc_add(env, io, descr, CEF_MUST);
if (result < 0)
RETURN(result);
+
+ cont:
if (vma->vm_end - addr >= count)
break;
count -= vma->vm_end - addr;
struct cl_lock *lss_active;
};
+/**
+ * Describe the environment settings for sublocks.
+ */
+struct lov_sublock_env {
+ const struct lu_env *lse_env;
+ struct cl_io *lse_io;
+ struct lov_io_sub *lse_sub;
+};
+
struct lovsub_page {
struct cl_page_slice lsb_cl;
};
};
struct lov_session {
- struct lov_io ls_io;
+ struct lov_io ls_io;
+ struct lov_sublock_env ls_subenv;
};
/**
void lov_lock_unlink (const struct lu_env *env, struct lov_lock_link *link,
struct lovsub_lock *sub);
-void lov_sub_put (struct lov_io_sub *sub);
+struct lov_io_sub *lov_sub_get(const struct lu_env *env, struct lov_io *lio,
+ int stripe);
+void lov_sub_put (struct lov_io_sub *sub);
int lov_sublock_modify (const struct lu_env *env, struct lov_lock *lov,
struct lovsub_lock *sublock,
const struct cl_lock_descr *d, int idx);
RETURN(result);
}
-static struct lov_io_sub *lov_sub_get(const struct lu_env *env,
- struct lov_io *lio, int stripe)
+struct lov_io_sub *lov_sub_get(const struct lu_env *env,
+ struct lov_io *lio, int stripe)
{
int rc;
struct lov_io_sub *sub = &lio->lis_subs[stripe];
stripe, start, end);
} else
rc = PTR_ERR(sub);
+
if (!rc)
list_add_tail(&sub->sub_linkage, &lio->lis_active);
else
*
*/
+static struct lov_sublock_env *lov_sublock_env_get(const struct lu_env *env,
+ struct cl_lock *parent,
+ struct lov_lock_sub *lls)
+{
+ struct lov_sublock_env *subenv;
+ struct lov_io *lio = lov_env_io(env);
+ struct cl_io *io = lio->lis_cl.cis_io;
+ struct lov_io_sub *sub;
+
+ subenv = &lov_env_session(env)->ls_subenv;
+
+ /*
+ * FIXME: We tend to use the subio's env & io to call the sublock
+ * lock operations because osc lock sometimes stores some control
+ * variables in thread's IO infomation(Now only lockless information).
+ * However, if the lock's host(object) is different from the object
+ * for current IO, we have no way to get the subenv and subio because
+ * they are not initialized at all. As a temp fix, in this case,
+ * we still borrow the parent's env to call sublock operations.
+ */
+ if (!cl_object_same(io->ci_obj, parent->cll_descr.cld_obj)) {
+ subenv->lse_env = env;
+ subenv->lse_io = io;
+ subenv->lse_sub = NULL;
+ } else {
+ LASSERT(io != NULL);
+ sub = lov_sub_get(env, lio, lls->sub_stripe);
+ if (!IS_ERR(sub)) {
+ subenv->lse_env = sub->sub_env;
+ subenv->lse_io = sub->sub_io;
+ subenv->lse_sub = sub;
+ } else {
+ subenv = (void*)sub;
+ }
+ }
+ return subenv;
+}
+
+static void lov_sublock_env_put(struct lov_sublock_env *subenv)
+{
+ if (subenv && subenv->lse_sub)
+ lov_sub_put(subenv->lse_sub);
+}
+
static void lov_sublock_adopt(const struct lu_env *env, struct lov_lock *lck,
struct cl_lock *sublock, int idx,
struct lov_lock_link *link)
OBD_SLAB_ALLOC_PTR(link, lov_lock_link_kmem);
if (link != NULL) {
- struct lov_lock_sub *sub;
+ struct lov_sublock_env *subenv;
+ struct lov_lock_sub *lls;
struct cl_lock_descr *descr;
parent = lck->lls_cl.cls_lock;
- sub = &lck->lls_sub[idx];
- descr = &sub->sub_descr;
+ lls = &lck->lls_sub[idx];
+ descr = &lls->sub_descr;
+
+ subenv = lov_sublock_env_get(env, parent, lls);
+ if (!IS_ERR(subenv)) {
+ /* CAVEAT: Don't try to add a field in lov_lock_sub
+ * to remember the subio. This is because lock is able
+ * to be cached, but this is not true for IO. This
+ * further means a sublock might be referenced in
+ * different io context. -jay */
+
+ sublock = cl_lock_hold(subenv->lse_env, subenv->lse_io,
+ descr, "lov-parent", parent);
+ lov_sublock_env_put(subenv);
+ } else {
+ /* error occurs. */
+ sublock = (void*)subenv;
+ }
- /* XXX maybe sub-io? */
- sublock = cl_lock_hold(env, io, descr, "lov-parent", parent);
if (!IS_ERR(sublock))
*out = link;
else
static void lov_sublock_unlock(const struct lu_env *env,
struct lovsub_lock *lsl,
- struct cl_lock_closure *closure)
+ struct cl_lock_closure *closure,
+ struct lov_sublock_env *subenv)
{
ENTRY;
+ lov_sublock_env_put(subenv);
lsl->lss_active = NULL;
cl_lock_disclosure(env, closure);
EXIT;
}
-static int lov_sublock_lock(const struct lu_env *env, struct lovsub_lock *lsl,
- struct cl_lock_closure *closure)
+static int lov_sublock_lock(const struct lu_env *env,
+ struct lov_lock_sub *lls,
+ struct cl_lock_closure *closure,
+ struct lov_sublock_env **lsep)
{
struct cl_lock *child;
- int result;
+ int result = 0;
+ ENTRY;
LASSERT(list_empty(&closure->clc_list));
- ENTRY;
- child = lsl->lss_cl.cls_lock;
+ child = lls->sub_lock->lss_cl.cls_lock;
result = cl_lock_closure_build(env, child, closure);
if (result == 0) {
+ struct cl_lock *parent = closure->clc_origin;
+
LASSERT(cl_lock_is_mutexed(child));
- lsl->lss_active = closure->clc_origin;
+ lls->sub_lock->lss_active = parent;
+
+ if (lsep) {
+ struct lov_sublock_env *subenv;
+ subenv = lov_sublock_env_get(env, parent, lls);
+ if (IS_ERR(subenv)) {
+ lov_sublock_unlock(env, lls->sub_lock,
+ closure, NULL);
+ result = PTR_ERR(subenv);
+ } else {
+ *lsep = subenv;
+ }
+ }
}
RETURN(result);
}
ENTRY;
if (lck->lls_sub[i].sub_flags & LSF_HELD) {
- struct cl_lock *sublock;
+ struct cl_lock *sublock;
int dying;
LASSERT(lck->lls_sub[i].sub_lock != NULL);
struct cl_io *io, __u32 enqflags, int last)
{
int result;
-
ENTRY;
+
/* first, try to enqueue a sub-lock ... */
result = cl_enqueue_try(env, sublock, io, enqflags);
if (sublock->cll_state == CLS_ENQUEUED)
for (result = 0, minstate = CLS_FREEING, i = 0; i < lck->lls_nr; ++i) {
int rc;
- struct lovsub_lock *sub;
- struct cl_lock *sublock;
+ struct lovsub_lock *sub;
+ struct lov_lock_sub *lls;
+ struct cl_lock *sublock;
+ struct lov_sublock_env *subenv;
if (lock->cll_state != CLS_QUEUING) {
/*
break;
}
- sub = lck->lls_sub[i].sub_lock;
+ lls = &lck->lls_sub[i];
+ sub = lls->sub_lock;
/*
* Sub-lock might have been canceled, while top-lock was
* cached.
break;
}
sublock = sub->lss_cl.cls_lock;
- rc = lov_sublock_lock(env, sub, closure);
+ rc = lov_sublock_lock(env, lls, closure, &subenv);
if (rc == 0) {
lov_sublock_hold(env, lck, i);
- rc = lov_lock_enqueue_one(env, lck, sublock, io,
- enqflags,
+ rc = lov_lock_enqueue_one(subenv->lse_env, lck, sublock,
+ subenv->lse_io, enqflags,
i == lck->lls_nr - 1);
minstate = min(minstate, sublock->cll_state);
/*
*/
if (sublock->cll_state > CLS_HELD)
rc = lov_sublock_release(env, lck, i, 1, rc);
- lov_sublock_unlock(env, sub, closure);
+ lov_sublock_unlock(env, sub, closure, subenv);
}
result = lov_subresult(result, rc);
if (result < 0)
for (result = 0, i = 0; i < lck->lls_nr; ++i) {
int rc;
- struct lovsub_lock *sub;
- struct cl_lock *sublock;
+ struct lovsub_lock *sub;
+ struct cl_lock *sublock;
+ struct lov_lock_sub *lls;
+ struct lov_sublock_env *subenv;
/* top-lock state cannot change concurrently, because single
* thread (one that released the last hold) carries unlocking
* to the completion. */
LASSERT(slice->cls_lock->cll_state == CLS_UNLOCKING);
- sub = lck->lls_sub[i].sub_lock;
+ lls = &lck->lls_sub[i];
+ sub = lls->sub_lock;
if (sub == NULL)
continue;
sublock = sub->lss_cl.cls_lock;
- rc = lov_sublock_lock(env, sub, closure);
+ rc = lov_sublock_lock(env, lls, closure, &subenv);
if (rc == 0) {
if (lck->lls_sub[i].sub_flags & LSF_HELD) {
LASSERT(sublock->cll_state == CLS_HELD);
- rc = cl_unuse_try(env, sublock);
+ rc = cl_unuse_try(subenv->lse_env, sublock);
if (rc != CLO_WAIT)
rc = lov_sublock_release(env, lck,
i, 0, rc);
}
- lov_sublock_unlock(env, sub, closure);
+ lov_sublock_unlock(env, sub, closure, subenv);
}
result = lov_subresult(result, rc);
if (result < 0)
for (result = 0, minstate = CLS_FREEING, i = 0; i < lck->lls_nr; ++i) {
int rc;
- struct lovsub_lock *sub;
- struct cl_lock *sublock;
+ struct lovsub_lock *sub;
+ struct cl_lock *sublock;
+ struct lov_lock_sub *lls;
+ struct lov_sublock_env *subenv;
- sub = lck->lls_sub[i].sub_lock;
+ lls = &lck->lls_sub[i];
+ sub = lls->sub_lock;
LASSERT(sub != NULL);
sublock = sub->lss_cl.cls_lock;
- rc = lov_sublock_lock(env, sub, closure);
+ rc = lov_sublock_lock(env, lls, closure, &subenv);
if (rc == 0) {
LASSERT(sublock->cll_state >= CLS_ENQUEUED);
if (sublock->cll_state < CLS_HELD)
rc = cl_wait_try(env, sublock);
+
minstate = min(minstate, sublock->cll_state);
- lov_sublock_unlock(env, sub, closure);
+ lov_sublock_unlock(env, sub, closure, subenv);
}
result = lov_subresult(result, rc);
if (result < 0)
for (result = 0, i = 0; i < lck->lls_nr; ++i) {
int rc;
- struct lovsub_lock *sub;
- struct cl_lock *sublock;
+ struct lovsub_lock *sub;
+ struct cl_lock *sublock;
+ struct lov_lock_sub *lls;
+ struct lov_sublock_env *subenv;
if (slice->cls_lock->cll_state != CLS_CACHED) {
/* see comment in lov_lock_enqueue(). */
* CLS_CACHED state, top-lock would have been moved into
* CLS_NEW state, so all sub-locks have to be in place.
*/
- sub = lck->lls_sub[i].sub_lock;
+ lls = &lck->lls_sub[i];
+ sub = lls->sub_lock;
LASSERT(sub != NULL);
sublock = sub->lss_cl.cls_lock;
- rc = lov_sublock_lock(env, sub, closure);
+ rc = lov_sublock_lock(env, lls, closure, &subenv);
if (rc == 0) {
LASSERT(sublock->cll_state != CLS_FREEING);
lov_sublock_hold(env, lck, i);
if (sublock->cll_state == CLS_CACHED) {
- rc = cl_use_try(env, sublock);
+ rc = cl_use_try(subenv->lse_env, sublock);
if (rc != 0)
rc = lov_sublock_release(env, lck,
i, 1, rc);
} else
rc = 0;
- lov_sublock_unlock(env, sub, closure);
+ lov_sublock_unlock(env, sub, closure, subenv);
}
result = lov_subresult(result, rc);
if (result < 0)
ENTRY;
for (i = 0; i < lck->lls_nr; ++i) {
- struct lovsub_lock *lsl;
- struct cl_lock *sublock;
+ struct lov_lock_sub *lls;
+ struct lovsub_lock *lsl;
+ struct cl_lock *sublock;
int rc;
- lsl = lck->lls_sub[i].sub_lock;
+ lls = &lck->lls_sub[i];
+ lsl = lls->sub_lock;
if (lsl == NULL)
continue;
sublock = lsl->lss_cl.cls_lock;
- rc = lov_sublock_lock(env, lsl, closure);
+ rc = lov_sublock_lock(env, lls, closure, NULL);
if (rc == 0) {
if (lck->lls_sub[i].sub_flags & LSF_HELD)
lov_sublock_release(env, lck, i, 1, 0);
lov_lock_unlink(env, link, lsl);
LASSERT(lck->lls_sub[i].sub_lock == NULL);
}
- lov_sublock_unlock(env, lsl, closure);
+ lov_sublock_unlock(env, lsl, closure, NULL);
} else if (rc == CLO_REPEAT) {
--i; /* repeat with this lock */
} else {
* Allocates new lock link, and uses it to add a lock to a lockset.
*/
int cl_io_lock_alloc_add(const struct lu_env *env, struct cl_io *io,
- struct cl_lock_descr *descr)
+ struct cl_lock_descr *descr, int enqflags)
{
struct cl_io_lock_link *link;
int result;
ENTRY;
OBD_ALLOC_PTR(link);
if (link != NULL) {
- link->cill_descr = *descr;
- link->cill_fini = cl_free_io_lock_link;
+ link->cill_descr = *descr;
+ link->cill_enq_flags = enqflags;
+ link->cill_fini = cl_free_io_lock_link;
result = cl_io_lock_add(env, io, link);
if (result) /* lock match */
link->cill_fini(env, link);
struct osc_punch_cbargs *cbargs = &oio->oi_punch_cbarg;
struct obd_capa *capa;
loff_t size = io->u.ci_truncate.tr_size;
- int result;
+ int result = 0;
+
memset(oa, 0, sizeof(*oa));
osc_trunc_check(env, io, oio, size);
- cl_object_attr_lock(obj);
- result = cl_object_attr_get(env, obj, attr);
- if (result == 0) {
- attr->cat_size = attr->cat_kms = size;
- result = cl_object_attr_set(env, obj, attr, CAT_SIZE|CAT_KMS);
+ if (oio->oi_lockless == 0) {
+ cl_object_attr_lock(obj);
+ result = cl_object_attr_get(env, obj, attr);
+ if (result == 0) {
+ attr->cat_size = attr->cat_kms = size;
+ result = cl_object_attr_set(env, obj, attr,
+ CAT_SIZE|CAT_KMS);
+ }
+ cl_object_attr_unlock(obj);
}
- cl_object_attr_unlock(obj);
if (result == 0) {
oa->o_id = loi->loi_id;
static const struct cl_lock_operations osc_lock_ops;
static const struct cl_lock_operations osc_lock_lockless_ops;
+static void osc_lock_to_lockless(const struct lu_env *env,
+ struct osc_lock *ols, int force);
int osc_lock_is_lockless(const struct osc_lock *olck)
{
{
int result = 0;
- LASSERT((enqflags & ~(CEF_NONBLOCK|CEF_ASYNC|CEF_DISCARD_DATA)) == 0);
+ LASSERT((enqflags & ~CEF_MASK) == 0);
if (enqflags & CEF_NONBLOCK)
result |= LDLM_FL_BLOCK_NOWAIT;
cl_lock_put(env, lock);
}
-static void osc_lock_to_lockless(struct osc_lock *olck)
-{
- struct cl_lock_slice *slice = &olck->ols_cl;
- struct cl_lock *lock = slice->cls_lock;
-
- /*
- * TODO: Discover which locks we need to convert the lock
- * to ldlmlockless.
- */
- LASSERT(cl_lock_is_mutexed(lock));
- slice->cls_ops = &osc_lock_lockless_ops;
-}
-
/**
* Updates object attributes from a lock value block (lvb) received together
* with the DLM lock reply from the server. Copy of osc_update_enqueue()
LASSERT(slice->cls_ops == &osc_lock_ops);
/* Change this lock to ldlmlock-less lock. */
- osc_lock_to_lockless(olck);
+ osc_lock_to_lockless(env, olck, 1);
olck->ols_state = OLS_GRANTED;
rc = 0;
} else if (olck->ols_glimpse && rc == -ENAVAIL) {
}
/**
+ * Determine if the lock should be converted into a lockless lock.
+ *
+ * Steps to check:
+ * - if the lock has an explicite requirment for a non-lockless lock;
+ * - if the io lock request type ci_lockreq;
+ * - send the enqueue rpc to ost to make the further decision;
+ * - special treat to truncate lockless lock
+ *
+ * Additional policy can be implemented here, e.g., never do lockless-io
+ * for large extents.
+ */
+static void osc_lock_to_lockless(const struct lu_env *env,
+ struct osc_lock *ols, int force)
+{
+ struct cl_lock_slice *slice = &ols->ols_cl;
+ struct cl_lock *lock = slice->cls_lock;
+
+ LASSERT(ols->ols_state == OLS_NEW ||
+ ols->ols_state == OLS_UPCALL_RECEIVED);
+
+ if (force) {
+ ols->ols_locklessable = 1;
+ LASSERT(cl_lock_is_mutexed(lock));
+ slice->cls_ops = &osc_lock_lockless_ops;
+ } else {
+ struct osc_io *oio = osc_env_io(env);
+ struct cl_io *io = oio->oi_cl.cis_io;
+ struct cl_object *obj = slice->cls_obj;
+ struct osc_object *oob = cl2osc(obj);
+ const struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev);
+ struct obd_connect_data *ocd;
+
+ LASSERT(io->ci_lockreq == CILR_MANDATORY ||
+ io->ci_lockreq == CILR_MAYBE ||
+ io->ci_lockreq == CILR_NEVER);
+
+ ocd = &class_exp2cliimp(osc_export(oob))->imp_connect_data;
+ ols->ols_locklessable = (io->ci_type != CIT_TRUNC) &&
+ (io->ci_lockreq == CILR_MAYBE) &&
+ (ocd->ocd_connect_flags & OBD_CONNECT_SRVLOCK);
+ if (io->ci_lockreq == CILR_NEVER ||
+ /* lockless IO */
+ (ols->ols_locklessable && osc_object_is_contended(oob)) ||
+ /* lockless truncate */
+ (io->ci_type == CIT_TRUNC &&
+ (ocd->ocd_connect_flags & OBD_CONNECT_TRUNCLOCK) &&
+ osd->od_lockless_truncate)) {
+ ols->ols_locklessable = 1;
+ slice->cls_ops = &osc_lock_lockless_ops;
+ }
+ }
+}
+
+/**
* Cancel all conflicting locks and wait for them to be destroyed.
*
* This function is used for two purposes:
osc_lock_build_res(env, obj, resname);
osc_lock_build_policy(env, lock, policy);
ols->ols_flags = osc_enq2ldlm_flags(enqflags);
- if (ols->ols_locklessable)
- ols->ols_flags |= LDLM_FL_DENY_ON_CONTENTION;
if (osc_deadlock_is_possible(env, lock))
ols->ols_flags |= LDLM_FL_BLOCK_GRANTED;
if (ols->ols_flags & LDLM_FL_HAS_INTENT)
result = osc_lock_enqueue_wait(env, ols);
if (result == 0) {
- /* a reference for lock, passed as an upcall cookie */
- cl_lock_get(lock);
- lu_ref_add(&lock->cll_reference, "upcall", lock);
- ols->ols_state = OLS_ENQUEUED;
+ if (!(enqflags & CEF_MUST))
+ /* try to convert this lock to a lockless lock */
+ osc_lock_to_lockless(env, ols, (enqflags & CEF_NEVER));
+ if (!osc_lock_is_lockless(ols)) {
+ if (ols->ols_locklessable)
+ ols->ols_flags |= LDLM_FL_DENY_ON_CONTENTION;
+
+ /* a reference for lock, passed as an upcall cookie */
+ cl_lock_get(lock);
+ lu_ref_add(&lock->cll_reference, "upcall", lock);
+ ols->ols_state = OLS_ENQUEUED;
- /*
- * XXX: this is possible blocking point as
- * ldlm_lock_match(LDLM_FL_LVB_READY) waits for
- * LDLM_CP_CALLBACK.
- */
- result = osc_enqueue_base(osc_export(obj), resname,
+ /*
+ * XXX: this is possible blocking point as
+ * ldlm_lock_match(LDLM_FL_LVB_READY) waits for
+ * LDLM_CP_CALLBACK.
+ */
+ result = osc_enqueue_base(osc_export(obj), resname,
&ols->ols_flags, policy,
&ols->ols_lvb,
obj->oo_oinfo->loi_kms_valid,
osc_lock_upcall,
ols, einfo, &ols->ols_handle,
PTLRPCD_SET, 1);
- if (result != 0) {
- lu_ref_del(&lock->cll_reference, "upcall", lock);
- cl_lock_put(env, lock);
+ if (result != 0) {
+ lu_ref_del(&lock->cll_reference,
+ "upcall", lock);
+ cl_lock_put(env, lock);
+ }
+ } else {
+ ols->ols_state = OLS_GRANTED;
}
}
const struct cl_lock_slice *slice,
struct cl_io *_, __u32 enqflags)
{
- struct osc_lock *ols = cl2osc_lock(slice);
- struct cl_lock *lock = ols->ols_cl.cls_lock;
- int result;
-
- LASSERT(cl_lock_is_mutexed(lock));
- LASSERT(lock->cll_state == CLS_QUEUING);
- LASSERT(ols->ols_state == OLS_NEW);
-
- result = osc_lock_enqueue_wait(env, ols);
- if (result == 0)
- ols->ols_state = OLS_GRANTED;
- return result;
+ LBUG();
+ return 0;
}
static int osc_lock_lockless_unuse(const struct lu_env *env,
if (state == CLS_HELD) {
LASSERT(lock->ols_owner == NULL);
lock->ols_owner = oio;
- oio->oi_lockless = 1;
+
+ /* set the io to be lockless if this lock is for io's
+ * host object */
+ if (cl_object_same(oio->oi_cl.cis_obj, slice->cls_obj))
+ oio->oi_lockless = 1;
} else
lock->ols_owner = NULL;
}
int osc_lock_init(const struct lu_env *env,
struct cl_object *obj, struct cl_lock *lock,
- const struct cl_io *io)
+ const struct cl_io *_)
{
- struct osc_lock *clk;
- struct osc_io *oio = osc_env_io(env);
- struct osc_object *oob = cl2osc(obj);
+ struct osc_lock *clk;
int result;
OBD_SLAB_ALLOC_PTR(clk, osc_lock_kmem);
if (clk != NULL) {
- const struct cl_lock_operations *ops;
- const struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev);
- struct obd_connect_data *ocd;
-
osc_lock_build_einfo(env, lock, clk, &clk->ols_einfo);
clk->ols_state = OLS_NEW;
-
- /*
- * Check if we need to do lockless IO here.
- * Following conditions must be satisfied:
- * - the current IO must be locklessable;
- * - the stripe is in contention;
- * - requested lock is not a glimpse.
- *
- * if not, we have to inherit the locklessable flag to
- * osc_lock, and let ost make the decision.
- *
- * Additional policy can be implemented here, e.g., never do
- * lockless-io for large extents.
- */
- LASSERT(io->ci_lockreq == CILR_MANDATORY ||
- io->ci_lockreq == CILR_MAYBE ||
- io->ci_lockreq == CILR_NEVER);
- ocd = &class_exp2cliimp(osc_export(oob))->imp_connect_data;
- clk->ols_locklessable = (io->ci_type != CIT_TRUNC) &&
- (io->ci_lockreq == CILR_MAYBE) &&
- (ocd->ocd_connect_flags & OBD_CONNECT_SRVLOCK);
- ops = &osc_lock_ops;
- if (io->ci_lockreq == CILR_NEVER ||
- /* lockless IO */
- (clk->ols_locklessable && osc_object_is_contended(oob)) ||
- /* lockless truncate */
- (io->ci_type == CIT_TRUNC &&
- (ocd->ocd_connect_flags & OBD_CONNECT_TRUNCLOCK) &&
- osd->od_lockless_truncate)) {
- ops = &osc_lock_lockless_ops;
- oio->oi_lockless = 1;
- clk->ols_locklessable = 1;
- }
-
- cl_lock_slice_add(lock, &clk->ols_cl, obj, ops);
+ cl_lock_slice_add(lock, &clk->ols_cl, obj, &osc_lock_ops);
result = 0;
} else
result = -ENOMEM;
local p="$TMP/sanityN-$TESTNAME.parameters"
save_lustre_params $HOSTNAME osc.*.lockless_truncate > $p
cancel_lru_locks osc
- clear_osc_stats
enable_lockless_truncate 1
+ rm -f $DIR1/$tfile
+ lfs setstripe -c -1 $DIR1/$tfile
dd if=/dev/zero of=$DIR1/$tfile count=10 bs=1M > /dev/null 2>&1
+ clear_osc_stats
log "checking cached lockless truncate"
$TRUNCATE $DIR1/$tfile 8000000