void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode);
void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode);
void ldlm_lock_allow_match(struct ldlm_lock *lock);
-int ldlm_lock_fast_match(struct ldlm_lock *, int, obd_off, obd_off, void **);
-void ldlm_lock_fast_release(void *, int);
+int ldlm_lock_fast_match(struct ldlm_lock *, int, obd_off, obd_off,
+ struct lustre_handle *);
ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags,
struct ldlm_res_id *, ldlm_type_t type,
ldlm_policy_data_t *, ldlm_mode_t mode,
struct lov_oinfo *loi,
cfs_page_t *page, obd_off offset,
struct obd_async_page_ops *ops, void *data,
- void **res, int nocache,
+ void **res, int flags,
struct lustre_handle *lockh);
- int (*o_reget_short_lock)(struct obd_export *exp,
- struct lov_stripe_md *lsm,
- void **res, int rw,
- obd_off start, obd_off end,
- void **cookie);
- int (*o_release_short_lock)(struct obd_export *exp,
- struct lov_stripe_md *lsm, obd_off end,
- void *cookie, int rw);
+ int (*o_get_lock)(struct obd_export *exp, struct lov_stripe_md *lsm,
+ void **res, int rw, obd_off start, obd_off end,
+ struct lustre_handle *lockh, int flags);
int (*o_queue_async_io)(struct obd_export *exp,
struct lov_stripe_md *lsm,
struct lov_oinfo *loi, void *cookie,
int (*o_change_cbdata)(struct obd_export *, struct lov_stripe_md *,
ldlm_iterator_t it, void *data);
int (*o_cancel)(struct obd_export *, struct lov_stripe_md *md,
- __u32 mode, struct lustre_handle *);
+ __u32 mode, struct lustre_handle *, int flags,
+ obd_off end);
int (*o_cancel_unused)(struct obd_export *, struct lov_stripe_md *,
int flags, void *opaque);
int (*o_join_lru)(struct obd_export *, struct lov_stripe_md *,
RETURN(rc);
}
+/* flags used by obd_prep_async_page */
+#define OBD_PAGE_NO_CACHE 0x00000001 /* don't add to cache */
+#define OBD_FAST_LOCK 0x00000002 /* lockh refers to a "fast lock" */
+
static inline int obd_prep_async_page(struct obd_export *exp,
struct lov_stripe_md *lsm,
struct lov_oinfo *loi,
cfs_page_t *page, obd_off offset,
struct obd_async_page_ops *ops,
- void *data, void **res, int nocache,
+ void *data, void **res, int flags,
struct lustre_handle *lockh)
{
int ret;
EXP_COUNTER_INCREMENT(exp, prep_async_page);
ret = OBP(exp->exp_obd, prep_async_page)(exp, lsm, loi, page, offset,
- ops, data, res, nocache,
+ ops, data, res, flags,
lockh);
RETURN(ret);
}
-static inline int obd_reget_short_lock(struct obd_export *exp,
- struct lov_stripe_md *lsm,
- void **res, int rw,
- obd_off start, obd_off end,
- void **cookie)
-{
- ENTRY;
-
- OBD_CHECK_OP(exp->exp_obd, reget_short_lock, -EOPNOTSUPP);
- EXP_COUNTER_INCREMENT(exp, reget_short_lock);
-
- RETURN(OBP(exp->exp_obd, reget_short_lock)(exp, lsm, res, rw,
- start, end, cookie));
-}
-
-static inline int obd_release_short_lock(struct obd_export *exp,
- struct lov_stripe_md *lsm, obd_off end,
- void *cookie, int rw)
+static inline int obd_get_lock(struct obd_export *exp,
+ struct lov_stripe_md *lsm, void **res, int rw,
+ obd_off start, obd_off end,
+ struct lustre_handle *lockh, int flags)
{
ENTRY;
- OBD_CHECK_OP(exp->exp_obd, release_short_lock, -EOPNOTSUPP);
- EXP_COUNTER_INCREMENT(exp, release_short_lock);
+ OBD_CHECK_OP(exp->exp_obd, get_lock, -EOPNOTSUPP);
+ EXP_COUNTER_INCREMENT(exp, get_lock);
- RETURN(OBP(exp->exp_obd, release_short_lock)(exp, lsm, end,
- cookie, rw));
+ RETURN(OBP(exp->exp_obd, get_lock)(exp, lsm, res, rw, start, end,
+ lockh, flags));
}
static inline int obd_queue_async_io(struct obd_export *exp,
RETURN(rc);
}
-static inline int obd_cancel(struct obd_export *exp,
- struct lov_stripe_md *ea, __u32 mode,
- struct lustre_handle *lockh)
+static inline int obd_cancel(struct obd_export *exp, struct lov_stripe_md *ea,
+ __u32 mode, struct lustre_handle *lockh, int flags,
+ obd_off end)
{
int rc;
ENTRY;
EXP_CHECK_OP(exp, cancel);
EXP_COUNTER_INCREMENT(exp, cancel);
- rc = OBP(exp->exp_obd, cancel)(exp, ea, mode, lockh);
+ rc = OBP(exp->exp_obd, cancel)(exp, ea, mode, lockh, flags, end);
RETURN(rc);
}
int ldlm_lock_fast_match(struct ldlm_lock *lock, int rw,
obd_off start, obd_off end,
- void **cookie)
+ struct lustre_handle *lockh)
{
LASSERT(rw == OBD_BRW_READ || rw == OBD_BRW_WRITE);
+ LASSERT(lockh != NULL);
if (!lock)
return 0;
!lock->l_writers && !lock->l_readers)
goto no_match;
- ldlm_lock_addref_internal_nolock(lock, rw == OBD_BRW_WRITE ? LCK_PW : LCK_PR);
+ ldlm_lock_addref_internal_nolock(lock,
+ rw == OBD_BRW_WRITE ? LCK_PW : LCK_PR);
unlock_res_and_lock(lock);
- *cookie = (void *)lock;
+ ldlm_lock2handle(lock, lockh);
return 1; /* avoid using rc for stack relief */
no_match:
return 0;
}
-void ldlm_lock_fast_release(void *cookie, int rw)
-{
- struct ldlm_lock *lock = (struct ldlm_lock *)cookie;
-
- LASSERT(lock != NULL);
- LASSERT(rw == OBD_BRW_READ || rw == OBD_BRW_WRITE);
- LASSERT(rw == OBD_BRW_READ || (lock->l_granted_mode & (LCK_PW | LCK_GROUP)));
- ldlm_lock_decref_internal(lock, rw == OBD_BRW_WRITE ? LCK_PW : LCK_PR);
-}
-
/* Can be called in two ways:
*
* If 'ns' is NULL, then lockh describes an existing lock that we want to look
EXPORT_SYMBOL(ldlm_lock_get);
EXPORT_SYMBOL(ldlm_lock_put);
EXPORT_SYMBOL(ldlm_lock_fast_match);
-EXPORT_SYMBOL(ldlm_lock_fast_release);
EXPORT_SYMBOL(ldlm_lock_match);
EXPORT_SYMBOL(ldlm_lock_cancel);
EXPORT_SYMBOL(ldlm_lock_addref);
(sbi->ll_flags & LL_SBI_NOLCK) || mode == LCK_NL)
RETURN(0);
- rc = obd_cancel(sbi->ll_osc_exp, lsm, mode, lockh);
+ rc = obd_cancel(sbi->ll_osc_exp, lsm, mode, lockh, 0, 0);
RETURN(rc);
}
(obd_off)page->index << CFS_PAGE_SHIFT,
&llu_async_page_ops,
llap, &llap->llap_cookie,
- 1 /* no cache in liblustre at all */,
+ /* no cache in liblustre at all */
+ OBD_PAGE_NO_CACHE,
NULL);
if (rc) {
LASSERT(rc < 0);
(sbi->ll_flags & LL_SBI_NOLCK))
RETURN(0);
- rc = obd_cancel(sbi->ll_osc_exp, lsm, mode, lockh);
+ rc = obd_cancel(sbi->ll_osc_exp, lsm, mode, lockh, 0, 0);
RETURN(rc);
}
return 0;
}
-static int ll_reget_short_lock(struct page *page, int rw,
- obd_off start, obd_off end,
- void **cookie)
+static int ll_get_short_lock(struct page *page, int rw, obd_off start,
+ obd_off end, struct lustre_handle *lockh)
{
struct ll_async_page *llap;
struct obd_export *exp;
if (llap == NULL)
RETURN(0);
- RETURN(obd_reget_short_lock(exp, ll_i2info(inode)->lli_smd,
- &llap->llap_cookie, rw, start, end,
- cookie));
+ RETURN(obd_get_lock(exp, ll_i2info(inode)->lli_smd,
+ &llap->llap_cookie, rw, start, end, lockh,
+ OBD_FAST_LOCK));
}
static void ll_release_short_lock(struct inode *inode, obd_off end,
- void *cookie, int rw)
+ struct lustre_handle *lockh, int rw)
{
struct obd_export *exp;
int rc;
if (exp == NULL)
return;
- rc = obd_release_short_lock(exp, ll_i2info(inode)->lli_smd, end,
- cookie, rw);
+ rc = obd_cancel(exp, ll_i2info(inode)->lli_smd,
+ rw = OBD_BRW_READ ? LCK_PR : LCK_PW, lockh,
+ OBD_FAST_LOCK, end);
if (rc < 0)
CERROR("unlock failed (%d)\n", rc);
}
obd_off ppos, obd_off end,
const struct iovec *iov,
unsigned long nr_segs,
- void **cookie, int rw)
+ struct lustre_handle *lockh,
+ int rw)
{
int rc = 0, seg;
struct page *page;
page = find_lock_page(file->f_dentry->d_inode->i_mapping,
ppos >> CFS_PAGE_SHIFT);
if (page) {
- if (ll_reget_short_lock(page, rw, ppos, end, cookie))
+ if (ll_get_short_lock(page, rw, ppos, end, lockh))
rc = 1;
unlock_page(page);
}
static inline void ll_file_put_fast_lock(struct inode *inode, obd_off end,
- void *cookie, int rw)
+ struct lustre_handle *lockh, int rw)
{
- ll_release_short_lock(inode, end, cookie, rw);
+ ll_release_short_lock(inode, end, lockh, rw);
}
-enum ll_lock_style {
- LL_LOCK_STYLE_NOLOCK = 0,
- LL_LOCK_STYLE_FASTLOCK = 1,
- LL_LOCK_STYLE_TREELOCK = 2
-};
-
static inline int ll_file_get_lock(struct file *file, obd_off ppos,
obd_off end, const struct iovec *iov,
- unsigned long nr_segs, void **cookie,
+ unsigned long nr_segs,
+ struct lustre_handle *lockh,
struct ll_lock_tree *tree, int rw)
{
int rc;
ENTRY;
- if (ll_file_get_fast_lock(file, ppos, end, iov, nr_segs, cookie, rw))
+ if (ll_file_get_fast_lock(file, ppos, end, iov, nr_segs, lockh, rw))
RETURN(LL_LOCK_STYLE_FASTLOCK);
rc = ll_file_get_tree_lock_iov(tree, file, iov, nr_segs,
static inline void ll_file_put_lock(struct inode *inode, obd_off end,
enum ll_lock_style lock_style,
- void *cookie, struct ll_lock_tree *tree,
- int rw)
+ struct lustre_handle *lockh,
+ struct ll_lock_tree *tree, int rw)
{
switch (lock_style) {
ll_tree_unlock(tree);
break;
case LL_LOCK_STYLE_FASTLOCK:
- ll_file_put_fast_lock(inode, end, cookie, rw);
+ ll_file_put_fast_lock(inode, end, lockh, rw);
break;
default:
CERROR("invalid locking style (%d)\n", lock_style);
struct ll_inode_info *lli = ll_i2info(inode);
struct lov_stripe_md *lsm = lli->lli_smd;
struct ll_sb_info *sbi = ll_i2sbi(inode);
- struct ll_lock_tree tree;
+ struct ll_thread_data ltd = { 0 };
struct ost_lvb lvb;
struct ll_ra_read bead;
int ra = 0;
obd_off end;
ssize_t retval, chunk, sum = 0;
- int lock_style;
struct iovec *iov_copy = NULL;
unsigned long nrsegs_copy, nrsegs_orig = 0;
size_t count, iov_offset = 0;
__u64 kms;
- void *cookie;
ENTRY;
count = ll_file_get_iov_count(iov, &nr_segs);
RETURN(sum);
}
+ ltd.ltd_magic = LTD_MAGIC;
+ ll_td_set(<d);
repeat:
+ memset(<d, 0, sizeof(ltd));
+ ltd.ltd_magic = LTD_MAGIC;
if (sbi->ll_max_rw_chunk != 0 && !(file->f_flags & O_DIRECT)) {
/* first, let's know the end of the current stripe */
end = *ppos;
down_read(&lli->lli_truncate_rwsem); /* Bug 18233 */
- lock_style = ll_file_get_lock(file, (obd_off)(*ppos), end,
- iov_copy, nrsegs_copy, &cookie, &tree,
- OBD_BRW_READ);
- if (lock_style < 0 || lock_style == LL_LOCK_STYLE_NOLOCK)
+ ltd.lock_style = ll_file_get_lock(file, (obd_off)(*ppos), end,
+ iov_copy, nrsegs_copy,
+ <d.u.lockh, <d.u.tree,
+ OBD_BRW_READ);
+ if (ltd.lock_style < 0 || ltd.lock_style == LL_LOCK_STYLE_NOLOCK)
up_read(&lli->lli_truncate_rwsem);
- if (lock_style < 0)
- GOTO(out, retval = lock_style);
+ if (ltd.lock_style < 0)
+ GOTO(out, retval = ltd.lock_style);
ll_inode_size_lock(inode, 1);
/*
ll_inode_size_unlock(inode, 1);
retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
if (retval) {
- if (lock_style != LL_LOCK_STYLE_NOLOCK) {
- ll_file_put_lock(inode, end, lock_style,
- cookie, &tree, OBD_BRW_READ);
+ if (ltd.lock_style != LL_LOCK_STYLE_NOLOCK) {
+ ll_file_put_lock(inode, end, ltd.lock_style,
+ <d.u.lockh, <d.u.tree,
+ OBD_BRW_READ);
up_read(&lli->lli_truncate_rwsem);
}
goto out;
if ((size == 0 && cur_index != 0) ||
(((size - 1) >> CFS_PAGE_SHIFT) < cur_index)) {
- if (lock_style != LL_LOCK_STYLE_NOLOCK) {
- ll_file_put_lock(inode, end, lock_style,
- cookie, &tree,
+ if (ltd.lock_style != LL_LOCK_STYLE_NOLOCK) {
+
+ ll_file_put_lock(inode, end,
+ ltd.lock_style,
+ <d.u.lockh,
+ <d.u.tree,
OBD_BRW_READ);
up_read(&lli->lli_truncate_rwsem);
}
inode->i_ino, chunk, *ppos, i_size_read(inode));
/* turn off the kernel's read-ahead */
- if (lock_style != LL_LOCK_STYLE_NOLOCK) {
+ if (ltd.lock_style != LL_LOCK_STYLE_NOLOCK) {
struct ost_lvb *xtimes;
/* read under locks
*
* ll_glimpse_size) could get correct values in lsm */
OBD_ALLOC_PTR(xtimes);
if (NULL == xtimes) {
- ll_file_put_lock(inode, end, lock_style, cookie,
- &tree, OBD_BRW_READ);
+ ll_file_put_lock(inode, end, ltd.lock_style,
+ <d.u.lockh, <d.u.tree,
+ OBD_BRW_READ);
up_read(&lli->lli_truncate_rwsem);
GOTO(out, retval = -ENOMEM);
}
retval = generic_file_aio_read(iocb, iov_copy, nrsegs_copy,
*ppos);
#endif
- ll_file_put_lock(inode, end, lock_style, cookie,
- &tree, OBD_BRW_READ);
+ ll_file_put_lock(inode, end, ltd.lock_style, <d.u.lockh,
+ <d.u.tree, OBD_BRW_READ);
up_read(&lli->lli_truncate_rwsem);
} else {
retval = ll_direct_IO(READ, file, iov_copy, *ppos, nr_segs, 0);
}
out:
+ ll_td_set(NULL);
if (ra != 0)
ll_ra_read_ex(file, &bead);
retval = (sum > 0) ? sum : retval;
struct inode *inode = file->f_dentry->d_inode;
struct ll_sb_info *sbi = ll_i2sbi(inode);
struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
- struct ll_lock_tree tree;
+ struct ll_thread_data ltd = { 0 };
loff_t maxbytes = ll_file_maxbytes(inode);
loff_t lock_start, lock_end, end;
ssize_t retval, chunk, sum = 0;
if (down_interruptible(&ll_i2info(inode)->lli_write_sem))
RETURN(-ERESTARTSYS);
+ ltd.ltd_magic = LTD_MAGIC;
+ ll_td_set(<d);
repeat:
+ memset(<d, 0, sizeof(ltd));
+ ltd.ltd_magic = LTD_MAGIC;
+
chunk = 0; /* just to fix gcc's warning */
end = *ppos + count - 1;
nrsegs_copy = nr_segs;
}
- tree_locked = ll_file_get_tree_lock_iov(&tree, file, iov_copy,
+ tree_locked = ll_file_get_tree_lock_iov(<d.u.tree, file, iov_copy,
nrsegs_copy,
(obd_off)lock_start,
(obd_off)lock_end,
lov_stripe_unlock(lsm);
OBD_FREE_PTR(xtimes);
+ ltd.lock_style = LL_LOCK_STYLE_TREELOCK;
+
#ifdef HAVE_FILE_WRITEV
retval = generic_file_writev(file, iov_copy, nrsegs_copy, ppos);
#else
out_unlock:
if (tree_locked)
- ll_tree_unlock(&tree);
+ ll_tree_unlock(<d.u.tree);
out:
if (retval > 0) {
up(&ll_i2info(inode)->lli_write_sem);
+ ll_td_set(NULL);
if (iov_copy && iov_copy != iov)
OBD_FREE(iov_copy, sizeof(*iov) * nrsegs_orig);
int ast_flags);
int ll_tree_unlock(struct ll_lock_tree *tree);
+enum ll_lock_style {
+ LL_LOCK_STYLE_NOLOCK = 0,
+ LL_LOCK_STYLE_FASTLOCK = 1,
+ LL_LOCK_STYLE_TREELOCK = 2
+};
+
+struct ll_thread_data {
+ int ltd_magic;
+ int lock_style;
+ struct list_head *tree_list;
+ union {
+ struct ll_lock_tree tree;
+ struct lustre_handle lockh;
+ } u;
+};
+struct ll_thread_data *ll_td_get(void);
+void ll_td_set(struct ll_thread_data *ltd);
+struct lustre_handle *ltd2lockh(struct ll_thread_data *ltd, __u64 start,
+ __u64 end);
+
#define ll_s2sbi(sb) (s2lsi(sb)->lsi_llsbi)
static inline __u64 ll_ts2u64(struct timespec *time)
int err;
err = (local_lock == 2) ?
- obd_cancel(sbi->ll_osc_exp, lsm, LCK_PW, &lockh):
+ obd_cancel(sbi->ll_osc_exp, lsm, LCK_PW, &lockh, 0, 0):
ll_extent_unlock(NULL, inode, lsm, LCK_PW, &lockh);
if (unlikely(err != 0)){
CERROR("extent unlock failed: err=%d,"
ll_extent_unlock(fd, inode, ll_i2info(inode)->lli_smd, mode, lockh);
}
+struct lustre_handle *ltd2lockh(struct ll_thread_data *ltd,
+ __u64 start, __u64 end) {
+ ENTRY;
+ if (NULL == ltd)
+ RETURN(NULL);
+ switch(ltd->lock_style) {
+ case LL_LOCK_STYLE_FASTLOCK:
+ RETURN(<d->u.lockh);
+ break;
+ case LL_LOCK_STYLE_TREELOCK: {
+ struct ll_lock_tree_node *node;
+ if (ltd->tree_list == NULL)
+ ltd->tree_list = <d->u.tree.lt_locked_list;
+
+ list_for_each_entry(node, ltd->tree_list, lt_locked_item) {
+ if (node->lt_policy.l_extent.start <= start &&
+ node->lt_policy.l_extent.end >= end) {
+ ltd->tree_list = node->lt_locked_item.prev;
+ RETURN(&node->lt_lockh);
+ }
+ }
+ }
+ default:
+ break;
+ }
+ RETURN(NULL);
+}
+
#ifndef HAVE_VM_OP_FAULT
/**
* Page fault handler.
static struct ll_async_page *llap_from_page_with_lockh(struct page *page,
unsigned origin,
- struct lustre_handle *lockh)
+ struct lustre_handle *lockh,
+ int flags)
{
struct ll_async_page *llap;
struct obd_export *exp;
LASSERT(ll_async_page_slab);
LASSERTF(origin < LLAP__ORIGIN_MAX, "%u\n", origin);
+ exp = ll_i2obdexp(page->mapping->host);
+ if (exp == NULL)
+ RETURN(ERR_PTR(-EINVAL));
+
llap = llap_cast_private(page);
if (llap != NULL) {
+ if (origin == LLAP_ORIGIN_READAHEAD && lockh) {
+ /* the page could belong to another lock for which
+ * we don't hold a reference. We need to check that
+ * a reference is taken on a lock covering this page.
+ * For readpage origin, this is fine because
+ * ll_file_readv() took a reference on lock(s) covering
+ * the whole read. However, for readhead, we don't have
+ * this guarantee, so we need to check that the lock
+ * matched in ll_file_readv() also covers this page */
+ __u64 offset = ((loff_t)page->index) << CFS_PAGE_SHIFT;
+ if (!obd_get_lock(exp, ll_i2info(inode)->lli_smd,
+ &llap->llap_cookie, OBD_BRW_READ,
+ offset, offset + CFS_PAGE_SIZE - 1,
+ lockh, flags))
+ RETURN(ERR_PTR(-ENOLCK));
+ }
/* move to end of LRU list, except when page is just about to
* die */
if (origin != LLAP_ORIGIN_REMOVEPAGE) {
GOTO(out, llap);
}
- exp = ll_i2obdexp(page->mapping->host);
- if (exp == NULL)
- RETURN(ERR_PTR(-EINVAL));
-
/* limit the number of lustre-cached pages */
cpu = cfs_get_cpu();
pd = LL_PGLIST_DATA(sbi);
rc = obd_prep_async_page(exp, ll_i2info(inode)->lli_smd, NULL, page,
(obd_off)page->index << CFS_PAGE_SHIFT,
&ll_async_page_ops, llap, &llap->llap_cookie,
- 0, lockh);
+ flags, lockh);
if (rc) {
OBD_SLAB_FREE(llap, ll_async_page_slab,
ll_async_page_slab_size);
static inline struct ll_async_page *llap_from_page(struct page *page,
unsigned origin)
{
- return llap_from_page_with_lockh(page, origin, NULL);
+ return llap_from_page_with_lockh(page, origin, NULL, 0);
}
static int queue_or_sync_write(struct obd_export *exp, struct inode *inode,
if (fd->fd_flags & LL_FILE_GROUP_LOCKED)
lockh = &fd->fd_cwlockh;
- llap = llap_from_page_with_lockh(page, LLAP_ORIGIN_COMMIT_WRITE, lockh);
+ llap = llap_from_page_with_lockh(page, LLAP_ORIGIN_COMMIT_WRITE, lockh,
+ 0);
if (IS_ERR(llap))
RETURN(PTR_ERR(llap));
return start <= index && index <= end;
}
+struct ll_thread_data *ll_td_get()
+{
+ struct ll_thread_data *ltd = current->journal_info;
+
+ LASSERT(ltd == NULL || ltd->ltd_magic == LTD_MAGIC);
+ return ltd;
+}
+
+void ll_td_set(struct ll_thread_data *ltd)
+{
+ if (ltd == NULL) {
+ ltd = current->journal_info;
+ LASSERT(ltd == NULL || ltd->ltd_magic == LTD_MAGIC);
+ current->journal_info = NULL;
+ return;
+ }
+
+ LASSERT(current->journal_info == NULL);
+ LASSERT(ltd->ltd_magic == LTD_MAGIC);
+ current->journal_info = ltd;
+}
+
static struct ll_readahead_state *ll_ras_get(struct file *f)
{
struct ll_file_data *fd;
struct ll_async_page *llap;
struct page *page;
unsigned int gfp_mask = 0;
- int rc = 0;
+ int rc = 0, flags = 0;
+ struct ll_thread_data *ltd;
+ struct lustre_handle *lockh = NULL;
gfp_mask = GFP_HIGHUSER & ~__GFP_WAIT;
#ifdef __GFP_NOWARN
GOTO(unlock_page, rc = 0);
}
+ ltd = ll_td_get();
+ if (ltd && ltd->lock_style > 0) {
+ __u64 offset = ((loff_t)page->index) << CFS_PAGE_SHIFT;
+ lockh = ltd2lockh(ltd, offset,
+ offset + CFS_PAGE_SIZE - 1);
+ if (ltd->lock_style == LL_LOCK_STYLE_FASTLOCK)
+ flags = OBD_FAST_LOCK;
+ }
+
/* we do this first so that we can see the page in the /proc
* accounting */
- llap = llap_from_page(page, LLAP_ORIGIN_READAHEAD);
+ llap = llap_from_page_with_lockh(page, LLAP_ORIGIN_READAHEAD, lockh,
+ flags);
if (IS_ERR(llap) || llap->llap_defer_uptodate) {
if (PTR_ERR(llap) == -ENOLCK) {
ll_ra_stats_inc(mapping, RA_STAT_FAILED_MATCH);
}
static int ll_readahead(struct ll_readahead_state *ras,
- struct obd_export *exp, struct address_space *mapping,
- struct obd_io_group *oig, int flags)
+ struct obd_export *exp, struct address_space *mapping,
+ struct obd_io_group *oig, int flags)
{
unsigned long start = 0, end = 0, reserved;
unsigned long ra_end, len;
struct ll_inode_info *lli = ll_i2info(inode);
struct obd_export *exp;
struct ll_async_page *llap;
+ struct ll_thread_data *ltd;
+ struct lustre_handle *lockh = NULL;
int rc = 0;
ENTRY;
if (exp == NULL)
GOTO(out, rc = -EINVAL);
- llap = llap_from_page(page, LLAP_ORIGIN_WRITEPAGE);
+ ltd = ll_td_get();
+ /* currently, no FAST lock in write path */
+ if (ltd && ltd->lock_style == LL_LOCK_STYLE_TREELOCK) {
+ __u64 offset = ((loff_t)page->index) << CFS_PAGE_SHIFT;
+ lockh = ltd2lockh(ltd, offset, offset + CFS_PAGE_SIZE - 1);
+ }
+
+ llap = llap_from_page_with_lockh(page, LLAP_ORIGIN_WRITEPAGE, lockh, 0);
if (IS_ERR(llap))
GOTO(out, rc = PTR_ERR(llap));
struct ll_async_page *llap;
struct obd_io_group *oig = NULL;
struct lustre_handle *lockh = NULL;
- int rc;
+ int rc, flags = 0;
ENTRY;
LASSERT(PageLocked(page));
if (exp == NULL)
GOTO(out, rc = -EINVAL);
- if (fd->fd_flags & LL_FILE_GROUP_LOCKED)
+ if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
lockh = &fd->fd_cwlockh;
+ } else {
+ struct ll_thread_data *ltd;
+ ltd = ll_td_get();
+ if (ltd && ltd->lock_style > 0) {
+ __u64 offset = ((loff_t)page->index) << CFS_PAGE_SHIFT;
+ lockh = ltd2lockh(ltd, offset,
+ offset + CFS_PAGE_SIZE - 1);
+ if (ltd->lock_style == LL_LOCK_STYLE_FASTLOCK)
+ flags = OBD_FAST_LOCK;
+ }
+ }
- llap = llap_from_page_with_lockh(page, LLAP_ORIGIN_READPAGE, lockh);
+ llap = llap_from_page_with_lockh(page, LLAP_ORIGIN_READPAGE, lockh,
+ flags);
if (IS_ERR(llap)) {
if (PTR_ERR(llap) == -ENOLCK) {
CWARN("ino %lu page %lu (%llu) not covered by "
};
int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
- struct lov_oinfo *loi, cfs_page_t *page,
- obd_off offset, struct obd_async_page_ops *ops,
- void *data, void **res, int nocache,
- struct lustre_handle *lockh)
+ struct lov_oinfo *loi, cfs_page_t *page,
+ obd_off offset, struct obd_async_page_ops *ops,
+ void *data, void **res, int flags,
+ struct lustre_handle *lockh)
{
struct lov_obd *lov = &exp->exp_obd->u.lov;
struct lov_async_page *lap;
lap->lap_sub_cookie = (void *)lap + size_round(sizeof(*lap));
- if (lockh) {
+ if (lockh && !(flags & OBD_FAST_LOCK)) {
lov_lockh = lov_handle2llh(lockh);
if (lov_lockh) {
lockh = lov_lockh->llh_handles + lap->lap_stripe;
rc = obd_prep_async_page(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
lsm, loi, page, lap->lap_sub_offset,
&lov_async_page_ops, lap,
- &lap->lap_sub_cookie, nocache, lockh);
+ &lap->lap_sub_cookie, flags, lockh);
if (lov_lockh)
lov_llh_put(lov_lockh);
if (rc)
}
static int lov_cancel(struct obd_export *exp, struct lov_stripe_md *lsm,
- __u32 mode, struct lustre_handle *lockh)
+ __u32 mode, struct lustre_handle *lockh, int flags,
+ obd_off end)
{
struct lov_request_set *set;
struct obd_info oinfo;
LASSERT(lockh);
lov = &exp->exp_obd->u.lov;
+ if (flags & OBD_FAST_LOCK) {
+ int stripe = lov_stripe_number(lsm, end);
+ RETURN(obd_cancel(lov->lov_tgts[lsm->lsm_oinfo[stripe]->
+ loi_ost_idx]->ltd_exp, NULL, mode, lockh,
+ flags, end));
+ }
+
rc = lov_prep_cancel_set(exp, &oinfo, lsm, mode, lockh, &set);
if (rc)
RETURN(rc);
this_mode = mode;
rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
- req->rq_oi.oi_md, this_mode, lov_lockhp);
+ req->rq_oi.oi_md, this_mode, lov_lockhp, flags,
+ end);
rc = lov_update_common_set(set, req, rc);
if (rc) {
CERROR("error: cancel objid "LPX64" subobj "
}
EXPORT_SYMBOL(lov_stripe_unlock);
-static int lov_reget_short_lock(struct obd_export *exp,
- struct lov_stripe_md *lsm,
- void **res, int rw,
- obd_off start, obd_off end,
- void **cookie)
+static int lov_get_lock(struct obd_export *exp, struct lov_stripe_md *lsm,
+ void **res, int rw, obd_off start, obd_off end,
+ struct lustre_handle *lockh, int flags)
{
struct lov_async_page *l = *res;
obd_off stripe_start, stripe_end = start;
+ struct lov_lock_handles *lov_lockh = NULL;
+ int rc;
ENTRY;
+ if (lockh && lustre_handle_is_used(lockh) &&
+ !(flags & OBD_FAST_LOCK)) {
+ lov_lockh = lov_handle2llh(lockh);
+ if (lov_lockh == NULL) {
+ CERROR("LOV: invalid lov lock handle %p\n", lockh);
+ RETURN(-EINVAL);
+ }
+ lockh = lov_lockh->llh_handles + l->lap_stripe;
+ }
+
/* ensure we don't cross stripe boundaries */
lov_extent_calc(exp, lsm, OBD_CALC_STRIPE_END, &stripe_end);
if (stripe_end <= end)
- RETURN(0);
+ GOTO(out, rc = 0);
/* map the region limits to the object limits */
lov_stripe_offset(lsm, start, l->lap_stripe, &stripe_start);
lov_stripe_offset(lsm, end, l->lap_stripe, &stripe_end);
- RETURN(obd_reget_short_lock(exp->exp_obd->u.lov.lov_tgts[lsm->
- lsm_oinfo[l->lap_stripe]->loi_ost_idx]->
- ltd_exp, NULL, &l->lap_sub_cookie,
- rw, stripe_start, stripe_end, cookie));
-}
-
-static int lov_release_short_lock(struct obd_export *exp,
- struct lov_stripe_md *lsm, obd_off end,
- void *cookie, int rw)
-{
- int stripe;
-
- ENTRY;
-
- stripe = lov_stripe_number(lsm, end);
-
- RETURN(obd_release_short_lock(exp->exp_obd->u.lov.lov_tgts[lsm->
- lsm_oinfo[stripe]->loi_ost_idx]->
- ltd_exp, NULL, end, cookie, rw));
+ rc = obd_get_lock(exp->exp_obd->u.lov.lov_tgts[lsm->
+ lsm_oinfo[l->lap_stripe]->loi_ost_idx]->
+ ltd_exp, NULL, &l->lap_sub_cookie,
+ rw, stripe_start, stripe_end, lockh, flags);
+out:
+ if (lov_lockh != NULL)
+ lov_llh_put(lov_lockh);
+ RETURN(rc);
}
struct obd_ops lov_obd_ops = {
.o_brw = lov_brw,
.o_brw_async = lov_brw_async,
.o_prep_async_page = lov_prep_async_page,
- .o_reget_short_lock = lov_reget_short_lock,
- .o_release_short_lock = lov_release_short_lock,
+ .o_get_lock = lov_get_lock,
.o_queue_async_io = lov_queue_async_io,
.o_set_async_flags = lov_set_async_flags,
.o_queue_group_io = lov_queue_group_io,
continue;
rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
- req->rq_oi.oi_md, mode, lov_lockhp);
+ req->rq_oi.oi_md, mode, lov_lockhp, 0, 0);
if (rc && lov->lov_tgts[req->rq_idx] &&
lov->lov_tgts[req->rq_idx]->ltd_active)
CERROR("cancelling obdjid "LPX64" on OST "
}
static int mgc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
- __u32 mode, struct lustre_handle *lockh)
+ __u32 mode, struct lustre_handle *lockh, int flags,
+ obd_off end)
{
ENTRY;
/* Now drop the lock so MGS can revoke it */
if (!rcl) {
rcl = mgc_cancel(mgc->u.cli.cl_mgc_mgsexp, NULL,
- LCK_CR, &lockh);
+ LCK_CR, &lockh, 0, 0);
if (rcl)
CERROR("Can't drop cfg lock: %d\n", rcl);
}
LPROCFS_OBD_OP_INIT(num_private_stats, stats, brw);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, brw_async);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, prep_async_page);
- LPROCFS_OBD_OP_INIT(num_private_stats, stats, reget_short_lock);
- LPROCFS_OBD_OP_INIT(num_private_stats, stats, release_short_lock);
+ LPROCFS_OBD_OP_INIT(num_private_stats, stats, get_lock);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, queue_async_io);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, queue_group_io);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, trigger_group_io);
rc = obd_prep_async_page(exp, lsm, NULL, eap->eap_page,
eap->eap_off, &ec_async_page_ops,
- eap, &eap->eap_cookie, 1, NULL);
+ eap, &eap->eap_cookie,
+ OBD_PAGE_NO_CACHE, NULL);
if (rc) {
spin_lock(&eas->eas_lock);
eas->eas_rc = rc;
return (-ENOENT);
rc = obd_cancel(ec->ec_exp, ecl->ecl_object->eco_lsm, ecl->ecl_mode,
- &ecl->ecl_lock_handle);
+ &ecl->ecl_lock_handle, 0, 0);
echo_put_object (ecl->ecl_object);
OBD_FREE (ecl, sizeof (*ecl));
list_del (&ecl->ecl_exp_chain);
rc = obd_cancel(ec->ec_exp, ecl->ecl_object->eco_lsm,
- ecl->ecl_mode, &ecl->ecl_lock_handle);
+ ecl->ecl_mode, &ecl->ecl_lock_handle, 0, 0);
CDEBUG (D_INFO, "Cancel lock on object "LPX64" on disconnect "
"(%d)\n", ecl->ecl_object->eco_id, rc);
if (!lock)
RETURN(-ENOLCK);
- LASSERTF(lock->l_policy_data.l_extent.start <=
- extent->oap_obj_off &&
- extent->oap_obj_off + CFS_PAGE_SIZE - 1 <=
- lock->l_policy_data.l_extent.end,
- "Got wrong lock [" LPU64 "," LPU64 "] for page with "
- "offset " LPU64 "\n",
- lock->l_policy_data.l_extent.start,
- lock->l_policy_data.l_extent.end, extent->oap_obj_off);
+ if(lock->l_policy_data.l_extent.start > extent->oap_obj_off ||
+ extent->oap_obj_off + CFS_PAGE_SIZE - 1 >
+ lock->l_policy_data.l_extent.end) {
+ CDEBUG(D_CACHE, "Got wrong lock [" LPU64 "," LPU64 "] "
+ "for page with offset " LPU64 "\n",
+ lock->l_policy_data.l_extent.start,
+ lock->l_policy_data.l_extent.end,
+ extent->oap_obj_off);
+ RETURN(-ENOLCK);
+ }
} else {
/* Real extent width calculation here once we have real
* extents
RETURN(-EDQUOT);
}
-static int osc_reget_short_lock(struct obd_export *exp,
- struct lov_stripe_md *lsm,
- void **res, int rw,
- obd_off start, obd_off end,
- void **cookie)
+static int osc_get_lock(struct obd_export *exp, struct lov_stripe_md *lsm,
+ void **res, int rw, obd_off start, obd_off end,
+ struct lustre_handle *lockh, int flags)
{
- struct osc_async_page *oap = *res;
- int rc;
+ struct ldlm_lock *lock = NULL;
+ int rc, release = 0;
ENTRY;
- spin_lock(&oap->oap_lock);
- rc = ldlm_lock_fast_match(oap->oap_ldlm_lock, rw,
- start, end, cookie);
- spin_unlock(&oap->oap_lock);
+ if (lockh && lustre_handle_is_used(lockh)) {
+ /* if a valid lockh is passed, just check that the corresponding
+ * lock covers the extent */
+ lock = ldlm_handle2lock(lockh);
+ release = 1;
+ } else {
+ struct osc_async_page *oap = *res;
+ spin_lock(&oap->oap_lock);
+ lock = oap->oap_ldlm_lock;
+ LDLM_LOCK_GET(lock);
+ spin_unlock(&oap->oap_lock);
+ }
+ rc = ldlm_lock_fast_match(lock, rw, start, end, lockh);
+ if (release == 1 && rc == 1)
+ /* if a valid lockh was passed, we just need to check
+ * that the lock covers the page, no reference should be
+ * taken*/
+ ldlm_lock_decref(lockh,
+ rw == OBD_BRW_WRITE ? LCK_PW : LCK_PR);
+ LDLM_LOCK_PUT(lock);
RETURN(rc);
}
-static int osc_release_short_lock(struct obd_export *exp,
- struct lov_stripe_md *lsm, obd_off end,
- void *cookie, int rw)
-{
- ENTRY;
- ldlm_lock_fast_release(cookie, rw);
- /* no error could have happened at this layer */
- RETURN(0);
-}
-
int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
struct lov_oinfo *loi, cfs_page_t *page,
obd_off offset, struct obd_async_page_ops *ops,
- void *data, void **res, int nocache,
+ void *data, void **res, int flags,
struct lustre_handle *lockh)
{
struct osc_async_page *oap;
spin_lock_init(&oap->oap_lock);
/* If the page was marked as notcacheable - don't add to any locks */
- if (!nocache) {
+ if (!(flags & OBD_PAGE_NO_CACHE)) {
osc_build_res_name(loi->loi_id, loi->loi_gr, &oid);
/* This is the only place where we can call cache_add_extent
without oap_lock, because this page is locked now, and
}
static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
- __u32 mode, struct lustre_handle *lockh)
+ __u32 mode, struct lustre_handle *lockh, int flags,
+ obd_off end)
{
ENTRY;
.o_brw = osc_brw,
.o_brw_async = osc_brw_async,
.o_prep_async_page = osc_prep_async_page,
- .o_reget_short_lock = osc_reget_short_lock,
- .o_release_short_lock = osc_release_short_lock,
+ .o_get_lock = osc_get_lock,
.o_queue_async_io = osc_queue_async_io,
.o_set_async_flags = osc_set_async_flags,
.o_queue_group_io = osc_queue_group_io,