Whamcloud - gitweb
b=12182
authoralex <alex>
Mon, 13 Oct 2008 11:35:15 +0000 (11:35 +0000)
committeralex <alex>
Mon, 13 Oct 2008 11:35:15 +0000 (11:35 +0000)
i=adilger
i=green
i=nikita

 - oss read-only cache feature

14 files changed:
lustre/autoconf/lustre-core.m4
lustre/include/linux/lustre_compat25.h
lustre/include/obd.h
lustre/include/obd_class.h
lustre/include/obd_support.h
lustre/obdecho/echo.c
lustre/obdecho/echo_client.c
lustre/obdfilter/filter.c
lustre/obdfilter/filter_internal.h
lustre/obdfilter/filter_io.c
lustre/obdfilter/filter_io_26.c
lustre/obdfilter/lproc_obdfilter.c
lustre/ost/ost_handler.c
lustre/tests/sanity.sh

index 8ddecff..09567c3 100644 (file)
@@ -1249,6 +1249,19 @@ AC_DEFINE(HAVE___D_MOVE, 1,
 ])
 ])
 
+
+AC_DEFUN([LC_EXPORT_INVALIDATE_MAPPING_PAGES],
+    [LB_CHECK_SYMBOL_EXPORT([invalidate_mapping_pages], [mm/truncate.c], [
+         AC_DEFINE(HAVE_INVALIDATE_MAPPING_PAGES, 1,
+                        [exported invalidate_mapping_pages])],
+    [LB_CHECK_SYMBOL_EXPORT([invalidate_inode_pages], [mm/truncate.c], [
+         AC_DEFINE(HAVE_INVALIDATE_INODE_PAGES, 1,
+                        [exported invalidate_inode_pages])], [
+       AC_MSG_ERROR([no way to invalidate pages])
+  ])
+    ],[])
+])
+
 # The actual symbol exported varies among architectures, so we need
 # to check many symbols (but only in the current architecture.)  No
 # matter what symbol is exported, the kernel #defines node_to_cpumask
@@ -1572,6 +1585,9 @@ AC_DEFUN([LC_PROG_LINUX],
          LC_VFS_KERN_MOUNT
          LC_INVALIDATEPAGE_RETURN_INT
          LC_UMOUNTBEGIN_HAS_VFSMOUNT
+         if test x$enable_server = xyes ; then
+                LC_EXPORT_INVALIDATE_MAPPING_PAGES
+         fi
 
          #2.6.18 + RHEL5 (fc6)
          LC_PG_FS_MISC
index b6c1496..9a57ebd 100644 (file)
@@ -627,5 +627,9 @@ static inline long labs(long x)
 #define __fls fls
 #endif
 
+#ifdef HAVE_INVALIDATE_INODE_PAGES
+#define invalidate_mapping_pages(mapping,s,e) invalidate_inode_pages(mapping)
+#endif
+
 #endif /* __KERNEL__ */
 #endif /* _COMPAT25_H */
index 72b61d7..8b2507e 100644 (file)
@@ -356,6 +356,8 @@ struct filter_obd {
         obd_size             fo_tot_pending;
 
         obd_size             fo_readcache_max_filesize;
+        int                  fo_read_cache;
+        int                  fo_writethrough_cache;
 
         struct obd_import   *fo_mdc_imp;
         struct obd_uuid      fo_mdc_uuid;
@@ -1329,12 +1331,14 @@ struct obd_ops {
                          obd_id *startid, obd_gr group, void *data);
         int (*o_preprw)(int cmd, struct obd_export *exp, struct obdo *oa,
                         int objcount, struct obd_ioobj *obj,
-                        int niocount, struct niobuf_remote *remote,
-                        struct niobuf_local *local, struct obd_trans_info *oti,
+                        struct niobuf_remote *remote, int *nr_pages,
+                        struct niobuf_local *local,
+                        struct obd_trans_info *oti,
                         struct lustre_capa *capa);
         int (*o_commitrw)(int cmd, struct obd_export *exp, struct obdo *oa,
                           int objcount, struct obd_ioobj *obj,
-                          int niocount, struct niobuf_local *local,
+                          struct niobuf_remote *remote, int pages,
+                          struct niobuf_local *local,
                           struct obd_trans_info *oti, int rc);
         int (*o_enqueue)(struct obd_export *, struct obd_info *oinfo,
                          struct ldlm_enqueue_info *einfo,
index fa44819..45e544e 100644 (file)
@@ -1443,7 +1443,7 @@ static inline int obd_teardown_async_page(struct obd_export *exp,
 
 static inline int obd_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
                              int objcount, struct obd_ioobj *obj,
-                             int niocount, struct niobuf_remote *remote,
+                             struct niobuf_remote *remote, int *pages,
                              struct niobuf_local *local,
                              struct obd_trans_info *oti,
                              struct lustre_capa *capa)
@@ -1454,14 +1454,15 @@ static inline int obd_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
         EXP_CHECK_DT_OP(exp, preprw);
         EXP_COUNTER_INCREMENT(exp, preprw);
 
-        rc = OBP(exp->exp_obd, preprw)(cmd, exp, oa, objcount, obj, niocount,
-                                       remote, local, oti, capa);
+        rc = OBP(exp->exp_obd, preprw)(cmd, exp, oa, objcount, obj, remote,
+                                       pages, local, oti, capa);
         RETURN(rc);
 }
 
 static inline int obd_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
                                int objcount, struct obd_ioobj *obj,
-                               int niocount, struct niobuf_local *local,
+                               struct niobuf_remote *rnb, int pages,
+                               struct niobuf_local *local,
                                struct obd_trans_info *oti, int rc)
 {
         ENTRY;
@@ -1469,8 +1470,8 @@ static inline int obd_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
         EXP_CHECK_DT_OP(exp, commitrw);
         EXP_COUNTER_INCREMENT(exp, commitrw);
 
-        rc = OBP(exp->exp_obd, commitrw)(cmd, exp, oa, objcount, obj, niocount,
-                                         local, oti, rc);
+        rc = OBP(exp->exp_obd, commitrw)(cmd, exp, oa, objcount, obj,
+                                         rnb, pages, local, oti, rc);
         RETURN(rc);
 }
 
index 4a9a82f..7b5d777 100644 (file)
@@ -229,6 +229,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_OST_PAUSE_CREATE        0x223
 #define OBD_FAIL_OST_BRW_PAUSE_PACK      0x224
 #define OBD_FAIL_OST_CONNECT_NET2        0x225
+#define OBD_FAIL_OST_NOMEM               0x226
 
 #define OBD_FAIL_LDLM                    0x300
 #define OBD_FAIL_LDLM_NAMESPACE_NEW      0x301
index f90afb1..5c77003 100644 (file)
@@ -289,16 +289,79 @@ echo_page_debug_check(cfs_page_t *page, obd_id id,
 /* This allows us to verify that desc_private is passed unmolested */
 #define DESC_PRIV 0x10293847
 
+static int echo_map_nb_to_lb(struct obdo *oa, struct obd_ioobj *obj,
+                             struct niobuf_remote *nb, int *pages,
+                             struct niobuf_local *lb, int cmd, int *left)
+{
+        int gfp_mask = (obj->ioo_id & 1) ? CFS_ALLOC_HIGHUSER : CFS_ALLOC_STD;
+        int ispersistent = obj->ioo_id == ECHO_PERSISTENT_OBJID;
+        int debug_setup = (!ispersistent &&
+                           (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
+                           (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
+        struct niobuf_local *res = lb;
+        obd_off offset = nb->offset;
+        int len = nb->len;
+
+        while (len > 0) {
+                int plen = CFS_PAGE_SIZE - (offset & (CFS_PAGE_SIZE-1));
+                if (len < plen)
+                        plen = len;
+
+                /* check for local buf overflow */
+                if (*left == 0)
+                        return -EINVAL;
+
+                res->offset = offset;
+                res->len = plen;
+                LASSERT((res->offset & ~CFS_PAGE_MASK) + res->len <= CFS_PAGE_SIZE);
+
+
+                if (ispersistent &&
+                    (res->offset >> CFS_PAGE_SHIFT) < ECHO_PERSISTENT_PAGES) {
+                        res->page = echo_persistent_pages[res->offset >>
+                                CFS_PAGE_SHIFT];
+                        /* Take extra ref so __free_pages() can be called OK */
+                        cfs_get_page (res->page);
+                } else {
+                        OBD_PAGE_ALLOC(res->page, gfp_mask);
+                        if (res->page == NULL) {
+                                CERROR("can't get page for id " LPU64"\n",
+                                       obj->ioo_id);
+                                return -ENOMEM;
+                        }
+                }
+
+                CDEBUG(D_PAGE, "$$$$ get page %p @ "LPU64" for %d\n",
+                       res->page, res->offset, res->len);
+
+                if (cmd & OBD_BRW_READ)
+                        res->rc = res->len;
+
+                if (debug_setup)
+                        echo_page_debug_setup(res->page, cmd, obj->ioo_id,
+                                              res->offset, res->len);
+
+                offset += plen;
+                len -= plen;
+                res++;
+
+                (*left)--;
+                (*pages)++;
+        }
+        
+        return 0;
+}
+
 int echo_preprw(int cmd, struct obd_export *export, struct obdo *oa,
-                int objcount, struct obd_ioobj *obj, int niocount,
-                struct niobuf_remote *nb, struct niobuf_local *res,
+                int objcount, struct obd_ioobj *obj, struct niobuf_remote *nb,
+                int *pages, struct niobuf_local *res,
                 struct obd_trans_info *oti, struct lustre_capa *unused)
 {
         struct obd_device *obd;
         struct niobuf_local *r = res;
         int tot_bytes = 0;
         int rc = 0;
-        int i;
+        int i, left;
         ENTRY;
 
         obd = export->exp_obd;
@@ -308,59 +371,33 @@ int echo_preprw(int cmd, struct obd_export *export, struct obdo *oa,
         /* Temp fix to stop falling foul of osc_announce_cached() */
         oa->o_valid &= ~(OBD_MD_FLBLOCKS | OBD_MD_FLGRANT);
 
-        memset(res, 0, sizeof(*res) * niocount);
+        memset(res, 0, sizeof(*res) * *pages);
 
         CDEBUG(D_PAGE, "%s %d obdos with %d IOs\n",
-               cmd == OBD_BRW_READ ? "reading" : "writing", objcount, niocount);
+               cmd == OBD_BRW_READ ? "reading" : "writing", objcount, *pages);
 
         if (oti)
                 oti->oti_handle = (void *)DESC_PRIV;
 
+        left = *pages;
+        *pages = 0;
+
         for (i = 0; i < objcount; i++, obj++) {
-                int gfp_mask = (obj->ioo_id & 1) ? CFS_ALLOC_HIGHUSER : CFS_ALLOC_STD;
-                int ispersistent = obj->ioo_id == ECHO_PERSISTENT_OBJID;
-                int debug_setup = (!ispersistent &&
-                                   (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
-                                   (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
                 int j;
 
                 for (j = 0 ; j < obj->ioo_bufcnt ; j++, nb++, r++) {
 
-                        if (ispersistent &&
-                            (nb->offset >> CFS_PAGE_SHIFT) < ECHO_PERSISTENT_PAGES) {
-                                r->page = echo_persistent_pages[nb->offset >>
-                                                                CFS_PAGE_SHIFT];
-                                /* Take extra ref so __free_pages() can be called OK */
-                                cfs_get_page (r->page);
-                        } else {
-                                OBD_PAGE_ALLOC(r->page, gfp_mask);
-                                if (r->page == NULL) {
-                                        CERROR("can't get page %u/%u for id "
-                                               LPU64"\n",
-                                               j, obj->ioo_bufcnt, obj->ioo_id);
-                                        GOTO(preprw_cleanup, rc = -ENOMEM);
-                                }
-                        }
+                        rc = echo_map_nb_to_lb(oa, obj, nb, pages,
+                                               res + *pages, cmd, &left);
+                        if (rc)
+                                GOTO(preprw_cleanup, rc);
 
                         tot_bytes += nb->len;
-
-                        atomic_inc(&obd->u.echo.eo_prep);
-
-                        r->offset = nb->offset;
-                        r->len = nb->len;
-                        LASSERT((r->offset & ~CFS_PAGE_MASK) + r->len <= CFS_PAGE_SIZE);
-
-                        CDEBUG(D_PAGE, "$$$$ get page %p @ "LPU64" for %d\n",
-                               r->page, r->offset, r->len);
-
-                        if (cmd & OBD_BRW_READ)
-                                r->rc = r->len;
-
-                        if (debug_setup)
-                                echo_page_debug_setup(r->page, cmd, obj->ioo_id,
-                                                      r->offset, r->len);
                 }
         }
+
+        atomic_add(*pages, &obd->u.echo.eo_prep);
+
         if (cmd & OBD_BRW_READ)
                 lprocfs_counter_add(obd->obd_stats, LPROC_ECHO_READ_BYTES,
                                     tot_bytes);
@@ -379,21 +416,22 @@ preprw_cleanup:
          * all down again.  I believe that this is what the in-kernel
          * prep/commit operations do.
          */
-        CERROR("cleaning up %ld pages (%d obdos)\n", (long)(r - res), objcount);
-        while (r-- > res) {
-                cfs_kunmap(r->page);
+        CERROR("cleaning up %u pages (%d obdos)\n", *pages, objcount);
+        for (i = 0; i < *pages; i++) {
+                cfs_kunmap(res[i].page);
                 /* NB if this is a persistent page, __free_pages will just
                  * lose the extra ref gained above */
-                OBD_PAGE_FREE(r->page);
+                OBD_PAGE_FREE(res[i].page);
+                res[i].page = NULL;
                 atomic_dec(&obd->u.echo.eo_prep);
         }
-        memset(res, 0, sizeof(*res) * niocount);
 
         return rc;
 }
 
 int echo_commitrw(int cmd, struct obd_export *export, struct obdo *oa,
-                  int objcount, struct obd_ioobj *obj, int niocount,
+                  int objcount, struct obd_ioobj *obj,
+                  struct niobuf_remote *rb, int niocount,
                   struct niobuf_local *res, struct obd_trans_info *oti, int rc)
 {
         struct obd_device *obd;
index 72297c3..1be3ce0 100644 (file)
@@ -656,8 +656,8 @@ static void ec_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
 
 static int ec_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
 {
-        struct echo_async_state *eas;
         struct echo_async_page *eap = eap_from_cookie(data);
+        struct echo_async_state *eas;
 
         eas = eap->eap_eas;
 
@@ -878,6 +878,8 @@ static int echo_client_prep_commit(struct obd_export *exp, int rw,
         off = offset;
 
         for(; tot_pages; tot_pages -= npages) {
+                int lpages;
+
                 if (tot_pages < npages)
                         npages = tot_pages;
 
@@ -889,12 +891,14 @@ static int echo_client_prep_commit(struct obd_export *exp, int rw,
                 ioo.ioo_bufcnt = npages;
                 oti->oti_transno = 0;
 
-                ret = obd_preprw(rw, exp, oa, 1, &ioo, npages, rnb, lnb, oti,
+                lpages = npages;
+                ret = obd_preprw(rw, exp, oa, 1, &ioo, rnb, &lpages, lnb, oti,
                                  NULL);
                 if (ret != 0)
                         GOTO(out, ret);
+                LASSERT(lpages == npages);
 
-                for (i = 0; i < npages; i++) {
+                for (i = 0; i < lpages; i++) {
                         cfs_page_t *page = lnb[i].page;
 
                         /* read past eof? */
@@ -918,7 +922,7 @@ static int echo_client_prep_commit(struct obd_export *exp, int rw,
                                                              rnb[i].len);
                 }
 
-                ret = obd_commitrw(rw, exp, oa, 1, &ioo, npages, lnb, oti, ret);
+                ret = obd_commitrw(rw, exp, oa, 1,&ioo,rnb,npages,lnb,oti,ret);
                 if (ret != 0)
                         GOTO(out, ret);
         }
index 55989ca..343b777 100644 (file)
@@ -2013,6 +2013,8 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg,
         CFS_INIT_LIST_HEAD(&filter->fo_export_list);
         sema_init(&filter->fo_alloc_lock, 1);
         init_brw_stats(&filter->fo_filter_stats);
+        filter->fo_read_cache = 1; /* enable read-only cache by default */
+        filter->fo_writethrough_cache = 0; /* disable writethrough cache */
         filter->fo_readcache_max_filesize = FILTER_MAX_CACHE_SIZE;
         filter->fo_fmd_max_num = FILTER_FMD_MAX_NUM_DEFAULT;
         filter->fo_fmd_max_age = FILTER_FMD_MAX_AGE_DEFAULT;
@@ -2136,6 +2138,21 @@ static int filter_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
                 lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
                                      LPROCFS_CNTR_AVGMINMAX,
                                      "write_bytes", "bytes");
+                lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_GET_PAGE,
+                                     LPROCFS_CNTR_AVGMINMAX|LPROCFS_CNTR_STDDEV,
+                                     "get_page", "usec");
+                lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_NO_PAGE,
+                                     LPROCFS_CNTR_AVGMINMAX,
+                                     "get_page_failures", "num");
+                lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_CACHE_ACCESS,
+                                     LPROCFS_CNTR_AVGMINMAX,
+                                     "cache_access", "pages");
+                lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_CACHE_HIT,
+                                     LPROCFS_CNTR_AVGMINMAX,
+                                     "cache_hit", "pages");
+                lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_CACHE_MISS,
+                                     LPROCFS_CNTR_AVGMINMAX,
+                                     "cache_miss", "pages");
 
                 lproc_filter_attach_seqstat(obd);
                 obd->obd_proc_exports_entry = lprocfs_register("exports",
@@ -3207,13 +3224,8 @@ int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry,
         }
 
         if (locked) {
-                /* Let's flush truncated page on disk immediately, then we can
-                 * avoid need to search for page aliases before directio writes
-                 * and this sort of stuff at expense of somewhat slower
-                 * truncates not on a page boundary. I believe this is the only
-                 * place in filter code that can lead to pages getting to
-                 * pagecache so far. */
-                filter_clear_truncated_page(inode);
+                /* truncate can leave dirty pages in the cache.
+                 * we'll take care of them in write path -bzzz */
                 UNLOCK_INODE_MUTEX(inode);
                 locked = 0;
         }
index cbd0cc0..38a941c 100644 (file)
@@ -103,6 +103,11 @@ void filter_fmd_expire(struct obd_export *exp);
 enum {
         LPROC_FILTER_READ_BYTES = 0,
         LPROC_FILTER_WRITE_BYTES = 1,
+        LPROC_FILTER_GET_PAGE = 2,
+        LPROC_FILTER_NO_PAGE = 3,
+        LPROC_FILTER_CACHE_ACCESS = 4,
+        LPROC_FILTER_CACHE_HIT = 5,
+        LPROC_FILTER_CACHE_MISS = 6,
         LPROC_FILTER_LAST,
 };
 
@@ -155,20 +160,21 @@ extern struct ldlm_valblock_ops filter_lvbo;
 
 /* filter_io.c */
 int filter_preprw(int cmd, struct obd_export *, struct obdo *, int objcount,
-                  struct obd_ioobj *, int niocount, struct niobuf_remote *,
-                  struct niobuf_local *, struct obd_trans_info *,
+                  struct obd_ioobj *, struct niobuf_remote *,
+                  int *, struct niobuf_local *, struct obd_trans_info *,
                   struct lustre_capa *);
 int filter_commitrw(int cmd, struct obd_export *, struct obdo *, int objcount,
-                    struct obd_ioobj *, int niocount, struct niobuf_local *,
-                    struct obd_trans_info *, int rc);
+                    struct obd_ioobj *, struct niobuf_remote *,  int,
+                    struct niobuf_local *, struct obd_trans_info *, int rc);
 int filter_brw(int cmd, struct obd_export *, struct obd_info *oinfo,
                obd_count oa_bufs, struct brw_page *pga, struct obd_trans_info *);
-void flip_into_page_cache(struct inode *inode, struct page *new_page);
+void filter_invalidate_cache(struct obd_device *, struct obd_ioobj *,
+                             struct niobuf_remote *, struct inode *);
 
 /* filter_io_*.c */
 struct filter_iobuf;
 int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount,
-                          struct obd_ioobj *obj, int niocount,
+                          struct obd_ioobj *obj, struct niobuf_remote *, int,
                           struct niobuf_local *res, struct obd_trans_info *oti,
                           int rc);
 obd_size filter_grant_space_left(struct obd_export *exp);
index fd449de..05e158e 100644 (file)
 
 int *obdfilter_created_scratchpad;
 
-static int filter_alloc_dio_page(struct obd_device *obd, struct inode *inode,
-                                 struct niobuf_local *lnb)
-{
-        struct page *page;
-
-        LASSERT(lnb->page != NULL);
-
-        page = lnb->page;
-#if 0
-        POISON_PAGE(page, 0xf1);
-        if (lnb->len != CFS_PAGE_SIZE) {
-                memset(kmap(page) + lnb->len, 0, CFS_PAGE_SIZE - lnb->len);
-                kunmap(page);
-        }
-#endif
-        page->index = lnb->offset >> CFS_PAGE_SHIFT;
-
-        RETURN(0);
-}
-
-static void filter_free_dio_pages(int objcount, struct obd_ioobj *obj,
-                           int niocount, struct niobuf_local *res)
-{
-        int i, j;
-
-        for (i = 0; i < objcount; i++, obj++) {
-                for (j = 0 ; j < obj->ioo_bufcnt ; j++, res++)
-                                res->page = NULL;
-        }
-}
-
 /* Grab the dirty and seen grant announcements from the incoming obdo.
  * We will later calculate the clients new grant and return it.
  * Caller must hold osfs lock */
@@ -272,22 +241,118 @@ long filter_grant(struct obd_export *exp, obd_size current_grant,
         return grant;
 }
 
+/*
+ * the routine is used to request pages from pagecache
+ *
+ * use GFP_NOFS not allowing to enter FS as the client can run on this node
+ * and we might end waiting on a page he sent in the request we're serving.
+ *
+ * use NORETRY so that the allocator doesn't go crazy: chance to more lucky
+ * thread have enough memory to complete his request. for our request client
+ * will do resend hopefully -bzzz
+ */
+static struct page * filter_get_page(struct obd_device *obd,
+                                     struct inode *inode,
+                                     obd_off offset)
+{
+        struct page *page;
+
+        page = find_or_create_page(inode->i_mapping, offset >> CFS_PAGE_SHIFT,
+                                   GFP_NOFS | __GFP_NORETRY);
+        if (unlikely(page == NULL))
+                lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_NO_PAGE, 1);
+
+        return page;
+}
+
+/*
+ * the routine initializes array of local_niobuf from remote_niobuf
+ */
+static int filter_map_remote_to_local(int objcount, struct obd_ioobj *obj,
+                                      struct niobuf_remote *nb,
+                                      int *nrpages, struct niobuf_local *res)
+{
+        struct niobuf_remote *rnb;
+        struct niobuf_local *lnb;
+        int i, max;
+        ENTRY;
+
+        /* we don't support multiobject RPC yet
+         * ost_brw_read() and ost_brw_write() check this */
+        LASSERT(objcount == 1);
+
+        max = *nrpages;
+        *nrpages = 0;
+        for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt; i++, rnb++) {
+                obd_off offset = rnb->offset;
+                unsigned int len = rnb->len;
+
+                while (len > 0) {
+                        int poff = offset & (CFS_PAGE_SIZE - 1);
+                        int plen = CFS_PAGE_SIZE - poff;
+
+                        if (*nrpages >= max) {
+                                CERROR("small array of local bufs: %d\n", max);
+                                RETURN(-EINVAL);
+                        }
+
+                        if (plen > len)
+                                plen = len;
+                        lnb->offset = offset;
+                        lnb->len = plen;
+                        lnb->flags = rnb->flags;
+                        lnb->page = NULL;
+                        lnb->rc = 0;
+                        lnb->lnb_grant_used = 0;
+
+                        LASSERTF(plen <= len, "plen %u, len %u\n", plen, len);
+                        offset += plen;
+                        len -= plen;
+                        lnb++;
+                        (*nrpages)++;
+                }
+        }
+        RETURN(0);
+}
+
+/*
+ * the function is used to free all pages used for request
+ * just to mimic cacheless OSS which don't occupy much memory
+ */
+void filter_invalidate_cache(struct obd_device *obd, struct obd_ioobj *obj,
+                             struct niobuf_remote *nb, struct inode *inode)
+{
+        struct niobuf_remote *rnb;
+        int i;
+
+        LASSERT(inode != NULL);
+
+        for (i = 0, rnb = nb; i < obj->ioo_bufcnt; i++, rnb++) {
+                obd_off start = rnb->offset >> CFS_PAGE_SHIFT;
+                obd_off end = (rnb->offset + rnb->len) >> CFS_PAGE_SHIFT;
+                invalidate_mapping_pages(inode->i_mapping, start, end);
+        }
+        
+}
+
 static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
                               int objcount, struct obd_ioobj *obj,
-                              int niocount, struct niobuf_remote *nb,
-                              struct niobuf_local *res,
+                              struct niobuf_remote *nb,
+                              int *pages, struct niobuf_local *res,
                               struct obd_trans_info *oti,
                               struct lustre_capa *capa)
 {
         struct obd_device *obd = exp->exp_obd;
+        struct filter_obd *fo = &obd->u.filter;
+        struct timeval start, end;
         struct lvfs_run_ctxt saved;
-        struct niobuf_remote *rnb;
         struct niobuf_local *lnb;
         struct dentry *dentry = NULL;
-        struct inode *inode;
+        struct inode *inode = NULL;
         void *iobuf = NULL;
         int rc = 0, i, tot_bytes = 0;
         unsigned long now = jiffies;
+        long timediff;
         ENTRY;
 
         /* We are currently not supporting multi-obj BRW_READ RPCS at all.
@@ -324,28 +389,29 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
         inode = dentry->d_inode;
 
         obdo_to_inode(inode, oa, OBD_MD_FLATIME);
+
+        rc = filter_map_remote_to_local(objcount, obj, nb, pages, res);
+        if (rc)
+                GOTO(cleanup, rc);
+
         fsfilt_check_slow(obd, now, "preprw_read setup");
 
-        for (i = 0, lnb = res, rnb = nb; i < obj->ioo_bufcnt;
-             i++, rnb++, lnb++) {
+        /* find pages for all segments, fill array with them */
+        do_gettimeofday(&start);
+        for (i = 0, lnb = res; i < *pages; i++, lnb++) {
+
                 lnb->dentry = dentry;
-                lnb->offset = rnb->offset;
-                lnb->len    = rnb->len;
-                lnb->flags  = rnb->flags;
-
-                /*
-                 * ost_brw_write()->ost_nio_pages_get() already initialized
-                 * lnb->page to point to the page from the per-thread page
-                 * pool (bug 5137), initialize page.
-                 */
-                LASSERT(lnb->page != NULL);
-
-                if (i_size_read(inode) <= rnb->offset)
+
+                if (i_size_read(inode) <= lnb->offset)
                         /* If there's no more data, abort early.  lnb->rc == 0,
                          * so it's easy to detect later. */
                         break;
-                else
-                        filter_alloc_dio_page(obd, inode, lnb);
+
+                lnb->page = filter_get_page(obd, inode, lnb->offset);
+                if (lnb->page == NULL)
+                        GOTO(cleanup, rc = -ENOMEM);
+
+                lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_CACHE_ACCESS, 1);
 
                 if (i_size_read(inode) < lnb->offset + lnb->len - 1)
                         lnb->rc = i_size_read(inode) - lnb->offset;
@@ -354,8 +420,21 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
 
                 tot_bytes += lnb->rc;
 
+                if (PageUptodate(lnb->page)) {
+                        lprocfs_counter_add(obd->obd_stats,
+                                            LPROC_FILTER_CACHE_HIT, 1);
+                        continue;
+                }
+
+                lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_CACHE_MISS, 1);
                 filter_iobuf_add_page(obd, iobuf, inode, lnb->page);
         }
+        do_gettimeofday(&end);
+        timediff = cfs_timeval_sub(&end, &start, NULL);
+        lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_GET_PAGE, timediff);
+
+        if (OBD_FAIL_CHECK(OBD_FAIL_OST_NOMEM))
+                GOTO(cleanup, rc = -ENOMEM);
 
         fsfilt_check_slow(obd, now, "start_page_read");
 
@@ -373,9 +452,24 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
         EXIT;
 
  cleanup:
-        if (rc != 0) {
-                filter_free_dio_pages(objcount, obj, niocount, res);
+        /* unlock pages to allow access from concurrent OST_READ */
+        for (i = 0, lnb = res; i < *pages; i++, lnb++) {
+                if (lnb->page) {
+                        LASSERT(PageLocked(lnb->page));
+                        unlock_page(lnb->page);
+
+                        if (rc) {
+                                page_cache_release(lnb->page);
+                                lnb->page = NULL;
+                        }
+                }
+        }
 
+        if (inode && (fo->fo_read_cache == 0 ||
+                        i_size_read(inode) > fo->fo_readcache_max_filesize))
+                filter_invalidate_cache(obd, obj, nb, inode);
+
+        if (rc != 0) {
                 if (dentry != NULL)
                         f_dput(dentry);
         }
@@ -399,9 +493,8 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
  * Caller must hold obd_osfs_lock. */
 static int filter_grant_check(struct obd_export *exp, struct obdo *oa, 
                               int objcount, struct fsfilt_objinfo *fso, 
-                              int niocount, struct niobuf_remote *rnb,
-                              struct niobuf_local *lnb, obd_size *left,
-                              struct inode *inode)
+                              int niocount, struct niobuf_local *lnb,
+                              obd_size *left, struct inode *inode)
 {
         struct filter_export_data *fed = &exp->exp_filter_data;
         int blocksize = exp->exp_obd->u.obt.obt_sb->s_blocksize;
@@ -415,13 +508,13 @@ static int filter_grant_check(struct obd_export *exp, struct obdo *oa,
                         int tmp, bytes;
 
                         /* should match the code in osc_exit_cache */
-                        bytes = rnb[n].len;
-                        bytes += rnb[n].offset & (blocksize - 1);
-                        tmp = (rnb[n].offset + rnb[n].len) & (blocksize - 1);
+                        bytes = lnb[n].len;
+                        bytes += lnb[n].offset & (blocksize - 1);
+                        tmp = (lnb[n].offset + lnb[n].len) & (blocksize - 1);
                         if (tmp)
                                 bytes += blocksize - tmp;
 
-                        if ((rnb[n].flags & OBD_BRW_FROM_GRANT) &&
+                        if ((lnb[n].flags & OBD_BRW_FROM_GRANT) &&
                             (oa->o_valid & OBD_MD_FLGRANT)) {
                                 if (fed->fed_grant < used + bytes) {
                                         CDEBUG(D_CACHE,
@@ -432,7 +525,7 @@ static int filter_grant_check(struct obd_export *exp, struct obdo *oa,
                                                used, bytes, fed->fed_grant, n);
                                 } else {
                                         used += bytes;
-                                        rnb[n].flags |= OBD_BRW_GRANTED;
+                                        lnb[n].flags |= OBD_BRW_GRANTED;
                                         lnb[n].lnb_grant_used = bytes;
                                         CDEBUG(0, "idx %d used=%lu\n", n, used);
                                         rc = 0;
@@ -442,7 +535,7 @@ static int filter_grant_check(struct obd_export *exp, struct obdo *oa,
                         if (*left > ungranted + bytes) {
                                 /* if enough space, pretend it was granted */
                                 ungranted += bytes;
-                                rnb[n].flags |= OBD_BRW_GRANTED;
+                                lnb[n].flags |= OBD_BRW_GRANTED;
                                 lnb[n].lnb_grant_used = bytes;
                                 CDEBUG(0, "idx %d ungranted=%lu\n",n,ungranted);
                                 rc = 0;
@@ -456,7 +549,7 @@ static int filter_grant_check(struct obd_export *exp, struct obdo *oa,
                          * marked BRW_GRANTED are already mapped and we can
                          * ignore this error. */
                         lnb[n].rc = -ENOSPC;
-                        rnb[n].flags &= ~OBD_BRW_GRANTED;
+                        lnb[n].flags &= ~OBD_BRW_GRANTED;
                         CDEBUG(D_CACHE,"%s: cli %s/%p idx %d no space for %d\n",
                                exp->exp_obd->obd_name,
                                exp->exp_client_uuid.uuid, exp, n, bytes);
@@ -517,20 +610,21 @@ static int filter_grant_check(struct obd_export *exp, struct obdo *oa,
  * bug) or ensure we get the page locks in an appropriate order. */
 static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
                                int objcount, struct obd_ioobj *obj,
-                               int niocount, struct niobuf_remote *nb,
+                               struct niobuf_remote *nb, int *pages,
                                struct niobuf_local *res,
                                struct obd_trans_info *oti,
                                struct lustre_capa *capa)
 {
+        struct obd_device *obd = exp->exp_obd;
+        struct timeval start, end;
         struct lvfs_run_ctxt saved;
-        struct niobuf_remote *rnb;
         struct niobuf_local *lnb = res;
         struct fsfilt_objinfo fso;
         struct filter_mod_data *fmd;
         struct dentry *dentry = NULL;
         void *iobuf;
         obd_size left;
-        unsigned long now = jiffies;
+        unsigned long now = jiffies, timediff;
         int rc = 0, i, tot_bytes = 0, cleanup_phase = 0;
         ENTRY;
         LASSERT(objcount == 1);
@@ -559,8 +653,9 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
                 GOTO(cleanup, rc = -ENOENT);
         }
 
-        fso.fso_dentry = dentry;
-        fso.fso_bufcnt = obj->ioo_bufcnt;
+        rc = filter_map_remote_to_local(objcount, obj, nb, pages, res);
+        if (rc)
+                GOTO(cleanup, rc);
 
         fsfilt_check_slow(exp->exp_obd, now, "preprw_write setup");
 
@@ -584,7 +679,10 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
 
         left = filter_grant_space_left(exp);
 
-        rc = filter_grant_check(exp, oa, objcount, &fso, niocount, nb, res,
+        fso.fso_dentry = dentry;
+        fso.fso_bufcnt = *pages;
+
+        rc = filter_grant_check(exp, oa, objcount, &fso, *pages, res,
                                 &left, dentry->d_inode);
 
         /* do not zero out oa->o_valid as it is used in filter_commitrw_write()
@@ -598,31 +696,29 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
         if (rc)
                 GOTO(cleanup, rc);
 
-        for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt;
-             i++, lnb++, rnb++) {
+        do_gettimeofday(&start);
+        for (i = 0, lnb = res; i < *pages; i++, lnb++) {
+
                 /* We still set up for ungranted pages so that granted pages
                  * can be written to disk as they were promised, and portals
                  * needs to keep the pages all aligned properly. */
                 lnb->dentry = dentry;
-                lnb->offset = rnb->offset;
-                lnb->len    = rnb->len;
-                lnb->flags  = rnb->flags;
-
-                /*
-                 * ost_brw_write()->ost_nio_pages_get() already initialized
-                 * lnb->page to point to the page from the per-thread page
-                 * pool (bug 5137), initialize page.
-                 */
-                LASSERT(lnb->page != NULL);
-                if (lnb->len != CFS_PAGE_SIZE) {
-                        memset(kmap(lnb->page) + lnb->len,
-                               0, CFS_PAGE_SIZE - lnb->len);
-                        kunmap(lnb->page);
-                }
-                lnb->page->index = lnb->offset >> CFS_PAGE_SHIFT;
 
+                lnb->page = filter_get_page(obd, dentry->d_inode, lnb->offset);
+                if (lnb->page == NULL)
+                        GOTO(cleanup, rc = -ENOMEM);
                 cleanup_phase = 4;
 
+                /* DLM locking protects us from write and truncate competing
+                 * for same region, but truncate can leave dirty page in the
+                 * cache. it's possible the writeout on a such a page is in
+                 * progress when we access it. it's also possible that during
+                 * this writeout we put new (partial) data, but then won't
+                 * be able to proceed in filter_commitrw_write(). thus let's
+                 * just wait for writeout completion, should be rare enough.
+                 * -bzzz */
+                wait_on_page_writeback(lnb->page);
+
                 /* If the filter writes a partial page, then has the file
                  * extended, the client will read in the whole page.  the
                  * filter has to be careful to zero the rest of the partial
@@ -658,7 +754,14 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
                 if (lnb->rc == 0)
                         tot_bytes += lnb->len;
         }
+        do_gettimeofday(&end);
+        timediff = cfs_timeval_sub(&end, &start, NULL);
+        lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_GET_PAGE, timediff);
+
+        if (OBD_FAIL_CHECK(OBD_FAIL_OST_NOMEM))
+                GOTO(cleanup, rc = -ENOMEM);
 
+        /* don't unlock pages to prevent any access */
         rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf, exp,
                               NULL, NULL, NULL);
 
@@ -671,6 +774,15 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
 cleanup:
         switch(cleanup_phase) {
         case 4:
+                if (rc) {
+                        for (i = 0, lnb = res; i < *pages; i++, lnb++) {
+                                if (lnb->page != NULL) {
+                                        unlock_page(lnb->page);
+                                        page_cache_release(lnb->page);
+                                        lnb->page = NULL;
+                                }
+                        }
+                }
         case 3:
                 filter_iobuf_put(&exp->exp_obd->u.filter, iobuf, oti);
         case 2:
@@ -693,47 +805,33 @@ cleanup:
 }
 
 int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
-                  int objcount, struct obd_ioobj *obj, int niocount,
-                  struct niobuf_remote *nb, struct niobuf_local *res,
-                  struct obd_trans_info *oti, struct lustre_capa *capa)
+                  int objcount, struct obd_ioobj *obj,
+                  struct niobuf_remote *nb, int *pages,
+                  struct niobuf_local *res, struct obd_trans_info *oti,
+                  struct lustre_capa *capa)
 {
         if (cmd == OBD_BRW_WRITE)
                 return filter_preprw_write(cmd, exp, oa, objcount, obj,
-                                           niocount, nb, res, oti, capa);
+                                           nb, pages, res, oti, capa);
         if (cmd == OBD_BRW_READ)
                 return filter_preprw_read(cmd, exp, oa, objcount, obj,
-                                          niocount, nb, res, oti, capa);
+                                          nb, pages, res, oti, capa);
         LBUG();
         return -EPROTO;
 }
 
-void filter_release_read_page(struct filter_obd *filter, struct inode *inode,
-                              struct page *page)
-{
-        int drop = 0;
-
-        if (inode != NULL &&
-            (i_size_read(inode) > filter->fo_readcache_max_filesize))
-                drop = 1;
-
-        /* drop from cache like truncate_list_pages() */
-        if (drop && !TryLockPage(page)) {
-                if (page->mapping)
-                        ll_truncate_complete_page(page);
-                unlock_page(page);
-        }
-        page_cache_release(page);
-}
-
 static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa,
                                 int objcount, struct obd_ioobj *obj,
-                                int niocount, struct niobuf_local *res,
+                                struct niobuf_remote *rnb,
+                                int pages, struct niobuf_local *res,
                                 struct obd_trans_info *oti, int rc)
 {
         struct inode *inode = NULL;
         struct ldlm_res_id res_id;
         struct ldlm_resource *resource = NULL;
         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
+        struct niobuf_local *lnb;
+        int i;
         ENTRY;
 
         osc_build_res_name(obj->ioo_id, obj->ioo_gr, &res_id);
@@ -752,52 +850,18 @@ static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa,
         if (res->dentry != NULL)
                 inode = res->dentry->d_inode;
 
-        filter_free_dio_pages(objcount, obj, niocount, res);
+        for (i = 0, lnb = res; i < pages; i++, lnb++) {
+                if (lnb->page != NULL) {
+                        page_cache_release(lnb->page);
+                        lnb->page = NULL;
+                }
+        }
 
         if (res->dentry != NULL)
                 f_dput(res->dentry);
         RETURN(rc);
 }
 
-void flip_into_page_cache(struct inode *inode, struct page *new_page)
-{
-        struct page *old_page;
-        int rc;
-
-        do {
-                /* the dlm is protecting us from read/write concurrency, so we
-                 * expect this find_lock_page to return quickly.  even if we
-                 * race with another writer it won't be doing much work with
-                 * the page locked.  we do this 'cause t_c_p expects a
-                 * locked page, and it wants to grab the pagecache lock
-                 * as well. */
-                old_page = find_lock_page(inode->i_mapping, new_page->index);
-                if (old_page) {
-                        ll_truncate_complete_page(old_page);
-                        unlock_page(old_page);
-                        page_cache_release(old_page);
-                }
-
-#if 0 /* this should be a /proc tunable someday */
-                /* racing o_directs (no locking ioctl) could race adding
-                 * their pages, so we repeat the page invalidation unless
-                 * we successfully added our new page */
-                rc = add_to_page_cache_unique(new_page, inode->i_mapping,
-                                              new_page->index,
-                                              page_hash(inode->i_mapping,
-                                                        new_page->index));
-                if (rc == 0) {
-                        /* add_to_page_cache clears uptodate|dirty and locks
-                         * the page */
-                        SetPageUptodate(new_page);
-                        unlock_page(new_page);
-                }
-#else
-                rc = 0;
-#endif
-        } while (rc != 0);
-}
-
 void filter_grant_commit(struct obd_export *exp, int niocount,
                          struct niobuf_local *res)
 {
@@ -830,16 +894,17 @@ void filter_grant_commit(struct obd_export *exp, int niocount,
 }
 
 int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
-                    int objcount, struct obd_ioobj *obj, int niocount,
+                    int objcount, struct obd_ioobj *obj,
+                    struct niobuf_remote *nb, int pages,
                     struct niobuf_local *res, struct obd_trans_info *oti,
                     int rc)
 {
         if (cmd == OBD_BRW_WRITE)
-                return filter_commitrw_write(exp, oa, objcount, obj, niocount,
-                                             res, oti, rc);
+                return filter_commitrw_write(exp, oa, objcount, obj,
+                                             nb, pages, res, oti, rc);
         if (cmd == OBD_BRW_READ)
-                return filter_commitrw_read(exp, oa, objcount, obj, niocount,
-                                            res, oti, rc);
+                return filter_commitrw_read(exp, oa, objcount, obj,
+                                            nb, pages, res, oti, rc);
         LBUG();
         return -EPROTO;
 }
@@ -852,7 +917,7 @@ int filter_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
         struct niobuf_local *lnb;
         struct niobuf_remote *rnb;
         obd_count i;
-        int ret = 0;
+        int ret = 0, npages;
         ENTRY;
 
         OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
@@ -870,13 +935,15 @@ int filter_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
         obdo_to_ioobj(oinfo->oi_oa, &ioo);
         ioo.ioo_bufcnt = oa_bufs;
 
+        npages = oa_bufs;
         ret = filter_preprw(cmd, exp, oinfo->oi_oa, 1, &ioo,
-                            oa_bufs, rnb, lnb, oti, oinfo_capa(oinfo));
+                            rnb, &npages, lnb, oti, oinfo_capa(oinfo));
         if (ret != 0)
                 GOTO(out, ret);
+        LASSERTF(oa_bufs == npages, "%u != %u\n", oa_bufs, npages);
 
-        ret = filter_commitrw(cmd, exp, oinfo->oi_oa, 1, &ioo,
-                              oa_bufs, lnb, oti, ret);
+        ret = filter_commitrw(cmd, exp, oinfo->oi_oa, 1, &ioo, rnb,
+                              npages, lnb, oti, ret);
 
 out:
         if (lnb)
index e96513c..c37c585 100644 (file)
@@ -65,7 +65,6 @@ struct filter_iobuf {
         int               dr_error;
         struct page     **dr_pages;
         unsigned long    *dr_blocks;
-        spinlock_t        dr_lock;              /* IRQ lock */
         unsigned int      dr_ignore_quota:1;
         struct filter_obd *dr_filter;
 };
@@ -117,12 +116,8 @@ static void record_finish_io(struct filter_iobuf *iobuf, int rw, int rc)
 static int dio_complete_routine(struct bio *bio, unsigned int done, int error)
 {
         struct filter_iobuf *iobuf = bio->bi_private;
-        unsigned long        flags;
-
-#ifdef HAVE_PAGE_CONSTANT
         struct bio_vec *bvl;
         int i;
-#endif
 
         /* CAVEAT EMPTOR: possibly in IRQ context 
          * DO NOT record procfs stats here!!! */
@@ -130,7 +125,7 @@ static int dio_complete_routine(struct bio *bio, unsigned int done, int error)
         if (bio->bi_size)                       /* Not complete */
                 return 1;
 
-        if (iobuf == NULL) {
+        if (unlikely(iobuf == NULL)) {
                 CERROR("***** bio->bi_private is NULL!  This should never "
                        "happen.  Normally, I would crash here, but instead I "
                        "will dump the bio contents to the console.  Please "
@@ -148,18 +143,31 @@ static int dio_complete_routine(struct bio *bio, unsigned int done, int error)
                 return 0;
         }
 
+        /* the check is outside of the cycle for performance reason -bzzz */
+        if (!test_bit(BIO_RW, &bio->bi_rw)) {
+                bio_for_each_segment(bvl, bio, i) {
+                        if (likely(error == 0))
+                                SetPageUptodate(bvl->bv_page);
+                        LASSERT(PageLocked(bvl->bv_page));
 #ifdef HAVE_PAGE_CONSTANT
-        bio_for_each_segment(bvl, bio, i)
-                ClearPageConstant(bvl->bv_page);
+                        ClearPageConstant(bvl->bv_page);
 #endif
+                }
+                record_finish_io(iobuf, OBD_BRW_READ, error);
+        } else {
+#ifdef HAVE_PAGE_CONSTANT
+                if (mapping_cap_page_constant_write(iobuf->dr_pages[0]->mapping)){
+                        bio_for_each_segment(bvl, bio, i) {
+                                ClearPageConstant(bvl->bv_page);
+                        }
+                }
+#endif
+                record_finish_io(iobuf, OBD_BRW_WRITE, error);
+        }
 
-        spin_lock_irqsave(&iobuf->dr_lock, flags);
-        if (iobuf->dr_error == 0)
+        /* any real error is good enough -bzzz */
+        if (error != 0 && iobuf->dr_error == 0)
                 iobuf->dr_error = error;
-        spin_unlock_irqrestore(&iobuf->dr_lock, flags);
-
-        record_finish_io(iobuf, test_bit(BIO_RW, &bio->bi_rw) ?
-                         OBD_BRW_WRITE : OBD_BRW_READ, error);
 
         /* Completed bios used to be chained off iobuf->dr_bios and freed in
          * filter_clear_dreq().  It was then possible to exhaust the biovec-256
@@ -204,7 +212,6 @@ struct filter_iobuf *filter_alloc_iobuf(struct filter_obd *filter,
         iobuf->dr_filter = filter;
         init_waitqueue_head(&iobuf->dr_wait);
         atomic_set(&iobuf->dr_numreqs, 0);
-        spin_lock_init(&iobuf->dr_lock);
         iobuf->dr_max_pages = num_pages;
         iobuf->dr_npages = 0;
         iobuf->dr_error = 0;
@@ -436,109 +443,6 @@ int filter_do_bio(struct obd_export *exp, struct inode *inode,
         RETURN(rc);
 }
 
-/* These are our hacks to keep our directio/bh IO coherent with ext3's
- * page cache use.  Most notably ext3 reads file data into the page
- * cache when it is zeroing the tail of partial-block truncates and
- * leaves it there, sometimes generating io from it at later truncates.
- * This removes the partial page and its buffers from the page cache,
- * so it should only ever cause a wait in rare cases, as otherwise we
- * always do full-page IO to the OST.
- *
- * The call to truncate_complete_page() will call journal_invalidatepage()
- * to free the buffers and drop the page from cache.  The buffers should
- * not be dirty, because we already called fdatasync/fdatawait on them.
- */
-static int filter_sync_inode_data(struct inode *inode, int locked)
-{
-        int rc = 0;
-
-        /* This is nearly do_fsync(), without the waiting on the inode */
-        /* XXX: in 2.6.16 (at least) we don't need to hold i_mutex over
-         * filemap_fdatawrite() and filemap_fdatawait(), so we may no longer
-         * need this lock here at all. */
-        if (!locked)
-                LOCK_INODE_MUTEX(inode);
-        if (inode->i_mapping->nrpages) {
-#ifdef PF_SYNCWRITE
-                current->flags |= PF_SYNCWRITE;
-#endif
-                rc = filemap_fdatawrite(inode->i_mapping);
-                if (rc == 0)
-                        rc = filemap_fdatawait(inode->i_mapping);
-#ifdef PF_SYNCWRITE
-                current->flags &= ~PF_SYNCWRITE;
-#endif
-        }
-        if (!locked)
-                UNLOCK_INODE_MUTEX(inode);
-
-        return rc;
-}
-/* Clear pages from the mapping before we do direct IO to that offset.
- * Now that the only source of such pages in the truncate path flushes
- * these pages to disk and then discards them, this is error condition.
- * If add back read cache this will happen again.  This could be disabled
- * until that time if we never see the below error. */
-static int filter_clear_page_cache(struct inode *inode,
-                                   struct filter_iobuf *iobuf)
-{
-        struct page *page;
-        int i, rc;
-
-        rc = filter_sync_inode_data(inode, 0);
-        if (rc != 0)
-                RETURN(rc);
-
-        /* be careful to call this after fsync_inode_data_buffers has waited
-         * for IO to complete before we evict it from the cache */
-        for (i = 0; i < iobuf->dr_npages; i++) {
-                page = find_lock_page(inode->i_mapping,
-                                      iobuf->dr_pages[i]->index);
-                if (page == NULL)
-                        continue;
-                if (page->mapping != NULL) {
-                        CERROR("page %lu (%d/%d) in page cache during write!\n",
-                               page->index, i, iobuf->dr_npages);
-                        wait_on_page_writeback(page);
-                        ll_truncate_complete_page(page);
-                }
-
-                unlock_page(page);
-                page_cache_release(page);
-        }
-
-        return 0;
-}
-
-int filter_clear_truncated_page(struct inode *inode)
-{
-        struct page *page;
-        int rc;
-
-        /* Truncate on page boundary, so nothing to flush? */
-        if (!(i_size_read(inode) & ~CFS_PAGE_MASK))
-                return 0;
-
-        rc = filter_sync_inode_data(inode, 1);
-        if (rc != 0)
-                RETURN(rc);
-
-        /* be careful to call this after fsync_inode_data_buffers has waited
-         * for IO to complete before we evict it from the cache */
-        page = find_lock_page(inode->i_mapping,
-                              i_size_read(inode) >> CFS_PAGE_SHIFT);
-        if (page) {
-                if (page->mapping != NULL) {
-                        wait_on_page_writeback(page);
-                        ll_truncate_complete_page(page);
-                }
-                unlock_page(page);
-                page_cache_release(page);
-        }
-
-        return 0;
-}
-
 /* Must be called with i_mutex taken for writes; this will drop it */
 int filter_direct_io(int rw, struct dentry *dchild, struct filter_iobuf *iobuf,
                      struct obd_export *exp, struct iattr *attr,
@@ -604,10 +508,6 @@ int filter_direct_io(int rw, struct dentry *dchild, struct filter_iobuf *iobuf,
                              iobuf->dr_blocks, blocks_per_page, 0);
         }
 
-        rc = filter_clear_page_cache(inode, iobuf);
-        if (rc != 0)
-                RETURN(rc);
-
         RETURN(filter_do_bio(exp, inode, iobuf, rw));
 }
 
@@ -632,8 +532,20 @@ static int filter_range_is_mapped(struct inode *inode, obd_size offset, int len)
         return 1;
 }
 
+/*
+ * interesting use cases on how it interacts with VM:
+ *
+ * - vm writeout -- shouldn't see our pages as we don't mark them dirty
+ *   though vm can find partial page left dirty by truncate. in this
+ *   usual writeout is used unless our write rewrite that page - then we
+ *   drop PG_dirty with PG_lock held.
+ *
+ * - else?
+ *
+ */
 int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
-                          int objcount, struct obd_ioobj *obj, int niocount,
+                          int objcount, struct obd_ioobj *obj,
+                          struct niobuf_remote *nb, int niocount,
                           struct niobuf_local *res, struct obd_trans_info *oti,
                           int rc)
 {
@@ -646,6 +558,7 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
         unsigned long now = jiffies;
         int i, err, cleanup_phase = 0;
         struct obd_device *obd = exp->exp_obd;
+        struct filter_obd *fo = &obd->u.filter;
         void *wait_handle;
         int   total_size = 0, rc2;
         unsigned int qcids[MAXQUOTAS] = {0, 0};
@@ -684,7 +597,7 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
         inode = res->dentry->d_inode;
 
         iobuf->dr_ignore_quota = 0;
-        for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++) {
+        for (i = 0, lnb = res; i < niocount; i++, lnb++) {
                 loff_t this_size;
 
                 /* If overwriting an existing block, we don't need a grant */
@@ -697,6 +610,14 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
                         continue;
                 }
 
+                LASSERT(PageLocked(lnb->page));
+                LASSERT(!PageWriteback(lnb->page));
+
+                /* truncate might leave tail dirty */
+                clear_page_dirty_for_io(lnb->page);
+
+                SetPageUptodate(lnb->page);
+
                 err = filter_iobuf_add_page(obd, iobuf, inode, lnb->page);
                 LASSERT (err == 0);
 
@@ -826,5 +747,20 @@ cleanup:
         CDEBUG(err ? D_ERROR : D_QUOTA,
                "filter adjust qunit! (rc:%d)\n", err);
 
+        for (i = 0, lnb = res; i < niocount; i++, lnb++) {
+                if (lnb->page == NULL)
+                        continue;
+
+                LASSERT(PageLocked(lnb->page));
+                unlock_page(lnb->page);
+
+                page_cache_release(lnb->page);
+                lnb->page = NULL;
+        }
+
+        if (inode && (fo->fo_writethrough_cache == 0 ||
+                        i_size_read(inode) > fo->fo_readcache_max_filesize))
+                filter_invalidate_cache(obd, obj, nb, inode);
+
         RETURN(rc);
 }
index d5e7a4a..cbe0753 100644 (file)
@@ -242,6 +242,56 @@ static int lprocfs_filter_rd_capa_count(char *page, char **start, off_t off,
                         capa_count[CAPA_SITE_SERVER]);
 }
 
+static int lprocfs_filter_rd_cache(char *page, char **start, off_t off,
+                                   int count, int *eof, void *data)
+{
+        struct obd_device *obd = (struct obd_device *)data;
+        LASSERT(obd != NULL);
+
+        return snprintf(page, count, "%u\n", obd->u.filter.fo_read_cache);
+}
+
+static int lprocfs_filter_wr_cache(struct file *file, const char *buffer,
+                     unsigned long count, void *data)
+{
+        struct obd_device *obd = (struct obd_device *)data;
+        int val, rc;
+        LASSERT(obd != NULL);
+
+        rc = lprocfs_write_helper(buffer, count, &val);
+
+        if (rc)
+                return rc;
+
+        obd->u.filter.fo_read_cache = val;
+        return count;
+}
+
+static int lprocfs_filter_rd_wcache(char *page, char **start, off_t off,
+                                   int count, int *eof, void *data)
+{
+        struct obd_device *obd = (struct obd_device *)data;
+        LASSERT(obd != NULL);
+
+        return snprintf(page, count, "%u\n", obd->u.filter.fo_writethrough_cache);
+}
+
+static int lprocfs_filter_wr_wcache(struct file *file, const char *buffer,
+                     unsigned long count, void *data)
+{
+        struct obd_device *obd = (struct obd_device *)data;
+        int val, rc;
+        LASSERT(obd != NULL);
+
+        rc = lprocfs_write_helper(buffer, count, &val);
+
+        if (rc)
+                return rc;
+
+        obd->u.filter.fo_writethrough_cache = val;
+        return count;
+}
+
 static struct lprocfs_vars lprocfs_filter_obd_vars[] = {
         { "uuid",         lprocfs_rd_uuid,          0, 0 },
         { "blocksize",    lprocfs_rd_blksize,       0, 0 },
@@ -281,6 +331,9 @@ static struct lprocfs_vars lprocfs_filter_obd_vars[] = {
         { "capa",         lprocfs_filter_rd_capa,
                           lprocfs_filter_wr_capa, 0 },
         { "capa_count",   lprocfs_filter_rd_capa_count, 0, 0 },
+        { "read_cache_enable", lprocfs_filter_rd_cache, lprocfs_filter_wr_cache, 0},
+        { "writethrough_cache_enable", lprocfs_filter_rd_wcache,
+                          lprocfs_filter_wr_wcache, 0},
         { 0 }
 };
 
index 457110b..a720a1d 100644 (file)
@@ -389,101 +389,6 @@ static int ost_bulk_timeout(void *data)
         RETURN(1);
 }
 
-static int get_per_page_niobufs(struct obd_ioobj *ioo, int nioo,
-                                struct niobuf_remote *rnb, int nrnb,
-                                struct niobuf_remote **pp_rnbp)
-{
-        /* Copy a remote niobuf, splitting it into page-sized chunks
-         * and setting ioo[i].ioo_bufcnt accordingly */
-        struct niobuf_remote *pp_rnb;
-        int   i;
-        int   j;
-        int   page;
-        int   rnbidx = 0;
-        int   npages = 0;
-
-        /*
-         * array of sufficient size already preallocated by caller
-         */
-        LASSERT(pp_rnbp != NULL);
-        LASSERT(*pp_rnbp != NULL);
-
-        /* first count and check the number of pages required */
-        for (i = 0; i < nioo; i++)
-                for (j = 0; j < ioo->ioo_bufcnt; j++, rnbidx++) {
-                        obd_off offset = rnb[rnbidx].offset;
-                        obd_off p0 = offset >> CFS_PAGE_SHIFT;
-                        obd_off pn = (offset + rnb[rnbidx].len - 1) >>
-                                     CFS_PAGE_SHIFT;
-
-                        LASSERT(rnbidx < nrnb);
-
-                        npages += (pn + 1 - p0);
-
-                        if (rnb[rnbidx].len == 0) {
-                                CERROR("zero len BRW: obj %d objid "LPX64
-                                       " buf %u\n", i, ioo[i].ioo_id, j);
-                                return -EINVAL;
-                        }
-                        if (j > 0 &&
-                            rnb[rnbidx].offset <= rnb[rnbidx-1].offset) {
-                                CERROR("unordered BRW: obj %d objid "LPX64
-                                       " buf %u offset "LPX64" <= "LPX64"\n",
-                                       i, ioo[i].ioo_id, j, rnb[rnbidx].offset,
-                                       rnb[rnbidx].offset);
-                                return -EINVAL;
-                        }
-                }
-
-        LASSERT(rnbidx == nrnb);
-
-        if (npages == nrnb) {       /* all niobufs are for single pages */
-                *pp_rnbp = rnb;
-                return npages;
-        }
-
-        pp_rnb = *pp_rnbp;
-
-        /* now do the actual split */
-        page = rnbidx = 0;
-        for (i = 0; i < nioo; i++) {
-                int  obj_pages = 0;
-
-                for (j = 0; j < ioo[i].ioo_bufcnt; j++, rnbidx++) {
-                        obd_off off = rnb[rnbidx].offset;
-                        int     nob = rnb[rnbidx].len;
-
-                        LASSERT(rnbidx < nrnb);
-                        do {
-                                obd_off  poff = off & ~CFS_PAGE_MASK;
-                                int      pnob = (poff + nob > CFS_PAGE_SIZE) ?
-                                                CFS_PAGE_SIZE - poff : nob;
-
-                                LASSERT(page < npages);
-                                pp_rnb[page].len = pnob;
-                                pp_rnb[page].offset = off;
-                                pp_rnb[page].flags = rnb[rnbidx].flags;
-
-                                CDEBUG(0, "   obj %d id "LPX64
-                                       "page %d(%d) "LPX64" for %d, flg %x\n",
-                                       i, ioo[i].ioo_id, obj_pages, page,
-                                       pp_rnb[page].offset, pp_rnb[page].len,
-                                       pp_rnb[page].flags);
-                                page++;
-                                obj_pages++;
-
-                                off += pnob;
-                                nob -= pnob;
-                        } while (nob > 0);
-                        LASSERT(nob == 0);
-                }
-                ioo[i].ioo_bufcnt = obj_pages;
-        }
-        LASSERT(page == npages);
-
-        return npages;
-}
-
 static __u32 ost_checksum_bulk(struct ptlrpc_bulk_desc *desc, int opc,
                                cksum_type_t cksum_type)
 {
@@ -506,62 +411,17 @@ static __u32 ost_checksum_bulk(struct ptlrpc_bulk_desc *desc, int opc,
                 /* corrupt the data after we compute the checksum, to
                  * simulate an OST->client data error */
                 if (i == 0 && opc == OST_READ &&
-                    OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_SEND))
+                    OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_SEND)) {
                         memcpy(ptr, "bad4", min(4, len));
+                        /* nobody should use corrupted page again */
+                        ClearPageUptodate(page);
+                }
                 kunmap(page);
         }
 
         return cksum;
 }
 
-/*
- * populate @nio by @nrpages pages from per-thread page pool
- */
-static void ost_nio_pages_get(struct ptlrpc_request *req,
-                              struct niobuf_local *nio, int nrpages)
-{
-        int i;
-        struct ost_thread_local_cache *tls;
-
-        ENTRY;
-
-        LASSERT(nrpages <= OST_THREAD_POOL_SIZE);
-        LASSERT(req != NULL);
-        LASSERT(req->rq_svc_thread != NULL);
-
-        tls = ost_tls(req);
-        LASSERT(tls != NULL);
-
-        memset(nio, 0, nrpages * sizeof *nio);
-        for (i = 0; i < nrpages; ++ i) {
-                struct page *page;
-
-                page = tls->page[i];
-                LASSERT(page != NULL);
-                POISON_PAGE(page, 0xf1);
-                nio[i].page = page;
-                LL_CDEBUG_PAGE(D_INFO, page, "%d\n", i);
-        }
-        EXIT;
-}
-
-/*
- * Dual for ost_nio_pages_get(). Poison pages in pool for debugging
- */
-static void ost_nio_pages_put(struct ptlrpc_request *req,
-                              struct niobuf_local *nio, int nrpages)
-{
-        int i;
-
-        ENTRY;
-
-        LASSERT(nrpages <= OST_THREAD_POOL_SIZE);
-
-        for (i = 0; i < nrpages; ++ i)
-                POISON_PAGE(nio[i].page, 0xf2);
-        EXIT;
-}
-
 static int ost_brw_lock_get(int mode, struct obd_export *exp,
                             struct obd_ioobj *obj, struct niobuf_remote *nb,
                             struct lustre_handle *lh)
@@ -694,10 +554,9 @@ static void ost_prolong_locks(struct obd_export *exp, struct obd_ioobj *obj,
 
 static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
 {
-        struct ptlrpc_bulk_desc *desc;
+        struct ptlrpc_bulk_desc *desc = NULL;
         struct obd_export *exp = req->rq_export;
         struct niobuf_remote *remote_nb;
-        struct niobuf_remote *pp_rnb = NULL;
         struct niobuf_local *local_nb;
         struct obd_ioobj *ioo;
         struct ost_body *body, *repbody;
@@ -705,7 +564,7 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
         struct l_wait_info lwi;
         struct lustre_handle lockh = { 0 };
         __u32  size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
-        int niocount, npages, nob = 0, rc, i;
+        int objcount, niocount, npages, nob = 0, rc, i;
         int no_reply = 0;
         ENTRY;
 
@@ -734,6 +593,17 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
                 GOTO(out, rc = -EFAULT);
         }
 
+        objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /
+                   sizeof(*ioo);
+        if (objcount == 0) {
+                CERROR("Missing/short ioobj\n");
+                GOTO(out, rc = -EFAULT);
+        }
+        if (objcount > 1) {
+                CERROR("too many ioobjs (%d)\n", objcount);
+                GOTO(out, rc = -EFAULT);
+        }
+
         ioo = lustre_swab_reqbuf(req, REQ_REC_OFF + 1, sizeof(*ioo),
                                  lustre_swab_obd_ioobj);
         if (ioo == NULL) {
@@ -772,24 +642,8 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
          * ost_thread_init().
          */
         local_nb = ost_tls(req)->local;
-        pp_rnb   = ost_tls(req)->remote;
 
-        /* FIXME all niobuf splitting should be done in obdfilter if needed */
-        /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
-        npages = get_per_page_niobufs(ioo, 1, remote_nb, niocount, &pp_rnb);
-        if (npages < 0)
-                GOTO(out, rc = npages);
-
-        LASSERT(npages <= OST_THREAD_POOL_SIZE);
-
-        ost_nio_pages_get(req, local_nb, npages);
-
-        desc = ptlrpc_prep_bulk_exp(req, npages,
-                                     BULK_PUT_SOURCE, OST_BULK_PORTAL);
-        if (desc == NULL)
-                GOTO(out, rc = -ENOMEM);
-
-        rc = ost_brw_lock_get(LCK_PR, exp, ioo, pp_rnb, &lockh);
+        rc = ost_brw_lock_get(LCK_PR, exp, ioo, remote_nb, &lockh);
         if (rc != 0)
                 GOTO(out_bulk, rc);
 
@@ -808,12 +662,18 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
                 GOTO(out_lock, rc = -ETIMEDOUT);
         }
 
-        rc = obd_preprw(OBD_BRW_READ, exp, &body->oa, 1,
-                        ioo, npages, pp_rnb, local_nb, oti, capa);
+        npages = OST_THREAD_POOL_SIZE;
+        rc = obd_preprw(OBD_BRW_READ, exp, &body->oa, 1, ioo,
+                        remote_nb, &npages, local_nb, oti, capa);
         if (rc != 0)
                 GOTO(out_lock, rc);
 
-        ost_prolong_locks(exp, ioo, pp_rnb, &body->oa, LCK_PW | LCK_PR);
+        desc = ptlrpc_prep_bulk_exp(req, npages,
+                                     BULK_PUT_SOURCE, OST_BULK_PORTAL);
+        if (desc == NULL) /* XXX: check all cleanup stuff */
+                GOTO(out, rc = -ENOMEM);
+
+        ost_prolong_locks(exp, ioo, remote_nb, &body->oa, LCK_PW | LCK_PR);
 
         nob = 0;
         for (i = 0; i < npages; i++) {
@@ -824,26 +684,18 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
                         break;
                 }
 
-                LASSERTF(page_rc <= pp_rnb[i].len, "page_rc (%d) > "
-                         "pp_rnb[%d].len (%d)\n", page_rc, i, pp_rnb[i].len);
                 nob += page_rc;
                 if (page_rc != 0) {             /* some data! */
                         LASSERT (local_nb[i].page != NULL);
                         ptlrpc_prep_bulk_page(desc, local_nb[i].page,
-                                              pp_rnb[i].offset & ~CFS_PAGE_MASK,
+                                              local_nb[i].offset & ~CFS_PAGE_MASK,
                                               page_rc);
                 }
 
-                if (page_rc != pp_rnb[i].len) { /* short read */
-                        int j = i;
-
+                if (page_rc != local_nb[i].len) { /* short read */
                         /* All subsequent pages should be 0 */
                         while(++i < npages)
-                                LASSERTF(local_nb[i].rc == 0,
-                                         "page_rc %d, pp_rnb[%u].len=%d, "
-                                         "local_nb[%u/%u].rc=%d\n",
-                                         page_rc, j, pp_rnb[j].len,
-                                         i, npages, local_nb[i].rc);
+                                LASSERT(local_nb[i].rc == 0);
                         break;
                 }
         }
@@ -931,10 +783,8 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
         }
 
         /* Must commit after prep above in all cases */
-        rc = obd_commitrw(OBD_BRW_READ, exp, &body->oa, 1,
-                          ioo, npages, local_nb, oti, rc);
-
-        ost_nio_pages_put(req, local_nb, npages);
+        rc = obd_commitrw(OBD_BRW_READ, exp, &body->oa, 1, ioo,
+                          remote_nb, npages, local_nb, oti, rc);
 
         if (rc == 0) {
                 repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
@@ -943,9 +793,10 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
         }
 
 out_lock:
-        ost_brw_lock_put(LCK_PR, ioo, pp_rnb, &lockh);
+        ost_brw_lock_put(LCK_PR, ioo, remote_nb, &lockh);
 out_bulk:
-        ptlrpc_free_bulk(desc);
+        if (desc)
+                ptlrpc_free_bulk(desc);
 out:
         LASSERT(rc <= 0);
         if (rc == 0) {
@@ -974,10 +825,9 @@ out:
 
 static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
 {
-        struct ptlrpc_bulk_desc *desc;
+        struct ptlrpc_bulk_desc *desc = NULL;
         struct obd_export       *exp = req->rq_export;
         struct niobuf_remote    *remote_nb;
-        struct niobuf_remote    *pp_rnb;
         struct niobuf_local     *local_nb;
         struct obd_ioobj        *ioo;
         struct ost_body         *body, *repbody;
@@ -1081,24 +931,8 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
          * ost_thread_init().
          */
         local_nb = ost_tls(req)->local;
-        pp_rnb   = ost_tls(req)->remote;
-
-        /* FIXME all niobuf splitting should be done in obdfilter if needed */
-        /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
-        npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
-        if (npages < 0)
-                GOTO(out, rc = npages);
-
-        LASSERT(npages <= OST_THREAD_POOL_SIZE);
-
-        ost_nio_pages_get(req, local_nb, npages);
-
-        desc = ptlrpc_prep_bulk_exp(req, npages,
-                                     BULK_GET_SINK, OST_BULK_PORTAL);
-        if (desc == NULL)
-                GOTO(out, rc = -ENOMEM);
 
-        rc = ost_brw_lock_get(LCK_PW, exp, ioo, pp_rnb, &lockh);
+        rc = ost_brw_lock_get(LCK_PW, exp, ioo, remote_nb, &lockh);
         if (rc != 0)
                 GOTO(out_bulk, rc);
 
@@ -1117,7 +951,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
                 GOTO(out_lock, rc = -ETIMEDOUT);
         }
 
-        ost_prolong_locks(exp, ioo, pp_rnb, &body->oa, LCK_PW);
+        ost_prolong_locks(exp, ioo, remote_nb,&body->oa,  LCK_PW);
 
         /* obd_preprw clobbers oa->valid, so save what we need */
         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
@@ -1134,17 +968,23 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
                 body->oa.o_valid &= ~OBD_MD_FLGRANT;
         }
 
+        npages = OST_THREAD_POOL_SIZE;
         rc = obd_preprw(OBD_BRW_WRITE, exp, &body->oa, objcount,
-                        ioo, npages, pp_rnb, local_nb, oti, capa);
+                        ioo, remote_nb, &npages, local_nb, oti, capa);
         if (rc != 0)
                 GOTO(out_lock, rc);
 
+        desc = ptlrpc_prep_bulk_exp(req, npages,
+                                     BULK_GET_SINK, OST_BULK_PORTAL);
+        if (desc == NULL)
+                GOTO(out, rc = -ENOMEM);
+
         /* NB Having prepped, we must commit... */
 
         for (i = 0; i < npages; i++)
                 ptlrpc_prep_bulk_page(desc, local_nb[i].page,
-                                      pp_rnb[i].offset & ~CFS_PAGE_MASK,
-                                      pp_rnb[i].len);
+                                      local_nb[i].offset & ~CFS_PAGE_MASK,
+                                      local_nb[i].len);
 
         /* Check if client was evicted while we were doing i/o before touching
            network */
@@ -1221,8 +1061,8 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
         }
 
         /* Must commit after prep above in all cases */
-        rc = obd_commitrw(OBD_BRW_WRITE, exp, &repbody->oa,
-                           objcount, ioo, npages, local_nb, oti, rc);
+        rc = obd_commitrw(OBD_BRW_WRITE, exp, &repbody->oa, objcount, ioo,
+                          remote_nb, npages, local_nb, oti, rc);
 
         if (unlikely(client_cksum != server_cksum && rc == 0)) {
                 int  new_cksum = ost_checksum_bulk(desc, OST_WRITE, cksum_type);
@@ -1257,16 +1097,14 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
                                    body->oa.o_id,
                                    body->oa.o_valid & OBD_MD_FLGROUP ?
                                                 body->oa.o_gr : (__u64)0,
-                                   pp_rnb[0].offset,
-                                   pp_rnb[npages-1].offset+pp_rnb[npages-1].len
-                                   - 1 );
+                                   local_nb[0].offset,
+                                   local_nb[npages-1].offset +
+                                   local_nb[npages-1].len - 1 );
                 CERROR("client csum %x, original server csum %x, "
                        "server csum now %x\n",
                        client_cksum, server_cksum, new_cksum);
         }
 
-        ost_nio_pages_put(req, local_nb, npages);
-
         if (rc == 0) {
                 int nob = 0;
 
@@ -1280,7 +1118,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
                                 LASSERT(j < npages);
                                 if (local_nb[j].rc < 0)
                                         rcs[i] = local_nb[j].rc;
-                                len -= pp_rnb[j].len;
+                                len -= local_nb[j].len;
                                 j++;
                         } while (len > 0);
                         LASSERT(len == 0);
@@ -1290,9 +1128,10 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
         }
 
 out_lock:
-        ost_brw_lock_put(LCK_PW, ioo, pp_rnb, &lockh);
+        ost_brw_lock_put(LCK_PW, ioo, remote_nb, &lockh);
 out_bulk:
-        ptlrpc_free_bulk(desc);
+        if (desc)
+                ptlrpc_free_bulk(desc);
 out:
         if (rc == 0) {
                 oti_to_request(oti, req);
@@ -1854,7 +1693,6 @@ EXPORT_SYMBOL(ost_handle);
  */
 static void ost_thread_done(struct ptlrpc_thread *thread)
 {
-        int i;
         struct ost_thread_local_cache *tls; /* TLS stands for Thread-Local
                                              * Storage */
 
@@ -1868,10 +1706,6 @@ static void ost_thread_done(struct ptlrpc_thread *thread)
          */
         tls = thread->t_data;
         if (tls != NULL) {
-                for (i = 0; i < OST_THREAD_POOL_SIZE; ++ i) {
-                        if (tls->page[i] != NULL)
-                                OBD_PAGE_FREE(tls->page[i]);
-                }
                 OBD_FREE_PTR(tls);
                 thread->t_data = NULL;
         }
@@ -1883,8 +1717,6 @@ static void ost_thread_done(struct ptlrpc_thread *thread)
  */
 static int ost_thread_init(struct ptlrpc_thread *thread)
 {
-        int result;
-        int i;
         struct ost_thread_local_cache *tls;
 
         ENTRY;
@@ -1894,23 +1726,10 @@ static int ost_thread_init(struct ptlrpc_thread *thread)
         LASSERTF(thread->t_id <= OSS_THREADS_MAX, "%u\n", thread->t_id);
 
         OBD_ALLOC_PTR(tls);
-        if (tls != NULL) {
-                result = 0;
-                thread->t_data = tls;
-                /*
-                 * populate pool
-                 */
-                for (i = 0; i < OST_THREAD_POOL_SIZE; ++ i) {
-                        OBD_PAGE_ALLOC(tls->page[i], OST_THREAD_POOL_GFP);
-                        if (tls->page[i] == NULL) {
-                                ost_thread_done(thread);
-                                result = -ENOMEM;
-                                break;
-                        }
-                }
-        } else
-                result = -ENOMEM;
-        RETURN(result);
+        if (tls == NULL)
+                RETURN(-ENOMEM);
+        thread->t_data = tls;
+        RETURN(0);
 }
 
 #define OST_WATCHDOG_TIMEOUT (obd_timeout * 1000)
index d227799..4db4a3c 100644 (file)
@@ -5598,6 +5598,119 @@ test_130e() {
 }
 run_test 130e "FIEMAP (test continuation FIEMAP calls)"
 
+test_150() {
+       local TF="$TMP/$tfile"
+
+        dd if=/dev/urandom of=$TF bs=6096 count=1 || error "dd failed"
+        cp $TF $DIR/$tfile
+        cancel_lru_locks osc
+        cmp $TF $DIR/$tfile || error "$TMP/$tfile $DIR/$tfile differ"
+        remount_client $MOUNT
+        cmp $TF $DIR/$tfile || error "$TF $DIR/$tfile differ (remount)"
+
+        $TRUNCATE $TF 6000
+        $TRUNCATE $DIR/$tfile 6000
+        cancel_lru_locks osc
+        cmp $TF $DIR/$tfile || error "$TF $DIR/$tfile differ (truncate1)"
+
+        echo "12345" >>$TF
+        echo "12345" >>$DIR/$tfile
+        cancel_lru_locks osc
+        cmp $TF $DIR/$tfile || error "$TF $DIR/$tfile differ (append1)"
+
+        echo "12345" >>$TF
+        echo "12345" >>$DIR/$tfile
+        cancel_lru_locks osc
+        cmp $TF $DIR/$tfile || error "$TF $DIR/$tfile differ (append2)"
+
+        rm -f $TF
+        true
+}
+run_test 150 "truncate/append tests"
+
+function roc_access() {
+       ACCNUM=`$LCTL get_param -n obdfilter.*.stats | \
+               grep 'cache_access'| awk '{print $2}' | \
+               awk '{sum=sum+$3} END{print sum}'`
+       echo $ACCNUM
+}
+
+function roc_hit() {
+       ACCNUM=`$LCTL get_param -n obdfilter.*.stats | \
+               grep 'cache_hit'|awk '{print $2}' | \
+               awk '{sum=sum+$1} END{print sum}'`
+       echo $ACCNUM
+}
+
+test_151() {
+       local CPAGES=3
+
+       # check whether obdfilter is cache capable at all
+       if ! $LCTL get_param -n obdfilter.*.read_cache_enable; then
+               echo "not cache-capable obdfilter"
+               return 0
+       fi
+
+       # check cache is enabled on all obdfilters
+       if $LCTL get_param -n obdfilter.*.read_cache_enable | grep 0 >&/dev/null; then
+               echo "oss cache is disabled"
+               return 0
+       fi
+
+       $LCTL set_param -n obdfilter.*.writethrough_cache_enable 1
+
+       # pages should be in the case right after write 
+        dd if=/dev/urandom of=$DIR/$tfile bs=4k count=$CPAGES || error "dd failed"
+       BEFORE=`roc_hit`
+        cancel_lru_locks osc
+       cat $DIR/$tfile >/dev/null
+       AFTER=`roc_hit`
+       if ! let "AFTER - BEFORE == CPAGES"; then
+               error "NOT IN CACHE: before: $BEFORE, after: $AFTER"
+       fi
+
+       # the following read invalidates the cache
+        cancel_lru_locks osc
+       $LCTL set_param -n obdfilter.*.read_cache_enable 0
+       cat $DIR/$tfile >/dev/null
+
+       # now data shouldn't be found in the cache
+       BEFORE=`roc_hit`
+        cancel_lru_locks osc
+       cat $DIR/$tfile >/dev/null
+       AFTER=`roc_hit`
+       if ! let "AFTER - BEFORE == CPAGES"; then
+               error "IN CACHE: before: $BEFORE, after: $AFTER"
+       fi
+
+       $LCTL set_param -n obdfilter.*.read_cache_enable 1
+        rm -f $DIR/$tfile
+}
+run_test 151 "test cache on oss and controls ==============================="
+
+test_152() {
+        local TF="$TMP/$tfile"
+
+       # simulate ENOMEM during write
+#define OBD_FAIL_OST_NOMEM             0x226
+        lctl set_param fail_loc=0x80000226
+        dd if=/dev/urandom of=$TF bs=6096 count=1 || error "dd failed"
+        cp $TF $DIR/$tfile
+        sync || error "sync failed"
+        lctl set_param fail_loc=0
+       
+        # discard client's cache
+        cancel_lru_locks osc
+
+        # simulate ENOMEM during read
+        lctl set_param fail_loc=0x80000226
+        cmp $TF $DIR/$tfile || error "cmp failed"
+        lctl set_param fail_loc=0
+
+       rm -f $TF
+}
+run_test 152 "test read/write with enomem ============================"
+
 POOL=${POOL:-cea1}
 TGT_COUNT=$OSTCOUNT
 TGTPOOL_FIRST=1