Whamcloud - gitweb
Land b_smallfix onto HEAD (20040223_1817)
[fs/lustre-release.git] / lustre / llite / rw.c
index bf0594c..c9ee1db 100644 (file)
@@ -55,7 +55,7 @@
 #endif
 
 /* SYNCHRONOUS I/O to object storage for an inode */
-static int ll_brw(int cmd, struct inode *inode, struct obdo *oa, 
+static int ll_brw(int cmd, struct inode *inode, struct obdo *oa,
                   struct page *page, int flags)
 {
         struct ll_inode_info *lli = ll_i2info(inode);
@@ -243,13 +243,8 @@ static int ll_ap_make_ready(void *data, int cmd)
 
         page = llap->llap_page;
 
-        if (cmd == OBD_BRW_READ) {
-                /* _sync_page beat us to it and is about to call
-                 * _set_async_flags which will fire off rpcs again */
-               if (!test_and_clear_bit(LL_PRIVBITS_READ, &page->private))
-                        RETURN(-EAGAIN);
+        if (cmd == OBD_BRW_READ)
                 RETURN(0);
-        }
 
         /* we're trying to write, but the page is locked.. come back later */
         if (TryLockPage(page))
@@ -327,9 +322,6 @@ static struct obd_async_page_ops ll_async_page_ops = {
         .ap_completion =        ll_ap_completion,
 };
 
-#define page_llap(page) \
-        ((struct ll_async_page *)((page)->private & ~LL_PRIVBITS_MASK))
-
 /* XXX have the exp be an argument? */
 struct ll_async_page *llap_from_page(struct page *page)
 {
@@ -340,12 +332,12 @@ struct ll_async_page *llap_from_page(struct page *page)
         int rc;
         ENTRY;
 
-        llap = page_llap(page);
+        llap = (struct ll_async_page *)page->private;
         if (llap != NULL) {
                 if (llap->llap_magic != LLAP_MAGIC)
                         RETURN(ERR_PTR(-EINVAL));
                 RETURN(llap);
-        } 
+        }
 
         exp = ll_i2obdexp(page->mapping->host);
         if (exp == NULL)
@@ -355,8 +347,7 @@ struct ll_async_page *llap_from_page(struct page *page)
         if (llap == NULL)
                 RETURN(ERR_PTR(-ENOMEM));
         llap->llap_magic = LLAP_MAGIC;
-        rc = obd_prep_async_page(exp, ll_i2info(inode)->lli_smd,
-                                 NULL, page, 
+        rc = obd_prep_async_page(exp, ll_i2info(inode)->lli_smd, NULL, page,
                                  (obd_off)page->index << PAGE_SHIFT,
                                  &ll_async_page_ops, llap, &llap->llap_cookie);
         if (rc) {
@@ -364,9 +355,9 @@ struct ll_async_page *llap_from_page(struct page *page)
                 RETURN(ERR_PTR(rc));
         }
 
-        CDEBUG(D_CACHE, "llap %p page %p cookie %p obj off "LPU64"\n", llap, 
+        CDEBUG(D_CACHE, "llap %p page %p cookie %p obj off "LPU64"\n", llap,
                page, llap->llap_cookie, (obd_off)page->index << PAGE_SHIFT);
-        /* also zeroing the PRIVBITS low order bitflags */ 
+        /* also zeroing the PRIVBITS low order bitflags */
         page->private = (unsigned long)llap;
         llap->llap_page = page;
 
@@ -423,23 +414,29 @@ int ll_commit_write(struct file *file, struct page *page, unsigned from,
                 rc = obd_queue_async_io(exp, lsm, NULL, llap->llap_cookie,
                                         OBD_BRW_WRITE, 0, 0, 0, 0);
                 if (rc != 0) { /* async failed, try sync.. */
-                        struct obd_sync_io_container *osic;
-                        osic_init(&osic);
+                        struct obd_io_group *oig;
+                        rc = oig_init(&oig);
+                        if (rc)
+                                GOTO(out, rc);
 
                         llap->llap_write_queued = 0;
-                        rc = obd_queue_sync_io(exp, lsm, NULL, osic,
-                                               llap->llap_cookie,
-                                               OBD_BRW_WRITE, 0, to, 0);
+                        rc = obd_queue_group_io(exp, lsm, NULL, oig,
+                                                llap->llap_cookie,
+                                                OBD_BRW_WRITE, 0, to, 0,
+                                                ASYNC_READY | ASYNC_URGENT |
+                                                ASYNC_COUNT_STABLE |
+                                                ASYNC_GROUP_SYNC);
+
                         if (rc)
-                                GOTO(free_osic, rc);
+                                GOTO(free_oig, rc);
 
-                        rc = obd_trigger_sync_io(exp, lsm, NULL, osic);
+                        rc = obd_trigger_group_io(exp, lsm, NULL, oig);
                         if (rc)
-                                GOTO(free_osic, rc);
+                                GOTO(free_oig, rc);
 
-                        rc = osic_wait(osic);
-free_osic:
-                        osic_release(osic);
+                        rc = oig_wait(oig);
+free_oig:
+                        oig_release(oig);
                         GOTO(out, rc);
                 }
                 LL_CDEBUG_PAGE(page, "write queued\n");
@@ -449,7 +446,7 @@ free_osic:
                                      LPROC_LL_DIRTY_HITS);
         }
 
-        /* put the page in the page cache, from now on ll_removepage is 
+        /* put the page in the page cache, from now on ll_removepage is
          * responsible for cleaning up the llap */
         set_page_dirty(page);
 
@@ -490,22 +487,21 @@ void ll_removepage(struct page *page)
 
         exp = ll_i2obdexp(inode);
         if (exp == NULL) {
-                CERROR("page %p ind %lu gave null export\n", page, 
-                       page->index);
+                CERROR("page %p ind %lu gave null export\n", page, page->index);
                 EXIT;
                 return;
         }
 
         llap = llap_from_page(page);
         if (IS_ERR(llap)) {
-                CERROR("page %p ind %lu couldn't find llap: %ld\n", page, 
+                CERROR("page %p ind %lu couldn't find llap: %ld\n", page,
                        page->index, PTR_ERR(llap));
                 EXIT;
                 return;
         }
 
         //llap_write_complete(inode, llap);
-        rc = obd_teardown_async_page(exp, ll_i2info(inode)->lli_smd, NULL, 
+        rc = obd_teardown_async_page(exp, ll_i2info(inode)->lli_smd, NULL,
                                      llap->llap_cookie);
         if (rc != 0)
                 CERROR("page %p ind %lu failed: %d\n", page, page->index, rc);
@@ -535,46 +531,45 @@ static int ll_page_matches(struct page *page)
         page_extent.l_extent.end =
                 page_extent.l_extent.start + PAGE_CACHE_SIZE - 1;
         flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED;
-        matches = obd_match(ll_i2sbi(inode)->ll_osc_exp, 
-                            ll_i2info(inode)->lli_smd, LDLM_EXTENT, 
+        matches = obd_match(ll_i2sbi(inode)->ll_osc_exp,
+                            ll_i2info(inode)->lli_smd, LDLM_EXTENT,
                             &page_extent, LCK_PR, &flags, inode, &match_lockh);
         if (matches < 0) {
                 LL_CDEBUG_PAGE(page, "lock match failed\n");
                 RETURN(matches);
-        } 
+        }
         if (matches) {
-                obd_cancel(ll_i2sbi(inode)->ll_osc_exp, 
+                obd_cancel(ll_i2sbi(inode)->ll_osc_exp,
                            ll_i2info(inode)->lli_smd, LCK_PR, &match_lockh);
         }
         RETURN(matches);
 }
 
 static int ll_issue_page_read(struct obd_export *exp,
-                              struct ll_async_page *llap, int defer_uptodate)
+                              struct ll_async_page *llap,
+                              struct obd_io_group *oig, int defer)
 {
         struct page *page = llap->llap_page;
         int rc;
 
-        /* we don't issue this page as URGENT so that it can be batched
-         * with other pages by the kernel's read-ahead.  We have a strong
-         * requirement that readpage() callers must call wait_on_page()
-         * or lock_page() to get into ->sync_page() to trigger the IO */
-        llap->llap_defer_uptodate = defer_uptodate;
         page_cache_get(page);
-        set_bit(LL_PRIVBITS_READ, &page->private); /* see ll_sync_page() */
-        rc = obd_queue_async_io(exp, ll_i2info(page->mapping->host)->lli_smd,
-                                NULL, llap->llap_cookie, OBD_BRW_READ, 0,
+        llap->llap_defer_uptodate = defer;
+        rc = obd_queue_group_io(exp, ll_i2info(page->mapping->host)->lli_smd,
+                                NULL, oig, llap->llap_cookie, OBD_BRW_READ, 0,
                                 PAGE_SIZE, 0, ASYNC_COUNT_STABLE);
         if (rc) {
                 LL_CDEBUG_PAGE(page, "read queueing failed\n");
-                clear_bit(LL_PRIVBITS_READ, &page->private);
                 page_cache_release(page);
         }
         RETURN(rc);
 }
 
-static void ll_readahead(struct ll_readahead_state *ras, 
-                         struct obd_export *exp, struct address_space *mapping)
+#define LL_RA_MIN(inode) ((unsigned long)PTL_MD_MAX_PAGES / 2)
+#define LL_RA_MAX(inode) (inode->i_blksize * 3)
+
+static void ll_readahead(struct ll_readahead_state *ras,
+                         struct obd_export *exp, struct address_space *mapping,
+                         struct obd_io_group *oig)
 {
         unsigned long i, start, end;
         struct ll_async_page *llap;
@@ -602,7 +597,7 @@ static void ll_readahead(struct ll_readahead_state *ras,
 
         CDEBUG(D_READA, "ni %lu last %lu win %lu: reading from %lu to %lu\n",
                ras->ras_next_index, ras->ras_last, ras->ras_window,
-               start, end); 
+               start, end);
 
         spin_unlock(&ras->ras_lock);
 
@@ -615,8 +610,8 @@ static void ll_readahead(struct ll_readahead_state *ras,
                  * truncating the page (page->mapping == NULL) */
                 page = grab_cache_page_nowait(mapping, i);
                 if (page == NULL)
-                       continue;
-  
+                       break;
+
                 /* the book-keeping above promises that we've tried
                  * all the indices from start to end, so we don't
                  * stop if anyone returns an error. This may not be good. */
@@ -627,7 +622,7 @@ static void ll_readahead(struct ll_readahead_state *ras,
                 if (IS_ERR(llap) || llap->llap_defer_uptodate)
                         goto next_page;
 
-                rc = ll_issue_page_read(exp, llap, 1);
+                rc = ll_issue_page_read(exp, llap, oig, 1);
                 if (rc == 0)
                         LL_CDEBUG_PAGE(page, "started read-ahead\n");
                 if (rc) {
@@ -640,18 +635,15 @@ static void ll_readahead(struct ll_readahead_state *ras,
         }
 }
 
-/* XXX this should really bubble up somehow.  */
-#define LL_RA_MIN ((unsigned long)PTL_MD_MAX_PAGES / 2)
-#define LL_RA_MAX ((unsigned long)(32 * PTL_MD_MAX_PAGES))
-
 /* called with the ras_lock held or from places where it doesn't matter */
-static void ll_readahead_set(struct ll_readahead_state *ras, 
+static void ll_readahead_set(struct inode *inode,
+                             struct ll_readahead_state *ras,
                              unsigned long index)
 {
         ras->ras_next_index = index;
         if (ras->ras_next_index != ~0UL)
                 ras->ras_next_index++;
-        ras->ras_window = LL_RA_MIN;
+        ras->ras_window = LL_RA_MIN(inode);
         ras->ras_last = ras->ras_next_index + ras->ras_window;
         if (ras->ras_last < ras->ras_next_index)
                 ras->ras_last = ~0UL;
@@ -660,63 +652,64 @@ static void ll_readahead_set(struct ll_readahead_state *ras,
                index);
 }
 
-void ll_readahead_init(struct ll_readahead_state *ras)
+void ll_readahead_init(struct inode *inode, struct ll_readahead_state *ras)
 {
         spin_lock_init(&ras->ras_lock);
-        ll_readahead_set(ras, 0);
+        ll_readahead_set(inode, ras, 0);
 }
 
-static void ll_readahead_update(struct ll_readahead_state *ras, 
+static void ll_readahead_update(struct inode *inode,
+                                struct ll_readahead_state *ras,
                                 unsigned long index, int hit)
 {
         unsigned long issued_start, new_last;
 
         spin_lock(&ras->ras_lock);
 
-        /* we're interested in noticing the index's relation to the 
+        /* we're interested in noticing the index's relation to the
          * previously issued read-ahead pages */
         issued_start = ras->ras_next_index - ras->ras_window - 1;
         if (issued_start > ras->ras_next_index)
                 issued_start = 0;
 
-        CDEBUG(D_READA, "ni %lu last %lu win %lu: %s ind %lu start %lu\n", 
+        CDEBUG(D_READA, "ni %lu last %lu win %lu: %s ind %lu start %lu\n",
                ras->ras_next_index, ras->ras_last, ras->ras_window,
                hit ? "hit" : "miss", index, issued_start);
-        if (!hit && 
+        if (!hit &&
             index == ras->ras_next_index && index == ras->ras_last + 1) {
                 /* special case the kernel's read-ahead running into the
                  * page just beyond our read-ahead window as an extension
                  * of our read-ahead.  sigh.  wishing it was easier to
                  * turn off 2.4's read-ahead. */
-                ras->ras_window = min(LL_RA_MAX, ras->ras_window + 1);
+                ras->ras_window = min(LL_RA_MAX(inode), ras->ras_window + 1);
                 if (index != ~0UL)
                         ras->ras_next_index = index + 1;
                 ras->ras_last = index;
-        } else if (!hit && 
+        } else if (!hit &&
                    (index > issued_start || ras->ras_next_index >= index)) {
                 /* deal with a miss way out of the window.  we interpret
                  * this as a seek and restart the window */
-                ll_readahead_set(ras, index);
+                ll_readahead_set(inode, ras, index);
 
-        } else if (!hit && 
+        } else if (!hit &&
                    issued_start <= index && index < ras->ras_next_index) {
                 /* a miss inside the window?  surely its memory pressure
                  * evicting our read pages before the app can see them.
                  * we shrink the window aggressively */
                 unsigned long old_window = ras->ras_window;
 
-                ras->ras_window = max(ras->ras_window / 2, LL_RA_MIN);
+                ras->ras_window = max(ras->ras_window / 2, LL_RA_MIN(inode));
                 ras->ras_last -= old_window - ras->ras_window;
                 if (ras->ras_next_index > ras->ras_last)
                         ras->ras_next_index = ras->ras_last + 1;
                 CDEBUG(D_READA, "ni %lu last %lu win %lu: miss inside\n",
                        ras->ras_next_index, ras->ras_last, ras->ras_window);
 
-        } else if (hit && 
+        } else if (hit &&
                    issued_start <= index && index < ras->ras_next_index) {
-                /* a hit inside the window.  grow the window by twice the 
+                /* a hit inside the window.  grow the window by twice the
                  * number of pages that are satisified within the window.  */
-                ras->ras_window = min(LL_RA_MAX, ras->ras_window + 2);
+                ras->ras_window = min(LL_RA_MAX(inode), ras->ras_window + 2);
 
                 /* we want the next readahead pass to issue a windows worth
                  * beyond where the app currently is */
@@ -745,8 +738,9 @@ int ll_readpage(struct file *filp, struct page *page)
         struct ll_file_data *fd = filp->private_data;
         struct inode *inode = page->mapping->host;
         struct obd_export *exp;
-        int rc;
         struct ll_async_page *llap;
+        struct obd_io_group *oig = NULL;
+        int rc;
         ENTRY;
 
         LASSERT(PageLocked(page));
@@ -756,6 +750,10 @@ int ll_readpage(struct file *filp, struct page *page)
                (((obd_off)page->index) << PAGE_SHIFT));
         LASSERT(atomic_read(&filp->f_dentry->d_inode->i_count) > 0);
 
+        rc = oig_init(&oig);
+        if (rc < 0)
+                GOTO(out, rc);
+
         exp = ll_i2obdexp(inode);
         if (exp == NULL)
                 GOTO(out, rc = -EINVAL);
@@ -765,15 +763,17 @@ int ll_readpage(struct file *filp, struct page *page)
                 GOTO(out, rc = PTR_ERR(llap));
 
         if (llap->llap_defer_uptodate) {
-                ll_readahead_update(&fd->fd_ras, page->index, 1);
+                ll_readahead_update(inode, &fd->fd_ras, page->index, 1);
+                ll_readahead(&fd->fd_ras, exp, page->mapping, oig);
+                obd_trigger_group_io(exp, ll_i2info(inode)->lli_smd, NULL,
+                                     oig);
                 LL_CDEBUG_PAGE(page, "marking uptodate from defer\n");
                 SetPageUptodate(page);
-                ll_readahead(&fd->fd_ras, exp, page->mapping);
                 unlock_page(page);
-                RETURN(0);
+                GOTO(out_oig, rc = 0);
         }
 
-        ll_readahead_update(&fd->fd_ras, page->index, 0);
+        ll_readahead_update(inode, &fd->fd_ras, page->index, 0);
 
         rc = ll_page_matches(page);
         if (rc < 0)
@@ -789,18 +789,26 @@ int ll_readpage(struct file *filp, struct page *page)
                 }
         }
 
-        rc = ll_issue_page_read(exp, llap, 0);
-        if (rc == 0) {
-                LL_CDEBUG_PAGE(page, "queued readpage\n");
-                if ((ll_i2sbi(inode)->ll_flags & LL_SBI_READAHEAD))
-                        ll_readahead(&fd->fd_ras, exp, page->mapping);
-        }
+        rc = ll_issue_page_read(exp, llap, oig, 0);
+        if (rc)
+                GOTO(out, rc);
+
+        LL_CDEBUG_PAGE(page, "queued readpage\n");
+        if ((ll_i2sbi(inode)->ll_flags & LL_SBI_READAHEAD))
+                ll_readahead(&fd->fd_ras, exp, page->mapping, oig);
+
+        rc = obd_trigger_group_io(exp, ll_i2info(inode)->lli_smd, NULL, oig);
+
 out:
-        if (rc) 
+        if (rc)
                 unlock_page(page);
+out_oig:
+        if (oig != NULL)
+                oig_release(oig);
         RETURN(rc);
 }
 
+#if 0
 /* this is for read pages.  we issue them as ready but not urgent.  when
  * someone waits on them we fire them off, hopefully merged with adjacent
  * reads that were queued by read-ahead.  */
@@ -821,15 +829,16 @@ int ll_sync_page(struct page *page)
         exp = ll_i2obdexp(page->mapping->host);
         if (exp == NULL)
                 RETURN(-EINVAL);
-  
+
         llap = llap_from_page(page);
         if (IS_ERR(llap))
                 RETURN(PTR_ERR(llap));
 
         LL_CDEBUG_PAGE(page, "setting ready|urgent\n");
 
-        rc = obd_set_async_flags(exp, ll_i2info(page->mapping->host)->lli_smd, 
-                                 NULL, llap->llap_cookie, 
+        rc = obd_set_async_flags(exp, ll_i2info(page->mapping->host)->lli_smd,
+                                 NULL, llap->llap_cookie,
                                  ASYNC_READY|ASYNC_URGENT);
         return rc;
 }
+#endif