void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode);
void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode);
void ldlm_lock_allow_match(struct ldlm_lock *lock);
+int ldlm_lock_fast_match(struct ldlm_lock *, int, loff_t, loff_t, void **);
+void ldlm_lock_fast_release(void *, int);
ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags,
struct ldlm_res_id *, ldlm_type_t type,
ldlm_policy_data_t *, ldlm_mode_t mode,
struct obd_async_page_ops *ops, void *data,
void **res, int nocache,
struct lustre_handle *lockh);
+ int (*o_reget_short_lock)(struct obd_export *exp,
+ struct lov_stripe_md *lsm,
+ void **res, int rw,
+ loff_t start, loff_t end,
+ void **cookie);
+ int (*o_release_short_lock)(struct obd_export *exp,
+ struct lov_stripe_md *lsm, loff_t end,
+ void *cookie, int rw);
int (*o_queue_async_io)(struct obd_export *exp,
struct lov_stripe_md *lsm,
struct lov_oinfo *loi, void *cookie,
RETURN(ret);
}
+static inline int obd_reget_short_lock(struct obd_export *exp,
+ struct lov_stripe_md *lsm,
+ void **res, int rw,
+ loff_t start, loff_t end,
+ void **cookie)
+{
+ ENTRY;
+
+ OBD_CHECK_OP(exp->exp_obd, reget_short_lock, -EOPNOTSUPP);
+ EXP_COUNTER_INCREMENT(exp, reget_short_lock);
+
+ RETURN(OBP(exp->exp_obd, reget_short_lock)(exp, lsm, res, rw,
+ start, end, cookie));
+}
+
+static inline int obd_release_short_lock(struct obd_export *exp,
+ struct lov_stripe_md *lsm, loff_t end,
+ void *cookie, int rw)
+{
+ ENTRY;
+
+ OBD_CHECK_OP(exp->exp_obd, release_short_lock, -EOPNOTSUPP);
+ EXP_COUNTER_INCREMENT(exp, release_short_lock);
+
+ RETURN(OBP(exp->exp_obd, release_short_lock)(exp, lsm, end,
+ cookie, rw));
+}
+
static inline int obd_queue_async_io(struct obd_export *exp,
struct lov_stripe_md *lsm,
struct lov_oinfo *loi, void *cookie,
# include <libcfs/kp30.h>
#endif
+#include <linux/fs.h>
#include <obd_class.h>
#include "ldlm_internal.h"
unlock_res_and_lock(lock);
}
+int ldlm_lock_fast_match(struct ldlm_lock *lock, int rw,
+ loff_t start, loff_t end,
+ void **cookie)
+{
+ LASSERT(rw == READ || rw == WRITE);
+ /* should LCK_GROUP be handled in a special way? */
+ if (lock && (rw == READ || (lock->l_granted_mode & (LCK_PW|LCK_GROUP))) &&
+ (lock->l_policy_data.l_extent.start <= start) &&
+ (lock->l_policy_data.l_extent.end >= end)) {
+ ldlm_lock_addref_internal(lock, rw == WRITE ? LCK_PW : LCK_PR);
+ *cookie = (void *)lock;
+ return 1; /* avoid using rc for stack relief */
+ }
+ return 0;
+}
+
+void ldlm_lock_fast_release(void *cookie, int rw)
+{
+ struct ldlm_lock *lock = (struct ldlm_lock *)cookie;
+
+ LASSERT(lock != NULL);
+ LASSERT(rw == READ || rw == WRITE);
+ LASSERT(rw == READ || (lock->l_granted_mode & (LCK_PW | LCK_GROUP)));
+ ldlm_lock_decref_internal(lock, rw == WRITE ? LCK_PW : LCK_PR);
+}
+
/* Can be called in two ways:
*
* If 'ns' is NULL, then lockh describes an existing lock that we want to look
EXPORT_SYMBOL(__ldlm_handle2lock);
EXPORT_SYMBOL(ldlm_lock_get);
EXPORT_SYMBOL(ldlm_lock_put);
+EXPORT_SYMBOL(ldlm_lock_fast_match);
+EXPORT_SYMBOL(ldlm_lock_fast_release);
EXPORT_SYMBOL(ldlm_lock_match);
EXPORT_SYMBOL(ldlm_lock_cancel);
EXPORT_SYMBOL(ldlm_lock_addref);
return 0;
}
+static int ll_reget_short_lock(struct page *page, int rw,
+ loff_t start, loff_t end,
+ void **cookie)
+{
+ struct ll_async_page *llap;
+ struct obd_export *exp;
+ struct inode *inode = page->mapping->host;
+
+ ENTRY;
+
+ exp = ll_i2obdexp(inode);
+ if (exp == NULL)
+ RETURN(0);
+
+ llap = llap_cast_private(page);
+ if (llap == NULL)
+ RETURN(0);
+
+ RETURN(obd_reget_short_lock(exp, ll_i2info(inode)->lli_smd,
+ &llap->llap_cookie, rw, start, end,
+ cookie));
+}
+
+static void ll_release_short_lock(struct inode *inode, loff_t end,
+ void *cookie, int rw)
+{
+ struct obd_export *exp;
+ int rc;
+
+ exp = ll_i2obdexp(inode);
+ if (exp == NULL)
+ return;
+
+ rc = obd_release_short_lock(exp, ll_i2info(inode)->lli_smd, end,
+ cookie, rw);
+ if (rc < 0)
+ CERROR("unlock failed (%d)\n", rc);
+}
+
+static inline int ll_file_get_fast_lock(struct file *file,
+ loff_t ppos, loff_t end,
+ const struct iovec *iov,
+ unsigned long nr_segs,
+ void **cookie, int rw)
+{
+ int rc = 0, seg;
+ struct page *page;
+
+ ENTRY;
+
+ /* we would like this read request to be lockfree */
+ for (seg = 0; seg < nr_segs; seg++) {
+ const struct iovec *iv = &iov[seg];
+ if (ll_region_mapped((unsigned long)iv->iov_base, iv->iov_len))
+ GOTO(out, rc);
+ }
+
+ page = find_lock_page(file->f_dentry->d_inode->i_mapping,
+ ppos >> CFS_PAGE_SHIFT);
+ if (page) {
+ if (ll_reget_short_lock(page, rw, ppos, end, cookie))
+ rc = 1;
+
+ unlock_page(page);
+ page_cache_release(page);
+ }
+
+out:
+ RETURN(rc);
+}
+
+static inline void ll_file_put_fast_lock(struct inode *inode, loff_t end,
+ void *cookie, int rw)
+{
+ ll_release_short_lock(inode, end, cookie, rw);
+}
+
+enum ll_lock_style {
+ LL_LOCK_STYLE_NOLOCK = 0,
+ LL_LOCK_STYLE_FASTLOCK = 1,
+ LL_LOCK_STYLE_TREELOCK = 2
+};
+
+static inline int ll_file_get_lock(struct file *file, loff_t ppos, loff_t end,
+ const struct iovec *iov, unsigned long nr_segs,
+ void **cookie, struct ll_lock_tree *tree,
+ int rw)
+{
+ int rc;
+
+ ENTRY;
+
+ if (ll_file_get_fast_lock(file, ppos, end, iov, nr_segs, cookie, rw))
+ RETURN(LL_LOCK_STYLE_FASTLOCK);
+
+ rc = ll_file_get_tree_lock_iov(tree, file, iov, nr_segs,
+ ppos, end, rw);
+ /* rc: 1 for tree lock, 0 for no lock, <0 for error */
+ switch (rc) {
+ case 1:
+ RETURN(LL_LOCK_STYLE_TREELOCK);
+ case 0:
+ RETURN(LL_LOCK_STYLE_NOLOCK);
+ }
+
+ /* an error happened if we reached this point, rc = -errno here */
+ RETURN(rc);
+}
+
+static inline void ll_file_put_lock(struct inode *inode, loff_t end,
+ enum ll_lock_style lock_style,
+ void *cookie, struct ll_lock_tree *tree, int rw)
+
+{
+ switch (lock_style) {
+ case LL_LOCK_STYLE_TREELOCK:
+ ll_tree_unlock(tree);
+ break;
+ case LL_LOCK_STYLE_FASTLOCK:
+ ll_file_put_fast_lock(inode, end, cookie, rw);
+ break;
+ default:
+ CERROR("invalid locking style (%d)\n", lock_style);
+ }
+}
+
#ifdef HAVE_FILE_READV
static ssize_t ll_file_readv(struct file *file, const struct iovec *iov,
unsigned long nr_segs, loff_t *ppos)
int ra = 0;
loff_t end;
ssize_t retval, chunk, sum = 0;
- int tree_locked;
+ int lock_style;
struct iovec *iov_copy = NULL;
unsigned long nrsegs_copy, nrsegs_orig = 0;
size_t count, iov_offset = 0;
__u64 kms;
+ void *cookie;
ENTRY;
count = ll_file_get_iov_count(iov, &nr_segs);
RETURN(-EFAULT);
RETURN(sum);
}
+
repeat:
if (sbi->ll_max_rw_chunk != 0) {
/* first, let's know the end of the current stripe */
nrsegs_copy = nr_segs;
}
- tree_locked = ll_file_get_tree_lock_iov(&tree, file, iov_copy,
- nrsegs_copy, *ppos, end, READ);
- if (tree_locked < 0)
- GOTO(out, retval = tree_locked);
+ lock_style = ll_file_get_lock(file, *ppos, end, iov_copy, nrsegs_copy,
+ &cookie, &tree, READ);
+ if (lock_style < 0)
+ GOTO(out, retval = lock_style);
ll_inode_size_lock(inode, 1);
/*
ll_inode_size_unlock(inode, 1);
retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
if (retval) {
- if (tree_locked)
- ll_tree_unlock(&tree);
+ if (lock_style != LL_LOCK_STYLE_NOLOCK)
+ ll_file_put_lock(inode, end, lock_style,
+ cookie, &tree, READ);
goto out;
}
} else {
inode->i_ino, chunk, *ppos, i_size_read(inode));
/* turn off the kernel's read-ahead */
- if (tree_locked) {
+ if (lock_style != LL_LOCK_STYLE_NOLOCK) {
#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
file->f_ramax = 0;
#else
retval = generic_file_aio_read(iocb, iov_copy, nrsegs_copy,
*ppos);
#endif
- ll_tree_unlock(&tree);
+ ll_file_put_lock(inode, end, lock_style, cookie, &tree, READ);
} else {
retval = ll_file_lockless_io(file, iov_copy, nrsegs_copy, ppos,
READ, chunk);
extern struct inode_operations ll_file_inode_operations;
extern int ll_inode_revalidate_it(struct dentry *, struct lookup_intent *);
extern int ll_have_md_lock(struct inode *inode, __u64 bits);
+int ll_region_mapped(unsigned long addr, size_t count);
int ll_extent_lock(struct ll_file_data *, struct inode *,
struct lov_stripe_md *, int mode, ldlm_policy_data_t *,
struct lustre_handle *, int ast_flags);
RETURN(ret);
}
+int ll_region_mapped(unsigned long addr, size_t count)
+{
+ return !!our_vma(addr, count);
+}
+
int lt_get_mmap_locks(struct ll_lock_tree *tree,
unsigned long addr, size_t count)
{
}
EXPORT_SYMBOL(lov_stripe_unlock);
+static int lov_reget_short_lock(struct obd_export *exp,
+ struct lov_stripe_md *lsm,
+ void **res, int rw,
+ loff_t start, loff_t end,
+ void **cookie)
+{
+ struct lov_async_page *l = *res;
+ loff_t stripe_start, stripe_end = start;
+
+ ENTRY;
+
+ /* ensure we don't cross stripe boundaries */
+ lov_extent_calc(exp, lsm, OBD_CALC_STRIPE_END, (obd_off *)&stripe_end);
+ if (stripe_end <= end)
+ RETURN(0);
+
+ /* map the region limits to the object limits */
+ lov_stripe_offset(lsm, start, l->lap_stripe, &stripe_start);
+ lov_stripe_offset(lsm, end, l->lap_stripe, &stripe_end);
+
+ RETURN(obd_reget_short_lock(exp->exp_obd->u.lov.lov_tgts[lsm->
+ lsm_oinfo[l->lap_stripe]->loi_ost_idx]->
+ ltd_exp, NULL, &l->lap_sub_cookie,
+ rw, stripe_start, stripe_end, cookie));
+}
+
+static int lov_release_short_lock(struct obd_export *exp,
+ struct lov_stripe_md *lsm, loff_t end,
+ void *cookie, int rw)
+{
+ int stripe;
+
+ ENTRY;
+
+ stripe = lov_stripe_number(lsm, end);
+
+ RETURN(obd_release_short_lock(exp->exp_obd->u.lov.lov_tgts[lsm->
+ lsm_oinfo[stripe]->loi_ost_idx]->
+ ltd_exp, NULL, end, cookie, rw));
+}
+
struct obd_ops lov_obd_ops = {
.o_owner = THIS_MODULE,
.o_setup = lov_setup,
.o_brw = lov_brw,
.o_brw_async = lov_brw_async,
.o_prep_async_page = lov_prep_async_page,
+ .o_reget_short_lock = lov_reget_short_lock,
+ .o_release_short_lock = lov_release_short_lock,
.o_queue_async_io = lov_queue_async_io,
.o_set_async_flags = lov_set_async_flags,
.o_queue_group_io = lov_queue_group_io,
LPROCFS_OBD_OP_INIT(num_private_stats, stats, brw);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, brw_async);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, prep_async_page);
+ LPROCFS_OBD_OP_INIT(num_private_stats, stats, reget_short_lock);
+ LPROCFS_OBD_OP_INIT(num_private_stats, stats, release_short_lock);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, queue_async_io);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, queue_group_io);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, trigger_group_io);
RETURN(-EDQUOT);
}
+static int osc_reget_short_lock(struct obd_export *exp,
+ struct lov_stripe_md *lsm,
+ void **res, int rw,
+ loff_t start, loff_t end,
+ void **cookie)
+{
+ struct osc_async_page *oap = *res;
+ int rc;
+
+ ENTRY;
+
+ spin_lock(&oap->oap_lock);
+ rc = ldlm_lock_fast_match(oap->oap_ldlm_lock, rw,
+ start, end, cookie);
+ spin_unlock(&oap->oap_lock);
+
+ RETURN(rc);
+}
+
+static int osc_release_short_lock(struct obd_export *exp,
+ struct lov_stripe_md *lsm, loff_t end,
+ void *cookie, int rw)
+{
+ ENTRY;
+ ldlm_lock_fast_release(cookie, rw);
+ /* no error could have happened at this layer */
+ RETURN(0);
+}
+
int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
struct lov_oinfo *loi, cfs_page_t *page,
obd_off offset, struct obd_async_page_ops *ops,
struct osc_async_page *oap;
struct ldlm_res_id oid = {{0}};
int rc = 0;
-
+
ENTRY;
if (!page)
.o_brw = osc_brw,
.o_brw_async = osc_brw_async,
.o_prep_async_page = osc_prep_async_page,
+ .o_reget_short_lock = osc_reget_short_lock,
+ .o_release_short_lock = osc_release_short_lock,
.o_queue_async_io = osc_queue_async_io,
.o_set_async_flags = osc_set_async_flags,
.o_queue_group_io = osc_queue_group_io,