From: alex Date: Mon, 13 Oct 2008 11:35:15 +0000 (+0000) Subject: b=12182 X-Git-Tag: v1_9_90~107 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=8d191d9c741ea5a87b3f11a4620f8b5d0c827f95 b=12182 i=adilger i=green i=nikita - oss read-only cache feature --- diff --git a/lustre/autoconf/lustre-core.m4 b/lustre/autoconf/lustre-core.m4 index 8ddecff..09567c3 100644 --- a/lustre/autoconf/lustre-core.m4 +++ b/lustre/autoconf/lustre-core.m4 @@ -1249,6 +1249,19 @@ AC_DEFINE(HAVE___D_MOVE, 1, ]) ]) + +AC_DEFUN([LC_EXPORT_INVALIDATE_MAPPING_PAGES], + [LB_CHECK_SYMBOL_EXPORT([invalidate_mapping_pages], [mm/truncate.c], [ + AC_DEFINE(HAVE_INVALIDATE_MAPPING_PAGES, 1, + [exported invalidate_mapping_pages])], + [LB_CHECK_SYMBOL_EXPORT([invalidate_inode_pages], [mm/truncate.c], [ + AC_DEFINE(HAVE_INVALIDATE_INODE_PAGES, 1, + [exported invalidate_inode_pages])], [ + AC_MSG_ERROR([no way to invalidate pages]) + ]) + ],[]) +]) + # The actual symbol exported varies among architectures, so we need # to check many symbols (but only in the current architecture.) No # matter what symbol is exported, the kernel #defines node_to_cpumask @@ -1572,6 +1585,9 @@ AC_DEFUN([LC_PROG_LINUX], LC_VFS_KERN_MOUNT LC_INVALIDATEPAGE_RETURN_INT LC_UMOUNTBEGIN_HAS_VFSMOUNT + if test x$enable_server = xyes ; then + LC_EXPORT_INVALIDATE_MAPPING_PAGES + fi #2.6.18 + RHEL5 (fc6) LC_PG_FS_MISC diff --git a/lustre/include/linux/lustre_compat25.h b/lustre/include/linux/lustre_compat25.h index b6c1496..9a57ebd 100644 --- a/lustre/include/linux/lustre_compat25.h +++ b/lustre/include/linux/lustre_compat25.h @@ -627,5 +627,9 @@ static inline long labs(long x) #define __fls fls #endif +#ifdef HAVE_INVALIDATE_INODE_PAGES +#define invalidate_mapping_pages(mapping,s,e) invalidate_inode_pages(mapping) +#endif + #endif /* __KERNEL__ */ #endif /* _COMPAT25_H */ diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 72b61d7..8b2507e 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -356,6 +356,8 @@ struct filter_obd { obd_size fo_tot_pending; obd_size fo_readcache_max_filesize; + int fo_read_cache; + int fo_writethrough_cache; struct obd_import *fo_mdc_imp; struct obd_uuid fo_mdc_uuid; @@ -1329,12 +1331,14 @@ struct obd_ops { obd_id *startid, obd_gr group, void *data); int (*o_preprw)(int cmd, struct obd_export *exp, struct obdo *oa, int objcount, struct obd_ioobj *obj, - int niocount, struct niobuf_remote *remote, - struct niobuf_local *local, struct obd_trans_info *oti, + struct niobuf_remote *remote, int *nr_pages, + struct niobuf_local *local, + struct obd_trans_info *oti, struct lustre_capa *capa); int (*o_commitrw)(int cmd, struct obd_export *exp, struct obdo *oa, int objcount, struct obd_ioobj *obj, - int niocount, struct niobuf_local *local, + struct niobuf_remote *remote, int pages, + struct niobuf_local *local, struct obd_trans_info *oti, int rc); int (*o_enqueue)(struct obd_export *, struct obd_info *oinfo, struct ldlm_enqueue_info *einfo, diff --git a/lustre/include/obd_class.h b/lustre/include/obd_class.h index fa44819..45e544e 100644 --- a/lustre/include/obd_class.h +++ b/lustre/include/obd_class.h @@ -1443,7 +1443,7 @@ static inline int obd_teardown_async_page(struct obd_export *exp, static inline int obd_preprw(int cmd, struct obd_export *exp, struct obdo *oa, int objcount, struct obd_ioobj *obj, - int niocount, struct niobuf_remote *remote, + struct niobuf_remote *remote, int *pages, struct niobuf_local *local, struct obd_trans_info *oti, struct lustre_capa *capa) @@ -1454,14 +1454,15 @@ static inline int obd_preprw(int cmd, struct obd_export *exp, struct obdo *oa, EXP_CHECK_DT_OP(exp, preprw); EXP_COUNTER_INCREMENT(exp, preprw); - rc = OBP(exp->exp_obd, preprw)(cmd, exp, oa, objcount, obj, niocount, - remote, local, oti, capa); + rc = OBP(exp->exp_obd, preprw)(cmd, exp, oa, objcount, obj, remote, + pages, local, oti, capa); RETURN(rc); } static inline int obd_commitrw(int cmd, struct obd_export *exp, struct obdo *oa, int objcount, struct obd_ioobj *obj, - int niocount, struct niobuf_local *local, + struct niobuf_remote *rnb, int pages, + struct niobuf_local *local, struct obd_trans_info *oti, int rc) { ENTRY; @@ -1469,8 +1470,8 @@ static inline int obd_commitrw(int cmd, struct obd_export *exp, struct obdo *oa, EXP_CHECK_DT_OP(exp, commitrw); EXP_COUNTER_INCREMENT(exp, commitrw); - rc = OBP(exp->exp_obd, commitrw)(cmd, exp, oa, objcount, obj, niocount, - local, oti, rc); + rc = OBP(exp->exp_obd, commitrw)(cmd, exp, oa, objcount, obj, + rnb, pages, local, oti, rc); RETURN(rc); } diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h index 4a9a82f..7b5d777 100644 --- a/lustre/include/obd_support.h +++ b/lustre/include/obd_support.h @@ -229,6 +229,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type, #define OBD_FAIL_OST_PAUSE_CREATE 0x223 #define OBD_FAIL_OST_BRW_PAUSE_PACK 0x224 #define OBD_FAIL_OST_CONNECT_NET2 0x225 +#define OBD_FAIL_OST_NOMEM 0x226 #define OBD_FAIL_LDLM 0x300 #define OBD_FAIL_LDLM_NAMESPACE_NEW 0x301 diff --git a/lustre/obdecho/echo.c b/lustre/obdecho/echo.c index f90afb1..5c77003 100644 --- a/lustre/obdecho/echo.c +++ b/lustre/obdecho/echo.c @@ -289,16 +289,79 @@ echo_page_debug_check(cfs_page_t *page, obd_id id, /* This allows us to verify that desc_private is passed unmolested */ #define DESC_PRIV 0x10293847 +static int echo_map_nb_to_lb(struct obdo *oa, struct obd_ioobj *obj, + struct niobuf_remote *nb, int *pages, + struct niobuf_local *lb, int cmd, int *left) +{ + int gfp_mask = (obj->ioo_id & 1) ? CFS_ALLOC_HIGHUSER : CFS_ALLOC_STD; + int ispersistent = obj->ioo_id == ECHO_PERSISTENT_OBJID; + int debug_setup = (!ispersistent && + (oa->o_valid & OBD_MD_FLFLAGS) != 0 && + (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0); + struct niobuf_local *res = lb; + obd_off offset = nb->offset; + int len = nb->len; + + while (len > 0) { + int plen = CFS_PAGE_SIZE - (offset & (CFS_PAGE_SIZE-1)); + if (len < plen) + plen = len; + + /* check for local buf overflow */ + if (*left == 0) + return -EINVAL; + + res->offset = offset; + res->len = plen; + LASSERT((res->offset & ~CFS_PAGE_MASK) + res->len <= CFS_PAGE_SIZE); + + + if (ispersistent && + (res->offset >> CFS_PAGE_SHIFT) < ECHO_PERSISTENT_PAGES) { + res->page = echo_persistent_pages[res->offset >> + CFS_PAGE_SHIFT]; + /* Take extra ref so __free_pages() can be called OK */ + cfs_get_page (res->page); + } else { + OBD_PAGE_ALLOC(res->page, gfp_mask); + if (res->page == NULL) { + CERROR("can't get page for id " LPU64"\n", + obj->ioo_id); + return -ENOMEM; + } + } + + CDEBUG(D_PAGE, "$$$$ get page %p @ "LPU64" for %d\n", + res->page, res->offset, res->len); + + if (cmd & OBD_BRW_READ) + res->rc = res->len; + + if (debug_setup) + echo_page_debug_setup(res->page, cmd, obj->ioo_id, + res->offset, res->len); + + offset += plen; + len -= plen; + res++; + + (*left)--; + (*pages)++; + } + + return 0; +} + int echo_preprw(int cmd, struct obd_export *export, struct obdo *oa, - int objcount, struct obd_ioobj *obj, int niocount, - struct niobuf_remote *nb, struct niobuf_local *res, + int objcount, struct obd_ioobj *obj, struct niobuf_remote *nb, + int *pages, struct niobuf_local *res, struct obd_trans_info *oti, struct lustre_capa *unused) { struct obd_device *obd; struct niobuf_local *r = res; int tot_bytes = 0; int rc = 0; - int i; + int i, left; ENTRY; obd = export->exp_obd; @@ -308,59 +371,33 @@ int echo_preprw(int cmd, struct obd_export *export, struct obdo *oa, /* Temp fix to stop falling foul of osc_announce_cached() */ oa->o_valid &= ~(OBD_MD_FLBLOCKS | OBD_MD_FLGRANT); - memset(res, 0, sizeof(*res) * niocount); + memset(res, 0, sizeof(*res) * *pages); CDEBUG(D_PAGE, "%s %d obdos with %d IOs\n", - cmd == OBD_BRW_READ ? "reading" : "writing", objcount, niocount); + cmd == OBD_BRW_READ ? "reading" : "writing", objcount, *pages); if (oti) oti->oti_handle = (void *)DESC_PRIV; + left = *pages; + *pages = 0; + for (i = 0; i < objcount; i++, obj++) { - int gfp_mask = (obj->ioo_id & 1) ? CFS_ALLOC_HIGHUSER : CFS_ALLOC_STD; - int ispersistent = obj->ioo_id == ECHO_PERSISTENT_OBJID; - int debug_setup = (!ispersistent && - (oa->o_valid & OBD_MD_FLFLAGS) != 0 && - (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0); int j; for (j = 0 ; j < obj->ioo_bufcnt ; j++, nb++, r++) { - if (ispersistent && - (nb->offset >> CFS_PAGE_SHIFT) < ECHO_PERSISTENT_PAGES) { - r->page = echo_persistent_pages[nb->offset >> - CFS_PAGE_SHIFT]; - /* Take extra ref so __free_pages() can be called OK */ - cfs_get_page (r->page); - } else { - OBD_PAGE_ALLOC(r->page, gfp_mask); - if (r->page == NULL) { - CERROR("can't get page %u/%u for id " - LPU64"\n", - j, obj->ioo_bufcnt, obj->ioo_id); - GOTO(preprw_cleanup, rc = -ENOMEM); - } - } + rc = echo_map_nb_to_lb(oa, obj, nb, pages, + res + *pages, cmd, &left); + if (rc) + GOTO(preprw_cleanup, rc); tot_bytes += nb->len; - - atomic_inc(&obd->u.echo.eo_prep); - - r->offset = nb->offset; - r->len = nb->len; - LASSERT((r->offset & ~CFS_PAGE_MASK) + r->len <= CFS_PAGE_SIZE); - - CDEBUG(D_PAGE, "$$$$ get page %p @ "LPU64" for %d\n", - r->page, r->offset, r->len); - - if (cmd & OBD_BRW_READ) - r->rc = r->len; - - if (debug_setup) - echo_page_debug_setup(r->page, cmd, obj->ioo_id, - r->offset, r->len); } } + + atomic_add(*pages, &obd->u.echo.eo_prep); + if (cmd & OBD_BRW_READ) lprocfs_counter_add(obd->obd_stats, LPROC_ECHO_READ_BYTES, tot_bytes); @@ -379,21 +416,22 @@ preprw_cleanup: * all down again. I believe that this is what the in-kernel * prep/commit operations do. */ - CERROR("cleaning up %ld pages (%d obdos)\n", (long)(r - res), objcount); - while (r-- > res) { - cfs_kunmap(r->page); + CERROR("cleaning up %u pages (%d obdos)\n", *pages, objcount); + for (i = 0; i < *pages; i++) { + cfs_kunmap(res[i].page); /* NB if this is a persistent page, __free_pages will just * lose the extra ref gained above */ - OBD_PAGE_FREE(r->page); + OBD_PAGE_FREE(res[i].page); + res[i].page = NULL; atomic_dec(&obd->u.echo.eo_prep); } - memset(res, 0, sizeof(*res) * niocount); return rc; } int echo_commitrw(int cmd, struct obd_export *export, struct obdo *oa, - int objcount, struct obd_ioobj *obj, int niocount, + int objcount, struct obd_ioobj *obj, + struct niobuf_remote *rb, int niocount, struct niobuf_local *res, struct obd_trans_info *oti, int rc) { struct obd_device *obd; diff --git a/lustre/obdecho/echo_client.c b/lustre/obdecho/echo_client.c index 72297c3..1be3ce0 100644 --- a/lustre/obdecho/echo_client.c +++ b/lustre/obdecho/echo_client.c @@ -656,8 +656,8 @@ static void ec_ap_fill_obdo(void *data, int cmd, struct obdo *oa) static int ec_ap_completion(void *data, int cmd, struct obdo *oa, int rc) { - struct echo_async_state *eas; struct echo_async_page *eap = eap_from_cookie(data); + struct echo_async_state *eas; eas = eap->eap_eas; @@ -878,6 +878,8 @@ static int echo_client_prep_commit(struct obd_export *exp, int rw, off = offset; for(; tot_pages; tot_pages -= npages) { + int lpages; + if (tot_pages < npages) npages = tot_pages; @@ -889,12 +891,14 @@ static int echo_client_prep_commit(struct obd_export *exp, int rw, ioo.ioo_bufcnt = npages; oti->oti_transno = 0; - ret = obd_preprw(rw, exp, oa, 1, &ioo, npages, rnb, lnb, oti, + lpages = npages; + ret = obd_preprw(rw, exp, oa, 1, &ioo, rnb, &lpages, lnb, oti, NULL); if (ret != 0) GOTO(out, ret); + LASSERT(lpages == npages); - for (i = 0; i < npages; i++) { + for (i = 0; i < lpages; i++) { cfs_page_t *page = lnb[i].page; /* read past eof? */ @@ -918,7 +922,7 @@ static int echo_client_prep_commit(struct obd_export *exp, int rw, rnb[i].len); } - ret = obd_commitrw(rw, exp, oa, 1, &ioo, npages, lnb, oti, ret); + ret = obd_commitrw(rw, exp, oa, 1,&ioo,rnb,npages,lnb,oti,ret); if (ret != 0) GOTO(out, ret); } diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 55989ca..343b777 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -2013,6 +2013,8 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg, CFS_INIT_LIST_HEAD(&filter->fo_export_list); sema_init(&filter->fo_alloc_lock, 1); init_brw_stats(&filter->fo_filter_stats); + filter->fo_read_cache = 1; /* enable read-only cache by default */ + filter->fo_writethrough_cache = 0; /* disable writethrough cache */ filter->fo_readcache_max_filesize = FILTER_MAX_CACHE_SIZE; filter->fo_fmd_max_num = FILTER_FMD_MAX_NUM_DEFAULT; filter->fo_fmd_max_age = FILTER_FMD_MAX_AGE_DEFAULT; @@ -2136,6 +2138,21 @@ static int filter_setup(struct obd_device *obd, struct lustre_cfg* lcfg) lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_WRITE_BYTES, LPROCFS_CNTR_AVGMINMAX, "write_bytes", "bytes"); + lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_GET_PAGE, + LPROCFS_CNTR_AVGMINMAX|LPROCFS_CNTR_STDDEV, + "get_page", "usec"); + lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_NO_PAGE, + LPROCFS_CNTR_AVGMINMAX, + "get_page_failures", "num"); + lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_CACHE_ACCESS, + LPROCFS_CNTR_AVGMINMAX, + "cache_access", "pages"); + lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_CACHE_HIT, + LPROCFS_CNTR_AVGMINMAX, + "cache_hit", "pages"); + lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_CACHE_MISS, + LPROCFS_CNTR_AVGMINMAX, + "cache_miss", "pages"); lproc_filter_attach_seqstat(obd); obd->obd_proc_exports_entry = lprocfs_register("exports", @@ -3207,13 +3224,8 @@ int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry, } if (locked) { - /* Let's flush truncated page on disk immediately, then we can - * avoid need to search for page aliases before directio writes - * and this sort of stuff at expense of somewhat slower - * truncates not on a page boundary. I believe this is the only - * place in filter code that can lead to pages getting to - * pagecache so far. */ - filter_clear_truncated_page(inode); + /* truncate can leave dirty pages in the cache. + * we'll take care of them in write path -bzzz */ UNLOCK_INODE_MUTEX(inode); locked = 0; } diff --git a/lustre/obdfilter/filter_internal.h b/lustre/obdfilter/filter_internal.h index cbd0cc0..38a941c 100644 --- a/lustre/obdfilter/filter_internal.h +++ b/lustre/obdfilter/filter_internal.h @@ -103,6 +103,11 @@ void filter_fmd_expire(struct obd_export *exp); enum { LPROC_FILTER_READ_BYTES = 0, LPROC_FILTER_WRITE_BYTES = 1, + LPROC_FILTER_GET_PAGE = 2, + LPROC_FILTER_NO_PAGE = 3, + LPROC_FILTER_CACHE_ACCESS = 4, + LPROC_FILTER_CACHE_HIT = 5, + LPROC_FILTER_CACHE_MISS = 6, LPROC_FILTER_LAST, }; @@ -155,20 +160,21 @@ extern struct ldlm_valblock_ops filter_lvbo; /* filter_io.c */ int filter_preprw(int cmd, struct obd_export *, struct obdo *, int objcount, - struct obd_ioobj *, int niocount, struct niobuf_remote *, - struct niobuf_local *, struct obd_trans_info *, + struct obd_ioobj *, struct niobuf_remote *, + int *, struct niobuf_local *, struct obd_trans_info *, struct lustre_capa *); int filter_commitrw(int cmd, struct obd_export *, struct obdo *, int objcount, - struct obd_ioobj *, int niocount, struct niobuf_local *, - struct obd_trans_info *, int rc); + struct obd_ioobj *, struct niobuf_remote *, int, + struct niobuf_local *, struct obd_trans_info *, int rc); int filter_brw(int cmd, struct obd_export *, struct obd_info *oinfo, obd_count oa_bufs, struct brw_page *pga, struct obd_trans_info *); -void flip_into_page_cache(struct inode *inode, struct page *new_page); +void filter_invalidate_cache(struct obd_device *, struct obd_ioobj *, + struct niobuf_remote *, struct inode *); /* filter_io_*.c */ struct filter_iobuf; int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount, - struct obd_ioobj *obj, int niocount, + struct obd_ioobj *obj, struct niobuf_remote *, int, struct niobuf_local *res, struct obd_trans_info *oti, int rc); obd_size filter_grant_space_left(struct obd_export *exp); diff --git a/lustre/obdfilter/filter_io.c b/lustre/obdfilter/filter_io.c index fd449de..05e158e 100644 --- a/lustre/obdfilter/filter_io.c +++ b/lustre/obdfilter/filter_io.c @@ -56,37 +56,6 @@ int *obdfilter_created_scratchpad; -static int filter_alloc_dio_page(struct obd_device *obd, struct inode *inode, - struct niobuf_local *lnb) -{ - struct page *page; - - LASSERT(lnb->page != NULL); - - page = lnb->page; -#if 0 - POISON_PAGE(page, 0xf1); - if (lnb->len != CFS_PAGE_SIZE) { - memset(kmap(page) + lnb->len, 0, CFS_PAGE_SIZE - lnb->len); - kunmap(page); - } -#endif - page->index = lnb->offset >> CFS_PAGE_SHIFT; - - RETURN(0); -} - -static void filter_free_dio_pages(int objcount, struct obd_ioobj *obj, - int niocount, struct niobuf_local *res) -{ - int i, j; - - for (i = 0; i < objcount; i++, obj++) { - for (j = 0 ; j < obj->ioo_bufcnt ; j++, res++) - res->page = NULL; - } -} - /* Grab the dirty and seen grant announcements from the incoming obdo. * We will later calculate the clients new grant and return it. * Caller must hold osfs lock */ @@ -272,22 +241,118 @@ long filter_grant(struct obd_export *exp, obd_size current_grant, return grant; } +/* + * the routine is used to request pages from pagecache + * + * use GFP_NOFS not allowing to enter FS as the client can run on this node + * and we might end waiting on a page he sent in the request we're serving. + * + * use NORETRY so that the allocator doesn't go crazy: chance to more lucky + * thread have enough memory to complete his request. for our request client + * will do resend hopefully -bzzz + */ +static struct page * filter_get_page(struct obd_device *obd, + struct inode *inode, + obd_off offset) +{ + struct page *page; + + page = find_or_create_page(inode->i_mapping, offset >> CFS_PAGE_SHIFT, + GFP_NOFS | __GFP_NORETRY); + if (unlikely(page == NULL)) + lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_NO_PAGE, 1); + + return page; +} + +/* + * the routine initializes array of local_niobuf from remote_niobuf + */ +static int filter_map_remote_to_local(int objcount, struct obd_ioobj *obj, + struct niobuf_remote *nb, + int *nrpages, struct niobuf_local *res) +{ + struct niobuf_remote *rnb; + struct niobuf_local *lnb; + int i, max; + ENTRY; + + /* we don't support multiobject RPC yet + * ost_brw_read() and ost_brw_write() check this */ + LASSERT(objcount == 1); + + max = *nrpages; + *nrpages = 0; + for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt; i++, rnb++) { + obd_off offset = rnb->offset; + unsigned int len = rnb->len; + + while (len > 0) { + int poff = offset & (CFS_PAGE_SIZE - 1); + int plen = CFS_PAGE_SIZE - poff; + + if (*nrpages >= max) { + CERROR("small array of local bufs: %d\n", max); + RETURN(-EINVAL); + } + + if (plen > len) + plen = len; + lnb->offset = offset; + lnb->len = plen; + lnb->flags = rnb->flags; + lnb->page = NULL; + lnb->rc = 0; + lnb->lnb_grant_used = 0; + + LASSERTF(plen <= len, "plen %u, len %u\n", plen, len); + offset += plen; + len -= plen; + lnb++; + (*nrpages)++; + } + } + RETURN(0); +} + +/* + * the function is used to free all pages used for request + * just to mimic cacheless OSS which don't occupy much memory + */ +void filter_invalidate_cache(struct obd_device *obd, struct obd_ioobj *obj, + struct niobuf_remote *nb, struct inode *inode) +{ + struct niobuf_remote *rnb; + int i; + + LASSERT(inode != NULL); + + for (i = 0, rnb = nb; i < obj->ioo_bufcnt; i++, rnb++) { + obd_off start = rnb->offset >> CFS_PAGE_SHIFT; + obd_off end = (rnb->offset + rnb->len) >> CFS_PAGE_SHIFT; + invalidate_mapping_pages(inode->i_mapping, start, end); + } + +} + static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, int objcount, struct obd_ioobj *obj, - int niocount, struct niobuf_remote *nb, - struct niobuf_local *res, + struct niobuf_remote *nb, + int *pages, struct niobuf_local *res, struct obd_trans_info *oti, struct lustre_capa *capa) { struct obd_device *obd = exp->exp_obd; + struct filter_obd *fo = &obd->u.filter; + struct timeval start, end; struct lvfs_run_ctxt saved; - struct niobuf_remote *rnb; struct niobuf_local *lnb; struct dentry *dentry = NULL; - struct inode *inode; + struct inode *inode = NULL; void *iobuf = NULL; int rc = 0, i, tot_bytes = 0; unsigned long now = jiffies; + long timediff; ENTRY; /* We are currently not supporting multi-obj BRW_READ RPCS at all. @@ -324,28 +389,29 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, inode = dentry->d_inode; obdo_to_inode(inode, oa, OBD_MD_FLATIME); + + rc = filter_map_remote_to_local(objcount, obj, nb, pages, res); + if (rc) + GOTO(cleanup, rc); + fsfilt_check_slow(obd, now, "preprw_read setup"); - for (i = 0, lnb = res, rnb = nb; i < obj->ioo_bufcnt; - i++, rnb++, lnb++) { + /* find pages for all segments, fill array with them */ + do_gettimeofday(&start); + for (i = 0, lnb = res; i < *pages; i++, lnb++) { + lnb->dentry = dentry; - lnb->offset = rnb->offset; - lnb->len = rnb->len; - lnb->flags = rnb->flags; - - /* - * ost_brw_write()->ost_nio_pages_get() already initialized - * lnb->page to point to the page from the per-thread page - * pool (bug 5137), initialize page. - */ - LASSERT(lnb->page != NULL); - - if (i_size_read(inode) <= rnb->offset) + + if (i_size_read(inode) <= lnb->offset) /* If there's no more data, abort early. lnb->rc == 0, * so it's easy to detect later. */ break; - else - filter_alloc_dio_page(obd, inode, lnb); + + lnb->page = filter_get_page(obd, inode, lnb->offset); + if (lnb->page == NULL) + GOTO(cleanup, rc = -ENOMEM); + + lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_CACHE_ACCESS, 1); if (i_size_read(inode) < lnb->offset + lnb->len - 1) lnb->rc = i_size_read(inode) - lnb->offset; @@ -354,8 +420,21 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, tot_bytes += lnb->rc; + if (PageUptodate(lnb->page)) { + lprocfs_counter_add(obd->obd_stats, + LPROC_FILTER_CACHE_HIT, 1); + continue; + } + + lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_CACHE_MISS, 1); filter_iobuf_add_page(obd, iobuf, inode, lnb->page); } + do_gettimeofday(&end); + timediff = cfs_timeval_sub(&end, &start, NULL); + lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_GET_PAGE, timediff); + + if (OBD_FAIL_CHECK(OBD_FAIL_OST_NOMEM)) + GOTO(cleanup, rc = -ENOMEM); fsfilt_check_slow(obd, now, "start_page_read"); @@ -373,9 +452,24 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, EXIT; cleanup: - if (rc != 0) { - filter_free_dio_pages(objcount, obj, niocount, res); + /* unlock pages to allow access from concurrent OST_READ */ + for (i = 0, lnb = res; i < *pages; i++, lnb++) { + if (lnb->page) { + LASSERT(PageLocked(lnb->page)); + unlock_page(lnb->page); + + if (rc) { + page_cache_release(lnb->page); + lnb->page = NULL; + } + } + } + if (inode && (fo->fo_read_cache == 0 || + i_size_read(inode) > fo->fo_readcache_max_filesize)) + filter_invalidate_cache(obd, obj, nb, inode); + + if (rc != 0) { if (dentry != NULL) f_dput(dentry); } @@ -399,9 +493,8 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, * Caller must hold obd_osfs_lock. */ static int filter_grant_check(struct obd_export *exp, struct obdo *oa, int objcount, struct fsfilt_objinfo *fso, - int niocount, struct niobuf_remote *rnb, - struct niobuf_local *lnb, obd_size *left, - struct inode *inode) + int niocount, struct niobuf_local *lnb, + obd_size *left, struct inode *inode) { struct filter_export_data *fed = &exp->exp_filter_data; int blocksize = exp->exp_obd->u.obt.obt_sb->s_blocksize; @@ -415,13 +508,13 @@ static int filter_grant_check(struct obd_export *exp, struct obdo *oa, int tmp, bytes; /* should match the code in osc_exit_cache */ - bytes = rnb[n].len; - bytes += rnb[n].offset & (blocksize - 1); - tmp = (rnb[n].offset + rnb[n].len) & (blocksize - 1); + bytes = lnb[n].len; + bytes += lnb[n].offset & (blocksize - 1); + tmp = (lnb[n].offset + lnb[n].len) & (blocksize - 1); if (tmp) bytes += blocksize - tmp; - if ((rnb[n].flags & OBD_BRW_FROM_GRANT) && + if ((lnb[n].flags & OBD_BRW_FROM_GRANT) && (oa->o_valid & OBD_MD_FLGRANT)) { if (fed->fed_grant < used + bytes) { CDEBUG(D_CACHE, @@ -432,7 +525,7 @@ static int filter_grant_check(struct obd_export *exp, struct obdo *oa, used, bytes, fed->fed_grant, n); } else { used += bytes; - rnb[n].flags |= OBD_BRW_GRANTED; + lnb[n].flags |= OBD_BRW_GRANTED; lnb[n].lnb_grant_used = bytes; CDEBUG(0, "idx %d used=%lu\n", n, used); rc = 0; @@ -442,7 +535,7 @@ static int filter_grant_check(struct obd_export *exp, struct obdo *oa, if (*left > ungranted + bytes) { /* if enough space, pretend it was granted */ ungranted += bytes; - rnb[n].flags |= OBD_BRW_GRANTED; + lnb[n].flags |= OBD_BRW_GRANTED; lnb[n].lnb_grant_used = bytes; CDEBUG(0, "idx %d ungranted=%lu\n",n,ungranted); rc = 0; @@ -456,7 +549,7 @@ static int filter_grant_check(struct obd_export *exp, struct obdo *oa, * marked BRW_GRANTED are already mapped and we can * ignore this error. */ lnb[n].rc = -ENOSPC; - rnb[n].flags &= ~OBD_BRW_GRANTED; + lnb[n].flags &= ~OBD_BRW_GRANTED; CDEBUG(D_CACHE,"%s: cli %s/%p idx %d no space for %d\n", exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp, n, bytes); @@ -517,20 +610,21 @@ static int filter_grant_check(struct obd_export *exp, struct obdo *oa, * bug) or ensure we get the page locks in an appropriate order. */ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa, int objcount, struct obd_ioobj *obj, - int niocount, struct niobuf_remote *nb, + struct niobuf_remote *nb, int *pages, struct niobuf_local *res, struct obd_trans_info *oti, struct lustre_capa *capa) { + struct obd_device *obd = exp->exp_obd; + struct timeval start, end; struct lvfs_run_ctxt saved; - struct niobuf_remote *rnb; struct niobuf_local *lnb = res; struct fsfilt_objinfo fso; struct filter_mod_data *fmd; struct dentry *dentry = NULL; void *iobuf; obd_size left; - unsigned long now = jiffies; + unsigned long now = jiffies, timediff; int rc = 0, i, tot_bytes = 0, cleanup_phase = 0; ENTRY; LASSERT(objcount == 1); @@ -559,8 +653,9 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa, GOTO(cleanup, rc = -ENOENT); } - fso.fso_dentry = dentry; - fso.fso_bufcnt = obj->ioo_bufcnt; + rc = filter_map_remote_to_local(objcount, obj, nb, pages, res); + if (rc) + GOTO(cleanup, rc); fsfilt_check_slow(exp->exp_obd, now, "preprw_write setup"); @@ -584,7 +679,10 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa, left = filter_grant_space_left(exp); - rc = filter_grant_check(exp, oa, objcount, &fso, niocount, nb, res, + fso.fso_dentry = dentry; + fso.fso_bufcnt = *pages; + + rc = filter_grant_check(exp, oa, objcount, &fso, *pages, res, &left, dentry->d_inode); /* do not zero out oa->o_valid as it is used in filter_commitrw_write() @@ -598,31 +696,29 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa, if (rc) GOTO(cleanup, rc); - for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt; - i++, lnb++, rnb++) { + do_gettimeofday(&start); + for (i = 0, lnb = res; i < *pages; i++, lnb++) { + /* We still set up for ungranted pages so that granted pages * can be written to disk as they were promised, and portals * needs to keep the pages all aligned properly. */ lnb->dentry = dentry; - lnb->offset = rnb->offset; - lnb->len = rnb->len; - lnb->flags = rnb->flags; - - /* - * ost_brw_write()->ost_nio_pages_get() already initialized - * lnb->page to point to the page from the per-thread page - * pool (bug 5137), initialize page. - */ - LASSERT(lnb->page != NULL); - if (lnb->len != CFS_PAGE_SIZE) { - memset(kmap(lnb->page) + lnb->len, - 0, CFS_PAGE_SIZE - lnb->len); - kunmap(lnb->page); - } - lnb->page->index = lnb->offset >> CFS_PAGE_SHIFT; + lnb->page = filter_get_page(obd, dentry->d_inode, lnb->offset); + if (lnb->page == NULL) + GOTO(cleanup, rc = -ENOMEM); cleanup_phase = 4; + /* DLM locking protects us from write and truncate competing + * for same region, but truncate can leave dirty page in the + * cache. it's possible the writeout on a such a page is in + * progress when we access it. it's also possible that during + * this writeout we put new (partial) data, but then won't + * be able to proceed in filter_commitrw_write(). thus let's + * just wait for writeout completion, should be rare enough. + * -bzzz */ + wait_on_page_writeback(lnb->page); + /* If the filter writes a partial page, then has the file * extended, the client will read in the whole page. the * filter has to be careful to zero the rest of the partial @@ -658,7 +754,14 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa, if (lnb->rc == 0) tot_bytes += lnb->len; } + do_gettimeofday(&end); + timediff = cfs_timeval_sub(&end, &start, NULL); + lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_GET_PAGE, timediff); + + if (OBD_FAIL_CHECK(OBD_FAIL_OST_NOMEM)) + GOTO(cleanup, rc = -ENOMEM); + /* don't unlock pages to prevent any access */ rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf, exp, NULL, NULL, NULL); @@ -671,6 +774,15 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa, cleanup: switch(cleanup_phase) { case 4: + if (rc) { + for (i = 0, lnb = res; i < *pages; i++, lnb++) { + if (lnb->page != NULL) { + unlock_page(lnb->page); + page_cache_release(lnb->page); + lnb->page = NULL; + } + } + } case 3: filter_iobuf_put(&exp->exp_obd->u.filter, iobuf, oti); case 2: @@ -693,47 +805,33 @@ cleanup: } int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa, - int objcount, struct obd_ioobj *obj, int niocount, - struct niobuf_remote *nb, struct niobuf_local *res, - struct obd_trans_info *oti, struct lustre_capa *capa) + int objcount, struct obd_ioobj *obj, + struct niobuf_remote *nb, int *pages, + struct niobuf_local *res, struct obd_trans_info *oti, + struct lustre_capa *capa) { if (cmd == OBD_BRW_WRITE) return filter_preprw_write(cmd, exp, oa, objcount, obj, - niocount, nb, res, oti, capa); + nb, pages, res, oti, capa); if (cmd == OBD_BRW_READ) return filter_preprw_read(cmd, exp, oa, objcount, obj, - niocount, nb, res, oti, capa); + nb, pages, res, oti, capa); LBUG(); return -EPROTO; } -void filter_release_read_page(struct filter_obd *filter, struct inode *inode, - struct page *page) -{ - int drop = 0; - - if (inode != NULL && - (i_size_read(inode) > filter->fo_readcache_max_filesize)) - drop = 1; - - /* drop from cache like truncate_list_pages() */ - if (drop && !TryLockPage(page)) { - if (page->mapping) - ll_truncate_complete_page(page); - unlock_page(page); - } - page_cache_release(page); -} - static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa, int objcount, struct obd_ioobj *obj, - int niocount, struct niobuf_local *res, + struct niobuf_remote *rnb, + int pages, struct niobuf_local *res, struct obd_trans_info *oti, int rc) { struct inode *inode = NULL; struct ldlm_res_id res_id; struct ldlm_resource *resource = NULL; struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; + struct niobuf_local *lnb; + int i; ENTRY; osc_build_res_name(obj->ioo_id, obj->ioo_gr, &res_id); @@ -752,52 +850,18 @@ static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa, if (res->dentry != NULL) inode = res->dentry->d_inode; - filter_free_dio_pages(objcount, obj, niocount, res); + for (i = 0, lnb = res; i < pages; i++, lnb++) { + if (lnb->page != NULL) { + page_cache_release(lnb->page); + lnb->page = NULL; + } + } if (res->dentry != NULL) f_dput(res->dentry); RETURN(rc); } -void flip_into_page_cache(struct inode *inode, struct page *new_page) -{ - struct page *old_page; - int rc; - - do { - /* the dlm is protecting us from read/write concurrency, so we - * expect this find_lock_page to return quickly. even if we - * race with another writer it won't be doing much work with - * the page locked. we do this 'cause t_c_p expects a - * locked page, and it wants to grab the pagecache lock - * as well. */ - old_page = find_lock_page(inode->i_mapping, new_page->index); - if (old_page) { - ll_truncate_complete_page(old_page); - unlock_page(old_page); - page_cache_release(old_page); - } - -#if 0 /* this should be a /proc tunable someday */ - /* racing o_directs (no locking ioctl) could race adding - * their pages, so we repeat the page invalidation unless - * we successfully added our new page */ - rc = add_to_page_cache_unique(new_page, inode->i_mapping, - new_page->index, - page_hash(inode->i_mapping, - new_page->index)); - if (rc == 0) { - /* add_to_page_cache clears uptodate|dirty and locks - * the page */ - SetPageUptodate(new_page); - unlock_page(new_page); - } -#else - rc = 0; -#endif - } while (rc != 0); -} - void filter_grant_commit(struct obd_export *exp, int niocount, struct niobuf_local *res) { @@ -830,16 +894,17 @@ void filter_grant_commit(struct obd_export *exp, int niocount, } int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa, - int objcount, struct obd_ioobj *obj, int niocount, + int objcount, struct obd_ioobj *obj, + struct niobuf_remote *nb, int pages, struct niobuf_local *res, struct obd_trans_info *oti, int rc) { if (cmd == OBD_BRW_WRITE) - return filter_commitrw_write(exp, oa, objcount, obj, niocount, - res, oti, rc); + return filter_commitrw_write(exp, oa, objcount, obj, + nb, pages, res, oti, rc); if (cmd == OBD_BRW_READ) - return filter_commitrw_read(exp, oa, objcount, obj, niocount, - res, oti, rc); + return filter_commitrw_read(exp, oa, objcount, obj, + nb, pages, res, oti, rc); LBUG(); return -EPROTO; } @@ -852,7 +917,7 @@ int filter_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo, struct niobuf_local *lnb; struct niobuf_remote *rnb; obd_count i; - int ret = 0; + int ret = 0, npages; ENTRY; OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local)); @@ -870,13 +935,15 @@ int filter_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo, obdo_to_ioobj(oinfo->oi_oa, &ioo); ioo.ioo_bufcnt = oa_bufs; + npages = oa_bufs; ret = filter_preprw(cmd, exp, oinfo->oi_oa, 1, &ioo, - oa_bufs, rnb, lnb, oti, oinfo_capa(oinfo)); + rnb, &npages, lnb, oti, oinfo_capa(oinfo)); if (ret != 0) GOTO(out, ret); + LASSERTF(oa_bufs == npages, "%u != %u\n", oa_bufs, npages); - ret = filter_commitrw(cmd, exp, oinfo->oi_oa, 1, &ioo, - oa_bufs, lnb, oti, ret); + ret = filter_commitrw(cmd, exp, oinfo->oi_oa, 1, &ioo, rnb, + npages, lnb, oti, ret); out: if (lnb) diff --git a/lustre/obdfilter/filter_io_26.c b/lustre/obdfilter/filter_io_26.c index e96513c..c37c585 100644 --- a/lustre/obdfilter/filter_io_26.c +++ b/lustre/obdfilter/filter_io_26.c @@ -65,7 +65,6 @@ struct filter_iobuf { int dr_error; struct page **dr_pages; unsigned long *dr_blocks; - spinlock_t dr_lock; /* IRQ lock */ unsigned int dr_ignore_quota:1; struct filter_obd *dr_filter; }; @@ -117,12 +116,8 @@ static void record_finish_io(struct filter_iobuf *iobuf, int rw, int rc) static int dio_complete_routine(struct bio *bio, unsigned int done, int error) { struct filter_iobuf *iobuf = bio->bi_private; - unsigned long flags; - -#ifdef HAVE_PAGE_CONSTANT struct bio_vec *bvl; int i; -#endif /* CAVEAT EMPTOR: possibly in IRQ context * DO NOT record procfs stats here!!! */ @@ -130,7 +125,7 @@ static int dio_complete_routine(struct bio *bio, unsigned int done, int error) if (bio->bi_size) /* Not complete */ return 1; - if (iobuf == NULL) { + if (unlikely(iobuf == NULL)) { CERROR("***** bio->bi_private is NULL! This should never " "happen. Normally, I would crash here, but instead I " "will dump the bio contents to the console. Please " @@ -148,18 +143,31 @@ static int dio_complete_routine(struct bio *bio, unsigned int done, int error) return 0; } + /* the check is outside of the cycle for performance reason -bzzz */ + if (!test_bit(BIO_RW, &bio->bi_rw)) { + bio_for_each_segment(bvl, bio, i) { + if (likely(error == 0)) + SetPageUptodate(bvl->bv_page); + LASSERT(PageLocked(bvl->bv_page)); #ifdef HAVE_PAGE_CONSTANT - bio_for_each_segment(bvl, bio, i) - ClearPageConstant(bvl->bv_page); + ClearPageConstant(bvl->bv_page); #endif + } + record_finish_io(iobuf, OBD_BRW_READ, error); + } else { +#ifdef HAVE_PAGE_CONSTANT + if (mapping_cap_page_constant_write(iobuf->dr_pages[0]->mapping)){ + bio_for_each_segment(bvl, bio, i) { + ClearPageConstant(bvl->bv_page); + } + } +#endif + record_finish_io(iobuf, OBD_BRW_WRITE, error); + } - spin_lock_irqsave(&iobuf->dr_lock, flags); - if (iobuf->dr_error == 0) + /* any real error is good enough -bzzz */ + if (error != 0 && iobuf->dr_error == 0) iobuf->dr_error = error; - spin_unlock_irqrestore(&iobuf->dr_lock, flags); - - record_finish_io(iobuf, test_bit(BIO_RW, &bio->bi_rw) ? - OBD_BRW_WRITE : OBD_BRW_READ, error); /* Completed bios used to be chained off iobuf->dr_bios and freed in * filter_clear_dreq(). It was then possible to exhaust the biovec-256 @@ -204,7 +212,6 @@ struct filter_iobuf *filter_alloc_iobuf(struct filter_obd *filter, iobuf->dr_filter = filter; init_waitqueue_head(&iobuf->dr_wait); atomic_set(&iobuf->dr_numreqs, 0); - spin_lock_init(&iobuf->dr_lock); iobuf->dr_max_pages = num_pages; iobuf->dr_npages = 0; iobuf->dr_error = 0; @@ -436,109 +443,6 @@ int filter_do_bio(struct obd_export *exp, struct inode *inode, RETURN(rc); } -/* These are our hacks to keep our directio/bh IO coherent with ext3's - * page cache use. Most notably ext3 reads file data into the page - * cache when it is zeroing the tail of partial-block truncates and - * leaves it there, sometimes generating io from it at later truncates. - * This removes the partial page and its buffers from the page cache, - * so it should only ever cause a wait in rare cases, as otherwise we - * always do full-page IO to the OST. - * - * The call to truncate_complete_page() will call journal_invalidatepage() - * to free the buffers and drop the page from cache. The buffers should - * not be dirty, because we already called fdatasync/fdatawait on them. - */ -static int filter_sync_inode_data(struct inode *inode, int locked) -{ - int rc = 0; - - /* This is nearly do_fsync(), without the waiting on the inode */ - /* XXX: in 2.6.16 (at least) we don't need to hold i_mutex over - * filemap_fdatawrite() and filemap_fdatawait(), so we may no longer - * need this lock here at all. */ - if (!locked) - LOCK_INODE_MUTEX(inode); - if (inode->i_mapping->nrpages) { -#ifdef PF_SYNCWRITE - current->flags |= PF_SYNCWRITE; -#endif - rc = filemap_fdatawrite(inode->i_mapping); - if (rc == 0) - rc = filemap_fdatawait(inode->i_mapping); -#ifdef PF_SYNCWRITE - current->flags &= ~PF_SYNCWRITE; -#endif - } - if (!locked) - UNLOCK_INODE_MUTEX(inode); - - return rc; -} -/* Clear pages from the mapping before we do direct IO to that offset. - * Now that the only source of such pages in the truncate path flushes - * these pages to disk and then discards them, this is error condition. - * If add back read cache this will happen again. This could be disabled - * until that time if we never see the below error. */ -static int filter_clear_page_cache(struct inode *inode, - struct filter_iobuf *iobuf) -{ - struct page *page; - int i, rc; - - rc = filter_sync_inode_data(inode, 0); - if (rc != 0) - RETURN(rc); - - /* be careful to call this after fsync_inode_data_buffers has waited - * for IO to complete before we evict it from the cache */ - for (i = 0; i < iobuf->dr_npages; i++) { - page = find_lock_page(inode->i_mapping, - iobuf->dr_pages[i]->index); - if (page == NULL) - continue; - if (page->mapping != NULL) { - CERROR("page %lu (%d/%d) in page cache during write!\n", - page->index, i, iobuf->dr_npages); - wait_on_page_writeback(page); - ll_truncate_complete_page(page); - } - - unlock_page(page); - page_cache_release(page); - } - - return 0; -} - -int filter_clear_truncated_page(struct inode *inode) -{ - struct page *page; - int rc; - - /* Truncate on page boundary, so nothing to flush? */ - if (!(i_size_read(inode) & ~CFS_PAGE_MASK)) - return 0; - - rc = filter_sync_inode_data(inode, 1); - if (rc != 0) - RETURN(rc); - - /* be careful to call this after fsync_inode_data_buffers has waited - * for IO to complete before we evict it from the cache */ - page = find_lock_page(inode->i_mapping, - i_size_read(inode) >> CFS_PAGE_SHIFT); - if (page) { - if (page->mapping != NULL) { - wait_on_page_writeback(page); - ll_truncate_complete_page(page); - } - unlock_page(page); - page_cache_release(page); - } - - return 0; -} - /* Must be called with i_mutex taken for writes; this will drop it */ int filter_direct_io(int rw, struct dentry *dchild, struct filter_iobuf *iobuf, struct obd_export *exp, struct iattr *attr, @@ -604,10 +508,6 @@ int filter_direct_io(int rw, struct dentry *dchild, struct filter_iobuf *iobuf, iobuf->dr_blocks, blocks_per_page, 0); } - rc = filter_clear_page_cache(inode, iobuf); - if (rc != 0) - RETURN(rc); - RETURN(filter_do_bio(exp, inode, iobuf, rw)); } @@ -632,8 +532,20 @@ static int filter_range_is_mapped(struct inode *inode, obd_size offset, int len) return 1; } +/* + * interesting use cases on how it interacts with VM: + * + * - vm writeout -- shouldn't see our pages as we don't mark them dirty + * though vm can find partial page left dirty by truncate. in this + * usual writeout is used unless our write rewrite that page - then we + * drop PG_dirty with PG_lock held. + * + * - else? + * + */ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, - int objcount, struct obd_ioobj *obj, int niocount, + int objcount, struct obd_ioobj *obj, + struct niobuf_remote *nb, int niocount, struct niobuf_local *res, struct obd_trans_info *oti, int rc) { @@ -646,6 +558,7 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, unsigned long now = jiffies; int i, err, cleanup_phase = 0; struct obd_device *obd = exp->exp_obd; + struct filter_obd *fo = &obd->u.filter; void *wait_handle; int total_size = 0, rc2; unsigned int qcids[MAXQUOTAS] = {0, 0}; @@ -684,7 +597,7 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, inode = res->dentry->d_inode; iobuf->dr_ignore_quota = 0; - for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++) { + for (i = 0, lnb = res; i < niocount; i++, lnb++) { loff_t this_size; /* If overwriting an existing block, we don't need a grant */ @@ -697,6 +610,14 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, continue; } + LASSERT(PageLocked(lnb->page)); + LASSERT(!PageWriteback(lnb->page)); + + /* truncate might leave tail dirty */ + clear_page_dirty_for_io(lnb->page); + + SetPageUptodate(lnb->page); + err = filter_iobuf_add_page(obd, iobuf, inode, lnb->page); LASSERT (err == 0); @@ -826,5 +747,20 @@ cleanup: CDEBUG(err ? D_ERROR : D_QUOTA, "filter adjust qunit! (rc:%d)\n", err); + for (i = 0, lnb = res; i < niocount; i++, lnb++) { + if (lnb->page == NULL) + continue; + + LASSERT(PageLocked(lnb->page)); + unlock_page(lnb->page); + + page_cache_release(lnb->page); + lnb->page = NULL; + } + + if (inode && (fo->fo_writethrough_cache == 0 || + i_size_read(inode) > fo->fo_readcache_max_filesize)) + filter_invalidate_cache(obd, obj, nb, inode); + RETURN(rc); } diff --git a/lustre/obdfilter/lproc_obdfilter.c b/lustre/obdfilter/lproc_obdfilter.c index d5e7a4a..cbe0753 100644 --- a/lustre/obdfilter/lproc_obdfilter.c +++ b/lustre/obdfilter/lproc_obdfilter.c @@ -242,6 +242,56 @@ static int lprocfs_filter_rd_capa_count(char *page, char **start, off_t off, capa_count[CAPA_SITE_SERVER]); } +static int lprocfs_filter_rd_cache(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ + struct obd_device *obd = (struct obd_device *)data; + LASSERT(obd != NULL); + + return snprintf(page, count, "%u\n", obd->u.filter.fo_read_cache); +} + +static int lprocfs_filter_wr_cache(struct file *file, const char *buffer, + unsigned long count, void *data) +{ + struct obd_device *obd = (struct obd_device *)data; + int val, rc; + LASSERT(obd != NULL); + + rc = lprocfs_write_helper(buffer, count, &val); + + if (rc) + return rc; + + obd->u.filter.fo_read_cache = val; + return count; +} + +static int lprocfs_filter_rd_wcache(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ + struct obd_device *obd = (struct obd_device *)data; + LASSERT(obd != NULL); + + return snprintf(page, count, "%u\n", obd->u.filter.fo_writethrough_cache); +} + +static int lprocfs_filter_wr_wcache(struct file *file, const char *buffer, + unsigned long count, void *data) +{ + struct obd_device *obd = (struct obd_device *)data; + int val, rc; + LASSERT(obd != NULL); + + rc = lprocfs_write_helper(buffer, count, &val); + + if (rc) + return rc; + + obd->u.filter.fo_writethrough_cache = val; + return count; +} + static struct lprocfs_vars lprocfs_filter_obd_vars[] = { { "uuid", lprocfs_rd_uuid, 0, 0 }, { "blocksize", lprocfs_rd_blksize, 0, 0 }, @@ -281,6 +331,9 @@ static struct lprocfs_vars lprocfs_filter_obd_vars[] = { { "capa", lprocfs_filter_rd_capa, lprocfs_filter_wr_capa, 0 }, { "capa_count", lprocfs_filter_rd_capa_count, 0, 0 }, + { "read_cache_enable", lprocfs_filter_rd_cache, lprocfs_filter_wr_cache, 0}, + { "writethrough_cache_enable", lprocfs_filter_rd_wcache, + lprocfs_filter_wr_wcache, 0}, { 0 } }; diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index 457110b..a720a1d 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -389,101 +389,6 @@ static int ost_bulk_timeout(void *data) RETURN(1); } -static int get_per_page_niobufs(struct obd_ioobj *ioo, int nioo, - struct niobuf_remote *rnb, int nrnb, - struct niobuf_remote **pp_rnbp) -{ - /* Copy a remote niobuf, splitting it into page-sized chunks - * and setting ioo[i].ioo_bufcnt accordingly */ - struct niobuf_remote *pp_rnb; - int i; - int j; - int page; - int rnbidx = 0; - int npages = 0; - - /* - * array of sufficient size already preallocated by caller - */ - LASSERT(pp_rnbp != NULL); - LASSERT(*pp_rnbp != NULL); - - /* first count and check the number of pages required */ - for (i = 0; i < nioo; i++) - for (j = 0; j < ioo->ioo_bufcnt; j++, rnbidx++) { - obd_off offset = rnb[rnbidx].offset; - obd_off p0 = offset >> CFS_PAGE_SHIFT; - obd_off pn = (offset + rnb[rnbidx].len - 1) >> - CFS_PAGE_SHIFT; - - LASSERT(rnbidx < nrnb); - - npages += (pn + 1 - p0); - - if (rnb[rnbidx].len == 0) { - CERROR("zero len BRW: obj %d objid "LPX64 - " buf %u\n", i, ioo[i].ioo_id, j); - return -EINVAL; - } - if (j > 0 && - rnb[rnbidx].offset <= rnb[rnbidx-1].offset) { - CERROR("unordered BRW: obj %d objid "LPX64 - " buf %u offset "LPX64" <= "LPX64"\n", - i, ioo[i].ioo_id, j, rnb[rnbidx].offset, - rnb[rnbidx].offset); - return -EINVAL; - } - } - - LASSERT(rnbidx == nrnb); - - if (npages == nrnb) { /* all niobufs are for single pages */ - *pp_rnbp = rnb; - return npages; - } - - pp_rnb = *pp_rnbp; - - /* now do the actual split */ - page = rnbidx = 0; - for (i = 0; i < nioo; i++) { - int obj_pages = 0; - - for (j = 0; j < ioo[i].ioo_bufcnt; j++, rnbidx++) { - obd_off off = rnb[rnbidx].offset; - int nob = rnb[rnbidx].len; - - LASSERT(rnbidx < nrnb); - do { - obd_off poff = off & ~CFS_PAGE_MASK; - int pnob = (poff + nob > CFS_PAGE_SIZE) ? - CFS_PAGE_SIZE - poff : nob; - - LASSERT(page < npages); - pp_rnb[page].len = pnob; - pp_rnb[page].offset = off; - pp_rnb[page].flags = rnb[rnbidx].flags; - - CDEBUG(0, " obj %d id "LPX64 - "page %d(%d) "LPX64" for %d, flg %x\n", - i, ioo[i].ioo_id, obj_pages, page, - pp_rnb[page].offset, pp_rnb[page].len, - pp_rnb[page].flags); - page++; - obj_pages++; - - off += pnob; - nob -= pnob; - } while (nob > 0); - LASSERT(nob == 0); - } - ioo[i].ioo_bufcnt = obj_pages; - } - LASSERT(page == npages); - - return npages; -} - static __u32 ost_checksum_bulk(struct ptlrpc_bulk_desc *desc, int opc, cksum_type_t cksum_type) { @@ -506,62 +411,17 @@ static __u32 ost_checksum_bulk(struct ptlrpc_bulk_desc *desc, int opc, /* corrupt the data after we compute the checksum, to * simulate an OST->client data error */ if (i == 0 && opc == OST_READ && - OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_SEND)) + OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_SEND)) { memcpy(ptr, "bad4", min(4, len)); + /* nobody should use corrupted page again */ + ClearPageUptodate(page); + } kunmap(page); } return cksum; } -/* - * populate @nio by @nrpages pages from per-thread page pool - */ -static void ost_nio_pages_get(struct ptlrpc_request *req, - struct niobuf_local *nio, int nrpages) -{ - int i; - struct ost_thread_local_cache *tls; - - ENTRY; - - LASSERT(nrpages <= OST_THREAD_POOL_SIZE); - LASSERT(req != NULL); - LASSERT(req->rq_svc_thread != NULL); - - tls = ost_tls(req); - LASSERT(tls != NULL); - - memset(nio, 0, nrpages * sizeof *nio); - for (i = 0; i < nrpages; ++ i) { - struct page *page; - - page = tls->page[i]; - LASSERT(page != NULL); - POISON_PAGE(page, 0xf1); - nio[i].page = page; - LL_CDEBUG_PAGE(D_INFO, page, "%d\n", i); - } - EXIT; -} - -/* - * Dual for ost_nio_pages_get(). Poison pages in pool for debugging - */ -static void ost_nio_pages_put(struct ptlrpc_request *req, - struct niobuf_local *nio, int nrpages) -{ - int i; - - ENTRY; - - LASSERT(nrpages <= OST_THREAD_POOL_SIZE); - - for (i = 0; i < nrpages; ++ i) - POISON_PAGE(nio[i].page, 0xf2); - EXIT; -} - static int ost_brw_lock_get(int mode, struct obd_export *exp, struct obd_ioobj *obj, struct niobuf_remote *nb, struct lustre_handle *lh) @@ -694,10 +554,9 @@ static void ost_prolong_locks(struct obd_export *exp, struct obd_ioobj *obj, static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti) { - struct ptlrpc_bulk_desc *desc; + struct ptlrpc_bulk_desc *desc = NULL; struct obd_export *exp = req->rq_export; struct niobuf_remote *remote_nb; - struct niobuf_remote *pp_rnb = NULL; struct niobuf_local *local_nb; struct obd_ioobj *ioo; struct ost_body *body, *repbody; @@ -705,7 +564,7 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti) struct l_wait_info lwi; struct lustre_handle lockh = { 0 }; __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) }; - int niocount, npages, nob = 0, rc, i; + int objcount, niocount, npages, nob = 0, rc, i; int no_reply = 0; ENTRY; @@ -734,6 +593,17 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti) GOTO(out, rc = -EFAULT); } + objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) / + sizeof(*ioo); + if (objcount == 0) { + CERROR("Missing/short ioobj\n"); + GOTO(out, rc = -EFAULT); + } + if (objcount > 1) { + CERROR("too many ioobjs (%d)\n", objcount); + GOTO(out, rc = -EFAULT); + } + ioo = lustre_swab_reqbuf(req, REQ_REC_OFF + 1, sizeof(*ioo), lustre_swab_obd_ioobj); if (ioo == NULL) { @@ -772,24 +642,8 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti) * ost_thread_init(). */ local_nb = ost_tls(req)->local; - pp_rnb = ost_tls(req)->remote; - /* FIXME all niobuf splitting should be done in obdfilter if needed */ - /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */ - npages = get_per_page_niobufs(ioo, 1, remote_nb, niocount, &pp_rnb); - if (npages < 0) - GOTO(out, rc = npages); - - LASSERT(npages <= OST_THREAD_POOL_SIZE); - - ost_nio_pages_get(req, local_nb, npages); - - desc = ptlrpc_prep_bulk_exp(req, npages, - BULK_PUT_SOURCE, OST_BULK_PORTAL); - if (desc == NULL) - GOTO(out, rc = -ENOMEM); - - rc = ost_brw_lock_get(LCK_PR, exp, ioo, pp_rnb, &lockh); + rc = ost_brw_lock_get(LCK_PR, exp, ioo, remote_nb, &lockh); if (rc != 0) GOTO(out_bulk, rc); @@ -808,12 +662,18 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti) GOTO(out_lock, rc = -ETIMEDOUT); } - rc = obd_preprw(OBD_BRW_READ, exp, &body->oa, 1, - ioo, npages, pp_rnb, local_nb, oti, capa); + npages = OST_THREAD_POOL_SIZE; + rc = obd_preprw(OBD_BRW_READ, exp, &body->oa, 1, ioo, + remote_nb, &npages, local_nb, oti, capa); if (rc != 0) GOTO(out_lock, rc); - ost_prolong_locks(exp, ioo, pp_rnb, &body->oa, LCK_PW | LCK_PR); + desc = ptlrpc_prep_bulk_exp(req, npages, + BULK_PUT_SOURCE, OST_BULK_PORTAL); + if (desc == NULL) /* XXX: check all cleanup stuff */ + GOTO(out, rc = -ENOMEM); + + ost_prolong_locks(exp, ioo, remote_nb, &body->oa, LCK_PW | LCK_PR); nob = 0; for (i = 0; i < npages; i++) { @@ -824,26 +684,18 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti) break; } - LASSERTF(page_rc <= pp_rnb[i].len, "page_rc (%d) > " - "pp_rnb[%d].len (%d)\n", page_rc, i, pp_rnb[i].len); nob += page_rc; if (page_rc != 0) { /* some data! */ LASSERT (local_nb[i].page != NULL); ptlrpc_prep_bulk_page(desc, local_nb[i].page, - pp_rnb[i].offset & ~CFS_PAGE_MASK, + local_nb[i].offset & ~CFS_PAGE_MASK, page_rc); } - if (page_rc != pp_rnb[i].len) { /* short read */ - int j = i; - + if (page_rc != local_nb[i].len) { /* short read */ /* All subsequent pages should be 0 */ while(++i < npages) - LASSERTF(local_nb[i].rc == 0, - "page_rc %d, pp_rnb[%u].len=%d, " - "local_nb[%u/%u].rc=%d\n", - page_rc, j, pp_rnb[j].len, - i, npages, local_nb[i].rc); + LASSERT(local_nb[i].rc == 0); break; } } @@ -931,10 +783,8 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti) } /* Must commit after prep above in all cases */ - rc = obd_commitrw(OBD_BRW_READ, exp, &body->oa, 1, - ioo, npages, local_nb, oti, rc); - - ost_nio_pages_put(req, local_nb, npages); + rc = obd_commitrw(OBD_BRW_READ, exp, &body->oa, 1, ioo, + remote_nb, npages, local_nb, oti, rc); if (rc == 0) { repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, @@ -943,9 +793,10 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti) } out_lock: - ost_brw_lock_put(LCK_PR, ioo, pp_rnb, &lockh); + ost_brw_lock_put(LCK_PR, ioo, remote_nb, &lockh); out_bulk: - ptlrpc_free_bulk(desc); + if (desc) + ptlrpc_free_bulk(desc); out: LASSERT(rc <= 0); if (rc == 0) { @@ -974,10 +825,9 @@ out: static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) { - struct ptlrpc_bulk_desc *desc; + struct ptlrpc_bulk_desc *desc = NULL; struct obd_export *exp = req->rq_export; struct niobuf_remote *remote_nb; - struct niobuf_remote *pp_rnb; struct niobuf_local *local_nb; struct obd_ioobj *ioo; struct ost_body *body, *repbody; @@ -1081,24 +931,8 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) * ost_thread_init(). */ local_nb = ost_tls(req)->local; - pp_rnb = ost_tls(req)->remote; - - /* FIXME all niobuf splitting should be done in obdfilter if needed */ - /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */ - npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb); - if (npages < 0) - GOTO(out, rc = npages); - - LASSERT(npages <= OST_THREAD_POOL_SIZE); - - ost_nio_pages_get(req, local_nb, npages); - - desc = ptlrpc_prep_bulk_exp(req, npages, - BULK_GET_SINK, OST_BULK_PORTAL); - if (desc == NULL) - GOTO(out, rc = -ENOMEM); - rc = ost_brw_lock_get(LCK_PW, exp, ioo, pp_rnb, &lockh); + rc = ost_brw_lock_get(LCK_PW, exp, ioo, remote_nb, &lockh); if (rc != 0) GOTO(out_bulk, rc); @@ -1117,7 +951,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) GOTO(out_lock, rc = -ETIMEDOUT); } - ost_prolong_locks(exp, ioo, pp_rnb, &body->oa, LCK_PW); + ost_prolong_locks(exp, ioo, remote_nb,&body->oa, LCK_PW); /* obd_preprw clobbers oa->valid, so save what we need */ if (body->oa.o_valid & OBD_MD_FLCKSUM) { @@ -1134,17 +968,23 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) body->oa.o_valid &= ~OBD_MD_FLGRANT; } + npages = OST_THREAD_POOL_SIZE; rc = obd_preprw(OBD_BRW_WRITE, exp, &body->oa, objcount, - ioo, npages, pp_rnb, local_nb, oti, capa); + ioo, remote_nb, &npages, local_nb, oti, capa); if (rc != 0) GOTO(out_lock, rc); + desc = ptlrpc_prep_bulk_exp(req, npages, + BULK_GET_SINK, OST_BULK_PORTAL); + if (desc == NULL) + GOTO(out, rc = -ENOMEM); + /* NB Having prepped, we must commit... */ for (i = 0; i < npages; i++) ptlrpc_prep_bulk_page(desc, local_nb[i].page, - pp_rnb[i].offset & ~CFS_PAGE_MASK, - pp_rnb[i].len); + local_nb[i].offset & ~CFS_PAGE_MASK, + local_nb[i].len); /* Check if client was evicted while we were doing i/o before touching network */ @@ -1221,8 +1061,8 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) } /* Must commit after prep above in all cases */ - rc = obd_commitrw(OBD_BRW_WRITE, exp, &repbody->oa, - objcount, ioo, npages, local_nb, oti, rc); + rc = obd_commitrw(OBD_BRW_WRITE, exp, &repbody->oa, objcount, ioo, + remote_nb, npages, local_nb, oti, rc); if (unlikely(client_cksum != server_cksum && rc == 0)) { int new_cksum = ost_checksum_bulk(desc, OST_WRITE, cksum_type); @@ -1257,16 +1097,14 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) body->oa.o_id, body->oa.o_valid & OBD_MD_FLGROUP ? body->oa.o_gr : (__u64)0, - pp_rnb[0].offset, - pp_rnb[npages-1].offset+pp_rnb[npages-1].len - - 1 ); + local_nb[0].offset, + local_nb[npages-1].offset + + local_nb[npages-1].len - 1 ); CERROR("client csum %x, original server csum %x, " "server csum now %x\n", client_cksum, server_cksum, new_cksum); } - ost_nio_pages_put(req, local_nb, npages); - if (rc == 0) { int nob = 0; @@ -1280,7 +1118,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) LASSERT(j < npages); if (local_nb[j].rc < 0) rcs[i] = local_nb[j].rc; - len -= pp_rnb[j].len; + len -= local_nb[j].len; j++; } while (len > 0); LASSERT(len == 0); @@ -1290,9 +1128,10 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) } out_lock: - ost_brw_lock_put(LCK_PW, ioo, pp_rnb, &lockh); + ost_brw_lock_put(LCK_PW, ioo, remote_nb, &lockh); out_bulk: - ptlrpc_free_bulk(desc); + if (desc) + ptlrpc_free_bulk(desc); out: if (rc == 0) { oti_to_request(oti, req); @@ -1854,7 +1693,6 @@ EXPORT_SYMBOL(ost_handle); */ static void ost_thread_done(struct ptlrpc_thread *thread) { - int i; struct ost_thread_local_cache *tls; /* TLS stands for Thread-Local * Storage */ @@ -1868,10 +1706,6 @@ static void ost_thread_done(struct ptlrpc_thread *thread) */ tls = thread->t_data; if (tls != NULL) { - for (i = 0; i < OST_THREAD_POOL_SIZE; ++ i) { - if (tls->page[i] != NULL) - OBD_PAGE_FREE(tls->page[i]); - } OBD_FREE_PTR(tls); thread->t_data = NULL; } @@ -1883,8 +1717,6 @@ static void ost_thread_done(struct ptlrpc_thread *thread) */ static int ost_thread_init(struct ptlrpc_thread *thread) { - int result; - int i; struct ost_thread_local_cache *tls; ENTRY; @@ -1894,23 +1726,10 @@ static int ost_thread_init(struct ptlrpc_thread *thread) LASSERTF(thread->t_id <= OSS_THREADS_MAX, "%u\n", thread->t_id); OBD_ALLOC_PTR(tls); - if (tls != NULL) { - result = 0; - thread->t_data = tls; - /* - * populate pool - */ - for (i = 0; i < OST_THREAD_POOL_SIZE; ++ i) { - OBD_PAGE_ALLOC(tls->page[i], OST_THREAD_POOL_GFP); - if (tls->page[i] == NULL) { - ost_thread_done(thread); - result = -ENOMEM; - break; - } - } - } else - result = -ENOMEM; - RETURN(result); + if (tls == NULL) + RETURN(-ENOMEM); + thread->t_data = tls; + RETURN(0); } #define OST_WATCHDOG_TIMEOUT (obd_timeout * 1000) diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index d227799..4db4a3c 100644 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -5598,6 +5598,119 @@ test_130e() { } run_test 130e "FIEMAP (test continuation FIEMAP calls)" +test_150() { + local TF="$TMP/$tfile" + + dd if=/dev/urandom of=$TF bs=6096 count=1 || error "dd failed" + cp $TF $DIR/$tfile + cancel_lru_locks osc + cmp $TF $DIR/$tfile || error "$TMP/$tfile $DIR/$tfile differ" + remount_client $MOUNT + cmp $TF $DIR/$tfile || error "$TF $DIR/$tfile differ (remount)" + + $TRUNCATE $TF 6000 + $TRUNCATE $DIR/$tfile 6000 + cancel_lru_locks osc + cmp $TF $DIR/$tfile || error "$TF $DIR/$tfile differ (truncate1)" + + echo "12345" >>$TF + echo "12345" >>$DIR/$tfile + cancel_lru_locks osc + cmp $TF $DIR/$tfile || error "$TF $DIR/$tfile differ (append1)" + + echo "12345" >>$TF + echo "12345" >>$DIR/$tfile + cancel_lru_locks osc + cmp $TF $DIR/$tfile || error "$TF $DIR/$tfile differ (append2)" + + rm -f $TF + true +} +run_test 150 "truncate/append tests" + +function roc_access() { + ACCNUM=`$LCTL get_param -n obdfilter.*.stats | \ + grep 'cache_access'| awk '{print $2}' | \ + awk '{sum=sum+$3} END{print sum}'` + echo $ACCNUM +} + +function roc_hit() { + ACCNUM=`$LCTL get_param -n obdfilter.*.stats | \ + grep 'cache_hit'|awk '{print $2}' | \ + awk '{sum=sum+$1} END{print sum}'` + echo $ACCNUM +} + +test_151() { + local CPAGES=3 + + # check whether obdfilter is cache capable at all + if ! $LCTL get_param -n obdfilter.*.read_cache_enable; then + echo "not cache-capable obdfilter" + return 0 + fi + + # check cache is enabled on all obdfilters + if $LCTL get_param -n obdfilter.*.read_cache_enable | grep 0 >&/dev/null; then + echo "oss cache is disabled" + return 0 + fi + + $LCTL set_param -n obdfilter.*.writethrough_cache_enable 1 + + # pages should be in the case right after write + dd if=/dev/urandom of=$DIR/$tfile bs=4k count=$CPAGES || error "dd failed" + BEFORE=`roc_hit` + cancel_lru_locks osc + cat $DIR/$tfile >/dev/null + AFTER=`roc_hit` + if ! let "AFTER - BEFORE == CPAGES"; then + error "NOT IN CACHE: before: $BEFORE, after: $AFTER" + fi + + # the following read invalidates the cache + cancel_lru_locks osc + $LCTL set_param -n obdfilter.*.read_cache_enable 0 + cat $DIR/$tfile >/dev/null + + # now data shouldn't be found in the cache + BEFORE=`roc_hit` + cancel_lru_locks osc + cat $DIR/$tfile >/dev/null + AFTER=`roc_hit` + if ! let "AFTER - BEFORE == CPAGES"; then + error "IN CACHE: before: $BEFORE, after: $AFTER" + fi + + $LCTL set_param -n obdfilter.*.read_cache_enable 1 + rm -f $DIR/$tfile +} +run_test 151 "test cache on oss and controls ===============================" + +test_152() { + local TF="$TMP/$tfile" + + # simulate ENOMEM during write +#define OBD_FAIL_OST_NOMEM 0x226 + lctl set_param fail_loc=0x80000226 + dd if=/dev/urandom of=$TF bs=6096 count=1 || error "dd failed" + cp $TF $DIR/$tfile + sync || error "sync failed" + lctl set_param fail_loc=0 + + # discard client's cache + cancel_lru_locks osc + + # simulate ENOMEM during read + lctl set_param fail_loc=0x80000226 + cmp $TF $DIR/$tfile || error "cmp failed" + lctl set_param fail_loc=0 + + rm -f $TF +} +run_test 152 "test read/write with enomem ============================" + POOL=${POOL:-cea1} TGT_COUNT=$OSTCOUNT TGTPOOL_FIRST=1