Whamcloud - gitweb
LU-11221 osd: allow concurrent bulks from pagecache
[fs/lustre-release.git] / lustre / osd-ldiskfs / osd_io.c
index ea8dde5..f140e2e 100644 (file)
@@ -630,6 +630,7 @@ static int osd_map_remote_to_local(loff_t offset, ssize_t len, int *nrpages,
                lnb->lnb_rc = 0;
                lnb->lnb_guard_rpc = 0;
                lnb->lnb_guard_disk = 0;
+               lnb->lnb_locked = 0;
 
                 LASSERTF(plen <= len, "plen %u, len %lld\n", plen,
                          (long long) len);
@@ -737,17 +738,18 @@ static int osd_bufs_put(const struct lu_env *env, struct dt_object *dt,
 
                if (page == NULL)
                        continue;
-               LASSERT(PageLocked(page));
 
                /* if the page isn't cached, then reset uptodate
                 * to prevent reuse */
                if (test_bit(PG_private_2, &page->flags)) {
                        clear_bit(PG_private_2, &page->flags);
                        ClearPageUptodate(page);
-                       unlock_page(page);
+                       if (lnb[i].lnb_locked)
+                               unlock_page(page);
                        oti->oti_dio_pages_used--;
                } else {
-                       unlock_page(page);
+                       if (lnb[i].lnb_locked)
+                               unlock_page(page);
                        if (pagevec_add(&pvec, page) == 0)
                                pagevec_release(&pvec);
                }
@@ -819,6 +821,7 @@ static int osd_bufs_get(const struct lu_env *env, struct dt_object *dt,
                if (lnb->lnb_page == NULL)
                        GOTO(cleanup, rc = -ENOMEM);
 
+               lnb->lnb_locked = 1;
                wait_on_page_writeback(lnb->lnb_page);
                BUG_ON(PageWriteback(lnb->lnb_page));
 
@@ -1316,16 +1319,23 @@ static int osd_read_prep(const struct lu_env *env, struct dt_object *dt,
                if (OBD_FAIL_CHECK(OBD_FAIL_OST_FAKE_RW))
                        SetPageUptodate(lnb[i].lnb_page);
 
+               if (cache == 0)
+                       generic_error_remove_page(inode->i_mapping,
+                                                 lnb[i].lnb_page);
+
                if (PageUptodate(lnb[i].lnb_page)) {
                        cache_hits++;
+                       unlock_page(lnb[i].lnb_page);
                } else {
                        cache_misses++;
                        osd_iobuf_add_page(iobuf, &lnb[i]);
                }
-
-               if (cache == 0)
-                       generic_error_remove_page(inode->i_mapping,
-                                                 lnb[i].lnb_page);
+               /* no need to unlock in osd_bufs_put(), the sooner page is
+                * unlocked, the earlier another client can access it.
+                * notice real unlock_page() can be called few lines
+                * below after osd_do_bio(). lnb is a per-thread, so it's
+                * fine to have PG_locked and lnb_locked inconsistent here */
+               lnb[i].lnb_locked = 0;
        }
        end = ktime_get();
        timediff = ktime_us_delta(end, start);
@@ -1341,16 +1351,22 @@ static int osd_read_prep(const struct lu_env *env, struct dt_object *dt,
                lprocfs_counter_add(osd->od_stats, LPROC_OSD_CACHE_ACCESS,
                                    cache_hits + cache_misses);
 
-        if (iobuf->dr_npages) {
+       if (iobuf->dr_npages) {
                rc = osd_ldiskfs_map_inode_pages(inode, iobuf->dr_pages,
                                                 iobuf->dr_npages,
                                                 iobuf->dr_blocks, 0);
-                rc = osd_do_bio(osd, inode, iobuf);
+               rc = osd_do_bio(osd, inode, iobuf);
 
-                /* IO stats will be done in osd_bufs_put() */
-        }
+               /* IO stats will be done in osd_bufs_put() */
 
-        RETURN(rc);
+               /* early release to let others read data during the bulk */
+               for (i = 0; i < iobuf->dr_npages; i++) {
+                       LASSERT(PageLocked(iobuf->dr_pages[i]));
+                       unlock_page(iobuf->dr_pages[i]);
+               }
+       }
+
+       RETURN(rc);
 }
 
 /*