Whamcloud - gitweb
- landing of b_hd_cleanup_merge to HEAD.
[fs/lustre-release.git] / lustre / llite / rw.c
index d95dbf5..abda07f 100644 (file)
@@ -60,27 +60,30 @@ static int ll_brw(int cmd, struct inode *inode, struct obdo *oa,
 {
         struct ll_inode_info *lli = ll_i2info(inode);
         struct lov_stripe_md *lsm = lli->lli_smd;
+        struct timeval start;
         struct brw_page pg;
         int rc;
         ENTRY;
 
+        do_gettimeofday(&start);
+
         pg.pg = page;
-        pg.off = ((obd_off)page->index) << PAGE_SHIFT;
+        pg.disk_offset = pg.page_offset = ((obd_off)page->index) << PAGE_SHIFT;
 
         if (cmd == OBD_BRW_WRITE &&
-            (pg.off + PAGE_SIZE > inode->i_size))
+            (pg.disk_offset + PAGE_SIZE > inode->i_size))
                 pg.count = inode->i_size % PAGE_SIZE;
         else
                 pg.count = PAGE_SIZE;
 
         CDEBUG(D_PAGE, "%s %d bytes ino %lu at "LPU64"/"LPX64"\n",
                cmd & OBD_BRW_WRITE ? "write" : "read", pg.count, inode->i_ino,
-               pg.off, pg.off);
+               pg.disk_offset, pg.disk_offset);
         if (pg.count == 0) {
                 CERROR("ZERO COUNT: ino %lu: size %p:%Lu(%p:%Lu) idx %lu off "
                        LPU64"\n", inode->i_ino, inode, inode->i_size,
                        page->mapping->host, page->mapping->host->i_size,
-                       page->index, pg.off);
+                       page->index, pg.disk_offset);
         }
 
         pg.flag = flags;
@@ -96,6 +99,8 @@ static int ll_brw(int cmd, struct inode *inode, struct obdo *oa,
                 obdo_to_inode(inode, oa, OBD_MD_FLBLOCKS);
         else if (rc != -EIO)
                 CERROR("error from obd_brw: rc = %d\n", rc);
+        ll_stime_record(ll_i2sbi(inode), &start,
+                        &ll_i2sbi(inode)->ll_brw_stime);
         RETURN(rc);
 }
 
@@ -161,7 +166,7 @@ int ll_prepare_write(struct file *file, struct page *page, unsigned from,
 
         /* Check to see if we should return -EIO right away */
         pga.pg = page;
-        pga.off = offset;
+        pga.disk_offset = pga.page_offset = offset;
         pga.count = PAGE_SIZE;
         pga.flag = 0;
 
@@ -385,7 +390,7 @@ struct ll_async_page *llap_from_page(struct page *page)
         CDEBUG(D_CACHE, "llap %p page %p cookie %p obj off "LPU64"\n", llap,
                page, llap->llap_cookie, (obd_off)page->index << PAGE_SHIFT);
         /* also zeroing the PRIVBITS low order bitflags */
-        page->private = (unsigned long)llap;
+        __set_page_ll_data(page, llap);
         llap->llap_page = page;
 
         spin_lock(&sbi->ll_lock);
@@ -396,10 +401,59 @@ struct ll_async_page *llap_from_page(struct page *page)
         RETURN(llap);
 }
 
+static int queue_or_sync_write(struct obd_export *exp,
+                               struct lov_stripe_md *lsm,
+                               struct ll_async_page *llap,
+                               unsigned to,
+                               obd_flag async_flags)
+{
+        struct obd_io_group *oig;
+        int rc;
+        ENTRY;
+
+        /* _make_ready only sees llap once we've unlocked the page */
+        llap->llap_write_queued = 1;
+        rc = obd_queue_async_io(exp, lsm, NULL, llap->llap_cookie,
+                                OBD_BRW_WRITE, 0, 0, 0, async_flags);
+        if (rc == 0) {
+                LL_CDEBUG_PAGE(D_PAGE, llap->llap_page, "write queued\n");
+                //llap_write_pending(inode, llap);
+                GOTO(out, 0);
+        }
+
+        llap->llap_write_queued = 0;
+
+        rc = oig_init(&oig);
+        if (rc)
+                GOTO(out, rc);
+
+        rc = obd_queue_group_io(exp, lsm, NULL, oig, llap->llap_cookie,
+                                OBD_BRW_WRITE, 0, to, 0, ASYNC_READY |
+                                ASYNC_URGENT | ASYNC_COUNT_STABLE |
+                                ASYNC_GROUP_SYNC);
+        if (rc)
+                GOTO(free_oig, rc);
+
+        rc = obd_trigger_group_io(exp, lsm, NULL, oig);
+        if (rc)
+                GOTO(free_oig, rc);
+
+        rc = oig_wait(oig);
+
+        if (!rc && async_flags & ASYNC_READY)
+                unlock_page(llap->llap_page);
+
+        LL_CDEBUG_PAGE(D_PAGE, llap->llap_page, "sync write returned %d\n",
+                       rc);
+
+free_oig:
+        oig_release(oig);
+out:
+        RETURN(rc);
+}
+
 void lov_increase_kms(struct obd_export *exp, struct lov_stripe_md *lsm,
                       obd_off size);
-/* update our write count to account for i_size increases that may have
- * happened since we've queued the page for io. */
 
 /* be careful not to return success without setting the page Uptodate or
  * the next pass through prepare_write will read in stale data from disk. */
@@ -436,46 +490,20 @@ int ll_commit_write(struct file *file, struct page *page, unsigned from,
                 if (exp == NULL)
                         RETURN(-EINVAL);
 
-                /* _make_ready only sees llap once we've unlocked the page */
-                llap->llap_write_queued = 1;
-                rc = obd_queue_async_io(exp, lsm, NULL, llap->llap_cookie,
-                                        OBD_BRW_WRITE, 0, 0, 0, 0);
-                if (rc != 0) { /* async failed, try sync.. */
-                        struct obd_io_group *oig;
-                        rc = oig_init(&oig);
-                        if (rc)
-                                GOTO(out, rc);
-
-                        llap->llap_write_queued = 0;
-                        rc = obd_queue_group_io(exp, lsm, NULL, oig,
-                                                llap->llap_cookie,
-                                                OBD_BRW_WRITE, 0, to, 0,
-                                                ASYNC_READY | ASYNC_URGENT |
-                                                ASYNC_COUNT_STABLE |
-                                                ASYNC_GROUP_SYNC);
-
-                        if (rc)
-                                GOTO(free_oig, rc);
-
-                        rc = obd_trigger_group_io(exp, lsm, NULL, oig);
-                        if (rc)
-                                GOTO(free_oig, rc);
-
-                        rc = oig_wait(oig);
-free_oig:
-                        oig_release(oig);
+                rc = queue_or_sync_write(exp, ll_i2info(inode)->lli_smd, llap,
+                                         to, 0);
+                if (rc)
                         GOTO(out, rc);
-                }
-                LL_CDEBUG_PAGE(D_PAGE, page, "write queued\n");
-                //llap_write_pending(inode, llap);
         } else {
                 lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats,
                                      LPROC_LL_DIRTY_HITS);
         }
 
         /* put the page in the page cache, from now on ll_removepage is
-         * responsible for cleaning up the llap */
-        set_page_dirty(page);
+         * responsible for cleaning up the llap.
+         * don't dirty the page if it has been write out in q_o_s_w */
+        if (llap->llap_write_queued)
+                set_page_dirty(page);
 
 out:
         if (rc == 0) {
@@ -487,16 +515,54 @@ out:
         }
         RETURN(rc);
 }
+                                                                                                                                                                                                     
+int ll_writepage(struct page *page)
+{
+        struct inode *inode = page->mapping->host;
+        struct obd_export *exp;
+        struct ll_async_page *llap;
+        int rc = 0;
+        ENTRY;
+                                                                                                                                                                                                     
+        LASSERT(!PageDirty(page));
+        LASSERT(PageLocked(page));
+                                                                                                                                                                                                     
+        exp = ll_i2obdexp(inode);
+        if (exp == NULL)
+                GOTO(out, rc = -EINVAL);
+                                                                                                                                                                                                     
+        llap = llap_from_page(page);
+        if (IS_ERR(llap))
+                GOTO(out, rc = PTR_ERR(llap));
+                                                                                                                                                                                                     
+        page_cache_get(page);
+        if (llap->llap_write_queued) {
+                LL_CDEBUG_PAGE(D_PAGE, page, "marking urgent\n");
+                rc = obd_set_async_flags(exp, ll_i2info(inode)->lli_smd, NULL,
+                                         llap->llap_cookie,
+                                         ASYNC_READY | ASYNC_URGENT);
+        } else {
+                rc = queue_or_sync_write(exp, ll_i2info(inode)->lli_smd, llap,
+                                         PAGE_SIZE, ASYNC_READY |
+                                         ASYNC_URGENT);
+        }
+        if (rc)
+                page_cache_release(page);
+out:
+        if (rc)
+                unlock_page(page);
+        RETURN(rc);
+}
 
 static unsigned long ll_ra_count_get(struct ll_sb_info *sbi, unsigned long len)
 {
+        struct ll_ra_info *ra = &sbi->ll_ra_info;
         unsigned long ret;
         ENTRY;
 
         spin_lock(&sbi->ll_lock);
-        ret = min(sbi->ll_max_read_ahead_pages - sbi->ll_read_ahead_pages,
-                  len);
-        sbi->ll_read_ahead_pages += ret;
+        ret = min(ra->ra_max_pages - ra->ra_cur_pages, len);
+        ra->ra_cur_pages += ret;
         spin_unlock(&sbi->ll_lock);
 
         RETURN(ret);
@@ -504,10 +570,11 @@ static unsigned long ll_ra_count_get(struct ll_sb_info *sbi, unsigned long len)
 
 static void ll_ra_count_put(struct ll_sb_info *sbi, unsigned long len)
 {
+        struct ll_ra_info *ra = &sbi->ll_ra_info;
         spin_lock(&sbi->ll_lock);
-        LASSERTF(sbi->ll_read_ahead_pages >= len, "r_a_p %lu len %lu\n",
-                 sbi->ll_read_ahead_pages, len);
-        sbi->ll_read_ahead_pages -= len;
+        LASSERTF(ra->ra_cur_pages >= len, "r_c_p %lu len %lu\n",
+                 ra->ra_cur_pages, len);
+        ra->ra_cur_pages -= len;
         spin_unlock(&sbi->ll_lock);
 }
 
@@ -552,7 +619,10 @@ void ll_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
                 llap_write_complete(page->mapping->host, llap);
                 ll_try_done_writing(page->mapping->host);
         }
-
+        
+        if (PageWriteback(page)) {
+                end_page_writeback(page);
+        }
         page_cache_release(page);
         EXIT;
 }
@@ -604,7 +674,7 @@ void ll_removepage(struct page *page)
 
         /* this unconditional free is only safe because the page lock
          * is providing exclusivity to memory pressure/truncate/writeback..*/
-        page->private = 0;
+        __clear_page_ll_data(page);
 
         spin_lock(&sbi->ll_lock);
         if (!list_empty(&llap->llap_proc_item))
@@ -646,6 +716,7 @@ static int ll_issue_page_read(struct obd_export *exp,
 
         page_cache_get(page);
         llap->llap_defer_uptodate = defer;
+        llap->llap_ra_used = 0;
         rc = obd_queue_group_io(exp, ll_i2info(page->mapping->host)->lli_smd,
                                 NULL, oig, llap->llap_cookie, OBD_BRW_READ, 0,
                                 PAGE_SIZE, 0, ASYNC_COUNT_STABLE | ASYNC_READY
@@ -657,12 +728,55 @@ static int ll_issue_page_read(struct obd_export *exp,
         RETURN(rc);
 }
 
+static void ll_ra_stats_inc_unlocked(struct ll_ra_info *ra, enum ra_stat which)
+{
+        LASSERTF(which >= 0 && which < _NR_RA_STAT, "which: %u\n", which);
+        ra->ra_stats[which]++;
+}
+
+static void ll_ra_stats_inc(struct address_space *mapping, enum ra_stat which)
+{
+        struct ll_sb_info *sbi = ll_i2sbi(mapping->host);
+        struct ll_ra_info *ra = &ll_i2sbi(mapping->host)->ll_ra_info;
+
+        spin_lock(&sbi->ll_lock);
+        ll_ra_stats_inc_unlocked(ra, which);
+        spin_unlock(&sbi->ll_lock);
+}
+
+void ll_ra_accounting(struct page *page, struct address_space *mapping)
+{
+        struct ll_async_page *llap;
+
+        llap = llap_from_page(page);
+        if (IS_ERR(llap))
+                return;
+
+        if (!llap->llap_defer_uptodate || llap->llap_ra_used)
+                return;
+
+        ll_ra_stats_inc(mapping, RA_STAT_DISCARDED);
+}
+
 #define RAS_CDEBUG(ras) \
         CDEBUG(D_READA, "lrp %lu c %lu ws %lu wl %lu nra %lu\n",        \
                ras->ras_last_readpage, ras->ras_consecutive,            \
                ras->ras_window_start, ras->ras_window_len,              \
                ras->ras_next_readahead);
 
+static int index_in_window(unsigned long index, unsigned long point,
+                           unsigned long before, unsigned long after)
+{
+        unsigned long start = point - before, end = point + after;
+
+        if (start > point)
+               start = 0;
+        if (end < point)
+               end = ~0;
+
+        return start <= index && index <= end;
+}
+
 static int ll_readahead(struct ll_readahead_state *ras,
                          struct obd_export *exp, struct address_space *mapping,
                          struct obd_io_group *oig, int flags)
@@ -670,16 +784,18 @@ static int ll_readahead(struct ll_readahead_state *ras,
         unsigned long i, start = 0, end = 0, reserved;
         struct ll_async_page *llap;
         struct page *page;
-        int rc, ret = 0;
+        int rc, ret = 0, match_failed = 0;
         __u64 kms;
         ENTRY;
 
         kms = lov_merge_size(ll_i2info(mapping->host)->lli_smd, 1);
-        if (kms == 0)
+        if (kms == 0) {
+                ll_ra_stats_inc(mapping, RA_STAT_ZERO_LEN);
                 RETURN(0);
-
+        }
         spin_lock(&ras->ras_lock);
 
+        /* reserve a part of the read-ahead window that we'll be issuing */
         if (ras->ras_window_len) {
                 start = ras->ras_next_readahead;
                 end = ras->ras_window_start + ras->ras_window_len - 1;
@@ -691,12 +807,16 @@ static int ll_readahead(struct ll_readahead_state *ras,
 
         spin_unlock(&ras->ras_lock);
 
-        if (end == 0)
+        if (end == 0) {
+                ll_ra_stats_inc(mapping, RA_STAT_ZERO_WINDOW);
                 RETURN(0);
+        }
 
         reserved = ll_ra_count_get(ll_i2sbi(mapping->host), end - start + 1);
+        if (reserved < end - start + 1)
+                ll_ra_stats_inc(mapping, RA_STAT_MAX_IN_FLIGHT);
 
-        for (i = start; reserved > 0 && i <= end; i++) {
+        for (i = start; reserved > 0 && !match_failed && i <= end; i++) {
                 /* skip locked pages from previous readpage calls */
                 page = grab_cache_page_nowait(mapping, i);
                 if (page == NULL) {
@@ -718,7 +838,8 @@ static int ll_readahead(struct ll_readahead_state *ras,
                 if ((rc = ll_page_matches(page, flags)) <= 0) {
                         LL_CDEBUG_PAGE(D_READA | D_PAGE, page,
                                        "lock match failed: rc %d\n", rc);
-                        i = end;
+                        ll_ra_stats_inc(mapping, RA_STAT_FAILED_MATCH);
+                        match_failed = 1;
                         goto next_page;
                 }
 
@@ -742,20 +863,36 @@ static int ll_readahead(struct ll_readahead_state *ras,
         LASSERTF(reserved >= 0, "reserved %lu\n", reserved);
         if (reserved != 0)
                 ll_ra_count_put(ll_i2sbi(mapping->host), reserved);
+
+        if (i == end + 1 && end == (kms >> PAGE_CACHE_SHIFT))
+                ll_ra_stats_inc(mapping, RA_STAT_EOF);
+
+        /* if we didn't get to the end of the region we reserved from
+         * the ras we need to go back and update the ras so that the
+         * next read-ahead tries from where we left off.  we only do so
+         * if the region we failed to issue read-ahead on is still ahead
+         * of the app and behind the next index to start read-ahead from */
+        if (i != end + 1) {
+                spin_lock(&ras->ras_lock);
+                if (i < ras->ras_next_readahead &&
+                    index_in_window(i, ras->ras_window_start, 0,
+                                    ras->ras_window_len)) {
+                        ras->ras_next_readahead = i;
+                        RAS_CDEBUG(ras);
+                }
+                spin_unlock(&ras->ras_lock);
+        }
+
         RETURN(ret);
 }
 
-static void ras_set_start(struct ll_readahead_state *ras,
-                               unsigned long index)
+static void ras_set_start(struct ll_readahead_state *ras, unsigned long index)
 {
         ras->ras_window_start = index & (~(PTLRPC_MAX_BRW_PAGES - 1));
-        ras->ras_next_readahead = max(ras->ras_window_start,
-                                      ras->ras_next_readahead);
 }
 
 /* called with the ras_lock held or from places where it doesn't matter */
-static void ras_reset(struct ll_readahead_state *ras,
-                      unsigned long index)
+static void ras_reset(struct ll_readahead_state *ras, unsigned long index)
 {
         ras->ras_last_readpage = index;
         ras->ras_consecutive = 1;
@@ -772,14 +909,35 @@ void ll_readahead_init(struct inode *inode, struct ll_readahead_state *ras)
         ras_reset(ras, 0);
 }
 
-static void ras_update(struct ll_readahead_state *ras,
-                       unsigned long index, unsigned long max)
+static void ras_update(struct ll_sb_info *sbi, struct ll_readahead_state *ras,
+                       unsigned long index, unsigned hit)
 {
+        struct ll_ra_info *ra = &sbi->ll_ra_info;
+        int zero = 0;
         ENTRY;
 
+        spin_lock(&sbi->ll_lock);
         spin_lock(&ras->ras_lock);
+        
+        ll_ra_stats_inc_unlocked(ra, hit ? RA_STAT_HIT : RA_STAT_MISS);
+
+        /* reset the read-ahead window in two cases.  First when the app seeks
+         * or reads to some other part of the file.  Secondly if we get a
+         * read-ahead miss that we think we've previously issued.  This can
+         * be a symptom of there being so many read-ahead pages that the VM is
+         * reclaiming it before we get to it. */
+        if (!index_in_window(index, ras->ras_last_readpage, 8, 8)) {
+                zero = 1;
+                ll_ra_stats_inc_unlocked(ra, RA_STAT_DISTANT_READPAGE);
+        } else if (!hit && ras->ras_window_len &&
+                   index < ras->ras_next_readahead &&
+                   index_in_window(index, ras->ras_window_start, 0,
+                                   ras->ras_window_len)) {
+                zero = 1;
+                ll_ra_stats_inc_unlocked(ra, RA_STAT_MISS_IN_WINDOW);
+        }
 
-        if (index != ras->ras_last_readpage + 1) {
+        if (zero) {
                 ras_reset(ras, index);
                 GOTO(out_unlock, 0);
         }
@@ -787,8 +945,12 @@ static void ras_update(struct ll_readahead_state *ras,
         ras->ras_last_readpage = index;
         ras->ras_consecutive++;
         ras_set_start(ras, index);
+        ras->ras_next_readahead = max(ras->ras_window_start,
+                                      ras->ras_next_readahead);
 
-        if (ras->ras_consecutive == 2) {
+        /* wait for a few pages to arrive before issuing readahead to avoid
+         * the worst overutilization */
+        if (ras->ras_consecutive == 3) {
                 ras->ras_window_len = PTLRPC_MAX_BRW_PAGES;
                 GOTO(out_unlock, 0);
         }
@@ -798,13 +960,16 @@ static void ras_update(struct ll_readahead_state *ras,
         if ((index & (PTLRPC_MAX_BRW_PAGES - 1)) == 
             (PTLRPC_MAX_BRW_PAGES >> 1)) {
                 ras->ras_window_len += PTLRPC_MAX_BRW_PAGES;
-                ras->ras_window_len = min(ras->ras_window_len, max);
+                ras->ras_window_len = min(ras->ras_window_len,
+                                          ra->ra_max_pages);
+
         }
 
         EXIT;
 out_unlock:
         RAS_CDEBUG(ras);
         spin_unlock(&ras->ras_lock);
+        spin_unlock(&sbi->ll_lock);
         return;
 }
 
@@ -847,10 +1012,11 @@ int ll_readpage(struct file *filp, struct page *page)
                 GOTO(out, rc = PTR_ERR(llap));
 
         if (ll_i2sbi(inode)->ll_flags & LL_SBI_READAHEAD)
-                ras_update(&fd->fd_ras, page->index, 
-                           ll_i2sbi(inode)->ll_max_read_ahead_pages);
-
+                ras_update(ll_i2sbi(inode), &fd->fd_ras, page->index,
+                           llap->llap_defer_uptodate);
+        
         if (llap->llap_defer_uptodate) {
+                llap->llap_ra_used = 1;
                 rc = ll_readahead(&fd->fd_ras, exp, page->mapping, oig,
                                   fd->fd_flags);
                 if (rc > 0)
@@ -869,17 +1035,10 @@ int ll_readpage(struct file *filp, struct page *page)
         }
 
         if (rc == 0) {
-                static unsigned long next_print;
-                CDEBUG(D_INODE, "ino %lu page %lu (%llu) didn't match a lock\n",
-                       inode->i_ino, page->index,
-                       (long long)page->index << PAGE_CACHE_SHIFT);
-                if (time_after(jiffies, next_print)) {
-                        CWARN("ino %lu page %lu (%llu) not covered by "
-                               "a lock (mmap?).  check debug logs.\n",
-                               inode->i_ino, page->index,
-                               (long long)page->index << PAGE_CACHE_SHIFT);
-                        next_print = jiffies + 30 * HZ;
-                }
+                CWARN("ino %lu page %lu (%llu) not covered by "
+                      "a lock (mmap?).  check debug logs.\n",
+                      inode->i_ino, page->index,
+                      (long long)page->index << PAGE_CACHE_SHIFT);
         }
 
         rc = ll_issue_page_read(exp, llap, oig, 0);