Whamcloud - gitweb
git://git.whamcloud.com
/
fs
/
lustre-release.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
| inline |
side by side
- landing of b_hd_cleanup_merge to HEAD.
[fs/lustre-release.git]
/
lustre
/
llite
/
rw.c
diff --git
a/lustre/llite/rw.c
b/lustre/llite/rw.c
index
d95dbf5
..
abda07f
100644
(file)
--- a/
lustre/llite/rw.c
+++ b/
lustre/llite/rw.c
@@
-60,27
+60,30
@@
static int ll_brw(int cmd, struct inode *inode, struct obdo *oa,
{
struct ll_inode_info *lli = ll_i2info(inode);
struct lov_stripe_md *lsm = lli->lli_smd;
+ struct timeval start;
struct brw_page pg;
int rc;
ENTRY;
+ do_gettimeofday(&start);
+
pg.pg = page;
- pg.
off
= ((obd_off)page->index) << PAGE_SHIFT;
+ pg.
disk_offset = pg.page_offset
= ((obd_off)page->index) << PAGE_SHIFT;
if (cmd == OBD_BRW_WRITE &&
- (pg.
off
+ PAGE_SIZE > inode->i_size))
+ (pg.
disk_offset
+ PAGE_SIZE > inode->i_size))
pg.count = inode->i_size % PAGE_SIZE;
else
pg.count = PAGE_SIZE;
CDEBUG(D_PAGE, "%s %d bytes ino %lu at "LPU64"/"LPX64"\n",
cmd & OBD_BRW_WRITE ? "write" : "read", pg.count, inode->i_ino,
- pg.
off, pg.off
);
+ pg.
disk_offset, pg.disk_offset
);
if (pg.count == 0) {
CERROR("ZERO COUNT: ino %lu: size %p:%Lu(%p:%Lu) idx %lu off "
LPU64"\n", inode->i_ino, inode, inode->i_size,
page->mapping->host, page->mapping->host->i_size,
- page->index, pg.
off
);
+ page->index, pg.
disk_offset
);
}
pg.flag = flags;
@@
-96,6
+99,8
@@
static int ll_brw(int cmd, struct inode *inode, struct obdo *oa,
obdo_to_inode(inode, oa, OBD_MD_FLBLOCKS);
else if (rc != -EIO)
CERROR("error from obd_brw: rc = %d\n", rc);
+ ll_stime_record(ll_i2sbi(inode), &start,
+ &ll_i2sbi(inode)->ll_brw_stime);
RETURN(rc);
}
@@
-161,7
+166,7
@@
int ll_prepare_write(struct file *file, struct page *page, unsigned from,
/* Check to see if we should return -EIO right away */
pga.pg = page;
- pga.
off
= offset;
+ pga.
disk_offset = pga.page_offset
= offset;
pga.count = PAGE_SIZE;
pga.flag = 0;
@@
-385,7
+390,7
@@
struct ll_async_page *llap_from_page(struct page *page)
CDEBUG(D_CACHE, "llap %p page %p cookie %p obj off "LPU64"\n", llap,
page, llap->llap_cookie, (obd_off)page->index << PAGE_SHIFT);
/* also zeroing the PRIVBITS low order bitflags */
-
page->private = (unsigned long)llap
;
+
__set_page_ll_data(page, llap)
;
llap->llap_page = page;
spin_lock(&sbi->ll_lock);
@@
-396,10
+401,59
@@
struct ll_async_page *llap_from_page(struct page *page)
RETURN(llap);
}
+static int queue_or_sync_write(struct obd_export *exp,
+ struct lov_stripe_md *lsm,
+ struct ll_async_page *llap,
+ unsigned to,
+ obd_flag async_flags)
+{
+ struct obd_io_group *oig;
+ int rc;
+ ENTRY;
+
+ /* _make_ready only sees llap once we've unlocked the page */
+ llap->llap_write_queued = 1;
+ rc = obd_queue_async_io(exp, lsm, NULL, llap->llap_cookie,
+ OBD_BRW_WRITE, 0, 0, 0, async_flags);
+ if (rc == 0) {
+ LL_CDEBUG_PAGE(D_PAGE, llap->llap_page, "write queued\n");
+ //llap_write_pending(inode, llap);
+ GOTO(out, 0);
+ }
+
+ llap->llap_write_queued = 0;
+
+ rc = oig_init(&oig);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = obd_queue_group_io(exp, lsm, NULL, oig, llap->llap_cookie,
+ OBD_BRW_WRITE, 0, to, 0, ASYNC_READY |
+ ASYNC_URGENT | ASYNC_COUNT_STABLE |
+ ASYNC_GROUP_SYNC);
+ if (rc)
+ GOTO(free_oig, rc);
+
+ rc = obd_trigger_group_io(exp, lsm, NULL, oig);
+ if (rc)
+ GOTO(free_oig, rc);
+
+ rc = oig_wait(oig);
+
+ if (!rc && async_flags & ASYNC_READY)
+ unlock_page(llap->llap_page);
+
+ LL_CDEBUG_PAGE(D_PAGE, llap->llap_page, "sync write returned %d\n",
+ rc);
+
+free_oig:
+ oig_release(oig);
+out:
+ RETURN(rc);
+}
+
void lov_increase_kms(struct obd_export *exp, struct lov_stripe_md *lsm,
obd_off size);
-/* update our write count to account for i_size increases that may have
- * happened since we've queued the page for io. */
/* be careful not to return success without setting the page Uptodate or
* the next pass through prepare_write will read in stale data from disk. */
@@
-436,46
+490,20
@@
int ll_commit_write(struct file *file, struct page *page, unsigned from,
if (exp == NULL)
RETURN(-EINVAL);
- /* _make_ready only sees llap once we've unlocked the page */
- llap->llap_write_queued = 1;
- rc = obd_queue_async_io(exp, lsm, NULL, llap->llap_cookie,
- OBD_BRW_WRITE, 0, 0, 0, 0);
- if (rc != 0) { /* async failed, try sync.. */
- struct obd_io_group *oig;
- rc = oig_init(&oig);
- if (rc)
- GOTO(out, rc);
-
- llap->llap_write_queued = 0;
- rc = obd_queue_group_io(exp, lsm, NULL, oig,
- llap->llap_cookie,
- OBD_BRW_WRITE, 0, to, 0,
- ASYNC_READY | ASYNC_URGENT |
- ASYNC_COUNT_STABLE |
- ASYNC_GROUP_SYNC);
-
- if (rc)
- GOTO(free_oig, rc);
-
- rc = obd_trigger_group_io(exp, lsm, NULL, oig);
- if (rc)
- GOTO(free_oig, rc);
-
- rc = oig_wait(oig);
-free_oig:
- oig_release(oig);
+ rc = queue_or_sync_write(exp, ll_i2info(inode)->lli_smd, llap,
+ to, 0);
+ if (rc)
GOTO(out, rc);
- }
- LL_CDEBUG_PAGE(D_PAGE, page, "write queued\n");
- //llap_write_pending(inode, llap);
} else {
lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats,
LPROC_LL_DIRTY_HITS);
}
/* put the page in the page cache, from now on ll_removepage is
- * responsible for cleaning up the llap */
- set_page_dirty(page);
+ * responsible for cleaning up the llap.
+ * don't dirty the page if it has been write out in q_o_s_w */
+ if (llap->llap_write_queued)
+ set_page_dirty(page);
out:
if (rc == 0) {
@@
-487,16
+515,54
@@
out:
}
RETURN(rc);
}
+
+int ll_writepage(struct page *page)
+{
+ struct inode *inode = page->mapping->host;
+ struct obd_export *exp;
+ struct ll_async_page *llap;
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(!PageDirty(page));
+ LASSERT(PageLocked(page));
+
+ exp = ll_i2obdexp(inode);
+ if (exp == NULL)
+ GOTO(out, rc = -EINVAL);
+
+ llap = llap_from_page(page);
+ if (IS_ERR(llap))
+ GOTO(out, rc = PTR_ERR(llap));
+
+ page_cache_get(page);
+ if (llap->llap_write_queued) {
+ LL_CDEBUG_PAGE(D_PAGE, page, "marking urgent\n");
+ rc = obd_set_async_flags(exp, ll_i2info(inode)->lli_smd, NULL,
+ llap->llap_cookie,
+ ASYNC_READY | ASYNC_URGENT);
+ } else {
+ rc = queue_or_sync_write(exp, ll_i2info(inode)->lli_smd, llap,
+ PAGE_SIZE, ASYNC_READY |
+ ASYNC_URGENT);
+ }
+ if (rc)
+ page_cache_release(page);
+out:
+ if (rc)
+ unlock_page(page);
+ RETURN(rc);
+}
static unsigned long ll_ra_count_get(struct ll_sb_info *sbi, unsigned long len)
{
+ struct ll_ra_info *ra = &sbi->ll_ra_info;
unsigned long ret;
ENTRY;
spin_lock(&sbi->ll_lock);
- ret = min(sbi->ll_max_read_ahead_pages - sbi->ll_read_ahead_pages,
- len);
- sbi->ll_read_ahead_pages += ret;
+ ret = min(ra->ra_max_pages - ra->ra_cur_pages, len);
+ ra->ra_cur_pages += ret;
spin_unlock(&sbi->ll_lock);
RETURN(ret);
@@
-504,10
+570,11
@@
static unsigned long ll_ra_count_get(struct ll_sb_info *sbi, unsigned long len)
static void ll_ra_count_put(struct ll_sb_info *sbi, unsigned long len)
{
+ struct ll_ra_info *ra = &sbi->ll_ra_info;
spin_lock(&sbi->ll_lock);
- LASSERTF(
sbi->ll_read_ahead_pages >= len, "r_a
_p %lu len %lu\n",
-
sbi->ll_read_ahead
_pages, len);
-
sbi->ll_read_ahead
_pages -= len;
+ LASSERTF(
ra->ra_cur_pages >= len, "r_c
_p %lu len %lu\n",
+
ra->ra_cur
_pages, len);
+
ra->ra_cur
_pages -= len;
spin_unlock(&sbi->ll_lock);
}
@@
-552,7
+619,10
@@
void ll_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
llap_write_complete(page->mapping->host, llap);
ll_try_done_writing(page->mapping->host);
}
-
+
+ if (PageWriteback(page)) {
+ end_page_writeback(page);
+ }
page_cache_release(page);
EXIT;
}
@@
-604,7
+674,7
@@
void ll_removepage(struct page *page)
/* this unconditional free is only safe because the page lock
* is providing exclusivity to memory pressure/truncate/writeback..*/
-
page->private = 0
;
+
__clear_page_ll_data(page)
;
spin_lock(&sbi->ll_lock);
if (!list_empty(&llap->llap_proc_item))
@@
-646,6
+716,7
@@
static int ll_issue_page_read(struct obd_export *exp,
page_cache_get(page);
llap->llap_defer_uptodate = defer;
+ llap->llap_ra_used = 0;
rc = obd_queue_group_io(exp, ll_i2info(page->mapping->host)->lli_smd,
NULL, oig, llap->llap_cookie, OBD_BRW_READ, 0,
PAGE_SIZE, 0, ASYNC_COUNT_STABLE | ASYNC_READY
@@
-657,12
+728,55
@@
static int ll_issue_page_read(struct obd_export *exp,
RETURN(rc);
}
+static void ll_ra_stats_inc_unlocked(struct ll_ra_info *ra, enum ra_stat which)
+{
+ LASSERTF(which >= 0 && which < _NR_RA_STAT, "which: %u\n", which);
+ ra->ra_stats[which]++;
+}
+
+static void ll_ra_stats_inc(struct address_space *mapping, enum ra_stat which)
+{
+ struct ll_sb_info *sbi = ll_i2sbi(mapping->host);
+ struct ll_ra_info *ra = &ll_i2sbi(mapping->host)->ll_ra_info;
+
+ spin_lock(&sbi->ll_lock);
+ ll_ra_stats_inc_unlocked(ra, which);
+ spin_unlock(&sbi->ll_lock);
+}
+
+void ll_ra_accounting(struct page *page, struct address_space *mapping)
+{
+ struct ll_async_page *llap;
+
+ llap = llap_from_page(page);
+ if (IS_ERR(llap))
+ return;
+
+ if (!llap->llap_defer_uptodate || llap->llap_ra_used)
+ return;
+
+ ll_ra_stats_inc(mapping, RA_STAT_DISCARDED);
+}
+
#define RAS_CDEBUG(ras) \
CDEBUG(D_READA, "lrp %lu c %lu ws %lu wl %lu nra %lu\n", \
ras->ras_last_readpage, ras->ras_consecutive, \
ras->ras_window_start, ras->ras_window_len, \
ras->ras_next_readahead);
+static int index_in_window(unsigned long index, unsigned long point,
+ unsigned long before, unsigned long after)
+{
+ unsigned long start = point - before, end = point + after;
+
+ if (start > point)
+ start = 0;
+ if (end < point)
+ end = ~0;
+
+ return start <= index && index <= end;
+}
+
static int ll_readahead(struct ll_readahead_state *ras,
struct obd_export *exp, struct address_space *mapping,
struct obd_io_group *oig, int flags)
@@
-670,16
+784,18
@@
static int ll_readahead(struct ll_readahead_state *ras,
unsigned long i, start = 0, end = 0, reserved;
struct ll_async_page *llap;
struct page *page;
- int rc, ret = 0;
+ int rc, ret = 0
, match_failed = 0
;
__u64 kms;
ENTRY;
kms = lov_merge_size(ll_i2info(mapping->host)->lli_smd, 1);
- if (kms == 0)
+ if (kms == 0) {
+ ll_ra_stats_inc(mapping, RA_STAT_ZERO_LEN);
RETURN(0);
-
+ }
spin_lock(&ras->ras_lock);
+ /* reserve a part of the read-ahead window that we'll be issuing */
if (ras->ras_window_len) {
start = ras->ras_next_readahead;
end = ras->ras_window_start + ras->ras_window_len - 1;
@@
-691,12
+807,16
@@
static int ll_readahead(struct ll_readahead_state *ras,
spin_unlock(&ras->ras_lock);
- if (end == 0)
+ if (end == 0) {
+ ll_ra_stats_inc(mapping, RA_STAT_ZERO_WINDOW);
RETURN(0);
+ }
reserved = ll_ra_count_get(ll_i2sbi(mapping->host), end - start + 1);
+ if (reserved < end - start + 1)
+ ll_ra_stats_inc(mapping, RA_STAT_MAX_IN_FLIGHT);
- for (i = start; reserved > 0 && i <= end; i++) {
+ for (i = start; reserved > 0 &&
!match_failed &&
i <= end; i++) {
/* skip locked pages from previous readpage calls */
page = grab_cache_page_nowait(mapping, i);
if (page == NULL) {
@@
-718,7
+838,8
@@
static int ll_readahead(struct ll_readahead_state *ras,
if ((rc = ll_page_matches(page, flags)) <= 0) {
LL_CDEBUG_PAGE(D_READA | D_PAGE, page,
"lock match failed: rc %d\n", rc);
- i = end;
+ ll_ra_stats_inc(mapping, RA_STAT_FAILED_MATCH);
+ match_failed = 1;
goto next_page;
}
@@
-742,20
+863,36
@@
static int ll_readahead(struct ll_readahead_state *ras,
LASSERTF(reserved >= 0, "reserved %lu\n", reserved);
if (reserved != 0)
ll_ra_count_put(ll_i2sbi(mapping->host), reserved);
+
+ if (i == end + 1 && end == (kms >> PAGE_CACHE_SHIFT))
+ ll_ra_stats_inc(mapping, RA_STAT_EOF);
+
+ /* if we didn't get to the end of the region we reserved from
+ * the ras we need to go back and update the ras so that the
+ * next read-ahead tries from where we left off. we only do so
+ * if the region we failed to issue read-ahead on is still ahead
+ * of the app and behind the next index to start read-ahead from */
+ if (i != end + 1) {
+ spin_lock(&ras->ras_lock);
+ if (i < ras->ras_next_readahead &&
+ index_in_window(i, ras->ras_window_start, 0,
+ ras->ras_window_len)) {
+ ras->ras_next_readahead = i;
+ RAS_CDEBUG(ras);
+ }
+ spin_unlock(&ras->ras_lock);
+ }
+
RETURN(ret);
}
-static void ras_set_start(struct ll_readahead_state *ras,
- unsigned long index)
+static void ras_set_start(struct ll_readahead_state *ras, unsigned long index)
{
ras->ras_window_start = index & (~(PTLRPC_MAX_BRW_PAGES - 1));
- ras->ras_next_readahead = max(ras->ras_window_start,
- ras->ras_next_readahead);
}
/* called with the ras_lock held or from places where it doesn't matter */
-static void ras_reset(struct ll_readahead_state *ras,
- unsigned long index)
+static void ras_reset(struct ll_readahead_state *ras, unsigned long index)
{
ras->ras_last_readpage = index;
ras->ras_consecutive = 1;
@@
-772,14
+909,35
@@
void ll_readahead_init(struct inode *inode, struct ll_readahead_state *ras)
ras_reset(ras, 0);
}
-static void ras_update(struct ll_readahead_state *ras,
- unsigned long index, unsigned
long max
)
+static void ras_update(struct ll_
sb_info *sbi, struct ll_
readahead_state *ras,
+ unsigned long index, unsigned
hit
)
{
+ struct ll_ra_info *ra = &sbi->ll_ra_info;
+ int zero = 0;
ENTRY;
+ spin_lock(&sbi->ll_lock);
spin_lock(&ras->ras_lock);
+
+ ll_ra_stats_inc_unlocked(ra, hit ? RA_STAT_HIT : RA_STAT_MISS);
+
+ /* reset the read-ahead window in two cases. First when the app seeks
+ * or reads to some other part of the file. Secondly if we get a
+ * read-ahead miss that we think we've previously issued. This can
+ * be a symptom of there being so many read-ahead pages that the VM is
+ * reclaiming it before we get to it. */
+ if (!index_in_window(index, ras->ras_last_readpage, 8, 8)) {
+ zero = 1;
+ ll_ra_stats_inc_unlocked(ra, RA_STAT_DISTANT_READPAGE);
+ } else if (!hit && ras->ras_window_len &&
+ index < ras->ras_next_readahead &&
+ index_in_window(index, ras->ras_window_start, 0,
+ ras->ras_window_len)) {
+ zero = 1;
+ ll_ra_stats_inc_unlocked(ra, RA_STAT_MISS_IN_WINDOW);
+ }
- if (
index != ras->ras_last_readpage + 1
) {
+ if (
zero
) {
ras_reset(ras, index);
GOTO(out_unlock, 0);
}
@@
-787,8
+945,12
@@
static void ras_update(struct ll_readahead_state *ras,
ras->ras_last_readpage = index;
ras->ras_consecutive++;
ras_set_start(ras, index);
+ ras->ras_next_readahead = max(ras->ras_window_start,
+ ras->ras_next_readahead);
- if (ras->ras_consecutive == 2) {
+ /* wait for a few pages to arrive before issuing readahead to avoid
+ * the worst overutilization */
+ if (ras->ras_consecutive == 3) {
ras->ras_window_len = PTLRPC_MAX_BRW_PAGES;
GOTO(out_unlock, 0);
}
@@
-798,13
+960,16
@@
static void ras_update(struct ll_readahead_state *ras,
if ((index & (PTLRPC_MAX_BRW_PAGES - 1)) ==
(PTLRPC_MAX_BRW_PAGES >> 1)) {
ras->ras_window_len += PTLRPC_MAX_BRW_PAGES;
- ras->ras_window_len = min(ras->ras_window_len, max);
+ ras->ras_window_len = min(ras->ras_window_len,
+ ra->ra_max_pages);
+
}
EXIT;
out_unlock:
RAS_CDEBUG(ras);
spin_unlock(&ras->ras_lock);
+ spin_unlock(&sbi->ll_lock);
return;
}
@@
-847,10
+1012,11
@@
int ll_readpage(struct file *filp, struct page *page)
GOTO(out, rc = PTR_ERR(llap));
if (ll_i2sbi(inode)->ll_flags & LL_SBI_READAHEAD)
- ras_update(
&fd->fd_ras, page->index,
- ll
_i2sbi(inode)->ll_max_read_ahead_pages
);
-
+ ras_update(
ll_i2sbi(inode), &fd->fd_ras, page->index,
+ ll
ap->llap_defer_uptodate
);
+
if (llap->llap_defer_uptodate) {
+ llap->llap_ra_used = 1;
rc = ll_readahead(&fd->fd_ras, exp, page->mapping, oig,
fd->fd_flags);
if (rc > 0)
@@
-869,17
+1035,10
@@
int ll_readpage(struct file *filp, struct page *page)
}
if (rc == 0) {
- static unsigned long next_print;
- CDEBUG(D_INODE, "ino %lu page %lu (%llu) didn't match a lock\n",
- inode->i_ino, page->index,
- (long long)page->index << PAGE_CACHE_SHIFT);
- if (time_after(jiffies, next_print)) {
- CWARN("ino %lu page %lu (%llu) not covered by "
- "a lock (mmap?). check debug logs.\n",
- inode->i_ino, page->index,
- (long long)page->index << PAGE_CACHE_SHIFT);
- next_print = jiffies + 30 * HZ;
- }
+ CWARN("ino %lu page %lu (%llu) not covered by "
+ "a lock (mmap?). check debug logs.\n",
+ inode->i_ino, page->index,
+ (long long)page->index << PAGE_CACHE_SHIFT);
}
rc = ll_issue_page_read(exp, llap, oig, 0);