X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fobdfilter%2Ffilter_io.c;h=ea2721e9469302867be1a0894083a731d5365410;hp=8b09fc74c52b375bad74672216653d374d1f9021;hb=3dcf18d3;hpb=c39489126f88bb5b30643ebb11c72fbe9f9d2241 diff --git a/lustre/obdfilter/filter_io.c b/lustre/obdfilter/filter_io.c index 8b09fc7..ea2721e 100644 --- a/lustre/obdfilter/filter_io.c +++ b/lustre/obdfilter/filter_io.c @@ -1,95 +1,68 @@ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * linux/fs/obdfilter/filter_io.c + * GPL HEADER START * - * Copyright (c) 2001-2003 Cluster File Systems, Inc. - * Author: Peter Braam - * Author: Andreas Dilger - * Author: Phil Schwan + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * - * This file is part of Lustre, http://www.lustre.org. + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. * - * Lustre is free software; you can redistribute it and/or - * modify it under the terms of version 2 of the GNU General Public - * License as published by the Free Software Foundation. + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). * - * Lustre is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf * - * You should have received a copy of the GNU General Public License - * along with Lustre; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + * + * GPL HEADER END + */ +/* + * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Use is subject to license terms. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. + * + * lustre/obdfilter/filter_io.c + * + * Author: Peter Braam + * Author: Andreas Dilger + * Author: Phil Schwan */ #define DEBUG_SUBSYSTEM S_FILTER +#ifndef AUTOCONF_INCLUDED #include +#endif #include #include // XXX kill me soon #include -#include -#include -#include -#include +#include +#include +#include #include "filter_internal.h" -static int filter_alloc_dio_page(struct obd_device *obd, struct inode *inode, - struct niobuf_local *lnb) - -{ - struct page *page; - ENTRY; - - page = alloc_pages(GFP_HIGHUSER, 0); - if (page == NULL) { - CERROR("no memory for a temp page\n"); - lnb->rc = -ENOMEM; - RETURN(-ENOMEM); - } - -#if 0 - POISON_PAGE(page, 0xf1); - if (lnb->len != PAGE_SIZE) { - memset(kmap(page) + lnb->len, 0, PAGE_SIZE - lnb->len); - kunmap(page); - } -#endif - page->index = lnb->offset >> PAGE_SHIFT; - - lnb->page = page; - - RETURN(0); -} - -void filter_free_dio_pages(int objcount, struct obd_ioobj *obj, - int niocount, struct niobuf_local *res) -{ - int i, j; - - for (i = 0; i < objcount; i++, obj++) { - for (j = 0 ; j < obj->ioo_bufcnt ; j++, res++) { - if (res->page != NULL) { - __free_page(res->page); - res->page = NULL; - } - } - } -} +int *obdfilter_created_scratchpad; /* Grab the dirty and seen grant announcements from the incoming obdo. * We will later calculate the clients new grant and return it. * Caller must hold osfs lock */ -static void filter_grant_incoming(struct obd_export *exp, struct obdo *oa) +void filter_grant_incoming(struct obd_export *exp, struct obdo *oa) { struct filter_export_data *fed; struct obd_device *obd = exp->exp_obd; - static unsigned long last_msg; - static int last_count; - int mask = D_CACHE; ENTRY; LASSERT_SPIN_LOCKED(&obd->obd_osfs_lock); @@ -103,20 +76,10 @@ static void filter_grant_incoming(struct obd_export *exp, struct obdo *oa) fed = &exp->exp_filter_data; - /* Don't print this to the console the first time it happens, since - * it can happen legitimately on occasion, but only rarely. */ - if (time_after(jiffies, last_msg + 60 * HZ)) { - last_count = 0; - last_msg = jiffies; - } - if ((last_count & (-last_count)) == last_count) - mask = D_WARNING; - last_count++; - /* Add some margin, since there is a small race if other RPCs arrive * out-or-order and have already consumed some grant. We want to * leave this here in case there is a large error in accounting. */ - CDEBUG(oa->o_grant > fed->fed_grant + FILTER_GRANT_CHUNK ? mask:D_CACHE, + CDEBUG(D_CACHE, "%s: cli %s/%p reports grant: "LPU64" dropped: %u, local: %lu\n", obd->obd_name, exp->exp_client_uuid.uuid, exp, oa->o_grant, oa->o_dropped, fed->fed_grant); @@ -124,10 +87,14 @@ static void filter_grant_incoming(struct obd_export *exp, struct obdo *oa) /* Update our accounting now so that statfs takes it into account. * Note that fed_dirty is only approximate and can become incorrect * if RPCs arrive out-of-order. No important calculations depend - * on fed_dirty however. */ + * on fed_dirty however, but we must check sanity to not assert. */ + if ((long long)oa->o_dirty < 0) + oa->o_dirty = 0; + else if (oa->o_dirty > fed->fed_grant + 4 * FILTER_GRANT_CHUNK) + oa->o_dirty = fed->fed_grant + 4 * FILTER_GRANT_CHUNK; obd->u.filter.fo_tot_dirty += oa->o_dirty - fed->fed_dirty; if (fed->fed_grant < oa->o_dropped) { - CERROR("%s: cli %s/%p reports %u dropped > fed_grant %lu\n", + CDEBUG(D_CACHE,"%s: cli %s/%p reports %u dropped > grant %lu\n", obd->obd_name, exp->exp_client_uuid.uuid, exp, oa->o_dropped, fed->fed_grant); oa->o_dropped = 0; @@ -141,11 +108,36 @@ static void filter_grant_incoming(struct obd_export *exp, struct obdo *oa) obd->u.filter.fo_tot_granted -= oa->o_dropped; fed->fed_grant -= oa->o_dropped; fed->fed_dirty = oa->o_dirty; + + if (oa->o_flags & OBD_FL_SHRINK_GRANT) { + obd_size left_space = filter_grant_space_left(exp); + struct filter_obd *filter = &exp->exp_obd->u.filter; + + /*Only if left_space < fo_tot_clients * 32M, + *then the grant space could be shrinked */ + if (left_space < filter->fo_tot_granted_clients * + FILTER_GRANT_SHRINK_LIMIT) { + fed->fed_grant -= oa->o_grant; + filter->fo_tot_granted -= oa->o_grant; + CDEBUG(D_CACHE, "%s: cli %s/%p shrink "LPU64 + "fed_grant %ld total "LPU64"\n", + obd->obd_name, exp->exp_client_uuid.uuid, + exp, oa->o_grant, fed->fed_grant, + filter->fo_tot_granted); + oa->o_grant = 0; + } + } + + if (fed->fed_dirty < 0 || fed->fed_grant < 0 || fed->fed_pending < 0) { + CERROR("%s: cli %s/%p dirty %ld pend %ld grant %ld\n", + obd->obd_name, exp->exp_client_uuid.uuid, exp, + fed->fed_dirty, fed->fed_pending, fed->fed_grant); + spin_unlock(&obd->obd_osfs_lock); + LBUG(); + } EXIT; } -#define GRANT_FOR_LLOG(obd) 16 - /* Figure out how much space is available between what we've granted * and what remains in the filesystem. Compensate for ext3 indirect * block overhead when computing how much free space is left ungranted. @@ -154,15 +146,16 @@ static void filter_grant_incoming(struct obd_export *exp, struct obdo *oa) obd_size filter_grant_space_left(struct obd_export *exp) { struct obd_device *obd = exp->exp_obd; - int blockbits = obd->u.filter.fo_sb->s_blocksize_bits; + int blockbits = obd->u.obt.obt_sb->s_blocksize_bits; obd_size tot_granted = obd->u.filter.fo_tot_granted, avail, left = 0; int rc, statfs_done = 0; LASSERT_SPIN_LOCKED(&obd->obd_osfs_lock); - if (time_before(obd->obd_osfs_age, jiffies - HZ)) { + if (cfs_time_before_64(obd->obd_osfs_age, cfs_time_current_64() - HZ)) { restat: - rc = fsfilt_statfs(obd, obd->u.filter.fo_sb, jiffies + 1); + rc = fsfilt_statfs(obd, obd->u.obt.obt_sb, + cfs_time_current_64() + HZ); if (rc) /* N.B. statfs can't really fail */ RETURN(0); statfs_done = 1; @@ -184,18 +177,11 @@ restat: if (left >= tot_granted) { left -= tot_granted; } else { - static unsigned long next; - if (left < tot_granted - obd->u.filter.fo_tot_pending && - time_after(jiffies, next)) { - spin_unlock(&obd->obd_osfs_lock); + if (left < tot_granted - obd->u.filter.fo_tot_pending) { CERROR("%s: cli %s/%p grant "LPU64" > available " LPU64" and pending "LPU64"\n", obd->obd_name, exp->exp_client_uuid.uuid, exp, tot_granted, left, obd->u.filter.fo_tot_pending); - if (next == 0) - portals_debug_dumplog(); - next = jiffies + 20 * HZ; - spin_lock(&obd->obd_osfs_lock); } left = 0; } @@ -218,7 +204,7 @@ long filter_grant(struct obd_export *exp, obd_size current_grant, { struct obd_device *obd = exp->exp_obd; struct filter_export_data *fed = &exp->exp_filter_data; - int blockbits = obd->u.filter.fo_sb->s_blocksize_bits; + int blockbits = obd->u.obt.obt_sb->s_blocksize_bits; __u64 grant = 0; LASSERT_SPIN_LOCKED(&obd->obd_osfs_lock); @@ -232,23 +218,40 @@ long filter_grant(struct obd_export *exp, obd_size current_grant, * has and what we think it has, don't grant very much and let the * client consume its grant first. Either it just has lots of RPCs * in flight, or it was evicted and its grants will soon be used up. */ - if (current_grant < want && - current_grant < fed->fed_grant + FILTER_GRANT_CHUNK) { - grant = min((want >> blockbits) / 2, + if (want > 0x7fffffff) { + CERROR("%s: client %s/%p requesting > 2GB grant "LPU64"\n", + obd->obd_name, exp->exp_client_uuid.uuid, exp, want); + } else if (current_grant < want && + current_grant < fed->fed_grant + FILTER_GRANT_CHUNK) { + grant = min((want >> blockbits), (fs_space_left >> blockbits) / 8); grant <<= blockbits; if (grant) { - if (grant > FILTER_GRANT_CHUNK) + /* Allow >FILTER_GRANT_CHUNK size when clients + * reconnect due to a server reboot. + */ + if ((grant > FILTER_GRANT_CHUNK) && + (!obd->obd_recovering)) grant = FILTER_GRANT_CHUNK; obd->u.filter.fo_tot_granted += grant; fed->fed_grant += grant; + if (fed->fed_grant < 0) { + CERROR("%s: cli %s/%p grant %ld want "LPU64 + "current"LPU64"\n", + obd->obd_name, exp->exp_client_uuid.uuid, + exp, fed->fed_grant, want,current_grant); + spin_unlock(&obd->obd_osfs_lock); + LBUG(); + } } } - CDEBUG(D_CACHE,"%s: cli %s/%p wants: "LPU64" granting: "LPU64"\n", - obd->obd_name, exp->exp_client_uuid.uuid, exp, want, grant); + CDEBUG(D_CACHE, + "%s: cli %s/%p wants: "LPU64" current grant "LPU64 + " granting: "LPU64"\n", obd->obd_name, exp->exp_client_uuid.uuid, + exp, want, current_grant, grant); CDEBUG(D_CACHE, "%s: cli %s/%p tot cached:"LPU64" granted:"LPU64 " num_exports: %d\n", obd->obd_name, exp->exp_client_uuid.uuid, @@ -258,128 +261,279 @@ long filter_grant(struct obd_export *exp, obd_size current_grant, return grant; } +/* + * the routine is used to request pages from pagecache + * + * use GFP_NOFS for requests from a local client not allowing to enter FS + * as we might end up waiting on a page he sent in the request we're serving. + * use __GFP_HIGHMEM so that the pages can use all of the available memory + * on 32-bit machines + * use more agressive GFP_HIGHUSER flags from non-local clients to be able to + * generate more memory pressure, but at the same time use __GFP_NOMEMALLOC + * in order not to exhaust emergency reserves. + * + * See Bug 19529 and Bug 19917 for details. + */ +static struct page *filter_get_page(struct obd_device *obd, + struct inode *inode, + obd_off offset, + int localreq) +{ + struct page *page; + + page = find_or_create_page(inode->i_mapping, offset >> CFS_PAGE_SHIFT, + (localreq ? (GFP_NOFS | __GFP_HIGHMEM) + : (GFP_HIGHUSER | __GFP_NOMEMALLOC))); + if (unlikely(page == NULL)) + lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_NO_PAGE, 1); + + return page; +} + +/* + * the routine initializes array of local_niobuf from remote_niobuf + */ +static int filter_map_remote_to_local(int objcount, struct obd_ioobj *obj, + struct niobuf_remote *nb, + int *nrpages, struct niobuf_local *res) +{ + struct niobuf_remote *rnb; + struct niobuf_local *lnb; + int i, max; + ENTRY; + + /* we don't support multiobject RPC yet + * ost_brw_read() and ost_brw_write() check this */ + LASSERT(objcount == 1); + + max = *nrpages; + *nrpages = 0; + for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt; i++, rnb++) { + obd_off offset = rnb->offset; + unsigned int len = rnb->len; + + while (len > 0) { + int poff = offset & (CFS_PAGE_SIZE - 1); + int plen = CFS_PAGE_SIZE - poff; + + if (*nrpages >= max) { + CERROR("small array of local bufs: %d\n", max); + RETURN(-EINVAL); + } + + if (plen > len) + plen = len; + lnb->offset = offset; + lnb->len = plen; + lnb->flags = rnb->flags; + lnb->page = NULL; + lnb->rc = 0; + lnb->lnb_grant_used = 0; + + LASSERTF(plen <= len, "plen %u, len %u\n", plen, len); + offset += plen; + len -= plen; + lnb++; + (*nrpages)++; + } + } + RETURN(0); +} + +/* + * the function is used to free all pages used for request + * just to mimic cacheless OSS which don't occupy much memory + */ +void filter_invalidate_cache(struct obd_device *obd, struct obd_ioobj *obj, + struct niobuf_remote *nb, struct inode *inode) +{ + struct niobuf_remote *rnb; + int i; + + LASSERT(inode != NULL); + + for (i = 0, rnb = nb; i < obj->ioo_bufcnt; i++, rnb++) { + obd_off start; + obd_off end; + + start = rnb->offset >> CFS_PAGE_SHIFT; + end = (rnb->offset + rnb->len) >> CFS_PAGE_SHIFT; + invalidate_mapping_pages(inode->i_mapping, start, end); + /* just to avoid warnings */ + start = 0; + end = 0; + } +} + +/* + * the invalidate above doesn't work during read because lnet pins pages. + * The truncate is used here instead to drop pages from cache + */ +void filter_truncate_cache(struct obd_device *obd, struct obd_ioobj *obj, + struct niobuf_remote *nb, int pages, + struct niobuf_local *res, struct inode *inode) +{ + struct niobuf_remote *rnb; + int i; + + LASSERT(inode != NULL); +#ifdef HAVE_TRUNCATE_RANGE + for (i = 0, rnb = nb; i < obj->ioo_bufcnt; i++, rnb++) { + /* remove pages in which range is fit */ + truncate_inode_pages_range(inode->i_mapping, + rnb->offset & CFS_PAGE_MASK, + (rnb->offset + rnb->len - 1) | + ~CFS_PAGE_MASK); + } +#elif (defined HAVE_TRUNCATE_COMPLETE) + for (i = 0, lnb = res; i < pages; i++, lnb++) + truncate_complete_page(inode->i_mapping, lnb->page); +#else +#error "Nor truncate_inode_pages_range or truncate_complete_page are supported" +#endif +} static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, int objcount, struct obd_ioobj *obj, - int niocount, struct niobuf_remote *nb, - struct niobuf_local *res, - struct obd_trans_info *oti) + struct niobuf_remote *nb, + int *npages, struct niobuf_local *res, + struct obd_trans_info *oti, + struct lustre_capa *capa) { struct obd_device *obd = exp->exp_obd; + struct timeval start, end; struct lvfs_run_ctxt saved; - struct niobuf_remote *rnb; struct niobuf_local *lnb; struct dentry *dentry = NULL; - struct inode *inode; - void *iobuf = NULL; + struct inode *inode = NULL; + void *iobuf = NULL; int rc = 0, i, tot_bytes = 0; unsigned long now = jiffies; + long timediff; ENTRY; /* We are currently not supporting multi-obj BRW_READ RPCS at all. - * When we do this function's dentry cleanup will need to be fixed */ + * When we do this function's dentry cleanup will need to be fixed. + * These values are verified in ost_brw_write() from the wire. */ LASSERTF(objcount == 1, "%d\n", objcount); LASSERTF(obj->ioo_bufcnt > 0, "%d\n", obj->ioo_bufcnt); + rc = filter_auth_capa(exp, NULL, obdo_mdsno(oa), capa, + CAPA_OPC_OSS_READ); + if (rc) + RETURN(rc); + if (oa && oa->o_valid & OBD_MD_FLGRANT) { spin_lock(&obd->obd_osfs_lock); filter_grant_incoming(exp, oa); - oa->o_grant = 0; - + if (!(oa->o_flags & OBD_FL_SHRINK_GRANT)) + oa->o_grant = 0; spin_unlock(&obd->obd_osfs_lock); } - memset(res, 0, niocount * sizeof(*res)); + iobuf = filter_iobuf_get(&obd->u.filter, oti); + if (IS_ERR(iobuf)) + RETURN(PTR_ERR(iobuf)); - push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); - rc = filter_alloc_iobuf(OBD_BRW_READ, obj->ioo_bufcnt, &iobuf); - if (rc) + push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + dentry = filter_oa2dentry(obd, oa); + if (IS_ERR(dentry)) { + rc = PTR_ERR(dentry); + dentry = NULL; GOTO(cleanup, rc); + } - dentry = filter_oa2dentry(obd, oa); - if (IS_ERR(dentry)) - GOTO(cleanup, rc = PTR_ERR(dentry)); + inode = dentry->d_inode; - if (dentry->d_inode == NULL) { - CERROR("trying to BRW to non-existent file "LPU64"\n", - obj->ioo_id); - GOTO(cleanup, rc = -ENOENT); - } + obdo_to_inode(inode, oa, OBD_MD_FLATIME); - inode = dentry->d_inode; + rc = filter_map_remote_to_local(objcount, obj, nb, npages, res); + if (rc) + GOTO(cleanup, rc); - if (time_after(jiffies, now + 15 * HZ)) - CERROR("slow preprw_read setup %lus\n", (jiffies - now) / HZ); - else - CDEBUG(D_INFO, "preprw_read setup: %lu jiffies\n", - (jiffies - now)); + fsfilt_check_slow(obd, now, "preprw_read setup"); + + /* find pages for all segments, fill array with them */ + do_gettimeofday(&start); + for (i = 0, lnb = res; i < *npages; i++, lnb++) { - for (i = 0, lnb = res, rnb = nb; i < obj->ioo_bufcnt; - i++, rnb++, lnb++) { lnb->dentry = dentry; - lnb->offset = rnb->offset; - lnb->len = rnb->len; - lnb->flags = rnb->flags; - - if (inode->i_size <= rnb->offset) - /* If there's no more data, abort early. - * lnb->page == NULL and lnb->rc == 0, so it's - * easy to detect later. */ + + if (i_size_read(inode) <= lnb->offset) + /* If there's no more data, abort early. lnb->rc == 0, + * so it's easy to detect later. */ break; - else - rc = filter_alloc_dio_page(obd, inode, lnb); - if (rc) { - CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, - "page err %u@"LPU64" %u/%u %p: rc %d\n", - lnb->len, lnb->offset, i, obj->ioo_bufcnt, - dentry, rc); - GOTO(cleanup, rc); - } - if (inode->i_size < lnb->offset + lnb->len - 1) - lnb->rc = inode->i_size - lnb->offset; + lnb->page = filter_get_page(obd, inode, lnb->offset, 0); + if (lnb->page == NULL) + GOTO(cleanup, rc = -ENOMEM); + + lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_CACHE_ACCESS, 1); + + if (i_size_read(inode) < lnb->offset + lnb->len - 1) + lnb->rc = i_size_read(inode) - lnb->offset; else lnb->rc = lnb->len; tot_bytes += lnb->rc; + if (PageUptodate(lnb->page)) { + lprocfs_counter_add(obd->obd_stats, + LPROC_FILTER_CACHE_HIT, 1); + continue; + } + + lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_CACHE_MISS, 1); filter_iobuf_add_page(obd, iobuf, inode, lnb->page); } + do_gettimeofday(&end); + timediff = cfs_timeval_sub(&end, &start, NULL); + lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_GET_PAGE, timediff); - if (time_after(jiffies, now + 15 * HZ)) - CERROR("slow start_page_read %lus\n", (jiffies - now) / HZ); - else - CDEBUG(D_INFO, "start_page_read: %lu jiffies\n", - (jiffies - now)); + if (OBD_FAIL_CHECK(OBD_FAIL_OST_NOMEM)) + GOTO(cleanup, rc = -ENOMEM); - rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf, exp, - NULL, NULL, NULL); + fsfilt_check_slow(obd, now, "start_page_read"); + + rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf, + exp, NULL, NULL, NULL); if (rc) GOTO(cleanup, rc); lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES, tot_bytes); - filter_tally_read(&exp->exp_obd->u.filter, res, niocount); + if (exp->exp_nid_stats && exp->exp_nid_stats->nid_stats) + lprocfs_counter_add(exp->exp_nid_stats->nid_stats, + LPROC_FILTER_READ_BYTES, tot_bytes); EXIT; -cleanup: - if (rc != 0) { - filter_free_dio_pages(objcount, obj, niocount, res); + cleanup: + /* unlock pages to allow access from concurrent OST_READ */ + for (i = 0, lnb = res; i < *npages; i++, lnb++) { + if (lnb->page) { + LASSERT(PageLocked(lnb->page)); + unlock_page(lnb->page); + + if (rc) { + page_cache_release(lnb->page); + lnb->page = NULL; + } + } + } + if (rc != 0) { if (dentry != NULL) f_dput(dentry); - else - CERROR("NULL dentry in cleanup -- tell CFS\n"); } - if (iobuf != NULL) - filter_free_iobuf(iobuf); + filter_iobuf_put(&obd->u.filter, iobuf, oti); - pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); + pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); if (rc) CERROR("io error %d\n", rc); + return rc; } @@ -391,16 +545,15 @@ cleanup: * right on through. * * Caller must hold obd_osfs_lock. */ -static int filter_grant_check(struct obd_export *exp, int objcount, - struct fsfilt_objinfo *fso, int niocount, - struct niobuf_remote *rnb, - struct niobuf_local *lnb, obd_size *left, - struct inode *inode) +static int filter_grant_check(struct obd_export *exp, struct obdo *oa, + int objcount, struct fsfilt_objinfo *fso, + int niocount, struct niobuf_local *lnb, + obd_size *left, struct inode *inode) { struct filter_export_data *fed = &exp->exp_filter_data; - int blocksize = exp->exp_obd->u.filter.fo_sb->s_blocksize; + int blocksize = exp->exp_obd->u.obt.obt_sb->s_blocksize; unsigned long used = 0, ungranted = 0, using; - int i, rc = -ENOSPC, obj, n = 0, mask = D_CACHE; + int i, rc = -ENOSPC, obj, n = 0; LASSERT_SPIN_LOCKED(&exp->exp_obd->obd_osfs_lock); @@ -408,14 +561,15 @@ static int filter_grant_check(struct obd_export *exp, int objcount, for (i = 0; i < fso[obj].fso_bufcnt; i++, n++) { int tmp, bytes; - /* FIXME: this is calculated with PAGE_SIZE on client */ - bytes = rnb[n].len; - bytes += rnb[n].offset & (blocksize - 1); - tmp = (rnb[n].offset + rnb[n].len) & (blocksize - 1); + /* should match the code in osc_exit_cache */ + bytes = lnb[n].len; + bytes += lnb[n].offset & (blocksize - 1); + tmp = (lnb[n].offset + lnb[n].len) & (blocksize - 1); if (tmp) bytes += blocksize - tmp; - if (rnb[n].flags & OBD_BRW_FROM_GRANT) { + if ((lnb[n].flags & OBD_BRW_FROM_GRANT) && + (oa->o_valid & OBD_MD_FLGRANT)) { if (fed->fed_grant < used + bytes) { CDEBUG(D_CACHE, "%s: cli %s/%p claims %ld+%d " @@ -423,20 +577,20 @@ static int filter_grant_check(struct obd_export *exp, int objcount, exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp, used, bytes, fed->fed_grant, n); - mask = D_ERROR; } else { used += bytes; - rnb[n].flags |= OBD_BRW_GRANTED; + lnb[n].flags |= OBD_BRW_GRANTED; lnb[n].lnb_grant_used = bytes; CDEBUG(0, "idx %d used=%lu\n", n, used); rc = 0; continue; } } - if (*left > ungranted) { + if (*left > ungranted + bytes) { /* if enough space, pretend it was granted */ ungranted += bytes; - rnb[n].flags |= OBD_BRW_GRANTED; + lnb[n].flags |= OBD_BRW_GRANTED; + lnb[n].lnb_grant_used = bytes; CDEBUG(0, "idx %d ungranted=%lu\n",n,ungranted); rc = 0; continue; @@ -449,7 +603,7 @@ static int filter_grant_check(struct obd_export *exp, int objcount, * marked BRW_GRANTED are already mapped and we can * ignore this error. */ lnb[n].rc = -ENOSPC; - rnb[n].flags &= OBD_BRW_GRANTED; + lnb[n].flags &= ~OBD_BRW_GRANTED; CDEBUG(D_CACHE,"%s: cli %s/%p idx %d no space for %d\n", exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp, n, bytes); @@ -462,17 +616,18 @@ static int filter_grant_check(struct obd_export *exp, int objcount, * happens in filter_grant_commit() after the writes are done. */ *left -= ungranted; fed->fed_grant -= used; - fed->fed_pending += used; - exp->exp_obd->u.filter.fo_tot_pending += used; + fed->fed_pending += used + ungranted; + exp->exp_obd->u.filter.fo_tot_granted += ungranted; + exp->exp_obd->u.filter.fo_tot_pending += used + ungranted; - CDEBUG(mask, + CDEBUG(D_CACHE, "%s: cli %s/%p used: %lu ungranted: %lu grant: %lu dirty: %lu\n", exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp, used, ungranted, fed->fed_grant, fed->fed_dirty); /* Rough calc in case we don't refresh cached statfs data */ using = (used + ungranted + 1 ) >> - exp->exp_obd->u.filter.fo_sb->s_blocksize_bits; + exp->exp_obd->u.obt.obt_sb->s_blocksize_bits; if (exp->exp_obd->obd_osfs.os_bavail > using) exp->exp_obd->obd_osfs.os_bavail -= using; else @@ -487,6 +642,13 @@ static int filter_grant_check(struct obd_export *exp, int objcount, exp->exp_obd->u.filter.fo_tot_dirty -= used; fed->fed_dirty -= used; + if (fed->fed_dirty < 0 || fed->fed_grant < 0 || fed->fed_pending < 0) { + CERROR("%s: cli %s/%p dirty %ld pend %ld grant %ld\n", + exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp, + fed->fed_dirty, fed->fed_pending, fed->fed_grant); + spin_unlock(&exp->exp_obd->obd_osfs_lock); + LBUG(); + } return rc; } @@ -502,90 +664,135 @@ static int filter_grant_check(struct obd_export *exp, int objcount, * bug) or ensure we get the page locks in an appropriate order. */ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa, int objcount, struct obd_ioobj *obj, - int niocount, struct niobuf_remote *nb, + struct niobuf_remote *nb, int *npages, struct niobuf_local *res, - struct obd_trans_info *oti) + struct obd_trans_info *oti, + struct lustre_capa *capa) { + struct obd_device *obd = exp->exp_obd; + struct timeval start, end; struct lvfs_run_ctxt saved; - struct niobuf_remote *rnb; struct niobuf_local *lnb = res; struct fsfilt_objinfo fso; + struct filter_mod_data *fmd; struct dentry *dentry = NULL; - void *iobuf; + void *iobuf; obd_size left; - unsigned long now = jiffies; - int rc = 0, i, tot_bytes = 0, cleanup_phase = 0; + unsigned long now = jiffies, timediff; + int rc = 0, i, tot_bytes = 0, cleanup_phase = 0, localreq = 0; ENTRY; LASSERT(objcount == 1); LASSERT(obj->ioo_bufcnt > 0); - memset(res, 0, niocount * sizeof(*res)); - - rc = filter_alloc_iobuf(OBD_BRW_READ, obj->ioo_bufcnt, &iobuf); + rc = filter_auth_capa(exp, NULL, obdo_mdsno(oa), capa, + CAPA_OPC_OSS_WRITE); if (rc) - GOTO(cleanup, rc); + RETURN(rc); + + if (exp->exp_connection->c_peer.nid == exp->exp_connection->c_self) + localreq = 1; + + push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + iobuf = filter_iobuf_get(&obd->u.filter, oti); + if (IS_ERR(iobuf)) + GOTO(cleanup, rc = PTR_ERR(iobuf)); cleanup_phase = 1; - push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); - dentry = filter_fid2dentry(exp->exp_obd, NULL, obj->ioo_gr, + dentry = filter_fid2dentry(obd, NULL, obj->ioo_gr, obj->ioo_id); if (IS_ERR(dentry)) GOTO(cleanup, rc = PTR_ERR(dentry)); - cleanup_phase = 2; - + if (dentry->d_inode == NULL) { - CERROR("trying to BRW to non-existent file "LPU64"\n", - obj->ioo_id); - f_dput(dentry); + CERROR("%s: trying to BRW to non-existent file "LPU64"\n", + obd->obd_name, obj->ioo_id); GOTO(cleanup, rc = -ENOENT); } - fso.fso_dentry = dentry; - fso.fso_bufcnt = obj->ioo_bufcnt; + if (oa->o_valid & (OBD_MD_FLUID | OBD_MD_FLGID) && + dentry->d_inode->i_mode & (S_ISUID | S_ISGID)) { + rc = filter_capa_fixoa(exp, oa, obdo_mdsno(oa), capa); + if (rc) + GOTO(cleanup, rc); + } - if (time_after(jiffies, now + 15 * HZ)) - CERROR("slow preprw_write setup %lus\n", (jiffies - now) / HZ); - else - CDEBUG(D_INFO, "preprw_write setup: %lu jiffies\n", - (jiffies - now)); + rc = filter_map_remote_to_local(objcount, obj, nb, npages, res); + if (rc) + GOTO(cleanup, rc); - spin_lock(&exp->exp_obd->obd_osfs_lock); - if (oa) - filter_grant_incoming(exp, oa); - + fsfilt_check_slow(obd, now, "preprw_write setup"); + + /* Don't update inode timestamps if this write is older than a + * setattr which modifies the timestamps. b=10150 */ + /* XXX when we start having persistent reservations this needs to + * be changed to filter_fmd_get() to create the fmd if it doesn't + * already exist so we can store the reservation handle there. */ + fmd = filter_fmd_find(exp, obj->ioo_id, obj->ioo_gr); + + LASSERT(oa != NULL); + spin_lock(&obd->obd_osfs_lock); + filter_grant_incoming(exp, oa); + if (fmd && fmd->fmd_mactime_xid > oti->oti_xid) + oa->o_valid &= ~(OBD_MD_FLMTIME | OBD_MD_FLCTIME | + OBD_MD_FLATIME); + else + obdo_to_inode(dentry->d_inode, oa, OBD_MD_FLATIME | + OBD_MD_FLMTIME | OBD_MD_FLCTIME); cleanup_phase = 3; left = filter_grant_space_left(exp); - rc = filter_grant_check(exp, objcount, &fso, niocount, nb, res, + fso.fso_dentry = dentry; + fso.fso_bufcnt = *npages; + + rc = filter_grant_check(exp, oa, objcount, &fso, *npages, res, &left, dentry->d_inode); - if (oa && oa->o_valid & OBD_MD_FLGRANT) + + /* do not zero out oa->o_valid as it is used in filter_commitrw_write() + * for setting UID/GID and fid EA in first write time. */ + if (oa->o_valid & OBD_MD_FLGRANT) oa->o_grant = filter_grant(exp,oa->o_grant,oa->o_undirty,left); - spin_unlock(&exp->exp_obd->obd_osfs_lock); + spin_unlock(&obd->obd_osfs_lock); + filter_fmd_put(exp, fmd); - if (rc) + if (rc) GOTO(cleanup, rc); + cleanup_phase = 4; + + /* Filter truncate first locks i_mutex then partally truncated + * page, filter write code first locks pages then take + * i_mutex. To avoid a deadlock in case of concurrent + * punch/write requests from one client, filter writes and + * filter truncates are serialized by i_alloc_sem, allowing + * multiple writes or single truncate. */ + down_read(&dentry->d_inode->i_alloc_sem); + + do_gettimeofday(&start); + for (i = 0, lnb = res; i < *npages; i++, lnb++) { - for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt; - i++, lnb++, rnb++) { /* We still set up for ungranted pages so that granted pages * can be written to disk as they were promised, and portals * needs to keep the pages all aligned properly. */ lnb->dentry = dentry; - lnb->offset = rnb->offset; - lnb->len = rnb->len; - lnb->flags = rnb->flags; - rc = filter_alloc_dio_page(exp->exp_obd, dentry->d_inode,lnb); - if (rc) { - CERROR("page err %u@"LPU64" %u/%u %p: rc %d\n", - lnb->len, lnb->offset, - i, obj->ioo_bufcnt, dentry, rc); - GOTO(cleanup, rc); - } - cleanup_phase = 4; + lnb->page = filter_get_page(obd, dentry->d_inode, lnb->offset, + localreq); + if (lnb->page == NULL) + GOTO(cleanup, rc = -ENOMEM); + + /* DLM locking protects us from write and truncate competing + * for same region, but truncate can leave dirty page in the + * cache. it's possible the writeout on a such a page is in + * progress when we access it. it's also possible that during + * this writeout we put new (partial) data, but then won't + * be able to proceed in filter_commitrw_write(). thus let's + * just wait for writeout completion, should be rare enough. + * -bzzz */ + if (obd->u.filter.fo_writethrough_cache) + wait_on_page_writeback(lnb->page); + BUG_ON(PageWriteback(lnb->page)); /* If the filter writes a partial page, then has the file * extended, the client will read in the whole page. the @@ -593,151 +800,152 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa, * page on disk. we do it by hand for partial extending * writes, send_bio() is responsible for zeroing pages when * asked to read unmapped blocks -- brw_kiovec() does this. */ - if (lnb->len != PAGE_SIZE) { - if (lnb->offset + lnb->len < dentry->d_inode->i_size) { - filter_iobuf_add_page(exp->exp_obd, iobuf, + if (lnb->len != CFS_PAGE_SIZE) { + __s64 maxidx; + + maxidx = ((i_size_read(dentry->d_inode) + + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT) - 1; + if (maxidx >= lnb->page->index) { + LL_CDEBUG_PAGE(D_PAGE, lnb->page, "write %u @ " + LPU64" flg %x before EOF %llu\n", + lnb->len, lnb->offset,lnb->flags, + i_size_read(dentry->d_inode)); + filter_iobuf_add_page(obd, iobuf, dentry->d_inode, lnb->page); } else { - memset(kmap(lnb->page) + lnb->len, 0, - PAGE_SIZE - lnb->len); + long off; + char *p = kmap(lnb->page); + + off = lnb->offset & ~CFS_PAGE_MASK; + if (off) + memset(p, 0, off); + off = (lnb->offset + lnb->len) & ~CFS_PAGE_MASK; + if (off) + memset(p + off, 0, CFS_PAGE_SIZE - off); kunmap(lnb->page); } } if (lnb->rc == 0) tot_bytes += lnb->len; } + do_gettimeofday(&end); + timediff = cfs_timeval_sub(&end, &start, NULL); + lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_GET_PAGE, timediff); + + if (OBD_FAIL_CHECK(OBD_FAIL_OST_NOMEM)) + GOTO(cleanup, rc = -ENOMEM); + /* don't unlock pages to prevent any access */ rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf, exp, NULL, NULL, NULL); - - if (time_after(jiffies, now + 15 * HZ)) - CERROR("slow start_page_write %lus\n", (jiffies - now) / HZ); - else - CDEBUG(D_INFO, "start_page_write: %lu jiffies\n", - (jiffies - now)); - lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_WRITE_BYTES, - tot_bytes); + fsfilt_check_slow(obd, now, "start_page_write"); + + if (exp->exp_nid_stats && exp->exp_nid_stats->nid_stats) + lprocfs_counter_add(exp->exp_nid_stats->nid_stats, + LPROC_FILTER_WRITE_BYTES, tot_bytes); EXIT; cleanup: switch(cleanup_phase) { case 4: - if (rc) - filter_free_dio_pages(objcount, obj, niocount, res); + if (rc) { + for (i = 0, lnb = res; i < *npages; i++, lnb++) { + if (lnb->page != NULL) { + unlock_page(lnb->page); + page_cache_release(lnb->page); + lnb->page = NULL; + } + } + up_read(&dentry->d_inode->i_alloc_sem); + } case 3: - pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); - filter_free_iobuf(iobuf); + filter_iobuf_put(&obd->u.filter, iobuf, oti); case 2: + pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); if (rc) f_dput(dentry); break; case 1: - spin_lock(&exp->exp_obd->obd_osfs_lock); + filter_iobuf_put(&obd->u.filter, iobuf, oti); + case 0: + spin_lock(&obd->obd_osfs_lock); if (oa) filter_grant_incoming(exp, oa); - spin_unlock(&exp->exp_obd->obd_osfs_lock); - pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); - filter_free_iobuf(iobuf); + spin_unlock(&obd->obd_osfs_lock); + pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); break; default:; - } - RETURN(rc); + return rc; } int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa, - int objcount, struct obd_ioobj *obj, int niocount, - struct niobuf_remote *nb, struct niobuf_local *res, - struct obd_trans_info *oti) + int objcount, struct obd_ioobj *obj, + struct niobuf_remote *nb, int *npages, + struct niobuf_local *res, struct obd_trans_info *oti, + struct lustre_capa *capa) { if (cmd == OBD_BRW_WRITE) return filter_preprw_write(cmd, exp, oa, objcount, obj, - niocount, nb, res, oti); - + nb, npages, res, oti, capa); if (cmd == OBD_BRW_READ) return filter_preprw_read(cmd, exp, oa, objcount, obj, - niocount, nb, res, oti); - + nb, npages, res, oti, capa); LBUG(); return -EPROTO; } -void filter_release_read_page(struct filter_obd *filter, struct inode *inode, - struct page *page) -{ - int drop = 0; - - if (inode != NULL && - (inode->i_size > filter->fo_readcache_max_filesize)) - drop = 1; - - /* drop from cache like truncate_list_pages() */ - if (drop && !TryLockPage(page)) { - if (page->mapping) - ll_truncate_complete_page(page); - unlock_page(page); - } - page_cache_release(page); -} - static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa, int objcount, struct obd_ioobj *obj, - int niocount, struct niobuf_local *res, + struct niobuf_remote *rnb, + int npages, struct niobuf_local *res, struct obd_trans_info *oti, int rc) { + struct filter_obd *fo = &exp->exp_obd->u.filter; struct inode *inode = NULL; + struct ldlm_res_id res_id; + struct ldlm_resource *resource = NULL; + struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; + struct niobuf_local *lnb; + int i; ENTRY; + osc_build_res_name(obj->ioo_id, obj->ioo_gr, &res_id); + /* If oa != NULL then filter_preprw_read updated the inode atime + * and we should update the lvb so that other glimpses will also + * get the updated value. bug 5972 */ + if (oa && ns && ns->ns_lvbo && ns->ns_lvbo->lvbo_update) { + resource = ldlm_resource_get(ns, NULL, &res_id, LDLM_EXTENT, 0); + + if (resource != NULL) { + LDLM_RESOURCE_ADDREF(resource); + ns->ns_lvbo->lvbo_update(resource, NULL, 0, 1); + LDLM_RESOURCE_DELREF(resource); + ldlm_resource_putref(resource); + } + } + if (res->dentry != NULL) inode = res->dentry->d_inode; - filter_free_dio_pages(objcount, obj, niocount, res); - + for (i = 0, lnb = res; i < npages; i++, lnb++) + if (lnb->page != NULL) + page_cache_release(lnb->page); + + if (inode && (fo->fo_read_cache == 0 || + i_size_read(inode) > fo->fo_readcache_max_filesize)) + filter_truncate_cache(exp->exp_obd, obj, rnb, npages, res, + inode); + + for (i = 0, lnb = res; i < npages; i++, lnb++) + lnb->page = NULL; + if (res->dentry != NULL) f_dput(res->dentry); RETURN(rc); } -void flip_into_page_cache(struct inode *inode, struct page *new_page) -{ - struct page *old_page; - int rc; - - do { - /* the dlm is protecting us from read/write concurrency, so we - * expect this find_lock_page to return quickly. even if we - * race with another writer it won't be doing much work with - * the page locked. we do this 'cause t_c_p expects a - * locked page, and it wants to grab the pagecache lock - * as well. */ - old_page = find_lock_page(inode->i_mapping, new_page->index); - if (old_page) { - ll_truncate_complete_page(old_page); - unlock_page(old_page); - page_cache_release(old_page); - } - -#if 0 /* this should be a /proc tunable someday */ - /* racing o_directs (no locking ioctl) could race adding - * their pages, so we repeat the page invalidation unless - * we successfully added our new page */ - rc = add_to_page_cache_unique(new_page, inode->i_mapping, - new_page->index, - page_hash(inode->i_mapping, - new_page->index)); - if (rc == 0) { - /* add_to_page_cache clears uptodate|dirty and locks - * the page */ - SetPageUptodate(new_page); - unlock_page(new_page); - } -#else - rc = 0; -#endif - } while (rc != 0); -} - void filter_grant_commit(struct obd_export *exp, int niocount, struct niobuf_local *res) { @@ -768,155 +976,32 @@ void filter_grant_commit(struct obd_export *exp, int niocount, spin_unlock(&exp->exp_obd->obd_osfs_lock); } -int filter_do_cow(struct obd_export *exp, struct obd_ioobj *obj, - int nioo, struct niobuf_remote *rnb) -{ - struct dentry *dentry; - struct lvfs_run_ctxt saved; - struct write_extents *extents = NULL; - int j, rc = 0, numexts = 0, flags = 0; - - ENTRY; - - LASSERT(nioo == 1); - - push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); - - dentry = filter_fid2dentry(exp->exp_obd, NULL, obj->ioo_gr, - obj->ioo_id); - if (IS_ERR(dentry)) { - pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); - RETURN (PTR_ERR(dentry)); - } - - if (dentry->d_inode == NULL) { - CERROR("trying to write extents to non-existent file "LPU64"\n", - obj->ioo_id); - GOTO(cleanup, rc = -ENOENT); - } - - flags = fsfilt_get_fs_flags(exp->exp_obd, dentry); - if (!(flags & SM_DO_COW)) { - GOTO(cleanup, rc); - } - OBD_ALLOC(extents, obj->ioo_bufcnt * sizeof(struct write_extents)); - if (!extents) { - CERROR("No Memory\n"); - GOTO(cleanup, rc = -ENOMEM); - } - for (j = 0; j < obj->ioo_bufcnt; j++) { - if (rnb[j].len != 0) { - extents[numexts].w_count = rnb[j].len; - extents[numexts].w_pos = rnb[j].offset; - numexts++; - } - } - rc = fsfilt_do_write_cow(exp->exp_obd, dentry, extents, numexts); - if (rc) { - CERROR("Do cow error id "LPU64" rc:%d \n", - obj->ioo_id, rc); - GOTO(cleanup, rc); - } - -cleanup: - if (extents) { - OBD_FREE(extents, obj->ioo_bufcnt * sizeof(struct write_extents)); - } - f_dput(dentry); - pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); - RETURN(rc); - -} -int filter_write_extents(struct obd_export *exp, struct obd_ioobj *obj, int nobj, - int niocount, struct niobuf_local *local, int rc) -{ - struct lvfs_run_ctxt saved; - struct dentry *dentry; - struct niobuf_local *lnb; - __u64 offset = 0; - __u32 len = 0; - int i, flags; - - ENTRY; - - LASSERT(nobj == 1); - - push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); - - dentry = filter_fid2dentry(exp->exp_obd, NULL, obj->ioo_gr, - obj->ioo_id); - if (IS_ERR(dentry)) { - pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); - RETURN (PTR_ERR(dentry)); - } - - if (dentry->d_inode == NULL) { - CERROR("trying to write extents to non-existent file "LPU64"\n", - obj->ioo_id); - GOTO(cleanup, rc = -ENOENT); - } - - flags = fsfilt_get_fs_flags(exp->exp_obd, dentry); - if (!(flags & SM_DO_REC)) { - GOTO(cleanup, rc); - } - - for (i = 0, lnb = local; i < obj->ioo_bufcnt; i++, lnb++) { - if (len == 0) { - offset = lnb->offset; - len = lnb->len; - } else if (lnb->offset == (offset + len)) { - len += lnb->len; - } else { - rc = fsfilt_write_extents(exp->exp_obd, dentry, - offset, len); - if (rc) { - CERROR("write exts off "LPU64" num %u rc:%d\n", - offset, len, rc); - GOTO(cleanup, rc); - } - offset = lnb->offset; - len = lnb->len; - } - } - if (len > 0) { - rc = fsfilt_write_extents(exp->exp_obd, dentry, - offset, len); - if (rc) { - CERROR("write exts off "LPU64" num %u rc:%d\n", - offset, len, rc); - GOTO(cleanup, rc); - } - } -cleanup: - f_dput(dentry); - pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); - RETURN(rc); -} int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa, - int objcount, struct obd_ioobj *obj, int niocount, - struct niobuf_local *res, struct obd_trans_info *oti,int rc) + int objcount, struct obd_ioobj *obj, + struct niobuf_remote *nb, int npages, + struct niobuf_local *res, struct obd_trans_info *oti, + int rc) { if (cmd == OBD_BRW_WRITE) - return filter_commitrw_write(exp, oa, objcount, obj, niocount, - res, oti, rc); + return filter_commitrw_write(exp, oa, objcount, obj, + nb, npages, res, oti, rc); if (cmd == OBD_BRW_READ) - return filter_commitrw_read(exp, oa, objcount, obj, niocount, - res, oti, rc); + return filter_commitrw_read(exp, oa, objcount, obj, + nb, npages, res, oti, rc); LBUG(); return -EPROTO; } -int filter_brw(int cmd, struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md *lsm, obd_count oa_bufs, - struct brw_page *pga, struct obd_trans_info *oti) +int filter_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo, + obd_count oa_bufs, struct brw_page *pga, + struct obd_trans_info *oti) { struct obd_ioobj ioo; struct niobuf_local *lnb; struct niobuf_remote *rnb; obd_count i; - int ret = 0; + int ret = 0, npages; ENTRY; OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local)); @@ -926,41 +1011,23 @@ int filter_brw(int cmd, struct obd_export *exp, struct obdo *oa, GOTO(out, ret = -ENOMEM); for (i = 0; i < oa_bufs; i++) { - rnb[i].offset = pga[i].disk_offset; + lnb[i].page = pga[i].pg; + rnb[i].offset = pga[i].off; rnb[i].len = pga[i].count; } - obdo_to_ioobj(oa, &ioo); + obdo_to_ioobj(oinfo->oi_oa, &ioo); ioo.ioo_bufcnt = oa_bufs; - ret = filter_preprw(cmd, exp, oa, 1, &ioo, oa_bufs, rnb, lnb, oti); + npages = oa_bufs; + ret = filter_preprw(cmd, exp, oinfo->oi_oa, 1, &ioo, + rnb, &npages, lnb, oti, oinfo_capa(oinfo)); if (ret != 0) GOTO(out, ret); + LASSERTF(oa_bufs == npages, "%u != %u\n", oa_bufs, npages); - for (i = 0; i < oa_bufs; i++) { - void *virt; - obd_off off; - void *addr; - - if (lnb[i].page == NULL) - break; - - off = pga[i].disk_offset & ~PAGE_MASK; - virt = kmap(pga[i].pg); - addr = kmap(lnb[i].page); - - /* 2 kmaps == vanishingly small deadlock opportunity */ - - if (cmd & OBD_BRW_WRITE) - memcpy(addr + off, virt + off, pga[i].count); - else - memcpy(virt + off, addr + off, pga[i].count); - - kunmap(lnb[i].page); - kunmap(pga[i].pg); - } - - ret = filter_commitrw(cmd, exp, oa, 1, &ioo, oa_bufs, lnb, oti, ret); + ret = filter_commitrw(cmd, exp, oinfo->oi_oa, 1, &ioo, rnb, + npages, lnb, oti, ret); out: if (lnb)