X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fobdfilter%2Ffilter_io.c;h=368842e30a8575a25138cad23c9e806f0c454fa9;hp=3900ad15bc8b668f92f4490b98c97d30b1393099;hb=9149bb4051b7d33e7e416d3178fc4fc18fef0826;hpb=3de901fceee79de12a31428bcc6ba3a00f10d1fe diff --git a/lustre/obdfilter/filter_io.c b/lustre/obdfilter/filter_io.c index 3900ad1..368842e 100644 --- a/lustre/obdfilter/filter_io.c +++ b/lustre/obdfilter/filter_io.c @@ -1,108 +1,65 @@ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * linux/fs/obdfilter/filter_io.c + * GPL HEADER START * - * Copyright (c) 2001-2003 Cluster File Systems, Inc. - * Author: Peter Braam - * Author: Andreas Dilger - * Author: Phil Schwan + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * - * This file is part of Lustre, http://www.lustre.org. + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. * - * Lustre is free software; you can redistribute it and/or - * modify it under the terms of version 2 of the GNU General Public - * License as published by the Free Software Foundation. + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). * - * Lustre is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf * - * You should have received a copy of the GNU General Public License - * along with Lustre; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + * + * GPL HEADER END + */ +/* + * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Use is subject to license terms. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. + * + * lustre/obdfilter/filter_io.c + * + * Author: Peter Braam + * Author: Andreas Dilger + * Author: Phil Schwan */ #define DEBUG_SUBSYSTEM S_FILTER +#ifndef AUTOCONF_INCLUDED #include +#endif #include #include // XXX kill me soon #include -#include -#include +#include +#include +#include #include "filter_internal.h" -static int filter_start_page_read(struct inode *inode, struct niobuf_local *lnb) -{ - struct address_space *mapping = inode->i_mapping; - struct page *page; - unsigned long index = lnb->offset >> PAGE_SHIFT; - int rc; - - page = grab_cache_page(mapping, index); /* locked page */ - if (page == NULL) - return lnb->rc = -ENOMEM; - - LASSERT(page->mapping == mapping); - - lnb->page = page; - - if (inode->i_size < lnb->offset + lnb->len - 1) - lnb->rc = inode->i_size - lnb->offset; - else - lnb->rc = lnb->len; - - if (PageUptodate(page)) { - unlock_page(page); - return 0; - } - - rc = mapping->a_ops->readpage(NULL, page); - if (rc < 0) { - CERROR("page index %lu, rc = %d\n", index, rc); - lnb->page = NULL; - page_cache_release(page); - return lnb->rc = rc; - } - - return 0; -} - -static int filter_finish_page_read(struct niobuf_local *lnb) -{ - if (lnb->page == NULL) - return 0; - - if (PageUptodate(lnb->page)) - return 0; - - wait_on_page(lnb->page); - if (!PageUptodate(lnb->page)) { - CERROR("page index %lu/offset "LPX64" not uptodate\n", - lnb->page->index, lnb->offset); - GOTO(err_page, lnb->rc = -EIO); - } - if (PageError(lnb->page)) { - CERROR("page index %lu/offset "LPX64" has error\n", - lnb->page->index, lnb->offset); - GOTO(err_page, lnb->rc = -EIO); - } - - return 0; - -err_page: - page_cache_release(lnb->page); - lnb->page = NULL; - return lnb->rc; -} +int *obdfilter_created_scratchpad; /* Grab the dirty and seen grant announcements from the incoming obdo. * We will later calculate the clients new grant and return it. * Caller must hold osfs lock */ -static void filter_grant_incoming(struct obd_export *exp, struct obdo *oa) +void filter_grant_incoming(struct obd_export *exp, struct obdo *oa) { struct filter_export_data *fed; struct obd_device *obd = exp->exp_obd; @@ -122,8 +79,7 @@ static void filter_grant_incoming(struct obd_export *exp, struct obdo *oa) /* Add some margin, since there is a small race if other RPCs arrive * out-or-order and have already consumed some grant. We want to * leave this here in case there is a large error in accounting. */ - CDEBUG(oa->o_grant > fed->fed_grant + FILTER_GRANT_CHUNK ? - D_WARNING : D_CACHE, + CDEBUG(D_CACHE, "%s: cli %s/%p reports grant: "LPU64" dropped: %u, local: %lu\n", obd->obd_name, exp->exp_client_uuid.uuid, exp, oa->o_grant, oa->o_dropped, fed->fed_grant); @@ -131,10 +87,14 @@ static void filter_grant_incoming(struct obd_export *exp, struct obdo *oa) /* Update our accounting now so that statfs takes it into account. * Note that fed_dirty is only approximate and can become incorrect * if RPCs arrive out-of-order. No important calculations depend - * on fed_dirty however. */ + * on fed_dirty however, but we must check sanity to not assert. */ + if ((long long)oa->o_dirty < 0) + oa->o_dirty = 0; + else if (oa->o_dirty > fed->fed_grant + 4 * FILTER_GRANT_CHUNK) + oa->o_dirty = fed->fed_grant + 4 * FILTER_GRANT_CHUNK; obd->u.filter.fo_tot_dirty += oa->o_dirty - fed->fed_dirty; if (fed->fed_grant < oa->o_dropped) { - CERROR("%s: cli %s/%p reports %u dropped > fed_grant %lu\n", + CDEBUG(D_CACHE,"%s: cli %s/%p reports %u dropped > grant %lu\n", obd->obd_name, exp->exp_client_uuid.uuid, exp, oa->o_dropped, fed->fed_grant); oa->o_dropped = 0; @@ -148,11 +108,36 @@ static void filter_grant_incoming(struct obd_export *exp, struct obdo *oa) obd->u.filter.fo_tot_granted -= oa->o_dropped; fed->fed_grant -= oa->o_dropped; fed->fed_dirty = oa->o_dirty; + + if (oa->o_flags & OBD_FL_SHRINK_GRANT) { + obd_size left_space = filter_grant_space_left(exp); + struct filter_obd *filter = &exp->exp_obd->u.filter; + + /*Only if left_space < fo_tot_clients * 32M, + *then the grant space could be shrinked */ + if (left_space < filter->fo_tot_granted_clients * + FILTER_GRANT_SHRINK_LIMIT) { + fed->fed_grant -= oa->o_grant; + filter->fo_tot_granted -= oa->o_grant; + CDEBUG(D_CACHE, "%s: cli %s/%p shrink "LPU64 + "fed_grant %ld total "LPU64"\n", + obd->obd_name, exp->exp_client_uuid.uuid, + exp, oa->o_grant, fed->fed_grant, + filter->fo_tot_granted); + oa->o_grant = 0; + } + } + + if (fed->fed_dirty < 0 || fed->fed_grant < 0 || fed->fed_pending < 0) { + CERROR("%s: cli %s/%p dirty %ld pend %ld grant %ld\n", + obd->obd_name, exp->exp_client_uuid.uuid, exp, + fed->fed_dirty, fed->fed_pending, fed->fed_grant); + spin_unlock(&obd->obd_osfs_lock); + LBUG(); + } EXIT; } -#define GRANT_FOR_LLOG(obd) 16 - /* Figure out how much space is available between what we've granted * and what remains in the filesystem. Compensate for ext3 indirect * block overhead when computing how much free space is left ungranted. @@ -161,15 +146,16 @@ static void filter_grant_incoming(struct obd_export *exp, struct obdo *oa) obd_size filter_grant_space_left(struct obd_export *exp) { struct obd_device *obd = exp->exp_obd; - int blockbits = obd->u.filter.fo_sb->s_blocksize_bits; + int blockbits = obd->u.obt.obt_sb->s_blocksize_bits; obd_size tot_granted = obd->u.filter.fo_tot_granted, avail, left = 0; int rc, statfs_done = 0; LASSERT_SPIN_LOCKED(&obd->obd_osfs_lock); - if (time_before(obd->obd_osfs_age, jiffies - HZ)) { + if (cfs_time_before_64(obd->obd_osfs_age, cfs_time_current_64() - HZ)) { restat: - rc = fsfilt_statfs(obd, obd->u.filter.fo_sb, jiffies + 1); + rc = fsfilt_statfs(obd, obd->u.obt.obt_sb, + cfs_time_current_64() + HZ); if (rc) /* N.B. statfs can't really fail */ RETURN(0); statfs_done = 1; @@ -191,18 +177,11 @@ restat: if (left >= tot_granted) { left -= tot_granted; } else { - static unsigned long next; - if (left < tot_granted - obd->u.filter.fo_tot_pending && - time_after(jiffies, next)) { - spin_unlock(&obd->obd_osfs_lock); + if (left < tot_granted - obd->u.filter.fo_tot_pending) { CERROR("%s: cli %s/%p grant "LPU64" > available " LPU64" and pending "LPU64"\n", obd->obd_name, exp->exp_client_uuid.uuid, exp, tot_granted, left, obd->u.filter.fo_tot_pending); - if (next == 0) - portals_debug_dumplog(); - next = jiffies + 20 * HZ; - spin_lock(&obd->obd_osfs_lock); } left = 0; } @@ -219,13 +198,17 @@ restat: /* Calculate how much grant space to allocate to this client, based on how * much space is currently free and how much of that is already granted. * + * if @conservative != 0, we limit the maximum grant to FILTER_GRANT_CHUNK; + * otherwise we'll satisfy the requested amount as possible as we can, this + * is usually due to client reconnect. + * * Caller must hold obd_osfs_lock. */ long filter_grant(struct obd_export *exp, obd_size current_grant, - obd_size want, obd_size fs_space_left) + obd_size want, obd_size fs_space_left, int conservative) { struct obd_device *obd = exp->exp_obd; struct filter_export_data *fed = &exp->exp_filter_data; - int blockbits = obd->u.filter.fo_sb->s_blocksize_bits; + int blockbits = obd->u.obt.obt_sb->s_blocksize_bits; __u64 grant = 0; LASSERT_SPIN_LOCKED(&obd->obd_osfs_lock); @@ -239,23 +222,35 @@ long filter_grant(struct obd_export *exp, obd_size current_grant, * has and what we think it has, don't grant very much and let the * client consume its grant first. Either it just has lots of RPCs * in flight, or it was evicted and its grants will soon be used up. */ - if (current_grant < want && - current_grant < fed->fed_grant + FILTER_GRANT_CHUNK) { - grant = min((want >> blockbits) / 2, - (fs_space_left >> blockbits) / 8); - grant <<= blockbits; + if (want > 0x7fffffff) { + CERROR("%s: client %s/%p requesting > 2GB grant "LPU64"\n", + obd->obd_name, exp->exp_client_uuid.uuid, exp, want); + } else if (current_grant < want && + current_grant < fed->fed_grant + FILTER_GRANT_CHUNK) { + grant = min(want + (1 << blockbits) - 1, fs_space_left / 8); + grant &= ~((1ULL << blockbits) - 1); if (grant) { - if (grant > FILTER_GRANT_CHUNK) + if (grant > FILTER_GRANT_CHUNK && conservative) grant = FILTER_GRANT_CHUNK; obd->u.filter.fo_tot_granted += grant; fed->fed_grant += grant; + if (fed->fed_grant < 0) { + CERROR("%s: cli %s/%p grant %ld want "LPU64 + "current"LPU64"\n", + obd->obd_name, exp->exp_client_uuid.uuid, + exp, fed->fed_grant, want,current_grant); + spin_unlock(&obd->obd_osfs_lock); + LBUG(); + } } } - CDEBUG(D_CACHE,"%s: cli %s/%p wants: "LPU64" granting: "LPU64"\n", - obd->obd_name, exp->exp_client_uuid.uuid, exp, want, grant); + CDEBUG(D_CACHE, + "%s: cli %s/%p wants: "LPU64" current grant "LPU64 + " granting: "LPU64"\n", obd->obd_name, exp->exp_client_uuid.uuid, + exp, want, current_grant, grant); CDEBUG(D_CACHE, "%s: cli %s/%p tot cached:"LPU64" granted:"LPU64 " num_exports: %d\n", obd->obd_name, exp->exp_client_uuid.uuid, @@ -265,153 +260,256 @@ long filter_grant(struct obd_export *exp, obd_size current_grant, return grant; } +/* + * the routine is used to request pages from pagecache + * + * use GFP_NOFS for requests from a local client not allowing to enter FS + * as we might end up waiting on a page he sent in the request we're serving. + * use __GFP_HIGHMEM so that the pages can use all of the available memory + * on 32-bit machines + * use more agressive GFP_HIGHUSER flags from non-local clients to be able to + * generate more memory pressure, but at the same time use __GFP_NOMEMALLOC + * in order not to exhaust emergency reserves. + * + * See Bug 19529 and Bug 19917 for details. + */ +static struct page *filter_get_page(struct obd_device *obd, + struct inode *inode, + obd_off offset, + int localreq) +{ + struct page *page; + + page = find_or_create_page(inode->i_mapping, offset >> CFS_PAGE_SHIFT, + (localreq ? (GFP_NOFS | __GFP_HIGHMEM) + : GFP_HIGHUSER)); + if (unlikely(page == NULL)) + lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_NO_PAGE, 1); + + return page; +} + +/* + * the routine initializes array of local_niobuf from remote_niobuf + */ +static int filter_map_remote_to_local(int objcount, struct obd_ioobj *obj, + struct niobuf_remote *nb, + int *nrpages, struct niobuf_local *res) +{ + struct niobuf_remote *rnb; + struct niobuf_local *lnb; + int i, max; + ENTRY; + + /* we don't support multiobject RPC yet + * ost_brw_read() and ost_brw_write() check this */ + LASSERT(objcount == 1); + + max = *nrpages; + *nrpages = 0; + for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt; i++, rnb++) { + obd_off offset = rnb->offset; + unsigned int len = rnb->len; + + while (len > 0) { + int poff = offset & (CFS_PAGE_SIZE - 1); + int plen = CFS_PAGE_SIZE - poff; + + if (*nrpages >= max) { + CERROR("small array of local bufs: %d\n", max); + RETURN(-EINVAL); + } + + if (plen > len) + plen = len; + lnb->offset = offset; + lnb->len = plen; + lnb->flags = rnb->flags; + lnb->page = NULL; + lnb->rc = 0; + lnb->lnb_grant_used = 0; + + LASSERTF(plen <= len, "plen %u, len %u\n", plen, len); + offset += plen; + len -= plen; + lnb++; + (*nrpages)++; + } + } + RETURN(0); +} + +/* + * the invalidate above doesn't work during read because lnet pins pages. + * The truncate is used here instead to drop pages from cache + */ +void filter_release_cache(struct obd_device *obd, struct obd_ioobj *obj, + struct niobuf_remote *rnb, struct inode *inode) +{ + int i; + + LASSERT(inode != NULL); + for (i = 0; i < obj->ioo_bufcnt; i++, rnb++) { +#ifdef HAVE_TRUNCATE_RANGE + /* remove pages in which range is fit */ + truncate_inode_pages_range(inode->i_mapping, + rnb->offset & CFS_PAGE_MASK, + (rnb->offset + rnb->len - 1) | + ~CFS_PAGE_MASK); +#else + /* use invalidate for old kernels */ + invalidate_mapping_pages(inode->i_mapping, + rnb->offset >> CFS_PAGE_SHIFT, + (rnb->offset + rnb->len) >> + CFS_PAGE_SHIFT); +#endif + } +} + static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, int objcount, struct obd_ioobj *obj, - int niocount, struct niobuf_remote *nb, - struct niobuf_local *res, - struct obd_trans_info *oti) + struct niobuf_remote *nb, + int *npages, struct niobuf_local *res, + struct obd_trans_info *oti, + struct lustre_capa *capa) { struct obd_device *obd = exp->exp_obd; - struct obd_run_ctxt saved; - struct obd_ioobj *o; - struct niobuf_remote *rnb; - struct niobuf_local *lnb = NULL; - struct fsfilt_objinfo *fso; - struct dentry *dentry; - struct inode *inode; - int rc = 0, i, j, tot_bytes = 0, cleanup_phase = 0; + struct timeval start, end; + struct lvfs_run_ctxt saved; + struct niobuf_local *lnb; + struct dentry *dentry = NULL; + struct inode *inode = NULL; + void *iobuf = NULL; + int rc = 0, i, tot_bytes = 0; unsigned long now = jiffies; + long timediff; + loff_t isize; ENTRY; /* We are currently not supporting multi-obj BRW_READ RPCS at all. - * When we do this function's dentry cleanup will need to be fixed */ - LASSERT(objcount == 1); - LASSERT(obj->ioo_bufcnt > 0); + * When we do this function's dentry cleanup will need to be fixed. + * These values are verified in ost_brw_write() from the wire. */ + LASSERTF(objcount == 1, "%d\n", objcount); + LASSERTF(obj->ioo_bufcnt > 0, "%d\n", obj->ioo_bufcnt); + + LASSERT(oa->o_valid & OBD_MD_FLGROUP); + rc = filter_auth_capa(exp, NULL, oa->o_gr, capa, + CAPA_OPC_OSS_READ); + if (rc) + RETURN(rc); if (oa && oa->o_valid & OBD_MD_FLGRANT) { spin_lock(&obd->obd_osfs_lock); filter_grant_incoming(exp, oa); -#if 0 - /* Reads do not increase grants */ - oa->o_grant = filter_grant(exp, oa->o_grant, oa->o_undirty, - filter_grant_space_left(exp)); -#else - oa->o_grant = 0; -#endif + if (!(oa->o_flags & OBD_FL_SHRINK_GRANT)) + oa->o_grant = 0; spin_unlock(&obd->obd_osfs_lock); } - OBD_ALLOC(fso, objcount * sizeof(*fso)); - if (fso == NULL) - RETURN(-ENOMEM); + iobuf = filter_iobuf_get(&obd->u.filter, oti); + if (IS_ERR(iobuf)) + RETURN(PTR_ERR(iobuf)); - memset(res, 0, niocount * sizeof(*res)); + push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + dentry = filter_oa2dentry(obd, oa); + if (IS_ERR(dentry)) { + rc = PTR_ERR(dentry); + dentry = NULL; + GOTO(cleanup, rc); + } - push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL); - for (i = 0, o = obj; i < objcount; i++, o++) { - LASSERT(o->ioo_bufcnt); + inode = dentry->d_inode; + isize = i_size_read(inode); - dentry = filter_oa2dentry(obd, oa); - if (IS_ERR(dentry)) - GOTO(cleanup, rc = PTR_ERR(dentry)); + obdo_to_inode(inode, oa, OBD_MD_FLATIME); - if (dentry->d_inode == NULL) { - CERROR("trying to BRW to non-existent file "LPU64"\n", - o->ioo_id); - f_dput(dentry); - GOTO(cleanup, rc = -ENOENT); - } + rc = filter_map_remote_to_local(objcount, obj, nb, npages, res); + if (rc) + GOTO(cleanup, rc); - fso[i].fso_dentry = dentry; - fso[i].fso_bufcnt = o->ioo_bufcnt; - } + fsfilt_check_slow(obd, now, "preprw_read setup"); - if (time_after(jiffies, now + 15 * HZ)) - CERROR("slow preprw_read setup %lus\n", (jiffies - now) / HZ); - else - CDEBUG(D_INFO, "preprw_read setup: %lu jiffies\n", - (jiffies - now)); - - for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) { - dentry = fso[i].fso_dentry; - inode = dentry->d_inode; - - for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) { - lnb->dentry = dentry; - lnb->offset = rnb->offset; - lnb->len = rnb->len; - lnb->flags = rnb->flags; - - if (inode->i_size <= rnb->offset) { - /* If there's no more data, abort early. - * lnb->page == NULL and lnb->rc == 0, so it's - * easy to detect later. */ - break; - } else { - rc = filter_start_page_read(inode, lnb); - } + /* find pages for all segments, fill array with them */ + do_gettimeofday(&start); + for (i = 0, lnb = res; i < *npages; i++, lnb++) { - if (rc) { - CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, - "page err %u@"LPU64" %u/%u %p: rc %d\n", - lnb->len, lnb->offset, j, o->ioo_bufcnt, - dentry, rc); - cleanup_phase = 1; - GOTO(cleanup, rc); - } + lnb->dentry = dentry; - tot_bytes += lnb->rc; - if (lnb->rc < lnb->len) { - /* short read, be sure to wait on it */ - lnb++; - break; - } - } - } + if (isize <= lnb->offset) + /* If there's no more data, abort early. lnb->rc == 0, + * so it's easy to detect later. */ + break; - if (time_after(jiffies, now + 15 * HZ)) - CERROR("slow start_page_read %lus\n", (jiffies - now) / HZ); - else - CDEBUG(D_INFO, "start_page_read: %lu jiffies\n", - (jiffies - now)); + lnb->page = filter_get_page(obd, inode, lnb->offset, 0); + if (lnb->page == NULL) + GOTO(cleanup, rc = -ENOMEM); - lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES, tot_bytes); - while (lnb-- > res) { - rc = filter_finish_page_read(lnb); - if (rc) { - CERROR("error page %u@"LPU64" %u %p: rc %d\n", lnb->len, - lnb->offset, (int)(lnb - res), lnb->dentry, rc); - cleanup_phase = 1; - GOTO(cleanup, rc); + lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_CACHE_ACCESS, 1); + + if (isize < lnb->offset + lnb->len - 1) + lnb->rc = isize - lnb->offset; + else + lnb->rc = lnb->len; + + tot_bytes += lnb->rc; + + if (PageUptodate(lnb->page)) { + lprocfs_counter_add(obd->obd_stats, + LPROC_FILTER_CACHE_HIT, 1); + continue; } + + lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_CACHE_MISS, 1); + filter_iobuf_add_page(obd, iobuf, inode, lnb->page); } + do_gettimeofday(&end); + timediff = cfs_timeval_sub(&end, &start, NULL); + lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_GET_PAGE, timediff); - if (time_after(jiffies, now + 15 * HZ)) - CERROR("slow finish_page_read %lus\n", (jiffies - now) / HZ); - else - CDEBUG(D_INFO, "finish_page_read: %lu jiffies\n", - (jiffies - now)); + if (OBD_FAIL_CHECK(OBD_FAIL_OST_NOMEM)) + GOTO(cleanup, rc = -ENOMEM); + + fsfilt_check_slow(obd, now, "start_page_read"); + + rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf, + exp, NULL, NULL, NULL); + if (rc) + GOTO(cleanup, rc); + + lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES, tot_bytes); - filter_tally_read(&exp->exp_obd->u.filter, res, niocount); + if (exp->exp_nid_stats && exp->exp_nid_stats->nid_stats) + lprocfs_counter_add(exp->exp_nid_stats->nid_stats, + LPROC_FILTER_READ_BYTES, tot_bytes); EXIT; cleanup: - switch (cleanup_phase) { - case 1: - for (lnb = res; lnb < (res + niocount); lnb++) { - if (lnb->page) + /* unlock pages to allow access from concurrent OST_READ */ + for (i = 0, lnb = res; i < *npages; i++, lnb++) { + if (lnb->page) { + LASSERT(PageLocked(lnb->page)); + unlock_page(lnb->page); + + if (rc) { page_cache_release(lnb->page); + lnb->page = NULL; + } } - if (res->dentry != NULL) - f_dput(res->dentry); - else - CERROR("NULL dentry in cleanup -- tell CFS\n"); - case 0: - OBD_FREE(fso, objcount * sizeof(*fso)); - pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL); } + + if (rc != 0) { + if (dentry != NULL) + f_dput(dentry); + } + + filter_iobuf_put(&obd->u.filter, iobuf, oti); + + pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + if (rc) + CERROR("io error %d\n", rc); + return rc; } @@ -423,16 +521,15 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, * right on through. * * Caller must hold obd_osfs_lock. */ -static int filter_grant_check(struct obd_export *exp, int objcount, - struct fsfilt_objinfo *fso, int niocount, - struct niobuf_remote *rnb, - struct niobuf_local *lnb, obd_size *left, - struct inode *inode) +static int filter_grant_check(struct obd_export *exp, struct obdo *oa, + int objcount, struct fsfilt_objinfo *fso, + int niocount, struct niobuf_local *lnb, + obd_size *left, struct inode *inode) { struct filter_export_data *fed = &exp->exp_filter_data; - int blocksize = exp->exp_obd->u.filter.fo_sb->s_blocksize; + int blocksize = exp->exp_obd->u.obt.obt_sb->s_blocksize; unsigned long used = 0, ungranted = 0, using; - int i, rc = -ENOSPC, obj, n = 0, mask = D_CACHE; + int i, rc = -ENOSPC, obj, n = 0; LASSERT_SPIN_LOCKED(&exp->exp_obd->obd_osfs_lock); @@ -440,14 +537,15 @@ static int filter_grant_check(struct obd_export *exp, int objcount, for (i = 0; i < fso[obj].fso_bufcnt; i++, n++) { int tmp, bytes; - /* FIXME: this is calculated with PAGE_SIZE on client */ - bytes = rnb[n].len; - bytes += rnb[n].offset & (blocksize - 1); - tmp = (rnb[n].offset + rnb[n].len) & (blocksize - 1); + /* should match the code in osc_exit_cache */ + bytes = lnb[n].len; + bytes += lnb[n].offset & (blocksize - 1); + tmp = (lnb[n].offset + lnb[n].len) & (blocksize - 1); if (tmp) bytes += blocksize - tmp; - if (rnb[n].flags & OBD_BRW_FROM_GRANT) { + if ((lnb[n].flags & OBD_BRW_FROM_GRANT) && + (oa->o_valid & OBD_MD_FLGRANT)) { if (fed->fed_grant < used + bytes) { CDEBUG(D_CACHE, "%s: cli %s/%p claims %ld+%d " @@ -455,20 +553,20 @@ static int filter_grant_check(struct obd_export *exp, int objcount, exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp, used, bytes, fed->fed_grant, n); - mask = D_ERROR; } else { used += bytes; - rnb[n].flags |= OBD_BRW_GRANTED; + lnb[n].flags |= OBD_BRW_GRANTED; lnb[n].lnb_grant_used = bytes; CDEBUG(0, "idx %d used=%lu\n", n, used); rc = 0; continue; } } - if (*left > ungranted) { + if (*left > ungranted + bytes) { /* if enough space, pretend it was granted */ ungranted += bytes; - rnb[n].flags |= OBD_BRW_GRANTED; + lnb[n].flags |= OBD_BRW_GRANTED; + lnb[n].lnb_grant_used = bytes; CDEBUG(0, "idx %d ungranted=%lu\n",n,ungranted); rc = 0; continue; @@ -481,7 +579,7 @@ static int filter_grant_check(struct obd_export *exp, int objcount, * marked BRW_GRANTED are already mapped and we can * ignore this error. */ lnb[n].rc = -ENOSPC; - rnb[n].flags &= OBD_BRW_GRANTED; + lnb[n].flags &= ~OBD_BRW_GRANTED; CDEBUG(D_CACHE,"%s: cli %s/%p idx %d no space for %d\n", exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp, n, bytes); @@ -494,17 +592,18 @@ static int filter_grant_check(struct obd_export *exp, int objcount, * happens in filter_grant_commit() after the writes are done. */ *left -= ungranted; fed->fed_grant -= used; - fed->fed_pending += used; - exp->exp_obd->u.filter.fo_tot_pending += used; + fed->fed_pending += used + ungranted; + exp->exp_obd->u.filter.fo_tot_granted += ungranted; + exp->exp_obd->u.filter.fo_tot_pending += used + ungranted; - CDEBUG(mask, + CDEBUG(D_CACHE, "%s: cli %s/%p used: %lu ungranted: %lu grant: %lu dirty: %lu\n", exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp, used, ungranted, fed->fed_grant, fed->fed_dirty); /* Rough calc in case we don't refresh cached statfs data */ using = (used + ungranted + 1 ) >> - exp->exp_obd->u.filter.fo_sb->s_blocksize_bits; + exp->exp_obd->u.obt.obt_sb->s_blocksize_bits; if (exp->exp_obd->obd_osfs.os_bavail > using) exp->exp_obd->obd_osfs.os_bavail -= using; else @@ -519,26 +618,14 @@ static int filter_grant_check(struct obd_export *exp, int objcount, exp->exp_obd->u.filter.fo_tot_dirty -= used; fed->fed_dirty -= used; - return rc; -} - -static int filter_start_page_write(struct inode *inode, - struct niobuf_local *lnb) -{ - struct page *page = alloc_pages(GFP_HIGHUSER, 0); - if (page == NULL) { - CERROR("no memory for a temp page\n"); - RETURN(lnb->rc = -ENOMEM); - } - POISON_PAGE(page, 0xf1); - if (lnb->len != PAGE_SIZE) { - memset(kmap(page) + lnb->len, 0, PAGE_SIZE - lnb->len); - kunmap(page); + if (fed->fed_dirty < 0 || fed->fed_grant < 0 || fed->fed_pending < 0) { + CERROR("%s: cli %s/%p dirty %ld pend %ld grant %ld\n", + exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp, + fed->fed_dirty, fed->fed_pending, fed->fed_grant); + spin_unlock(&exp->exp_obd->obd_osfs_lock); + LBUG(); } - page->index = lnb->offset >> PAGE_SHIFT; - lnb->page = page; - - return 0; + return rc; } /* If we ever start to support multi-object BRW RPCs, we will need to get locks @@ -553,200 +640,288 @@ static int filter_start_page_write(struct inode *inode, * bug) or ensure we get the page locks in an appropriate order. */ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa, int objcount, struct obd_ioobj *obj, - int niocount, struct niobuf_remote *nb, + struct niobuf_remote *nb, int *npages, struct niobuf_local *res, - struct obd_trans_info *oti) + struct obd_trans_info *oti, + struct lustre_capa *capa) { - struct obd_run_ctxt saved; - struct niobuf_remote *rnb; - struct niobuf_local *lnb; + struct obd_device *obd = exp->exp_obd; + struct timeval start, end; + struct lvfs_run_ctxt saved; + struct niobuf_local *lnb = res; struct fsfilt_objinfo fso; - struct dentry *dentry; + struct filter_mod_data *fmd; + struct dentry *dentry = NULL; + void *iobuf; obd_size left; - unsigned long now = jiffies; - int rc = 0, i, tot_bytes = 0, cleanup_phase = 1; + unsigned long now = jiffies, timediff; + int rc = 0, i, tot_bytes = 0, cleanup_phase = 0, localreq = 0; ENTRY; LASSERT(objcount == 1); LASSERT(obj->ioo_bufcnt > 0); - memset(res, 0, niocount * sizeof(*res)); + LASSERT(oa->o_valid & OBD_MD_FLGROUP); + rc = filter_auth_capa(exp, NULL, oa->o_gr, capa, + CAPA_OPC_OSS_WRITE); + if (rc) + RETURN(rc); - push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL); - dentry = filter_fid2dentry(exp->exp_obd, NULL, obj->ioo_gr, + if (exp->exp_connection && + exp->exp_connection->c_peer.nid == exp->exp_connection->c_self) + localreq = 1; + + push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + iobuf = filter_iobuf_get(&obd->u.filter, oti); + if (IS_ERR(iobuf)) + GOTO(cleanup, rc = PTR_ERR(iobuf)); + cleanup_phase = 1; + + dentry = filter_fid2dentry(obd, NULL, obj->ioo_gr, obj->ioo_id); if (IS_ERR(dentry)) GOTO(cleanup, rc = PTR_ERR(dentry)); + cleanup_phase = 2; if (dentry->d_inode == NULL) { - CERROR("trying to BRW to non-existent file "LPU64"\n", - obj->ioo_id); - f_dput(dentry); + CERROR("%s: trying to BRW to non-existent file "LPU64"\n", + obd->obd_name, obj->ioo_id); GOTO(cleanup, rc = -ENOENT); } - fso.fso_dentry = dentry; - fso.fso_bufcnt = obj->ioo_bufcnt; + if (oa->o_valid & (OBD_MD_FLUID | OBD_MD_FLGID) && + dentry->d_inode->i_mode & (S_ISUID | S_ISGID)) { + rc = filter_capa_fixoa(exp, oa, oa->o_gr, capa); + if (rc) + GOTO(cleanup, rc); + } - if (time_after(jiffies, now + 15 * HZ)) - CERROR("slow preprw_write setup %lus\n", (jiffies - now) / HZ); - else - CDEBUG(D_INFO, "preprw_write setup: %lu jiffies\n", - (jiffies - now)); + rc = filter_map_remote_to_local(objcount, obj, nb, npages, res); + if (rc) + GOTO(cleanup, rc); - spin_lock(&exp->exp_obd->obd_osfs_lock); - if (oa) - filter_grant_incoming(exp, oa); - cleanup_phase = 0; + fsfilt_check_slow(obd, now, "preprw_write setup"); + + /* Don't update inode timestamps if this write is older than a + * setattr which modifies the timestamps. b=10150 */ + /* XXX when we start having persistent reservations this needs to + * be changed to filter_fmd_get() to create the fmd if it doesn't + * already exist so we can store the reservation handle there. */ + fmd = filter_fmd_find(exp, obj->ioo_id, obj->ioo_gr); + + LASSERT(oa != NULL); + spin_lock(&obd->obd_osfs_lock); + filter_grant_incoming(exp, oa); + if (fmd && fmd->fmd_mactime_xid > oti->oti_xid) + oa->o_valid &= ~(OBD_MD_FLMTIME | OBD_MD_FLCTIME | + OBD_MD_FLATIME); + else + obdo_to_inode(dentry->d_inode, oa, OBD_MD_FLATIME | + OBD_MD_FLMTIME | OBD_MD_FLCTIME); + cleanup_phase = 3; left = filter_grant_space_left(exp); - rc = filter_grant_check(exp, objcount, &fso, niocount, nb, res, + fso.fso_dentry = dentry; + fso.fso_bufcnt = *npages; + + rc = filter_grant_check(exp, oa, objcount, &fso, *npages, res, &left, dentry->d_inode); - if (oa && oa->o_valid & OBD_MD_FLGRANT) - oa->o_grant = filter_grant(exp,oa->o_grant,oa->o_undirty,left); - spin_unlock(&exp->exp_obd->obd_osfs_lock); + /* do not zero out oa->o_valid as it is used in filter_commitrw_write() + * for setting UID/GID and fid EA in first write time. */ + if (oa->o_valid & OBD_MD_FLGRANT) + oa->o_grant = filter_grant(exp, oa->o_grant, oa->o_undirty, + left, 1); + + spin_unlock(&obd->obd_osfs_lock); + filter_fmd_put(exp, fmd); - if (rc) { - f_dput(dentry); + if (rc) GOTO(cleanup, rc); - } + cleanup_phase = 4; + + /* Filter truncate first locks i_mutex then partally truncated + * page, filter write code first locks pages then take + * i_mutex. To avoid a deadlock in case of concurrent + * punch/write requests from one client, filter writes and + * filter truncates are serialized by i_alloc_sem, allowing + * multiple writes or single truncate. */ + down_read(&dentry->d_inode->i_alloc_sem); + + do_gettimeofday(&start); + for (i = 0, lnb = res; i < *npages; i++, lnb++) { - for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt; - i++, lnb++, rnb++) { /* We still set up for ungranted pages so that granted pages * can be written to disk as they were promised, and portals - * needs to keep the pages all aligned properly. */ + * needs to keep the pages all aligned properly. */ lnb->dentry = dentry; - lnb->offset = rnb->offset; - lnb->len = rnb->len; - lnb->flags = rnb->flags; - rc = filter_start_page_write(dentry->d_inode, lnb); - if (rc) { - CERROR("page err %u@"LPU64" %u/%u %p: rc %d\n", - lnb->len, lnb->offset, - i, obj->ioo_bufcnt, dentry, rc); - while (lnb-- > res) - __free_pages(lnb->page, 0); - f_dput(dentry); - GOTO(cleanup, rc); + lnb->page = filter_get_page(obd, dentry->d_inode, lnb->offset, + localreq); + if (lnb->page == NULL) + GOTO(cleanup, rc = -ENOMEM); + + /* DLM locking protects us from write and truncate competing + * for same region, but truncate can leave dirty page in the + * cache. it's possible the writeout on a such a page is in + * progress when we access it. it's also possible that during + * this writeout we put new (partial) data, but then won't + * be able to proceed in filter_commitrw_write(). thus let's + * just wait for writeout completion, should be rare enough. + * -bzzz */ + wait_on_page_writeback(lnb->page); + BUG_ON(PageWriteback(lnb->page)); + + /* If the filter writes a partial page, then has the file + * extended, the client will read in the whole page. the + * filter has to be careful to zero the rest of the partial + * page on disk. we do it by hand for partial extending + * writes, send_bio() is responsible for zeroing pages when + * asked to read unmapped blocks -- brw_kiovec() does this. */ + if (lnb->len != CFS_PAGE_SIZE) { + __s64 maxidx; + + maxidx = ((i_size_read(dentry->d_inode) + + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT) - 1; + if (maxidx >= lnb->page->index) { + LL_CDEBUG_PAGE(D_PAGE, lnb->page, "write %u @ " + LPU64" flg %x before EOF %llu\n", + lnb->len, lnb->offset,lnb->flags, + i_size_read(dentry->d_inode)); + filter_iobuf_add_page(obd, iobuf, + dentry->d_inode, + lnb->page); + } else { + long off; + char *p = kmap(lnb->page); + + off = lnb->offset & ~CFS_PAGE_MASK; + if (off) + memset(p, 0, off); + off = (lnb->offset + lnb->len) & ~CFS_PAGE_MASK; + if (off) + memset(p + off, 0, CFS_PAGE_SIZE - off); + kunmap(lnb->page); + } } if (lnb->rc == 0) tot_bytes += lnb->len; } + do_gettimeofday(&end); + timediff = cfs_timeval_sub(&end, &start, NULL); + lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_GET_PAGE, timediff); - if (time_after(jiffies, now + 15 * HZ)) - CERROR("slow start_page_write %lus\n", (jiffies - now) / HZ); - else - CDEBUG(D_INFO, "start_page_write: %lu jiffies\n", - (jiffies - now)); + if (OBD_FAIL_CHECK(OBD_FAIL_OST_NOMEM)) + GOTO(cleanup, rc = -ENOMEM); + + /* don't unlock pages to prevent any access */ + rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf, exp, + NULL, NULL, NULL); - lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_WRITE_BYTES, - tot_bytes); + fsfilt_check_slow(obd, now, "start_page_write"); + + if (exp->exp_nid_stats && exp->exp_nid_stats->nid_stats) + lprocfs_counter_add(exp->exp_nid_stats->nid_stats, + LPROC_FILTER_WRITE_BYTES, tot_bytes); EXIT; cleanup: switch(cleanup_phase) { + case 4: + if (rc) { + for (i = 0, lnb = res; i < *npages; i++, lnb++) { + if (lnb->page != NULL) { + unlock_page(lnb->page); + page_cache_release(lnb->page); + lnb->page = NULL; + } + } + up_read(&dentry->d_inode->i_alloc_sem); + } + case 3: + filter_iobuf_put(&obd->u.filter, iobuf, oti); + case 2: + pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + if (rc) + f_dput(dentry); + break; case 1: - spin_lock(&exp->exp_obd->obd_osfs_lock); + filter_iobuf_put(&obd->u.filter, iobuf, oti); + case 0: + spin_lock(&obd->obd_osfs_lock); if (oa) filter_grant_incoming(exp, oa); - spin_unlock(&exp->exp_obd->obd_osfs_lock); - default: ; + spin_unlock(&obd->obd_osfs_lock); + pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + break; + default:; } - pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL); return rc; } int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa, - int objcount, struct obd_ioobj *obj, int niocount, - struct niobuf_remote *nb, struct niobuf_local *res, - struct obd_trans_info *oti) + int objcount, struct obd_ioobj *obj, + struct niobuf_remote *nb, int *npages, + struct niobuf_local *res, struct obd_trans_info *oti, + struct lustre_capa *capa) { if (cmd == OBD_BRW_WRITE) return filter_preprw_write(cmd, exp, oa, objcount, obj, - niocount, nb, res, oti); - + nb, npages, res, oti, capa); if (cmd == OBD_BRW_READ) return filter_preprw_read(cmd, exp, oa, objcount, obj, - niocount, nb, res, oti); - + nb, npages, res, oti, capa); LBUG(); return -EPROTO; } static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa, int objcount, struct obd_ioobj *obj, - int niocount, struct niobuf_local *res, + struct niobuf_remote *rnb, + int npages, struct niobuf_local *res, struct obd_trans_info *oti, int rc) { - struct obd_ioobj *o; + struct filter_obd *fo = &exp->exp_obd->u.filter; + struct inode *inode = NULL; + struct ldlm_res_id res_id; + struct ldlm_resource *resource = NULL; + struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; struct niobuf_local *lnb; - int i, j, drop = 0; + int i; ENTRY; + osc_build_res_name(obj->ioo_id, obj->ioo_gr, &res_id); + /* If oa != NULL then filter_preprw_read updated the inode atime + * and we should update the lvb so that other glimpses will also + * get the updated value. bug 5972 */ + if (oa && ns && ns->ns_lvbo && ns->ns_lvbo->lvbo_update) { + resource = ldlm_resource_get(ns, NULL, &res_id, LDLM_EXTENT, 0); + + if (resource != NULL) { + LDLM_RESOURCE_ADDREF(resource); + ns->ns_lvbo->lvbo_update(resource, NULL, 1); + LDLM_RESOURCE_DELREF(resource); + ldlm_resource_putref(resource); + } + } + if (res->dentry != NULL) - drop = (res->dentry->d_inode->i_size > - exp->exp_obd->u.filter.fo_readcache_max_filesize); + inode = res->dentry->d_inode; - for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) { - for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) { - if (lnb->page == NULL) - continue; - /* drop from cache like truncate_list_pages() */ - if (drop && !TryLockPage(lnb->page)) { - if (lnb->page->mapping) - ll_truncate_complete_page(lnb->page); - unlock_page(lnb->page); - } + for (i = 0, lnb = res; i < npages; i++, lnb++) { + if (lnb->page != NULL) { page_cache_release(lnb->page); + lnb->page = NULL; } } + if (inode && (fo->fo_read_cache == 0 || + i_size_read(inode) > fo->fo_readcache_max_filesize)) + filter_release_cache(exp->exp_obd, obj, rnb, inode); if (res->dentry != NULL) f_dput(res->dentry); RETURN(rc); } -void flip_into_page_cache(struct inode *inode, struct page *new_page) -{ - struct page *old_page; - int rc; - - do { - /* the dlm is protecting us from read/write concurrency, so we - * expect this find_lock_page to return quickly. even if we - * race with another writer it won't be doing much work with - * the page locked. we do this 'cause t_c_p expects a - * locked page, and it wants to grab the pagecache lock - * as well. */ - old_page = find_lock_page(inode->i_mapping, new_page->index); - if (old_page) { - ll_truncate_complete_page(old_page); - unlock_page(old_page); - page_cache_release(old_page); - } - -#if 0 /* this should be a /proc tunable someday */ - /* racing o_directs (no locking ioctl) could race adding - * their pages, so we repeat the page invalidation unless - * we successfully added our new page */ - rc = add_to_page_cache_unique(new_page, inode->i_mapping, - new_page->index, - page_hash(inode->i_mapping, - new_page->index)); - if (rc == 0) { - /* add_to_page_cache clears uptodate|dirty and locks - * the page */ - SetPageUptodate(new_page); - unlock_page(new_page); - } -#else - rc = 0; -#endif - } while (rc != 0); -} - void filter_grant_commit(struct obd_export *exp, int niocount, struct niobuf_local *res) { @@ -779,28 +954,30 @@ void filter_grant_commit(struct obd_export *exp, int niocount, } int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa, - int objcount, struct obd_ioobj *obj, int niocount, - struct niobuf_local *res, struct obd_trans_info *oti,int rc) + int objcount, struct obd_ioobj *obj, + struct niobuf_remote *nb, int npages, + struct niobuf_local *res, struct obd_trans_info *oti, + int rc) { if (cmd == OBD_BRW_WRITE) - return filter_commitrw_write(exp, oa, objcount, obj, niocount, - res, oti, rc); + return filter_commitrw_write(exp, oa, objcount, obj, + nb, npages, res, oti, rc); if (cmd == OBD_BRW_READ) - return filter_commitrw_read(exp, oa, objcount, obj, niocount, - res, oti, rc); + return filter_commitrw_read(exp, oa, objcount, obj, + nb, npages, res, oti, rc); LBUG(); return -EPROTO; } -int filter_brw(int cmd, struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md *lsm, obd_count oa_bufs, - struct brw_page *pga, struct obd_trans_info *oti) +int filter_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo, + obd_count oa_bufs, struct brw_page *pga, + struct obd_trans_info *oti) { struct obd_ioobj ioo; struct niobuf_local *lnb; struct niobuf_remote *rnb; obd_count i; - int ret = 0; + int ret = 0, npages; ENTRY; OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local)); @@ -810,34 +987,23 @@ int filter_brw(int cmd, struct obd_export *exp, struct obdo *oa, GOTO(out, ret = -ENOMEM); for (i = 0; i < oa_bufs; i++) { + lnb[i].page = pga[i].pg; rnb[i].offset = pga[i].off; rnb[i].len = pga[i].count; } - obdo_to_ioobj(oa, &ioo); + obdo_to_ioobj(oinfo->oi_oa, &ioo); ioo.ioo_bufcnt = oa_bufs; - ret = filter_preprw(cmd, exp, oa, 1, &ioo, oa_bufs, rnb, lnb, oti); + npages = oa_bufs; + ret = filter_preprw(cmd, exp, oinfo->oi_oa, 1, &ioo, + rnb, &npages, lnb, oti, oinfo_capa(oinfo)); if (ret != 0) GOTO(out, ret); + LASSERTF(oa_bufs == npages, "%u != %u\n", oa_bufs, npages); - for (i = 0; i < oa_bufs; i++) { - void *virt = kmap(pga[i].pg); - obd_off off = pga[i].off & ~PAGE_MASK; - void *addr = kmap(lnb[i].page); - - /* 2 kmaps == vanishingly small deadlock opportunity */ - - if (cmd & OBD_BRW_WRITE) - memcpy(addr + off, virt + off, pga[i].count); - else - memcpy(virt + off, addr + off, pga[i].count); - - kunmap(lnb[i].page); - kunmap(pga[i].pg); - } - - ret = filter_commitrw(cmd, exp, oa, 1, &ioo, oa_bufs, lnb, oti, ret); + ret = filter_commitrw(cmd, exp, oinfo->oi_oa, 1, &ioo, rnb, + npages, lnb, oti, ret); out: if (lnb)