])
])
+
+AC_DEFUN([LC_EXPORT_INVALIDATE_MAPPING_PAGES],
+ [LB_CHECK_SYMBOL_EXPORT([invalidate_mapping_pages], [mm/truncate.c], [
+ AC_DEFINE(HAVE_INVALIDATE_MAPPING_PAGES, 1,
+ [exported invalidate_mapping_pages])],
+ [LB_CHECK_SYMBOL_EXPORT([invalidate_inode_pages], [mm/truncate.c], [
+ AC_DEFINE(HAVE_INVALIDATE_INODE_PAGES, 1,
+ [exported invalidate_inode_pages])], [
+ AC_MSG_ERROR([no way to invalidate pages])
+ ])
+ ],[])
+])
+
# The actual symbol exported varies among architectures, so we need
# to check many symbols (but only in the current architecture.) No
# matter what symbol is exported, the kernel #defines node_to_cpumask
LC_VFS_KERN_MOUNT
LC_INVALIDATEPAGE_RETURN_INT
LC_UMOUNTBEGIN_HAS_VFSMOUNT
+ if test x$enable_server = xyes ; then
+ LC_EXPORT_INVALIDATE_MAPPING_PAGES
+ fi
#2.6.18 + RHEL5 (fc6)
LC_PG_FS_MISC
#define __fls fls
#endif
+#ifdef HAVE_INVALIDATE_INODE_PAGES
+#define invalidate_mapping_pages(mapping,s,e) invalidate_inode_pages(mapping)
+#endif
+
#endif /* __KERNEL__ */
#endif /* _COMPAT25_H */
obd_size fo_tot_pending;
obd_size fo_readcache_max_filesize;
+ int fo_read_cache;
+ int fo_writethrough_cache;
struct obd_import *fo_mdc_imp;
struct obd_uuid fo_mdc_uuid;
obd_id *startid, obd_gr group, void *data);
int (*o_preprw)(int cmd, struct obd_export *exp, struct obdo *oa,
int objcount, struct obd_ioobj *obj,
- int niocount, struct niobuf_remote *remote,
- struct niobuf_local *local, struct obd_trans_info *oti,
+ struct niobuf_remote *remote, int *nr_pages,
+ struct niobuf_local *local,
+ struct obd_trans_info *oti,
struct lustre_capa *capa);
int (*o_commitrw)(int cmd, struct obd_export *exp, struct obdo *oa,
int objcount, struct obd_ioobj *obj,
- int niocount, struct niobuf_local *local,
+ struct niobuf_remote *remote, int pages,
+ struct niobuf_local *local,
struct obd_trans_info *oti, int rc);
int (*o_enqueue)(struct obd_export *, struct obd_info *oinfo,
struct ldlm_enqueue_info *einfo,
static inline int obd_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
int objcount, struct obd_ioobj *obj,
- int niocount, struct niobuf_remote *remote,
+ struct niobuf_remote *remote, int *pages,
struct niobuf_local *local,
struct obd_trans_info *oti,
struct lustre_capa *capa)
EXP_CHECK_DT_OP(exp, preprw);
EXP_COUNTER_INCREMENT(exp, preprw);
- rc = OBP(exp->exp_obd, preprw)(cmd, exp, oa, objcount, obj, niocount,
- remote, local, oti, capa);
+ rc = OBP(exp->exp_obd, preprw)(cmd, exp, oa, objcount, obj, remote,
+ pages, local, oti, capa);
RETURN(rc);
}
static inline int obd_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
int objcount, struct obd_ioobj *obj,
- int niocount, struct niobuf_local *local,
+ struct niobuf_remote *rnb, int pages,
+ struct niobuf_local *local,
struct obd_trans_info *oti, int rc)
{
ENTRY;
EXP_CHECK_DT_OP(exp, commitrw);
EXP_COUNTER_INCREMENT(exp, commitrw);
- rc = OBP(exp->exp_obd, commitrw)(cmd, exp, oa, objcount, obj, niocount,
- local, oti, rc);
+ rc = OBP(exp->exp_obd, commitrw)(cmd, exp, oa, objcount, obj,
+ rnb, pages, local, oti, rc);
RETURN(rc);
}
#define OBD_FAIL_OST_PAUSE_CREATE 0x223
#define OBD_FAIL_OST_BRW_PAUSE_PACK 0x224
#define OBD_FAIL_OST_CONNECT_NET2 0x225
+#define OBD_FAIL_OST_NOMEM 0x226
#define OBD_FAIL_LDLM 0x300
#define OBD_FAIL_LDLM_NAMESPACE_NEW 0x301
/* This allows us to verify that desc_private is passed unmolested */
#define DESC_PRIV 0x10293847
+static int echo_map_nb_to_lb(struct obdo *oa, struct obd_ioobj *obj,
+ struct niobuf_remote *nb, int *pages,
+ struct niobuf_local *lb, int cmd, int *left)
+{
+ int gfp_mask = (obj->ioo_id & 1) ? CFS_ALLOC_HIGHUSER : CFS_ALLOC_STD;
+ int ispersistent = obj->ioo_id == ECHO_PERSISTENT_OBJID;
+ int debug_setup = (!ispersistent &&
+ (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
+ (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
+ struct niobuf_local *res = lb;
+ obd_off offset = nb->offset;
+ int len = nb->len;
+
+ while (len > 0) {
+ int plen = CFS_PAGE_SIZE - (offset & (CFS_PAGE_SIZE-1));
+ if (len < plen)
+ plen = len;
+
+ /* check for local buf overflow */
+ if (*left == 0)
+ return -EINVAL;
+
+ res->offset = offset;
+ res->len = plen;
+ LASSERT((res->offset & ~CFS_PAGE_MASK) + res->len <= CFS_PAGE_SIZE);
+
+
+ if (ispersistent &&
+ (res->offset >> CFS_PAGE_SHIFT) < ECHO_PERSISTENT_PAGES) {
+ res->page = echo_persistent_pages[res->offset >>
+ CFS_PAGE_SHIFT];
+ /* Take extra ref so __free_pages() can be called OK */
+ cfs_get_page (res->page);
+ } else {
+ OBD_PAGE_ALLOC(res->page, gfp_mask);
+ if (res->page == NULL) {
+ CERROR("can't get page for id " LPU64"\n",
+ obj->ioo_id);
+ return -ENOMEM;
+ }
+ }
+
+ CDEBUG(D_PAGE, "$$$$ get page %p @ "LPU64" for %d\n",
+ res->page, res->offset, res->len);
+
+ if (cmd & OBD_BRW_READ)
+ res->rc = res->len;
+
+ if (debug_setup)
+ echo_page_debug_setup(res->page, cmd, obj->ioo_id,
+ res->offset, res->len);
+
+ offset += plen;
+ len -= plen;
+ res++;
+
+ (*left)--;
+ (*pages)++;
+ }
+
+ return 0;
+}
+
int echo_preprw(int cmd, struct obd_export *export, struct obdo *oa,
- int objcount, struct obd_ioobj *obj, int niocount,
- struct niobuf_remote *nb, struct niobuf_local *res,
+ int objcount, struct obd_ioobj *obj, struct niobuf_remote *nb,
+ int *pages, struct niobuf_local *res,
struct obd_trans_info *oti, struct lustre_capa *unused)
{
struct obd_device *obd;
struct niobuf_local *r = res;
int tot_bytes = 0;
int rc = 0;
- int i;
+ int i, left;
ENTRY;
obd = export->exp_obd;
/* Temp fix to stop falling foul of osc_announce_cached() */
oa->o_valid &= ~(OBD_MD_FLBLOCKS | OBD_MD_FLGRANT);
- memset(res, 0, sizeof(*res) * niocount);
+ memset(res, 0, sizeof(*res) * *pages);
CDEBUG(D_PAGE, "%s %d obdos with %d IOs\n",
- cmd == OBD_BRW_READ ? "reading" : "writing", objcount, niocount);
+ cmd == OBD_BRW_READ ? "reading" : "writing", objcount, *pages);
if (oti)
oti->oti_handle = (void *)DESC_PRIV;
+ left = *pages;
+ *pages = 0;
+
for (i = 0; i < objcount; i++, obj++) {
- int gfp_mask = (obj->ioo_id & 1) ? CFS_ALLOC_HIGHUSER : CFS_ALLOC_STD;
- int ispersistent = obj->ioo_id == ECHO_PERSISTENT_OBJID;
- int debug_setup = (!ispersistent &&
- (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
- (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
int j;
for (j = 0 ; j < obj->ioo_bufcnt ; j++, nb++, r++) {
- if (ispersistent &&
- (nb->offset >> CFS_PAGE_SHIFT) < ECHO_PERSISTENT_PAGES) {
- r->page = echo_persistent_pages[nb->offset >>
- CFS_PAGE_SHIFT];
- /* Take extra ref so __free_pages() can be called OK */
- cfs_get_page (r->page);
- } else {
- OBD_PAGE_ALLOC(r->page, gfp_mask);
- if (r->page == NULL) {
- CERROR("can't get page %u/%u for id "
- LPU64"\n",
- j, obj->ioo_bufcnt, obj->ioo_id);
- GOTO(preprw_cleanup, rc = -ENOMEM);
- }
- }
+ rc = echo_map_nb_to_lb(oa, obj, nb, pages,
+ res + *pages, cmd, &left);
+ if (rc)
+ GOTO(preprw_cleanup, rc);
tot_bytes += nb->len;
-
- atomic_inc(&obd->u.echo.eo_prep);
-
- r->offset = nb->offset;
- r->len = nb->len;
- LASSERT((r->offset & ~CFS_PAGE_MASK) + r->len <= CFS_PAGE_SIZE);
-
- CDEBUG(D_PAGE, "$$$$ get page %p @ "LPU64" for %d\n",
- r->page, r->offset, r->len);
-
- if (cmd & OBD_BRW_READ)
- r->rc = r->len;
-
- if (debug_setup)
- echo_page_debug_setup(r->page, cmd, obj->ioo_id,
- r->offset, r->len);
}
}
+
+ atomic_add(*pages, &obd->u.echo.eo_prep);
+
if (cmd & OBD_BRW_READ)
lprocfs_counter_add(obd->obd_stats, LPROC_ECHO_READ_BYTES,
tot_bytes);
* all down again. I believe that this is what the in-kernel
* prep/commit operations do.
*/
- CERROR("cleaning up %ld pages (%d obdos)\n", (long)(r - res), objcount);
- while (r-- > res) {
- cfs_kunmap(r->page);
+ CERROR("cleaning up %u pages (%d obdos)\n", *pages, objcount);
+ for (i = 0; i < *pages; i++) {
+ cfs_kunmap(res[i].page);
/* NB if this is a persistent page, __free_pages will just
* lose the extra ref gained above */
- OBD_PAGE_FREE(r->page);
+ OBD_PAGE_FREE(res[i].page);
+ res[i].page = NULL;
atomic_dec(&obd->u.echo.eo_prep);
}
- memset(res, 0, sizeof(*res) * niocount);
return rc;
}
int echo_commitrw(int cmd, struct obd_export *export, struct obdo *oa,
- int objcount, struct obd_ioobj *obj, int niocount,
+ int objcount, struct obd_ioobj *obj,
+ struct niobuf_remote *rb, int niocount,
struct niobuf_local *res, struct obd_trans_info *oti, int rc)
{
struct obd_device *obd;
static int ec_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
{
- struct echo_async_state *eas;
struct echo_async_page *eap = eap_from_cookie(data);
+ struct echo_async_state *eas;
eas = eap->eap_eas;
off = offset;
for(; tot_pages; tot_pages -= npages) {
+ int lpages;
+
if (tot_pages < npages)
npages = tot_pages;
ioo.ioo_bufcnt = npages;
oti->oti_transno = 0;
- ret = obd_preprw(rw, exp, oa, 1, &ioo, npages, rnb, lnb, oti,
+ lpages = npages;
+ ret = obd_preprw(rw, exp, oa, 1, &ioo, rnb, &lpages, lnb, oti,
NULL);
if (ret != 0)
GOTO(out, ret);
+ LASSERT(lpages == npages);
- for (i = 0; i < npages; i++) {
+ for (i = 0; i < lpages; i++) {
cfs_page_t *page = lnb[i].page;
/* read past eof? */
rnb[i].len);
}
- ret = obd_commitrw(rw, exp, oa, 1, &ioo, npages, lnb, oti, ret);
+ ret = obd_commitrw(rw, exp, oa, 1,&ioo,rnb,npages,lnb,oti,ret);
if (ret != 0)
GOTO(out, ret);
}
CFS_INIT_LIST_HEAD(&filter->fo_export_list);
sema_init(&filter->fo_alloc_lock, 1);
init_brw_stats(&filter->fo_filter_stats);
+ filter->fo_read_cache = 1; /* enable read-only cache by default */
+ filter->fo_writethrough_cache = 0; /* disable writethrough cache */
filter->fo_readcache_max_filesize = FILTER_MAX_CACHE_SIZE;
filter->fo_fmd_max_num = FILTER_FMD_MAX_NUM_DEFAULT;
filter->fo_fmd_max_age = FILTER_FMD_MAX_AGE_DEFAULT;
lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
LPROCFS_CNTR_AVGMINMAX,
"write_bytes", "bytes");
+ lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_GET_PAGE,
+ LPROCFS_CNTR_AVGMINMAX|LPROCFS_CNTR_STDDEV,
+ "get_page", "usec");
+ lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_NO_PAGE,
+ LPROCFS_CNTR_AVGMINMAX,
+ "get_page_failures", "num");
+ lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_CACHE_ACCESS,
+ LPROCFS_CNTR_AVGMINMAX,
+ "cache_access", "pages");
+ lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_CACHE_HIT,
+ LPROCFS_CNTR_AVGMINMAX,
+ "cache_hit", "pages");
+ lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_CACHE_MISS,
+ LPROCFS_CNTR_AVGMINMAX,
+ "cache_miss", "pages");
lproc_filter_attach_seqstat(obd);
obd->obd_proc_exports_entry = lprocfs_register("exports",
}
if (locked) {
- /* Let's flush truncated page on disk immediately, then we can
- * avoid need to search for page aliases before directio writes
- * and this sort of stuff at expense of somewhat slower
- * truncates not on a page boundary. I believe this is the only
- * place in filter code that can lead to pages getting to
- * pagecache so far. */
- filter_clear_truncated_page(inode);
+ /* truncate can leave dirty pages in the cache.
+ * we'll take care of them in write path -bzzz */
UNLOCK_INODE_MUTEX(inode);
locked = 0;
}
enum {
LPROC_FILTER_READ_BYTES = 0,
LPROC_FILTER_WRITE_BYTES = 1,
+ LPROC_FILTER_GET_PAGE = 2,
+ LPROC_FILTER_NO_PAGE = 3,
+ LPROC_FILTER_CACHE_ACCESS = 4,
+ LPROC_FILTER_CACHE_HIT = 5,
+ LPROC_FILTER_CACHE_MISS = 6,
LPROC_FILTER_LAST,
};
/* filter_io.c */
int filter_preprw(int cmd, struct obd_export *, struct obdo *, int objcount,
- struct obd_ioobj *, int niocount, struct niobuf_remote *,
- struct niobuf_local *, struct obd_trans_info *,
+ struct obd_ioobj *, struct niobuf_remote *,
+ int *, struct niobuf_local *, struct obd_trans_info *,
struct lustre_capa *);
int filter_commitrw(int cmd, struct obd_export *, struct obdo *, int objcount,
- struct obd_ioobj *, int niocount, struct niobuf_local *,
- struct obd_trans_info *, int rc);
+ struct obd_ioobj *, struct niobuf_remote *, int,
+ struct niobuf_local *, struct obd_trans_info *, int rc);
int filter_brw(int cmd, struct obd_export *, struct obd_info *oinfo,
obd_count oa_bufs, struct brw_page *pga, struct obd_trans_info *);
-void flip_into_page_cache(struct inode *inode, struct page *new_page);
+void filter_invalidate_cache(struct obd_device *, struct obd_ioobj *,
+ struct niobuf_remote *, struct inode *);
/* filter_io_*.c */
struct filter_iobuf;
int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount,
- struct obd_ioobj *obj, int niocount,
+ struct obd_ioobj *obj, struct niobuf_remote *, int,
struct niobuf_local *res, struct obd_trans_info *oti,
int rc);
obd_size filter_grant_space_left(struct obd_export *exp);
int *obdfilter_created_scratchpad;
-static int filter_alloc_dio_page(struct obd_device *obd, struct inode *inode,
- struct niobuf_local *lnb)
-{
- struct page *page;
-
- LASSERT(lnb->page != NULL);
-
- page = lnb->page;
-#if 0
- POISON_PAGE(page, 0xf1);
- if (lnb->len != CFS_PAGE_SIZE) {
- memset(kmap(page) + lnb->len, 0, CFS_PAGE_SIZE - lnb->len);
- kunmap(page);
- }
-#endif
- page->index = lnb->offset >> CFS_PAGE_SHIFT;
-
- RETURN(0);
-}
-
-static void filter_free_dio_pages(int objcount, struct obd_ioobj *obj,
- int niocount, struct niobuf_local *res)
-{
- int i, j;
-
- for (i = 0; i < objcount; i++, obj++) {
- for (j = 0 ; j < obj->ioo_bufcnt ; j++, res++)
- res->page = NULL;
- }
-}
-
/* Grab the dirty and seen grant announcements from the incoming obdo.
* We will later calculate the clients new grant and return it.
* Caller must hold osfs lock */
return grant;
}
+/*
+ * the routine is used to request pages from pagecache
+ *
+ * use GFP_NOFS not allowing to enter FS as the client can run on this node
+ * and we might end waiting on a page he sent in the request we're serving.
+ *
+ * use NORETRY so that the allocator doesn't go crazy: chance to more lucky
+ * thread have enough memory to complete his request. for our request client
+ * will do resend hopefully -bzzz
+ */
+static struct page * filter_get_page(struct obd_device *obd,
+ struct inode *inode,
+ obd_off offset)
+{
+ struct page *page;
+
+ page = find_or_create_page(inode->i_mapping, offset >> CFS_PAGE_SHIFT,
+ GFP_NOFS | __GFP_NORETRY);
+ if (unlikely(page == NULL))
+ lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_NO_PAGE, 1);
+
+ return page;
+}
+
+/*
+ * the routine initializes array of local_niobuf from remote_niobuf
+ */
+static int filter_map_remote_to_local(int objcount, struct obd_ioobj *obj,
+ struct niobuf_remote *nb,
+ int *nrpages, struct niobuf_local *res)
+{
+ struct niobuf_remote *rnb;
+ struct niobuf_local *lnb;
+ int i, max;
+ ENTRY;
+
+ /* we don't support multiobject RPC yet
+ * ost_brw_read() and ost_brw_write() check this */
+ LASSERT(objcount == 1);
+
+ max = *nrpages;
+ *nrpages = 0;
+ for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt; i++, rnb++) {
+ obd_off offset = rnb->offset;
+ unsigned int len = rnb->len;
+
+ while (len > 0) {
+ int poff = offset & (CFS_PAGE_SIZE - 1);
+ int plen = CFS_PAGE_SIZE - poff;
+
+ if (*nrpages >= max) {
+ CERROR("small array of local bufs: %d\n", max);
+ RETURN(-EINVAL);
+ }
+
+ if (plen > len)
+ plen = len;
+ lnb->offset = offset;
+ lnb->len = plen;
+ lnb->flags = rnb->flags;
+ lnb->page = NULL;
+ lnb->rc = 0;
+ lnb->lnb_grant_used = 0;
+
+ LASSERTF(plen <= len, "plen %u, len %u\n", plen, len);
+ offset += plen;
+ len -= plen;
+ lnb++;
+ (*nrpages)++;
+ }
+ }
+ RETURN(0);
+}
+
+/*
+ * the function is used to free all pages used for request
+ * just to mimic cacheless OSS which don't occupy much memory
+ */
+void filter_invalidate_cache(struct obd_device *obd, struct obd_ioobj *obj,
+ struct niobuf_remote *nb, struct inode *inode)
+{
+ struct niobuf_remote *rnb;
+ int i;
+
+ LASSERT(inode != NULL);
+
+ for (i = 0, rnb = nb; i < obj->ioo_bufcnt; i++, rnb++) {
+ obd_off start = rnb->offset >> CFS_PAGE_SHIFT;
+ obd_off end = (rnb->offset + rnb->len) >> CFS_PAGE_SHIFT;
+ invalidate_mapping_pages(inode->i_mapping, start, end);
+ }
+
+}
+
static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
int objcount, struct obd_ioobj *obj,
- int niocount, struct niobuf_remote *nb,
- struct niobuf_local *res,
+ struct niobuf_remote *nb,
+ int *pages, struct niobuf_local *res,
struct obd_trans_info *oti,
struct lustre_capa *capa)
{
struct obd_device *obd = exp->exp_obd;
+ struct filter_obd *fo = &obd->u.filter;
+ struct timeval start, end;
struct lvfs_run_ctxt saved;
- struct niobuf_remote *rnb;
struct niobuf_local *lnb;
struct dentry *dentry = NULL;
- struct inode *inode;
+ struct inode *inode = NULL;
void *iobuf = NULL;
int rc = 0, i, tot_bytes = 0;
unsigned long now = jiffies;
+ long timediff;
ENTRY;
/* We are currently not supporting multi-obj BRW_READ RPCS at all.
inode = dentry->d_inode;
obdo_to_inode(inode, oa, OBD_MD_FLATIME);
+
+ rc = filter_map_remote_to_local(objcount, obj, nb, pages, res);
+ if (rc)
+ GOTO(cleanup, rc);
+
fsfilt_check_slow(obd, now, "preprw_read setup");
- for (i = 0, lnb = res, rnb = nb; i < obj->ioo_bufcnt;
- i++, rnb++, lnb++) {
+ /* find pages for all segments, fill array with them */
+ do_gettimeofday(&start);
+ for (i = 0, lnb = res; i < *pages; i++, lnb++) {
+
lnb->dentry = dentry;
- lnb->offset = rnb->offset;
- lnb->len = rnb->len;
- lnb->flags = rnb->flags;
-
- /*
- * ost_brw_write()->ost_nio_pages_get() already initialized
- * lnb->page to point to the page from the per-thread page
- * pool (bug 5137), initialize page.
- */
- LASSERT(lnb->page != NULL);
-
- if (i_size_read(inode) <= rnb->offset)
+
+ if (i_size_read(inode) <= lnb->offset)
/* If there's no more data, abort early. lnb->rc == 0,
* so it's easy to detect later. */
break;
- else
- filter_alloc_dio_page(obd, inode, lnb);
+
+ lnb->page = filter_get_page(obd, inode, lnb->offset);
+ if (lnb->page == NULL)
+ GOTO(cleanup, rc = -ENOMEM);
+
+ lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_CACHE_ACCESS, 1);
if (i_size_read(inode) < lnb->offset + lnb->len - 1)
lnb->rc = i_size_read(inode) - lnb->offset;
tot_bytes += lnb->rc;
+ if (PageUptodate(lnb->page)) {
+ lprocfs_counter_add(obd->obd_stats,
+ LPROC_FILTER_CACHE_HIT, 1);
+ continue;
+ }
+
+ lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_CACHE_MISS, 1);
filter_iobuf_add_page(obd, iobuf, inode, lnb->page);
}
+ do_gettimeofday(&end);
+ timediff = cfs_timeval_sub(&end, &start, NULL);
+ lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_GET_PAGE, timediff);
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_OST_NOMEM))
+ GOTO(cleanup, rc = -ENOMEM);
fsfilt_check_slow(obd, now, "start_page_read");
EXIT;
cleanup:
- if (rc != 0) {
- filter_free_dio_pages(objcount, obj, niocount, res);
+ /* unlock pages to allow access from concurrent OST_READ */
+ for (i = 0, lnb = res; i < *pages; i++, lnb++) {
+ if (lnb->page) {
+ LASSERT(PageLocked(lnb->page));
+ unlock_page(lnb->page);
+
+ if (rc) {
+ page_cache_release(lnb->page);
+ lnb->page = NULL;
+ }
+ }
+ }
+ if (inode && (fo->fo_read_cache == 0 ||
+ i_size_read(inode) > fo->fo_readcache_max_filesize))
+ filter_invalidate_cache(obd, obj, nb, inode);
+
+ if (rc != 0) {
if (dentry != NULL)
f_dput(dentry);
}
* Caller must hold obd_osfs_lock. */
static int filter_grant_check(struct obd_export *exp, struct obdo *oa,
int objcount, struct fsfilt_objinfo *fso,
- int niocount, struct niobuf_remote *rnb,
- struct niobuf_local *lnb, obd_size *left,
- struct inode *inode)
+ int niocount, struct niobuf_local *lnb,
+ obd_size *left, struct inode *inode)
{
struct filter_export_data *fed = &exp->exp_filter_data;
int blocksize = exp->exp_obd->u.obt.obt_sb->s_blocksize;
int tmp, bytes;
/* should match the code in osc_exit_cache */
- bytes = rnb[n].len;
- bytes += rnb[n].offset & (blocksize - 1);
- tmp = (rnb[n].offset + rnb[n].len) & (blocksize - 1);
+ bytes = lnb[n].len;
+ bytes += lnb[n].offset & (blocksize - 1);
+ tmp = (lnb[n].offset + lnb[n].len) & (blocksize - 1);
if (tmp)
bytes += blocksize - tmp;
- if ((rnb[n].flags & OBD_BRW_FROM_GRANT) &&
+ if ((lnb[n].flags & OBD_BRW_FROM_GRANT) &&
(oa->o_valid & OBD_MD_FLGRANT)) {
if (fed->fed_grant < used + bytes) {
CDEBUG(D_CACHE,
used, bytes, fed->fed_grant, n);
} else {
used += bytes;
- rnb[n].flags |= OBD_BRW_GRANTED;
+ lnb[n].flags |= OBD_BRW_GRANTED;
lnb[n].lnb_grant_used = bytes;
CDEBUG(0, "idx %d used=%lu\n", n, used);
rc = 0;
if (*left > ungranted + bytes) {
/* if enough space, pretend it was granted */
ungranted += bytes;
- rnb[n].flags |= OBD_BRW_GRANTED;
+ lnb[n].flags |= OBD_BRW_GRANTED;
lnb[n].lnb_grant_used = bytes;
CDEBUG(0, "idx %d ungranted=%lu\n",n,ungranted);
rc = 0;
* marked BRW_GRANTED are already mapped and we can
* ignore this error. */
lnb[n].rc = -ENOSPC;
- rnb[n].flags &= ~OBD_BRW_GRANTED;
+ lnb[n].flags &= ~OBD_BRW_GRANTED;
CDEBUG(D_CACHE,"%s: cli %s/%p idx %d no space for %d\n",
exp->exp_obd->obd_name,
exp->exp_client_uuid.uuid, exp, n, bytes);
* bug) or ensure we get the page locks in an appropriate order. */
static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
int objcount, struct obd_ioobj *obj,
- int niocount, struct niobuf_remote *nb,
+ struct niobuf_remote *nb, int *pages,
struct niobuf_local *res,
struct obd_trans_info *oti,
struct lustre_capa *capa)
{
+ struct obd_device *obd = exp->exp_obd;
+ struct timeval start, end;
struct lvfs_run_ctxt saved;
- struct niobuf_remote *rnb;
struct niobuf_local *lnb = res;
struct fsfilt_objinfo fso;
struct filter_mod_data *fmd;
struct dentry *dentry = NULL;
void *iobuf;
obd_size left;
- unsigned long now = jiffies;
+ unsigned long now = jiffies, timediff;
int rc = 0, i, tot_bytes = 0, cleanup_phase = 0;
ENTRY;
LASSERT(objcount == 1);
GOTO(cleanup, rc = -ENOENT);
}
- fso.fso_dentry = dentry;
- fso.fso_bufcnt = obj->ioo_bufcnt;
+ rc = filter_map_remote_to_local(objcount, obj, nb, pages, res);
+ if (rc)
+ GOTO(cleanup, rc);
fsfilt_check_slow(exp->exp_obd, now, "preprw_write setup");
left = filter_grant_space_left(exp);
- rc = filter_grant_check(exp, oa, objcount, &fso, niocount, nb, res,
+ fso.fso_dentry = dentry;
+ fso.fso_bufcnt = *pages;
+
+ rc = filter_grant_check(exp, oa, objcount, &fso, *pages, res,
&left, dentry->d_inode);
/* do not zero out oa->o_valid as it is used in filter_commitrw_write()
if (rc)
GOTO(cleanup, rc);
- for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt;
- i++, lnb++, rnb++) {
+ do_gettimeofday(&start);
+ for (i = 0, lnb = res; i < *pages; i++, lnb++) {
+
/* We still set up for ungranted pages so that granted pages
* can be written to disk as they were promised, and portals
* needs to keep the pages all aligned properly. */
lnb->dentry = dentry;
- lnb->offset = rnb->offset;
- lnb->len = rnb->len;
- lnb->flags = rnb->flags;
-
- /*
- * ost_brw_write()->ost_nio_pages_get() already initialized
- * lnb->page to point to the page from the per-thread page
- * pool (bug 5137), initialize page.
- */
- LASSERT(lnb->page != NULL);
- if (lnb->len != CFS_PAGE_SIZE) {
- memset(kmap(lnb->page) + lnb->len,
- 0, CFS_PAGE_SIZE - lnb->len);
- kunmap(lnb->page);
- }
- lnb->page->index = lnb->offset >> CFS_PAGE_SHIFT;
+ lnb->page = filter_get_page(obd, dentry->d_inode, lnb->offset);
+ if (lnb->page == NULL)
+ GOTO(cleanup, rc = -ENOMEM);
cleanup_phase = 4;
+ /* DLM locking protects us from write and truncate competing
+ * for same region, but truncate can leave dirty page in the
+ * cache. it's possible the writeout on a such a page is in
+ * progress when we access it. it's also possible that during
+ * this writeout we put new (partial) data, but then won't
+ * be able to proceed in filter_commitrw_write(). thus let's
+ * just wait for writeout completion, should be rare enough.
+ * -bzzz */
+ wait_on_page_writeback(lnb->page);
+
/* If the filter writes a partial page, then has the file
* extended, the client will read in the whole page. the
* filter has to be careful to zero the rest of the partial
if (lnb->rc == 0)
tot_bytes += lnb->len;
}
+ do_gettimeofday(&end);
+ timediff = cfs_timeval_sub(&end, &start, NULL);
+ lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_GET_PAGE, timediff);
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_OST_NOMEM))
+ GOTO(cleanup, rc = -ENOMEM);
+ /* don't unlock pages to prevent any access */
rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf, exp,
NULL, NULL, NULL);
cleanup:
switch(cleanup_phase) {
case 4:
+ if (rc) {
+ for (i = 0, lnb = res; i < *pages; i++, lnb++) {
+ if (lnb->page != NULL) {
+ unlock_page(lnb->page);
+ page_cache_release(lnb->page);
+ lnb->page = NULL;
+ }
+ }
+ }
case 3:
filter_iobuf_put(&exp->exp_obd->u.filter, iobuf, oti);
case 2:
}
int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
- int objcount, struct obd_ioobj *obj, int niocount,
- struct niobuf_remote *nb, struct niobuf_local *res,
- struct obd_trans_info *oti, struct lustre_capa *capa)
+ int objcount, struct obd_ioobj *obj,
+ struct niobuf_remote *nb, int *pages,
+ struct niobuf_local *res, struct obd_trans_info *oti,
+ struct lustre_capa *capa)
{
if (cmd == OBD_BRW_WRITE)
return filter_preprw_write(cmd, exp, oa, objcount, obj,
- niocount, nb, res, oti, capa);
+ nb, pages, res, oti, capa);
if (cmd == OBD_BRW_READ)
return filter_preprw_read(cmd, exp, oa, objcount, obj,
- niocount, nb, res, oti, capa);
+ nb, pages, res, oti, capa);
LBUG();
return -EPROTO;
}
-void filter_release_read_page(struct filter_obd *filter, struct inode *inode,
- struct page *page)
-{
- int drop = 0;
-
- if (inode != NULL &&
- (i_size_read(inode) > filter->fo_readcache_max_filesize))
- drop = 1;
-
- /* drop from cache like truncate_list_pages() */
- if (drop && !TryLockPage(page)) {
- if (page->mapping)
- ll_truncate_complete_page(page);
- unlock_page(page);
- }
- page_cache_release(page);
-}
-
static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa,
int objcount, struct obd_ioobj *obj,
- int niocount, struct niobuf_local *res,
+ struct niobuf_remote *rnb,
+ int pages, struct niobuf_local *res,
struct obd_trans_info *oti, int rc)
{
struct inode *inode = NULL;
struct ldlm_res_id res_id;
struct ldlm_resource *resource = NULL;
struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
+ struct niobuf_local *lnb;
+ int i;
ENTRY;
osc_build_res_name(obj->ioo_id, obj->ioo_gr, &res_id);
if (res->dentry != NULL)
inode = res->dentry->d_inode;
- filter_free_dio_pages(objcount, obj, niocount, res);
+ for (i = 0, lnb = res; i < pages; i++, lnb++) {
+ if (lnb->page != NULL) {
+ page_cache_release(lnb->page);
+ lnb->page = NULL;
+ }
+ }
if (res->dentry != NULL)
f_dput(res->dentry);
RETURN(rc);
}
-void flip_into_page_cache(struct inode *inode, struct page *new_page)
-{
- struct page *old_page;
- int rc;
-
- do {
- /* the dlm is protecting us from read/write concurrency, so we
- * expect this find_lock_page to return quickly. even if we
- * race with another writer it won't be doing much work with
- * the page locked. we do this 'cause t_c_p expects a
- * locked page, and it wants to grab the pagecache lock
- * as well. */
- old_page = find_lock_page(inode->i_mapping, new_page->index);
- if (old_page) {
- ll_truncate_complete_page(old_page);
- unlock_page(old_page);
- page_cache_release(old_page);
- }
-
-#if 0 /* this should be a /proc tunable someday */
- /* racing o_directs (no locking ioctl) could race adding
- * their pages, so we repeat the page invalidation unless
- * we successfully added our new page */
- rc = add_to_page_cache_unique(new_page, inode->i_mapping,
- new_page->index,
- page_hash(inode->i_mapping,
- new_page->index));
- if (rc == 0) {
- /* add_to_page_cache clears uptodate|dirty and locks
- * the page */
- SetPageUptodate(new_page);
- unlock_page(new_page);
- }
-#else
- rc = 0;
-#endif
- } while (rc != 0);
-}
-
void filter_grant_commit(struct obd_export *exp, int niocount,
struct niobuf_local *res)
{
}
int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
- int objcount, struct obd_ioobj *obj, int niocount,
+ int objcount, struct obd_ioobj *obj,
+ struct niobuf_remote *nb, int pages,
struct niobuf_local *res, struct obd_trans_info *oti,
int rc)
{
if (cmd == OBD_BRW_WRITE)
- return filter_commitrw_write(exp, oa, objcount, obj, niocount,
- res, oti, rc);
+ return filter_commitrw_write(exp, oa, objcount, obj,
+ nb, pages, res, oti, rc);
if (cmd == OBD_BRW_READ)
- return filter_commitrw_read(exp, oa, objcount, obj, niocount,
- res, oti, rc);
+ return filter_commitrw_read(exp, oa, objcount, obj,
+ nb, pages, res, oti, rc);
LBUG();
return -EPROTO;
}
struct niobuf_local *lnb;
struct niobuf_remote *rnb;
obd_count i;
- int ret = 0;
+ int ret = 0, npages;
ENTRY;
OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
obdo_to_ioobj(oinfo->oi_oa, &ioo);
ioo.ioo_bufcnt = oa_bufs;
+ npages = oa_bufs;
ret = filter_preprw(cmd, exp, oinfo->oi_oa, 1, &ioo,
- oa_bufs, rnb, lnb, oti, oinfo_capa(oinfo));
+ rnb, &npages, lnb, oti, oinfo_capa(oinfo));
if (ret != 0)
GOTO(out, ret);
+ LASSERTF(oa_bufs == npages, "%u != %u\n", oa_bufs, npages);
- ret = filter_commitrw(cmd, exp, oinfo->oi_oa, 1, &ioo,
- oa_bufs, lnb, oti, ret);
+ ret = filter_commitrw(cmd, exp, oinfo->oi_oa, 1, &ioo, rnb,
+ npages, lnb, oti, ret);
out:
if (lnb)
int dr_error;
struct page **dr_pages;
unsigned long *dr_blocks;
- spinlock_t dr_lock; /* IRQ lock */
unsigned int dr_ignore_quota:1;
struct filter_obd *dr_filter;
};
static int dio_complete_routine(struct bio *bio, unsigned int done, int error)
{
struct filter_iobuf *iobuf = bio->bi_private;
- unsigned long flags;
-
-#ifdef HAVE_PAGE_CONSTANT
struct bio_vec *bvl;
int i;
-#endif
/* CAVEAT EMPTOR: possibly in IRQ context
* DO NOT record procfs stats here!!! */
if (bio->bi_size) /* Not complete */
return 1;
- if (iobuf == NULL) {
+ if (unlikely(iobuf == NULL)) {
CERROR("***** bio->bi_private is NULL! This should never "
"happen. Normally, I would crash here, but instead I "
"will dump the bio contents to the console. Please "
return 0;
}
+ /* the check is outside of the cycle for performance reason -bzzz */
+ if (!test_bit(BIO_RW, &bio->bi_rw)) {
+ bio_for_each_segment(bvl, bio, i) {
+ if (likely(error == 0))
+ SetPageUptodate(bvl->bv_page);
+ LASSERT(PageLocked(bvl->bv_page));
#ifdef HAVE_PAGE_CONSTANT
- bio_for_each_segment(bvl, bio, i)
- ClearPageConstant(bvl->bv_page);
+ ClearPageConstant(bvl->bv_page);
#endif
+ }
+ record_finish_io(iobuf, OBD_BRW_READ, error);
+ } else {
+#ifdef HAVE_PAGE_CONSTANT
+ if (mapping_cap_page_constant_write(iobuf->dr_pages[0]->mapping)){
+ bio_for_each_segment(bvl, bio, i) {
+ ClearPageConstant(bvl->bv_page);
+ }
+ }
+#endif
+ record_finish_io(iobuf, OBD_BRW_WRITE, error);
+ }
- spin_lock_irqsave(&iobuf->dr_lock, flags);
- if (iobuf->dr_error == 0)
+ /* any real error is good enough -bzzz */
+ if (error != 0 && iobuf->dr_error == 0)
iobuf->dr_error = error;
- spin_unlock_irqrestore(&iobuf->dr_lock, flags);
-
- record_finish_io(iobuf, test_bit(BIO_RW, &bio->bi_rw) ?
- OBD_BRW_WRITE : OBD_BRW_READ, error);
/* Completed bios used to be chained off iobuf->dr_bios and freed in
* filter_clear_dreq(). It was then possible to exhaust the biovec-256
iobuf->dr_filter = filter;
init_waitqueue_head(&iobuf->dr_wait);
atomic_set(&iobuf->dr_numreqs, 0);
- spin_lock_init(&iobuf->dr_lock);
iobuf->dr_max_pages = num_pages;
iobuf->dr_npages = 0;
iobuf->dr_error = 0;
RETURN(rc);
}
-/* These are our hacks to keep our directio/bh IO coherent with ext3's
- * page cache use. Most notably ext3 reads file data into the page
- * cache when it is zeroing the tail of partial-block truncates and
- * leaves it there, sometimes generating io from it at later truncates.
- * This removes the partial page and its buffers from the page cache,
- * so it should only ever cause a wait in rare cases, as otherwise we
- * always do full-page IO to the OST.
- *
- * The call to truncate_complete_page() will call journal_invalidatepage()
- * to free the buffers and drop the page from cache. The buffers should
- * not be dirty, because we already called fdatasync/fdatawait on them.
- */
-static int filter_sync_inode_data(struct inode *inode, int locked)
-{
- int rc = 0;
-
- /* This is nearly do_fsync(), without the waiting on the inode */
- /* XXX: in 2.6.16 (at least) we don't need to hold i_mutex over
- * filemap_fdatawrite() and filemap_fdatawait(), so we may no longer
- * need this lock here at all. */
- if (!locked)
- LOCK_INODE_MUTEX(inode);
- if (inode->i_mapping->nrpages) {
-#ifdef PF_SYNCWRITE
- current->flags |= PF_SYNCWRITE;
-#endif
- rc = filemap_fdatawrite(inode->i_mapping);
- if (rc == 0)
- rc = filemap_fdatawait(inode->i_mapping);
-#ifdef PF_SYNCWRITE
- current->flags &= ~PF_SYNCWRITE;
-#endif
- }
- if (!locked)
- UNLOCK_INODE_MUTEX(inode);
-
- return rc;
-}
-/* Clear pages from the mapping before we do direct IO to that offset.
- * Now that the only source of such pages in the truncate path flushes
- * these pages to disk and then discards them, this is error condition.
- * If add back read cache this will happen again. This could be disabled
- * until that time if we never see the below error. */
-static int filter_clear_page_cache(struct inode *inode,
- struct filter_iobuf *iobuf)
-{
- struct page *page;
- int i, rc;
-
- rc = filter_sync_inode_data(inode, 0);
- if (rc != 0)
- RETURN(rc);
-
- /* be careful to call this after fsync_inode_data_buffers has waited
- * for IO to complete before we evict it from the cache */
- for (i = 0; i < iobuf->dr_npages; i++) {
- page = find_lock_page(inode->i_mapping,
- iobuf->dr_pages[i]->index);
- if (page == NULL)
- continue;
- if (page->mapping != NULL) {
- CERROR("page %lu (%d/%d) in page cache during write!\n",
- page->index, i, iobuf->dr_npages);
- wait_on_page_writeback(page);
- ll_truncate_complete_page(page);
- }
-
- unlock_page(page);
- page_cache_release(page);
- }
-
- return 0;
-}
-
-int filter_clear_truncated_page(struct inode *inode)
-{
- struct page *page;
- int rc;
-
- /* Truncate on page boundary, so nothing to flush? */
- if (!(i_size_read(inode) & ~CFS_PAGE_MASK))
- return 0;
-
- rc = filter_sync_inode_data(inode, 1);
- if (rc != 0)
- RETURN(rc);
-
- /* be careful to call this after fsync_inode_data_buffers has waited
- * for IO to complete before we evict it from the cache */
- page = find_lock_page(inode->i_mapping,
- i_size_read(inode) >> CFS_PAGE_SHIFT);
- if (page) {
- if (page->mapping != NULL) {
- wait_on_page_writeback(page);
- ll_truncate_complete_page(page);
- }
- unlock_page(page);
- page_cache_release(page);
- }
-
- return 0;
-}
-
/* Must be called with i_mutex taken for writes; this will drop it */
int filter_direct_io(int rw, struct dentry *dchild, struct filter_iobuf *iobuf,
struct obd_export *exp, struct iattr *attr,
iobuf->dr_blocks, blocks_per_page, 0);
}
- rc = filter_clear_page_cache(inode, iobuf);
- if (rc != 0)
- RETURN(rc);
-
RETURN(filter_do_bio(exp, inode, iobuf, rw));
}
return 1;
}
+/*
+ * interesting use cases on how it interacts with VM:
+ *
+ * - vm writeout -- shouldn't see our pages as we don't mark them dirty
+ * though vm can find partial page left dirty by truncate. in this
+ * usual writeout is used unless our write rewrite that page - then we
+ * drop PG_dirty with PG_lock held.
+ *
+ * - else?
+ *
+ */
int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
- int objcount, struct obd_ioobj *obj, int niocount,
+ int objcount, struct obd_ioobj *obj,
+ struct niobuf_remote *nb, int niocount,
struct niobuf_local *res, struct obd_trans_info *oti,
int rc)
{
unsigned long now = jiffies;
int i, err, cleanup_phase = 0;
struct obd_device *obd = exp->exp_obd;
+ struct filter_obd *fo = &obd->u.filter;
void *wait_handle;
int total_size = 0, rc2;
unsigned int qcids[MAXQUOTAS] = {0, 0};
inode = res->dentry->d_inode;
iobuf->dr_ignore_quota = 0;
- for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++) {
+ for (i = 0, lnb = res; i < niocount; i++, lnb++) {
loff_t this_size;
/* If overwriting an existing block, we don't need a grant */
continue;
}
+ LASSERT(PageLocked(lnb->page));
+ LASSERT(!PageWriteback(lnb->page));
+
+ /* truncate might leave tail dirty */
+ clear_page_dirty_for_io(lnb->page);
+
+ SetPageUptodate(lnb->page);
+
err = filter_iobuf_add_page(obd, iobuf, inode, lnb->page);
LASSERT (err == 0);
CDEBUG(err ? D_ERROR : D_QUOTA,
"filter adjust qunit! (rc:%d)\n", err);
+ for (i = 0, lnb = res; i < niocount; i++, lnb++) {
+ if (lnb->page == NULL)
+ continue;
+
+ LASSERT(PageLocked(lnb->page));
+ unlock_page(lnb->page);
+
+ page_cache_release(lnb->page);
+ lnb->page = NULL;
+ }
+
+ if (inode && (fo->fo_writethrough_cache == 0 ||
+ i_size_read(inode) > fo->fo_readcache_max_filesize))
+ filter_invalidate_cache(obd, obj, nb, inode);
+
RETURN(rc);
}
capa_count[CAPA_SITE_SERVER]);
}
+static int lprocfs_filter_rd_cache(char *page, char **start, off_t off,
+ int count, int *eof, void *data)
+{
+ struct obd_device *obd = (struct obd_device *)data;
+ LASSERT(obd != NULL);
+
+ return snprintf(page, count, "%u\n", obd->u.filter.fo_read_cache);
+}
+
+static int lprocfs_filter_wr_cache(struct file *file, const char *buffer,
+ unsigned long count, void *data)
+{
+ struct obd_device *obd = (struct obd_device *)data;
+ int val, rc;
+ LASSERT(obd != NULL);
+
+ rc = lprocfs_write_helper(buffer, count, &val);
+
+ if (rc)
+ return rc;
+
+ obd->u.filter.fo_read_cache = val;
+ return count;
+}
+
+static int lprocfs_filter_rd_wcache(char *page, char **start, off_t off,
+ int count, int *eof, void *data)
+{
+ struct obd_device *obd = (struct obd_device *)data;
+ LASSERT(obd != NULL);
+
+ return snprintf(page, count, "%u\n", obd->u.filter.fo_writethrough_cache);
+}
+
+static int lprocfs_filter_wr_wcache(struct file *file, const char *buffer,
+ unsigned long count, void *data)
+{
+ struct obd_device *obd = (struct obd_device *)data;
+ int val, rc;
+ LASSERT(obd != NULL);
+
+ rc = lprocfs_write_helper(buffer, count, &val);
+
+ if (rc)
+ return rc;
+
+ obd->u.filter.fo_writethrough_cache = val;
+ return count;
+}
+
static struct lprocfs_vars lprocfs_filter_obd_vars[] = {
{ "uuid", lprocfs_rd_uuid, 0, 0 },
{ "blocksize", lprocfs_rd_blksize, 0, 0 },
{ "capa", lprocfs_filter_rd_capa,
lprocfs_filter_wr_capa, 0 },
{ "capa_count", lprocfs_filter_rd_capa_count, 0, 0 },
+ { "read_cache_enable", lprocfs_filter_rd_cache, lprocfs_filter_wr_cache, 0},
+ { "writethrough_cache_enable", lprocfs_filter_rd_wcache,
+ lprocfs_filter_wr_wcache, 0},
{ 0 }
};
RETURN(1);
}
-static int get_per_page_niobufs(struct obd_ioobj *ioo, int nioo,
- struct niobuf_remote *rnb, int nrnb,
- struct niobuf_remote **pp_rnbp)
-{
- /* Copy a remote niobuf, splitting it into page-sized chunks
- * and setting ioo[i].ioo_bufcnt accordingly */
- struct niobuf_remote *pp_rnb;
- int i;
- int j;
- int page;
- int rnbidx = 0;
- int npages = 0;
-
- /*
- * array of sufficient size already preallocated by caller
- */
- LASSERT(pp_rnbp != NULL);
- LASSERT(*pp_rnbp != NULL);
-
- /* first count and check the number of pages required */
- for (i = 0; i < nioo; i++)
- for (j = 0; j < ioo->ioo_bufcnt; j++, rnbidx++) {
- obd_off offset = rnb[rnbidx].offset;
- obd_off p0 = offset >> CFS_PAGE_SHIFT;
- obd_off pn = (offset + rnb[rnbidx].len - 1) >>
- CFS_PAGE_SHIFT;
-
- LASSERT(rnbidx < nrnb);
-
- npages += (pn + 1 - p0);
-
- if (rnb[rnbidx].len == 0) {
- CERROR("zero len BRW: obj %d objid "LPX64
- " buf %u\n", i, ioo[i].ioo_id, j);
- return -EINVAL;
- }
- if (j > 0 &&
- rnb[rnbidx].offset <= rnb[rnbidx-1].offset) {
- CERROR("unordered BRW: obj %d objid "LPX64
- " buf %u offset "LPX64" <= "LPX64"\n",
- i, ioo[i].ioo_id, j, rnb[rnbidx].offset,
- rnb[rnbidx].offset);
- return -EINVAL;
- }
- }
-
- LASSERT(rnbidx == nrnb);
-
- if (npages == nrnb) { /* all niobufs are for single pages */
- *pp_rnbp = rnb;
- return npages;
- }
-
- pp_rnb = *pp_rnbp;
-
- /* now do the actual split */
- page = rnbidx = 0;
- for (i = 0; i < nioo; i++) {
- int obj_pages = 0;
-
- for (j = 0; j < ioo[i].ioo_bufcnt; j++, rnbidx++) {
- obd_off off = rnb[rnbidx].offset;
- int nob = rnb[rnbidx].len;
-
- LASSERT(rnbidx < nrnb);
- do {
- obd_off poff = off & ~CFS_PAGE_MASK;
- int pnob = (poff + nob > CFS_PAGE_SIZE) ?
- CFS_PAGE_SIZE - poff : nob;
-
- LASSERT(page < npages);
- pp_rnb[page].len = pnob;
- pp_rnb[page].offset = off;
- pp_rnb[page].flags = rnb[rnbidx].flags;
-
- CDEBUG(0, " obj %d id "LPX64
- "page %d(%d) "LPX64" for %d, flg %x\n",
- i, ioo[i].ioo_id, obj_pages, page,
- pp_rnb[page].offset, pp_rnb[page].len,
- pp_rnb[page].flags);
- page++;
- obj_pages++;
-
- off += pnob;
- nob -= pnob;
- } while (nob > 0);
- LASSERT(nob == 0);
- }
- ioo[i].ioo_bufcnt = obj_pages;
- }
- LASSERT(page == npages);
-
- return npages;
-}
-
static __u32 ost_checksum_bulk(struct ptlrpc_bulk_desc *desc, int opc,
cksum_type_t cksum_type)
{
/* corrupt the data after we compute the checksum, to
* simulate an OST->client data error */
if (i == 0 && opc == OST_READ &&
- OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_SEND))
+ OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_SEND)) {
memcpy(ptr, "bad4", min(4, len));
+ /* nobody should use corrupted page again */
+ ClearPageUptodate(page);
+ }
kunmap(page);
}
return cksum;
}
-/*
- * populate @nio by @nrpages pages from per-thread page pool
- */
-static void ost_nio_pages_get(struct ptlrpc_request *req,
- struct niobuf_local *nio, int nrpages)
-{
- int i;
- struct ost_thread_local_cache *tls;
-
- ENTRY;
-
- LASSERT(nrpages <= OST_THREAD_POOL_SIZE);
- LASSERT(req != NULL);
- LASSERT(req->rq_svc_thread != NULL);
-
- tls = ost_tls(req);
- LASSERT(tls != NULL);
-
- memset(nio, 0, nrpages * sizeof *nio);
- for (i = 0; i < nrpages; ++ i) {
- struct page *page;
-
- page = tls->page[i];
- LASSERT(page != NULL);
- POISON_PAGE(page, 0xf1);
- nio[i].page = page;
- LL_CDEBUG_PAGE(D_INFO, page, "%d\n", i);
- }
- EXIT;
-}
-
-/*
- * Dual for ost_nio_pages_get(). Poison pages in pool for debugging
- */
-static void ost_nio_pages_put(struct ptlrpc_request *req,
- struct niobuf_local *nio, int nrpages)
-{
- int i;
-
- ENTRY;
-
- LASSERT(nrpages <= OST_THREAD_POOL_SIZE);
-
- for (i = 0; i < nrpages; ++ i)
- POISON_PAGE(nio[i].page, 0xf2);
- EXIT;
-}
-
static int ost_brw_lock_get(int mode, struct obd_export *exp,
struct obd_ioobj *obj, struct niobuf_remote *nb,
struct lustre_handle *lh)
static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
{
- struct ptlrpc_bulk_desc *desc;
+ struct ptlrpc_bulk_desc *desc = NULL;
struct obd_export *exp = req->rq_export;
struct niobuf_remote *remote_nb;
- struct niobuf_remote *pp_rnb = NULL;
struct niobuf_local *local_nb;
struct obd_ioobj *ioo;
struct ost_body *body, *repbody;
struct l_wait_info lwi;
struct lustre_handle lockh = { 0 };
__u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
- int niocount, npages, nob = 0, rc, i;
+ int objcount, niocount, npages, nob = 0, rc, i;
int no_reply = 0;
ENTRY;
GOTO(out, rc = -EFAULT);
}
+ objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /
+ sizeof(*ioo);
+ if (objcount == 0) {
+ CERROR("Missing/short ioobj\n");
+ GOTO(out, rc = -EFAULT);
+ }
+ if (objcount > 1) {
+ CERROR("too many ioobjs (%d)\n", objcount);
+ GOTO(out, rc = -EFAULT);
+ }
+
ioo = lustre_swab_reqbuf(req, REQ_REC_OFF + 1, sizeof(*ioo),
lustre_swab_obd_ioobj);
if (ioo == NULL) {
* ost_thread_init().
*/
local_nb = ost_tls(req)->local;
- pp_rnb = ost_tls(req)->remote;
- /* FIXME all niobuf splitting should be done in obdfilter if needed */
- /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
- npages = get_per_page_niobufs(ioo, 1, remote_nb, niocount, &pp_rnb);
- if (npages < 0)
- GOTO(out, rc = npages);
-
- LASSERT(npages <= OST_THREAD_POOL_SIZE);
-
- ost_nio_pages_get(req, local_nb, npages);
-
- desc = ptlrpc_prep_bulk_exp(req, npages,
- BULK_PUT_SOURCE, OST_BULK_PORTAL);
- if (desc == NULL)
- GOTO(out, rc = -ENOMEM);
-
- rc = ost_brw_lock_get(LCK_PR, exp, ioo, pp_rnb, &lockh);
+ rc = ost_brw_lock_get(LCK_PR, exp, ioo, remote_nb, &lockh);
if (rc != 0)
GOTO(out_bulk, rc);
GOTO(out_lock, rc = -ETIMEDOUT);
}
- rc = obd_preprw(OBD_BRW_READ, exp, &body->oa, 1,
- ioo, npages, pp_rnb, local_nb, oti, capa);
+ npages = OST_THREAD_POOL_SIZE;
+ rc = obd_preprw(OBD_BRW_READ, exp, &body->oa, 1, ioo,
+ remote_nb, &npages, local_nb, oti, capa);
if (rc != 0)
GOTO(out_lock, rc);
- ost_prolong_locks(exp, ioo, pp_rnb, &body->oa, LCK_PW | LCK_PR);
+ desc = ptlrpc_prep_bulk_exp(req, npages,
+ BULK_PUT_SOURCE, OST_BULK_PORTAL);
+ if (desc == NULL) /* XXX: check all cleanup stuff */
+ GOTO(out, rc = -ENOMEM);
+
+ ost_prolong_locks(exp, ioo, remote_nb, &body->oa, LCK_PW | LCK_PR);
nob = 0;
for (i = 0; i < npages; i++) {
break;
}
- LASSERTF(page_rc <= pp_rnb[i].len, "page_rc (%d) > "
- "pp_rnb[%d].len (%d)\n", page_rc, i, pp_rnb[i].len);
nob += page_rc;
if (page_rc != 0) { /* some data! */
LASSERT (local_nb[i].page != NULL);
ptlrpc_prep_bulk_page(desc, local_nb[i].page,
- pp_rnb[i].offset & ~CFS_PAGE_MASK,
+ local_nb[i].offset & ~CFS_PAGE_MASK,
page_rc);
}
- if (page_rc != pp_rnb[i].len) { /* short read */
- int j = i;
-
+ if (page_rc != local_nb[i].len) { /* short read */
/* All subsequent pages should be 0 */
while(++i < npages)
- LASSERTF(local_nb[i].rc == 0,
- "page_rc %d, pp_rnb[%u].len=%d, "
- "local_nb[%u/%u].rc=%d\n",
- page_rc, j, pp_rnb[j].len,
- i, npages, local_nb[i].rc);
+ LASSERT(local_nb[i].rc == 0);
break;
}
}
}
/* Must commit after prep above in all cases */
- rc = obd_commitrw(OBD_BRW_READ, exp, &body->oa, 1,
- ioo, npages, local_nb, oti, rc);
-
- ost_nio_pages_put(req, local_nb, npages);
+ rc = obd_commitrw(OBD_BRW_READ, exp, &body->oa, 1, ioo,
+ remote_nb, npages, local_nb, oti, rc);
if (rc == 0) {
repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
}
out_lock:
- ost_brw_lock_put(LCK_PR, ioo, pp_rnb, &lockh);
+ ost_brw_lock_put(LCK_PR, ioo, remote_nb, &lockh);
out_bulk:
- ptlrpc_free_bulk(desc);
+ if (desc)
+ ptlrpc_free_bulk(desc);
out:
LASSERT(rc <= 0);
if (rc == 0) {
static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
{
- struct ptlrpc_bulk_desc *desc;
+ struct ptlrpc_bulk_desc *desc = NULL;
struct obd_export *exp = req->rq_export;
struct niobuf_remote *remote_nb;
- struct niobuf_remote *pp_rnb;
struct niobuf_local *local_nb;
struct obd_ioobj *ioo;
struct ost_body *body, *repbody;
* ost_thread_init().
*/
local_nb = ost_tls(req)->local;
- pp_rnb = ost_tls(req)->remote;
-
- /* FIXME all niobuf splitting should be done in obdfilter if needed */
- /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
- npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
- if (npages < 0)
- GOTO(out, rc = npages);
-
- LASSERT(npages <= OST_THREAD_POOL_SIZE);
-
- ost_nio_pages_get(req, local_nb, npages);
-
- desc = ptlrpc_prep_bulk_exp(req, npages,
- BULK_GET_SINK, OST_BULK_PORTAL);
- if (desc == NULL)
- GOTO(out, rc = -ENOMEM);
- rc = ost_brw_lock_get(LCK_PW, exp, ioo, pp_rnb, &lockh);
+ rc = ost_brw_lock_get(LCK_PW, exp, ioo, remote_nb, &lockh);
if (rc != 0)
GOTO(out_bulk, rc);
GOTO(out_lock, rc = -ETIMEDOUT);
}
- ost_prolong_locks(exp, ioo, pp_rnb, &body->oa, LCK_PW);
+ ost_prolong_locks(exp, ioo, remote_nb,&body->oa, LCK_PW);
/* obd_preprw clobbers oa->valid, so save what we need */
if (body->oa.o_valid & OBD_MD_FLCKSUM) {
body->oa.o_valid &= ~OBD_MD_FLGRANT;
}
+ npages = OST_THREAD_POOL_SIZE;
rc = obd_preprw(OBD_BRW_WRITE, exp, &body->oa, objcount,
- ioo, npages, pp_rnb, local_nb, oti, capa);
+ ioo, remote_nb, &npages, local_nb, oti, capa);
if (rc != 0)
GOTO(out_lock, rc);
+ desc = ptlrpc_prep_bulk_exp(req, npages,
+ BULK_GET_SINK, OST_BULK_PORTAL);
+ if (desc == NULL)
+ GOTO(out, rc = -ENOMEM);
+
/* NB Having prepped, we must commit... */
for (i = 0; i < npages; i++)
ptlrpc_prep_bulk_page(desc, local_nb[i].page,
- pp_rnb[i].offset & ~CFS_PAGE_MASK,
- pp_rnb[i].len);
+ local_nb[i].offset & ~CFS_PAGE_MASK,
+ local_nb[i].len);
/* Check if client was evicted while we were doing i/o before touching
network */
}
/* Must commit after prep above in all cases */
- rc = obd_commitrw(OBD_BRW_WRITE, exp, &repbody->oa,
- objcount, ioo, npages, local_nb, oti, rc);
+ rc = obd_commitrw(OBD_BRW_WRITE, exp, &repbody->oa, objcount, ioo,
+ remote_nb, npages, local_nb, oti, rc);
if (unlikely(client_cksum != server_cksum && rc == 0)) {
int new_cksum = ost_checksum_bulk(desc, OST_WRITE, cksum_type);
body->oa.o_id,
body->oa.o_valid & OBD_MD_FLGROUP ?
body->oa.o_gr : (__u64)0,
- pp_rnb[0].offset,
- pp_rnb[npages-1].offset+pp_rnb[npages-1].len
- - 1 );
+ local_nb[0].offset,
+ local_nb[npages-1].offset +
+ local_nb[npages-1].len - 1 );
CERROR("client csum %x, original server csum %x, "
"server csum now %x\n",
client_cksum, server_cksum, new_cksum);
}
- ost_nio_pages_put(req, local_nb, npages);
-
if (rc == 0) {
int nob = 0;
LASSERT(j < npages);
if (local_nb[j].rc < 0)
rcs[i] = local_nb[j].rc;
- len -= pp_rnb[j].len;
+ len -= local_nb[j].len;
j++;
} while (len > 0);
LASSERT(len == 0);
}
out_lock:
- ost_brw_lock_put(LCK_PW, ioo, pp_rnb, &lockh);
+ ost_brw_lock_put(LCK_PW, ioo, remote_nb, &lockh);
out_bulk:
- ptlrpc_free_bulk(desc);
+ if (desc)
+ ptlrpc_free_bulk(desc);
out:
if (rc == 0) {
oti_to_request(oti, req);
*/
static void ost_thread_done(struct ptlrpc_thread *thread)
{
- int i;
struct ost_thread_local_cache *tls; /* TLS stands for Thread-Local
* Storage */
*/
tls = thread->t_data;
if (tls != NULL) {
- for (i = 0; i < OST_THREAD_POOL_SIZE; ++ i) {
- if (tls->page[i] != NULL)
- OBD_PAGE_FREE(tls->page[i]);
- }
OBD_FREE_PTR(tls);
thread->t_data = NULL;
}
*/
static int ost_thread_init(struct ptlrpc_thread *thread)
{
- int result;
- int i;
struct ost_thread_local_cache *tls;
ENTRY;
LASSERTF(thread->t_id <= OSS_THREADS_MAX, "%u\n", thread->t_id);
OBD_ALLOC_PTR(tls);
- if (tls != NULL) {
- result = 0;
- thread->t_data = tls;
- /*
- * populate pool
- */
- for (i = 0; i < OST_THREAD_POOL_SIZE; ++ i) {
- OBD_PAGE_ALLOC(tls->page[i], OST_THREAD_POOL_GFP);
- if (tls->page[i] == NULL) {
- ost_thread_done(thread);
- result = -ENOMEM;
- break;
- }
- }
- } else
- result = -ENOMEM;
- RETURN(result);
+ if (tls == NULL)
+ RETURN(-ENOMEM);
+ thread->t_data = tls;
+ RETURN(0);
}
#define OST_WATCHDOG_TIMEOUT (obd_timeout * 1000)
}
run_test 130e "FIEMAP (test continuation FIEMAP calls)"
+test_150() {
+ local TF="$TMP/$tfile"
+
+ dd if=/dev/urandom of=$TF bs=6096 count=1 || error "dd failed"
+ cp $TF $DIR/$tfile
+ cancel_lru_locks osc
+ cmp $TF $DIR/$tfile || error "$TMP/$tfile $DIR/$tfile differ"
+ remount_client $MOUNT
+ cmp $TF $DIR/$tfile || error "$TF $DIR/$tfile differ (remount)"
+
+ $TRUNCATE $TF 6000
+ $TRUNCATE $DIR/$tfile 6000
+ cancel_lru_locks osc
+ cmp $TF $DIR/$tfile || error "$TF $DIR/$tfile differ (truncate1)"
+
+ echo "12345" >>$TF
+ echo "12345" >>$DIR/$tfile
+ cancel_lru_locks osc
+ cmp $TF $DIR/$tfile || error "$TF $DIR/$tfile differ (append1)"
+
+ echo "12345" >>$TF
+ echo "12345" >>$DIR/$tfile
+ cancel_lru_locks osc
+ cmp $TF $DIR/$tfile || error "$TF $DIR/$tfile differ (append2)"
+
+ rm -f $TF
+ true
+}
+run_test 150 "truncate/append tests"
+
+function roc_access() {
+ ACCNUM=`$LCTL get_param -n obdfilter.*.stats | \
+ grep 'cache_access'| awk '{print $2}' | \
+ awk '{sum=sum+$3} END{print sum}'`
+ echo $ACCNUM
+}
+
+function roc_hit() {
+ ACCNUM=`$LCTL get_param -n obdfilter.*.stats | \
+ grep 'cache_hit'|awk '{print $2}' | \
+ awk '{sum=sum+$1} END{print sum}'`
+ echo $ACCNUM
+}
+
+test_151() {
+ local CPAGES=3
+
+ # check whether obdfilter is cache capable at all
+ if ! $LCTL get_param -n obdfilter.*.read_cache_enable; then
+ echo "not cache-capable obdfilter"
+ return 0
+ fi
+
+ # check cache is enabled on all obdfilters
+ if $LCTL get_param -n obdfilter.*.read_cache_enable | grep 0 >&/dev/null; then
+ echo "oss cache is disabled"
+ return 0
+ fi
+
+ $LCTL set_param -n obdfilter.*.writethrough_cache_enable 1
+
+ # pages should be in the case right after write
+ dd if=/dev/urandom of=$DIR/$tfile bs=4k count=$CPAGES || error "dd failed"
+ BEFORE=`roc_hit`
+ cancel_lru_locks osc
+ cat $DIR/$tfile >/dev/null
+ AFTER=`roc_hit`
+ if ! let "AFTER - BEFORE == CPAGES"; then
+ error "NOT IN CACHE: before: $BEFORE, after: $AFTER"
+ fi
+
+ # the following read invalidates the cache
+ cancel_lru_locks osc
+ $LCTL set_param -n obdfilter.*.read_cache_enable 0
+ cat $DIR/$tfile >/dev/null
+
+ # now data shouldn't be found in the cache
+ BEFORE=`roc_hit`
+ cancel_lru_locks osc
+ cat $DIR/$tfile >/dev/null
+ AFTER=`roc_hit`
+ if ! let "AFTER - BEFORE == CPAGES"; then
+ error "IN CACHE: before: $BEFORE, after: $AFTER"
+ fi
+
+ $LCTL set_param -n obdfilter.*.read_cache_enable 1
+ rm -f $DIR/$tfile
+}
+run_test 151 "test cache on oss and controls ==============================="
+
+test_152() {
+ local TF="$TMP/$tfile"
+
+ # simulate ENOMEM during write
+#define OBD_FAIL_OST_NOMEM 0x226
+ lctl set_param fail_loc=0x80000226
+ dd if=/dev/urandom of=$TF bs=6096 count=1 || error "dd failed"
+ cp $TF $DIR/$tfile
+ sync || error "sync failed"
+ lctl set_param fail_loc=0
+
+ # discard client's cache
+ cancel_lru_locks osc
+
+ # simulate ENOMEM during read
+ lctl set_param fail_loc=0x80000226
+ cmp $TF $DIR/$tfile || error "cmp failed"
+ lctl set_param fail_loc=0
+
+ rm -f $TF
+}
+run_test 152 "test read/write with enomem ============================"
+
POOL=${POOL:-cea1}
TGT_COUNT=$OSTCOUNT
TGTPOOL_FIRST=1