* \see cl_page_is_under_lock()
*/
int (*cpo_is_under_lock)(const struct lu_env *env,
- const struct cl_page_slice *slice,
- struct cl_io *io);
+ const struct cl_page_slice *slice,
+ struct cl_io *io, pgoff_t *max);
/**
* Optional debugging helper. Prints given page slice.
}
void cl_page_slice_add(struct cl_page *page, struct cl_page_slice *slice,
- struct cl_object *obj,
- const struct cl_page_operations *ops);
+ struct cl_object *obj, pgoff_t index,
+ const struct cl_page_operations *ops);
void cl_lock_slice_add(struct cl_lock *lock, struct cl_lock_slice *slice,
struct cl_object *obj,
const struct cl_lock_operations *ops);
void cl_page_export (const struct lu_env *env,
struct cl_page *pg, int uptodate);
int cl_page_is_under_lock(const struct lu_env *env, struct cl_io *io,
- struct cl_page *page);
+ struct cl_page *page, pgoff_t *max_index);
loff_t cl_offset (const struct cl_object *obj, pgoff_t idx);
pgoff_t cl_index (const struct cl_object *obj, loff_t offset);
int cl_page_size (const struct cl_object *obj);
const struct cl_object *obj, struct ost_lvb *lvb);
int ccc_conf_set(const struct lu_env *env, struct cl_object *obj,
const struct cl_object_conf *conf);
-int ccc_page_is_under_lock(const struct lu_env *env,
- const struct cl_page_slice *slice, struct cl_io *io);
int ccc_fail(const struct lu_env *env, const struct cl_page_slice *slice);
void ccc_transient_page_verify(const struct cl_page *page);
int ccc_transient_page_own(const struct lu_env *env,
*
*/
-int ccc_page_is_under_lock(const struct lu_env *env,
- const struct cl_page_slice *slice,
- struct cl_io *io)
-{
- struct ccc_io *cio = ccc_env_io(env);
- struct cl_lock_descr *desc = &ccc_env_info(env)->cti_descr;
- struct cl_page *page = slice->cpl_page;
-
- int result;
-
- ENTRY;
-
- if (io->ci_type == CIT_READ || io->ci_type == CIT_WRITE ||
- io->ci_type == CIT_FAULT) {
- if (cio->cui_fd->fd_flags & LL_FILE_GROUP_LOCKED)
- result = -EBUSY;
- else {
- desc->cld_start = ccc_index(cl2ccc_page(slice));
- desc->cld_end = ccc_index(cl2ccc_page(slice));
- desc->cld_obj = page->cp_obj;
- desc->cld_mode = CLM_READ;
- result = cl_queue_match(&io->ci_lockset.cls_done,
- desc) ? -EBUSY : 0;
- }
- } else
- result = 0;
- RETURN(result);
-}
-
int ccc_fail(const struct lu_env *env, const struct cl_page_slice *slice)
{
/*
} else {
struct ccc_object *clobj = cl2ccc(obj);
- cl_page_slice_add(page, &cpg->cpg_cl, obj,
- &slp_transient_page_ops);
+ cl_page_slice_add(page, &cpg->cpg_cl, obj, index,
+ &slp_transient_page_ops);
clobj->cob_transient_pages++;
}
.cpo_discard = ccc_transient_page_discard,
.cpo_is_vmlocked = slp_page_is_vmlocked,
.cpo_fini = slp_transient_page_fini,
- .cpo_is_under_lock = ccc_page_is_under_lock,
.io = {
[CRT_READ] = {
.cpo_completion = slp_page_completion_read,
RA_STAT_EOF,
RA_STAT_MAX_IN_FLIGHT,
RA_STAT_WRONG_GRAB_PAGE,
- _NR_RA_STAT,
+ RA_STAT_FAILED_REACH_END,
+ _NR_RA_STAT,
};
struct ll_ra_info {
void ll_clear_file_contended(struct inode*);
int ll_sync_page_range(struct inode *, struct address_space *, loff_t, size_t);
int ll_readahead(const struct lu_env *env, struct cl_io *io,
- struct ll_readahead_state *ras, struct address_space *mapping,
- struct cl_page_list *queue, int flags);
+ struct cl_page_list *queue, struct ll_readahead_state *ras,
+ bool hit);
int vvp_io_write_commit(const struct lu_env *env, struct cl_io *io);
struct ll_cl_context *ll_cl_init(struct file *file, struct page *vmpage);
void ll_cl_fini(struct ll_cl_context *lcc);
unsigned hit);
void ll_ra_count_put(struct ll_sb_info *sbi, unsigned long len);
int ll_is_file_contended(struct file *file);
-void ll_ra_stats_inc(struct address_space *mapping, enum ra_stat which);
+void ll_ra_stats_inc(struct inode *inode, enum ra_stat which);
/* llite/llite_rmtacl.c */
#ifdef CONFIG_FS_POSIX_ACL
EXPORT_SYMBOL(ll_stats_ops_tally);
static const char *ra_stat_string[] = {
- [RA_STAT_HIT] = "hits",
- [RA_STAT_MISS] = "misses",
- [RA_STAT_DISTANT_READPAGE] = "readpage not consecutive",
- [RA_STAT_MISS_IN_WINDOW] = "miss inside window",
- [RA_STAT_FAILED_GRAB_PAGE] = "failed grab_cache_page",
- [RA_STAT_FAILED_MATCH] = "failed lock match",
- [RA_STAT_DISCARDED] = "read but discarded",
- [RA_STAT_ZERO_LEN] = "zero length file",
- [RA_STAT_ZERO_WINDOW] = "zero size window",
- [RA_STAT_EOF] = "read-ahead to EOF",
- [RA_STAT_MAX_IN_FLIGHT] = "hit max r-a issue",
- [RA_STAT_WRONG_GRAB_PAGE] = "wrong page from grab_cache_page",
+ [RA_STAT_HIT] = "hits",
+ [RA_STAT_MISS] = "misses",
+ [RA_STAT_DISTANT_READPAGE] = "readpage not consecutive",
+ [RA_STAT_MISS_IN_WINDOW] = "miss inside window",
+ [RA_STAT_FAILED_GRAB_PAGE] = "failed grab_cache_page",
+ [RA_STAT_FAILED_MATCH] = "failed lock match",
+ [RA_STAT_DISCARDED] = "read but discarded",
+ [RA_STAT_ZERO_LEN] = "zero length file",
+ [RA_STAT_ZERO_WINDOW] = "zero size window",
+ [RA_STAT_EOF] = "read-ahead to EOF",
+ [RA_STAT_MAX_IN_FLIGHT] = "hit max r-a issue",
+ [RA_STAT_WRONG_GRAB_PAGE] = "wrong page from grab_cache_page",
+ [RA_STAT_FAILED_REACH_END] = "failed to reach end"
};
LPROC_SEQ_FOPS_RO_TYPE(llite, name);
* get a zero ra window, although there is still ra space remaining. - Jay */
static unsigned long ll_ra_count_get(struct ll_sb_info *sbi,
- struct ra_io_arg *ria,
- unsigned long pages)
+ struct ra_io_arg *ria,
+ unsigned long pages, unsigned long min)
{
struct ll_ra_info *ra = &sbi->ll_ra_info;
long ret;
/* If read-ahead pages left are less than 1M, do not do read-ahead,
* otherwise it will form small read RPC(< 1M), which hurt server
* performance a lot. */
- ret = min(ra->ra_max_pages - cfs_atomic_read(&ra->ra_cur_pages), pages);
+ ret = min(ra->ra_max_pages - atomic_read(&ra->ra_cur_pages), pages);
if (ret < 0 || ret < min_t(long, PTLRPC_MAX_BRW_PAGES, pages))
GOTO(out, ret = 0);
ret -= beyond_rpc;
}
- if (cfs_atomic_add_return(ret, &ra->ra_cur_pages) > ra->ra_max_pages) {
- cfs_atomic_sub(ret, &ra->ra_cur_pages);
- ret = 0;
- }
+ if (atomic_add_return(ret, &ra->ra_cur_pages) > ra->ra_max_pages) {
+ atomic_sub(ret, &ra->ra_cur_pages);
+ ret = 0;
+ }
out:
- RETURN(ret);
+ if (ret < min) {
+ /* override ra limit for maximum performance */
+ atomic_add(min - ret, &ra->ra_cur_pages);
+ ret = min;
+ }
+ RETURN(ret);
}
void ll_ra_count_put(struct ll_sb_info *sbi, unsigned long len)
{
- struct ll_ra_info *ra = &sbi->ll_ra_info;
- cfs_atomic_sub(len, &ra->ra_cur_pages);
+ struct ll_ra_info *ra = &sbi->ll_ra_info;
+ atomic_sub(len, &ra->ra_cur_pages);
}
static void ll_ra_stats_inc_sbi(struct ll_sb_info *sbi, enum ra_stat which)
lprocfs_counter_incr(sbi->ll_ra_stats, which);
}
-void ll_ra_stats_inc(struct address_space *mapping, enum ra_stat which)
+void ll_ra_stats_inc(struct inode *inode, enum ra_stat which)
{
- struct ll_sb_info *sbi = ll_i2sbi(mapping->host);
- ll_ra_stats_inc_sbi(sbi, which);
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
+ ll_ra_stats_inc_sbi(sbi, which);
}
#define RAS_CDEBUG(ras) \
static int cl_read_ahead_page(const struct lu_env *env, struct cl_io *io,
struct cl_page_list *queue, struct cl_page *page,
- struct cl_object *clob)
+ struct cl_object *clob, pgoff_t *max_index)
{
struct page *vmpage = page->cp_vmpage;
struct ccc_page *cp;
lu_ref_add(&page->cp_reference, "ra", current);
cp = cl2ccc_page(cl_object_page_slice(clob, page));
if (!cp->cpg_defer_uptodate && !PageUptodate(vmpage)) {
- rc = cl_page_is_under_lock(env, io, page);
- if (rc == -EBUSY) {
+ CDEBUG(D_READA, "page index %lu, max_index: %lu\n",
+ ccc_index(cp), *max_index);
+ if (*max_index == 0 || ccc_index(cp) > *max_index)
+ rc = cl_page_is_under_lock(env, io, page, max_index);
+ if (rc == 0) {
cp->cpg_defer_uptodate = 1;
cp->cpg_ra_used = 0;
cl_page_list_add(queue, page);
* \retval -ve, 0: page wasn't added to \a queue for other reason.
*/
static int ll_read_ahead_page(const struct lu_env *env, struct cl_io *io,
- struct cl_page_list *queue,
- pgoff_t index, struct address_space *mapping)
+ struct cl_page_list *queue,
+ pgoff_t index, pgoff_t *max_index)
{
+ struct cl_object *clob = io->ci_obj;
+ struct inode *inode = ccc_object_inode(clob);
struct page *vmpage;
- struct cl_object *clob = ll_i2info(mapping->host)->lli_clob;
struct cl_page *page;
enum ra_stat which = _NR_RA_STAT; /* keep gcc happy */
unsigned int gfp_mask;
#ifdef __GFP_NOWARN
gfp_mask |= __GFP_NOWARN;
#endif
- vmpage = grab_cache_page_nowait(mapping, index);
- if (vmpage != NULL) {
- /* Check if vmpage was truncated or reclaimed */
- if (vmpage->mapping == mapping) {
- page = cl_page_find(env, clob, vmpage->index,
- vmpage, CPT_CACHEABLE);
- if (!IS_ERR(page)) {
- rc = cl_read_ahead_page(env, io, queue,
- page, clob);
+ vmpage = grab_cache_page_nowait(inode->i_mapping, index);
+ if (vmpage != NULL) {
+ /* Check if vmpage was truncated or reclaimed */
+ if (vmpage->mapping == inode->i_mapping) {
+ page = cl_page_find(env, clob, vmpage->index,
+ vmpage, CPT_CACHEABLE);
+ if (!IS_ERR(page)) {
+ rc = cl_read_ahead_page(env, io, queue,
+ page, clob, max_index);
if (rc == -ENOLCK) {
which = RA_STAT_FAILED_MATCH;
msg = "lock match failed";
which = RA_STAT_FAILED_GRAB_PAGE;
msg = "g_c_p_n failed";
}
- if (msg != NULL) {
- ll_ra_stats_inc(mapping, which);
- CDEBUG(D_READA, "%s\n", msg);
- }
- RETURN(rc);
+ if (msg != NULL) {
+ ll_ra_stats_inc(inode, which);
+ CDEBUG(D_READA, "%s\n", msg);
+ }
+ RETURN(rc);
}
#define RIA_DEBUG(ria) \
struct cl_io *io, struct cl_page_list *queue,
struct ra_io_arg *ria,
unsigned long *reserved_pages,
- struct address_space *mapping,
unsigned long *ra_end)
{
- int rc, count = 0, stride_ria;
- unsigned long page_idx;
-
- LASSERT(ria != NULL);
- RIA_DEBUG(ria);
-
- stride_ria = ria->ria_length > ria->ria_pages && ria->ria_pages > 0;
- for (page_idx = ria->ria_start; page_idx <= ria->ria_end &&
- *reserved_pages > 0; page_idx++) {
- if (ras_inside_ra_window(page_idx, ria)) {
- /* If the page is inside the read-ahead window*/
- rc = ll_read_ahead_page(env, io, queue,
- page_idx, mapping);
+ int rc, count = 0;
+ bool stride_ria;
+ pgoff_t page_idx;
+ pgoff_t max_index = 0;
+
+ LASSERT(ria != NULL);
+ RIA_DEBUG(ria);
+
+ stride_ria = ria->ria_length > ria->ria_pages && ria->ria_pages > 0;
+ for (page_idx = ria->ria_start;
+ page_idx <= ria->ria_end && *reserved_pages > 0; page_idx++) {
+ if (ras_inside_ra_window(page_idx, ria)) {
+ /* If the page is inside the read-ahead window*/
+ rc = ll_read_ahead_page(env, io, queue,
+ page_idx, &max_index);
if (rc == 1) {
(*reserved_pages)--;
- count ++;
+ count++;
} else if (rc == -ENOLCK)
break;
} else if (stride_ria) {
}
int ll_readahead(const struct lu_env *env, struct cl_io *io,
- struct ll_readahead_state *ras, struct address_space *mapping,
- struct cl_page_list *queue, int flags)
+ struct cl_page_list *queue, struct ll_readahead_state *ras,
+ bool hit)
{
- struct vvp_io *vio = vvp_env_io(env);
- struct vvp_thread_info *vti = vvp_env_info(env);
- struct cl_attr *attr = ccc_env_thread_attr(env);
- unsigned long start = 0, end = 0, reserved;
- unsigned long ra_end, len;
- struct inode *inode;
- struct ll_ra_read *bead;
- struct ra_io_arg *ria = &vti->vti_ria;
- struct ll_inode_info *lli;
- struct cl_object *clob;
- int ret = 0;
- __u64 kms;
- ENTRY;
+ struct vvp_io *vio = vvp_env_io(env);
+ struct vvp_thread_info *vti = vvp_env_info(env);
+ struct cl_attr *attr = ccc_env_thread_attr(env);
+ unsigned long start = 0, end = 0, reserved;
+ unsigned long ra_end, len, mlen = 0;
+ struct inode *inode;
+ struct ll_ra_read *bead;
+ struct ra_io_arg *ria = &vti->vti_ria;
+ struct cl_object *clob;
+ int ret = 0;
+ __u64 kms;
+ ENTRY;
- inode = mapping->host;
- lli = ll_i2info(inode);
- clob = lli->lli_clob;
+ clob = io->ci_obj;
+ inode = ccc_object_inode(clob);
- memset(ria, 0, sizeof *ria);
+ memset(ria, 0, sizeof *ria);
- cl_object_attr_lock(clob);
- ret = cl_object_attr_get(env, clob, attr);
- cl_object_attr_unlock(clob);
+ cl_object_attr_lock(clob);
+ ret = cl_object_attr_get(env, clob, attr);
+ cl_object_attr_unlock(clob);
- if (ret != 0)
- RETURN(ret);
- kms = attr->cat_kms;
- if (kms == 0) {
- ll_ra_stats_inc(mapping, RA_STAT_ZERO_LEN);
- RETURN(0);
- }
+ if (ret != 0)
+ RETURN(ret);
+ kms = attr->cat_kms;
+ if (kms == 0) {
+ ll_ra_stats_inc(inode, RA_STAT_ZERO_LEN);
+ RETURN(0);
+ }
spin_lock(&ras->ras_lock);
if (vio->cui_ra_window_set)
}
spin_unlock(&ras->ras_lock);
- if (end == 0) {
- ll_ra_stats_inc(mapping, RA_STAT_ZERO_WINDOW);
- RETURN(0);
- }
- len = ria_page_count(ria);
- if (len == 0)
- RETURN(0);
+ if (end == 0) {
+ ll_ra_stats_inc(inode, RA_STAT_ZERO_WINDOW);
+ RETURN(0);
+ }
+ len = ria_page_count(ria);
+ if (len == 0) {
+ ll_ra_stats_inc(inode, RA_STAT_ZERO_WINDOW);
+ RETURN(0);
+ }
- reserved = ll_ra_count_get(ll_i2sbi(inode), ria, len);
- if (reserved < len)
- ll_ra_stats_inc(mapping, RA_STAT_MAX_IN_FLIGHT);
+ CDEBUG(D_READA, DFID": ria: %lu/%lu, bead: %lu/%lu, hit: %d\n",
+ PFID(lu_object_fid(&clob->co_lu)),
+ ria->ria_start, ria->ria_end,
+ bead == NULL ? 0 : bead->lrr_start,
+ bead == NULL ? 0 : bead->lrr_count,
+ hit);
+
+ /* at least to extend the readahead window to cover current read */
+ if (!hit && bead != NULL &&
+ bead->lrr_start + bead->lrr_count > ria->ria_start) {
+ /* to the end of current read window. */
+ mlen = bead->lrr_start + bead->lrr_count - ria->ria_start;
+ /* trim to RPC boundary */
+ start = ria->ria_start & (PTLRPC_MAX_BRW_PAGES - 1);
+ mlen = min(mlen, PTLRPC_MAX_BRW_PAGES - start);
+ }
+
+ reserved = ll_ra_count_get(ll_i2sbi(inode), ria, len, mlen);
+ if (reserved < len)
+ ll_ra_stats_inc(inode, RA_STAT_MAX_IN_FLIGHT);
- CDEBUG(D_READA, "reserved page %lu ra_cur %d ra_max %lu\n", reserved,
- cfs_atomic_read(&ll_i2sbi(inode)->ll_ra_info.ra_cur_pages),
- ll_i2sbi(inode)->ll_ra_info.ra_max_pages);
+ CDEBUG(D_READA, "reserved pages: %lu/%lu/%lu, ra_cur %d, ra_max %lu\n",
+ reserved, len, mlen,
+ atomic_read(&ll_i2sbi(inode)->ll_ra_info.ra_cur_pages),
+ ll_i2sbi(inode)->ll_ra_info.ra_max_pages);
- ret = ll_read_ahead_pages(env, io, queue,
- ria, &reserved, mapping, &ra_end);
+ ret = ll_read_ahead_pages(env, io, queue, ria, &reserved, &ra_end);
- if (reserved != 0)
- ll_ra_count_put(ll_i2sbi(inode), reserved);
+ if (reserved != 0)
+ ll_ra_count_put(ll_i2sbi(inode), reserved);
if (ra_end == end + 1 && ra_end == (kms >> PAGE_CACHE_SHIFT))
- ll_ra_stats_inc(mapping, RA_STAT_EOF);
+ ll_ra_stats_inc(inode, RA_STAT_EOF);
- /* if we didn't get to the end of the region we reserved from
- * the ras we need to go back and update the ras so that the
- * next read-ahead tries from where we left off. we only do so
- * if the region we failed to issue read-ahead on is still ahead
- * of the app and behind the next index to start read-ahead from */
- CDEBUG(D_READA, "ra_end %lu end %lu stride end %lu \n",
- ra_end, end, ria->ria_end);
+ /* if we didn't get to the end of the region we reserved from
+ * the ras we need to go back and update the ras so that the
+ * next read-ahead tries from where we left off. we only do so
+ * if the region we failed to issue read-ahead on is still ahead
+ * of the app and behind the next index to start read-ahead from */
+ CDEBUG(D_READA, "ra_end %lu end %lu stride end %lu \n",
+ ra_end, end, ria->ria_end);
if (ra_end != end + 1) {
+ ll_ra_stats_inc(inode, RA_STAT_FAILED_REACH_END);
spin_lock(&ras->ras_lock);
if (ra_end < ras->ras_next_readahead &&
index_in_window(ra_end, ras->ras_window_start, 0,
ras->ras_last_readpage = index;
ras_set_start(inode, ras, index);
- if (stride_io_mode(ras))
+ if (stride_io_mode(ras)) {
/* Since stride readahead is sentivite to the offset
* of read-ahead, so we use original offset here,
* instead of ras_window_start, which is RPC aligned */
ras->ras_next_readahead = max(index, ras->ras_next_readahead);
- else
- ras->ras_next_readahead = max(ras->ras_window_start,
- ras->ras_next_readahead);
+ } else {
+ if (ras->ras_next_readahead < ras->ras_window_start)
+ ras->ras_next_readahead = ras->ras_window_start;
+ if (!hit)
+ ras->ras_next_readahead = index + 1;
+ }
RAS_CDEBUG(ras);
/* Trigger RA in the mmap case where ras_consecutive_requests
const struct cl_io_slice *ios,
const struct cl_page_slice *slice)
{
- struct cl_io *io = ios->cis_io;
- struct cl_object *obj = slice->cpl_obj;
- struct ccc_page *cp = cl2ccc_page(slice);
- struct cl_page *page = slice->cpl_page;
- struct inode *inode = ccc_object_inode(obj);
- struct ll_sb_info *sbi = ll_i2sbi(inode);
- struct ll_file_data *fd = cl2ccc_io(env, ios)->cui_fd;
- struct ll_readahead_state *ras = &fd->fd_ras;
- struct page *vmpage = cp->cpg_page;
- struct cl_2queue *queue = &io->ci_queue;
- int rc;
+ struct cl_io *io = ios->cis_io;
+ struct ccc_page *cp = cl2ccc_page(slice);
+ struct cl_page *page = slice->cpl_page;
+ struct inode *inode = ccc_object_inode(slice->cpl_obj);
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
+ struct ll_file_data *fd = cl2ccc_io(env, ios)->cui_fd;
+ struct ll_readahead_state *ras = &fd->fd_ras;
+ struct cl_2queue *queue = &io->ci_queue;
- CLOBINVRNT(env, obj, ccc_object_invariant(obj));
- LASSERT(slice->cpl_obj == obj);
-
- ENTRY;
+ ENTRY;
- if (sbi->ll_ra_info.ra_max_pages_per_file &&
- sbi->ll_ra_info.ra_max_pages)
+ if (sbi->ll_ra_info.ra_max_pages_per_file > 0 &&
+ sbi->ll_ra_info.ra_max_pages > 0)
ras_update(sbi, inode, ras, ccc_index(cp),
cp->cpg_defer_uptodate);
- /* Sanity check whether the page is protected by a lock. */
- rc = cl_page_is_under_lock(env, io, page);
- if (rc != -EBUSY) {
- CL_PAGE_HEADER(D_WARNING, env, page, "%s: %d\n",
- rc == -ENODATA ? "without a lock" :
- "match failed", rc);
- if (rc != -ENODATA)
- RETURN(rc);
- }
-
if (cp->cpg_defer_uptodate) {
cp->cpg_ra_used = 1;
cl_page_export(env, page, 1);
}
+
/*
* Add page into the queue even when it is marked uptodate above.
* this will unlock it automatically as part of cl_page_list_disown().
*/
cl_2queue_add(queue, page);
- if (sbi->ll_ra_info.ra_max_pages_per_file &&
- sbi->ll_ra_info.ra_max_pages)
- ll_readahead(env, io, ras,
- vmpage->mapping, &queue->c2_qin, fd->fd_flags);
+ if (sbi->ll_ra_info.ra_max_pages_per_file > 0 &&
+ sbi->ll_ra_info.ra_max_pages > 0)
+ ll_readahead(env, io, &queue->c2_qin, ras,
+ cp->cpg_defer_uptodate);
- RETURN(0);
+ RETURN(0);
}
static const struct cl_io_operations vvp_io_ops = {
LASSERT(PageLocked(vmpage));
if (cpg->cpg_defer_uptodate && !cpg->cpg_ra_used)
- ll_ra_stats_inc(vmpage->mapping, RA_STAT_DISCARDED);
+ ll_ra_stats_inc(vmpage->mapping->host, RA_STAT_DISCARDED);
ll_invalidate_page(vmpage);
}
RETURN(result);
}
+static int vvp_page_is_under_lock(const struct lu_env *env,
+ const struct cl_page_slice *slice,
+ struct cl_io *io, pgoff_t *max_index)
+{
+ ENTRY;
+
+ if (io->ci_type == CIT_READ || io->ci_type == CIT_WRITE ||
+ io->ci_type == CIT_FAULT) {
+ struct ccc_io *cio = ccc_env_io(env);
+
+ if (unlikely(cio->cui_fd->fd_flags & LL_FILE_GROUP_LOCKED))
+ *max_index = CL_PAGE_EOF;
+ }
+ RETURN(0);
+}
+
+
static int vvp_page_print(const struct lu_env *env,
const struct cl_page_slice *slice,
void *cookie, lu_printer_t printer)
.cpo_is_vmlocked = vvp_page_is_vmlocked,
.cpo_fini = vvp_page_fini,
.cpo_print = vvp_page_print,
- .cpo_is_under_lock = ccc_page_is_under_lock,
+ .cpo_is_under_lock = vvp_page_is_under_lock,
.io = {
[CRT_READ] = {
.cpo_prep = vvp_page_prep_read,
}
static const struct cl_page_operations vvp_transient_page_ops = {
- .cpo_own = vvp_transient_page_own,
- .cpo_assume = vvp_transient_page_assume,
- .cpo_unassume = vvp_transient_page_unassume,
- .cpo_disown = vvp_transient_page_disown,
- .cpo_discard = vvp_transient_page_discard,
- .cpo_fini = vvp_transient_page_fini,
- .cpo_is_vmlocked = vvp_transient_page_is_vmlocked,
- .cpo_print = vvp_page_print,
- .cpo_is_under_lock = ccc_page_is_under_lock,
- .io = {
- [CRT_READ] = {
- .cpo_prep = ccc_transient_page_prep,
- .cpo_completion = vvp_transient_page_completion,
- },
- [CRT_WRITE] = {
- .cpo_prep = ccc_transient_page_prep,
- .cpo_completion = vvp_transient_page_completion,
- }
- }
+ .cpo_own = vvp_transient_page_own,
+ .cpo_assume = vvp_transient_page_assume,
+ .cpo_unassume = vvp_transient_page_unassume,
+ .cpo_disown = vvp_transient_page_disown,
+ .cpo_discard = vvp_transient_page_discard,
+ .cpo_fini = vvp_transient_page_fini,
+ .cpo_is_vmlocked = vvp_transient_page_is_vmlocked,
+ .cpo_print = vvp_page_print,
+ .cpo_is_under_lock = vvp_page_is_under_lock,
+ .io = {
+ [CRT_READ] = {
+ .cpo_prep = ccc_transient_page_prep,
+ .cpo_completion = vvp_transient_page_completion,
+ },
+ [CRT_WRITE] = {
+ .cpo_prep = ccc_transient_page_prep,
+ .cpo_completion = vvp_transient_page_completion,
+ }
+ }
};
int vvp_page_init(const struct lu_env *env, struct cl_object *obj,
CLOBINVRNT(env, obj, ccc_object_invariant(obj));
- cpg->cpg_cl.cpl_index = index;
cpg->cpg_page = vmpage;
page_cache_get(vmpage);
atomic_inc(&page->cp_ref);
SetPagePrivate(vmpage);
vmpage->private = (unsigned long)page;
- cl_page_slice_add(page, &cpg->cpg_cl, obj,
+ cl_page_slice_add(page, &cpg->cpg_cl, obj, index,
&vvp_page_ops);
} else {
struct ccc_object *clobj = cl2ccc(obj);
LASSERT(!mutex_trylock(&clobj->cob_inode->i_mutex));
- cl_page_slice_add(page, &cpg->cpg_cl, obj,
+ cl_page_slice_add(page, &cpg->cpg_cl, obj, index,
&vvp_transient_page_ops);
clobj->cob_transient_pages++;
}
const struct cl_page_slice *slice);
struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov);
+int lov_page_stripe(const struct cl_page *page);
#define lov_foreach_target(lov, var) \
for (var = 0; var < lov_targets_nr(lov); ++var)
obd_off start, obd_off end,
obd_off *obd_start, obd_off *obd_end);
int lov_stripe_number(struct lov_stripe_md *lsm, obd_off lov_off);
+pgoff_t lov_stripe_pgoff(struct lov_stripe_md *lsm, pgoff_t stripe_index,
+ int stripe);
/* lov_request.c */
void lov_set_add_req(struct lov_request *req, struct lov_request_set *set);
*
*/
-static int lov_page_stripe(const struct cl_page *page)
+int lov_page_stripe(const struct cl_page *page)
{
struct lovsub_object *subobj;
const struct cl_page_slice *slice;
RETURN(lov_size);
}
+/**
+ * Compute file level page index by stripe level page offset
+ */
+pgoff_t lov_stripe_pgoff(struct lov_stripe_md *lsm, pgoff_t stripe_index,
+ int stripe)
+{
+ obd_off offset;
+
+ offset = lov_stripe_size(lsm, stripe_index << PAGE_CACHE_SHIFT,
+ stripe);
+ return offset >> PAGE_CACHE_SHIFT;
+}
+
/* we have an offset in file backed by an lov and want to find out where
* that offset lands in our given stripe of the file. for the easy
* case where the offset is within the stripe, we just have to scale the
*
*/
-static int lov_page_print(const struct lu_env *env,
- const struct cl_page_slice *slice,
- void *cookie, lu_printer_t printer)
+/**
+ * Adjust the stripe index by layout of raid0. @max_index is the maximum
+ * page index covered by an underlying DLM lock.
+ * This function converts max_index from stripe level to file level, and make
+ * sure it's not beyond one stripe.
+ */
+static int lov_raid0_page_is_under_lock(const struct lu_env *env,
+ const struct cl_page_slice *slice,
+ struct cl_io *unused,
+ pgoff_t *max_index)
{
- struct lov_page *lp = cl2lov_page(slice);
+ struct lov_object *loo = cl2lov(slice->cpl_obj);
+ struct lov_layout_raid0 *r0 = lov_r0(loo);
+ pgoff_t index = *max_index;
+ unsigned int pps; /* pages per stripe */
+ ENTRY;
+
+ CDEBUG(D_READA, "*max_index = %lu, nr = %d\n", index, r0->lo_nr);
+ if (index == 0) /* the page is not covered by any lock */
+ RETURN(0);
+
+ if (r0->lo_nr == 1) /* single stripe file */
+ RETURN(0);
+
+ /* max_index is stripe level, convert it into file level */
+ if (index != CL_PAGE_EOF) {
+ int stripeno = lov_page_stripe(slice->cpl_page);
+ *max_index = lov_stripe_pgoff(loo->lo_lsm, index, stripeno);
+ }
+
+ /* calculate the end of current stripe */
+ pps = loo->lo_lsm->lsm_stripe_size >> PAGE_CACHE_SHIFT;
+ index = ((slice->cpl_index + pps) & ~(pps - 1)) - 1;
+
+ /* never exceed the end of the stripe */
+ *max_index = min_t(pgoff_t, *max_index, index);
+ RETURN(0);
+}
+
+static int lov_raid0_page_print(const struct lu_env *env,
+ const struct cl_page_slice *slice,
+ void *cookie, lu_printer_t printer)
+{
+ struct lov_page *lp = cl2lov_page(slice);
- return (*printer)(env, cookie, LUSTRE_LOV_NAME"-page@%p\n", lp);
+ return (*printer)(env, cookie, LUSTRE_LOV_NAME"-page@%p, raid0\n", lp);
}
-static const struct cl_page_operations lov_page_ops = {
- .cpo_print = lov_page_print
+static const struct cl_page_operations lov_raid0_page_ops = {
+ .cpo_is_under_lock = lov_raid0_page_is_under_lock,
+ .cpo_print = lov_raid0_page_print
};
int lov_page_init_raid0(const struct lu_env *env, struct cl_object *obj,
&suboff);
LASSERT(rc == 0);
- cl_page_slice_add(page, &lpg->lps_cl, obj, &lov_page_ops);
+ cl_page_slice_add(page, &lpg->lps_cl, obj, index, &lov_raid0_page_ops);
sub = lov_sub_get(env, lio, stripe);
if (IS_ERR(sub))
RETURN(rc);
}
-static int lov_page_empty_print(const struct lu_env *env,
+static int lov_empty_page_print(const struct lu_env *env,
const struct cl_page_slice *slice,
void *cookie, lu_printer_t printer)
{
}
static const struct cl_page_operations lov_empty_page_ops = {
- .cpo_print = lov_page_empty_print
+ .cpo_print = lov_empty_page_print
};
int lov_page_init_empty(const struct lu_env *env, struct cl_object *obj,
void *addr;
ENTRY;
- cl_page_slice_add(page, &lpg->lps_cl, obj, &lov_empty_page_ops);
+ cl_page_slice_add(page, &lpg->lps_cl, obj, index, &lov_empty_page_ops);
addr = kmap(page->cp_vmpage);
memset(addr, 0, cl_page_size(obj));
kunmap(page->cp_vmpage);
};
int lovsub_page_init(const struct lu_env *env, struct cl_object *obj,
- struct cl_page *page, pgoff_t ind)
+ struct cl_page *page, pgoff_t index)
{
struct lovsub_page *lsb = cl_object_page_slice(obj, page);
ENTRY;
- cl_page_slice_add(page, &lsb->lsb_cl, obj, &lovsub_page_ops);
+ cl_page_slice_add(page, &lsb->lsb_cl, obj, index, &lovsub_page_ops);
RETURN(0);
}
break;
}
}
- if (result == 0)
+ if (result == 0 && queue->c2_qin.pl_nr > 0)
result = cl_io_submit_rw(env, io, CRT_READ, queue);
/*
* Unlock unsent pages in case of error.
__result; \
})
+#define CL_PAGE_INVOKE_REVERSE(_env, _page, _op, _proto, ...) \
+({ \
+ const struct lu_env *__env = (_env); \
+ struct cl_page *__page = (_page); \
+ const struct cl_page_slice *__scan; \
+ int __result; \
+ ptrdiff_t __op = (_op); \
+ int (*__method)_proto; \
+ \
+ __result = 0; \
+ list_for_each_entry_reverse(__scan, &__page->cp_layers, \
+ cpl_linkage) { \
+ __method = *(void **)((char *)__scan->cpl_ops + __op); \
+ if (__method != NULL) { \
+ __result = (*__method)(__env, __scan, ## __VA_ARGS__); \
+ if (__result != 0) \
+ break; \
+ } \
+ } \
+ if (__result > 0) \
+ __result = 0; \
+ __result; \
+})
+
#define CL_PAGE_INVOID(_env, _page, _op, _proto, ...) \
do { \
const struct lu_env *__env = (_env); \
* \see cl_page_operations::cpo_is_under_lock()
*/
int cl_page_is_under_lock(const struct lu_env *env, struct cl_io *io,
- struct cl_page *page)
+ struct cl_page *page, pgoff_t *max_index)
{
- int rc;
+ int rc;
- PINVRNT(env, page, cl_page_invariant(page));
+ PINVRNT(env, page, cl_page_invariant(page));
- ENTRY;
- rc = CL_PAGE_INVOKE(env, page, CL_PAGE_OP(cpo_is_under_lock),
- (const struct lu_env *,
- const struct cl_page_slice *, struct cl_io *),
- io);
- PASSERT(env, page, rc != 0);
- RETURN(rc);
+ ENTRY;
+ rc = CL_PAGE_INVOKE_REVERSE(env, page, CL_PAGE_OP(cpo_is_under_lock),
+ (const struct lu_env *,
+ const struct cl_page_slice *,
+ struct cl_io *, pgoff_t *),
+ io, max_index);
+ RETURN(rc);
}
EXPORT_SYMBOL(cl_page_is_under_lock);
* \see cl_lock_slice_add(), cl_req_slice_add(), cl_io_slice_add()
*/
void cl_page_slice_add(struct cl_page *page, struct cl_page_slice *slice,
- struct cl_object *obj,
- const struct cl_page_operations *ops)
+ struct cl_object *obj, pgoff_t index,
+ const struct cl_page_operations *ops)
{
- ENTRY;
- cfs_list_add_tail(&slice->cpl_linkage, &page->cp_layers);
- slice->cpl_obj = obj;
- slice->cpl_ops = ops;
- slice->cpl_page = page;
- EXIT;
+ ENTRY;
+ list_add_tail(&slice->cpl_linkage, &page->cp_layers);
+ slice->cpl_obj = obj;
+ slice->cpl_index = index;
+ slice->cpl_ops = ops;
+ slice->cpl_page = page;
+ EXIT;
}
EXPORT_SYMBOL(cl_page_slice_add);
page_cache_get(page->cp_vmpage);
mutex_init(&ep->ep_lock);
- cl_page_slice_add(page, &ep->ep_cl, obj, &echo_page_ops);
+ cl_page_slice_add(page, &ep->ep_cl, obj, index, &echo_page_ops);
atomic_inc(&eco->eo_npages);
RETURN(0);
}
}
static int osc_page_is_under_lock(const struct lu_env *env,
- const struct cl_page_slice *slice,
- struct cl_io *unused)
+ const struct cl_page_slice *slice,
+ struct cl_io *unused, pgoff_t *max_index)
{
struct osc_page *opg = cl2osc_page(slice);
struct cl_lock *lock;
int result = -ENODATA;
-
ENTRY;
+
+ *max_index = 0;
lock = cl_lock_at_pgoff(env, slice->cpl_obj, osc_index(opg),
- NULL, 1, 0);
+ NULL, 1, 0);
if (lock != NULL) {
+ *max_index = lock->cll_descr.cld_end;
cl_lock_put(env, lock);
- result = -EBUSY;
+ result = 0;
}
RETURN(result);
}
opg->ops_from = 0;
opg->ops_to = PAGE_CACHE_SIZE;
- opg->ops_cl.cpl_index = index;
result = osc_prep_async_page(osc, opg, page->cp_vmpage,
cl_offset(obj, index));
if (result == 0) {
struct osc_io *oio = osc_env_io(env);
opg->ops_srvlock = osc_io_srvlock(oio);
- cl_page_slice_add(page, &opg->ops_cl, obj,
- &osc_page_ops);
+ cl_page_slice_add(page, &opg->ops_cl, obj, index,
+ &osc_page_ops);
}
/*
* Cannot assert osc_page_protected() here as read-ahead