1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * linux/fs/obdfilter/filter_io.c
6 * Copyright (c) 2001-2003 Cluster File Systems, Inc.
7 * Author: Peter Braam <braam@clusterfs.com>
8 * Author: Andreas Dilger <adilger@clusterfs.com>
9 * Author: Phil Schwan <phil@clusterfs.com>
11 * This file is part of Lustre, http://www.lustre.org.
13 * Lustre is free software; you can redistribute it and/or
14 * modify it under the terms of version 2 of the GNU General Public
15 * License as published by the Free Software Foundation.
17 * Lustre is distributed in the hope that it will be useful,
18 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 * GNU General Public License for more details.
22 * You should have received a copy of the GNU General Public License
23 * along with Lustre; if not, write to the Free Software
24 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27 #define DEBUG_SUBSYSTEM S_FILTER
29 #include <linux/config.h>
30 #include <linux/module.h>
31 #include <linux/pagemap.h> // XXX kill me soon
32 #include <linux/version.h>
33 #include <asm/div64.h>
35 #include <linux/obd_class.h>
36 #include <linux/lustre_fsfilt.h>
37 #include "filter_internal.h"
39 static int filter_start_page_read(struct inode *inode, struct niobuf_local *lnb)
41 struct address_space *mapping = inode->i_mapping;
43 unsigned long index = lnb->offset >> PAGE_SHIFT;
46 page = grab_cache_page(mapping, index); /* locked page */
48 return lnb->rc = -ENOMEM;
50 LASSERT(page->mapping == mapping);
54 if (inode->i_size < lnb->offset + lnb->len - 1)
55 lnb->rc = inode->i_size - lnb->offset;
59 if (PageUptodate(page)) {
64 rc = mapping->a_ops->readpage(NULL, page);
66 CERROR("page index %lu, rc = %d\n", index, rc);
68 page_cache_release(page);
75 static int filter_finish_page_read(struct niobuf_local *lnb)
77 if (lnb->page == NULL)
80 if (PageUptodate(lnb->page))
83 wait_on_page(lnb->page);
84 if (!PageUptodate(lnb->page)) {
85 CERROR("page index %lu/offset "LPX64" not uptodate\n",
86 lnb->page->index, lnb->offset);
87 GOTO(err_page, lnb->rc = -EIO);
89 if (PageError(lnb->page)) {
90 CERROR("page index %lu/offset "LPX64" has error\n",
91 lnb->page->index, lnb->offset);
92 GOTO(err_page, lnb->rc = -EIO);
98 page_cache_release(lnb->page);
103 /* See if there are unallocated parts in given file region */
104 static int filter_inode_has_holes(struct inode *inode, obd_size start,
108 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
109 sector_t (*fs_bmap)(struct address_space *,
112 int (*fs_bmap)(struct address_space *, long);
114 fs_bmap = inode->i_mapping->a_ops->bmap;
116 for (j = 0; j <= len ; j++) {
117 if (!fs_bmap(inode->i_mapping, start+j)) {
123 /* Return -1 in case that caller cares about bmap availability.
129 /* Grab the dirty and seen grant announcements from the incoming obdo.
130 * We will later calculate the clients new grant and return it. */
131 static void filter_grant_incoming(struct obd_export *exp, struct obdo *oa)
133 struct filter_export_data *fed;
134 struct obd_device *obd = exp->exp_obd;
135 obd_size client_cached;
138 if (!oa || (oa->o_valid & (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) !=
139 (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) {
141 oa->o_valid &= ~OBD_MD_FLGRANT;
146 client_cached = oa->o_blocks;
147 fed = &exp->exp_filter_data;
149 if (client_cached > fed->fed_grant)
150 CERROR("client %s claims "LPU64" granted, > "LPU64" granted\n",
151 obd->obd_name, client_cached, fed->fed_grant);
153 spin_lock(&obd->obd_osfs_lock);
154 /* update our accounting now so that statfs takes it into account */
155 obd->u.filter.fo_tot_cached += client_cached - fed->fed_cached;
156 fed->fed_cached = client_cached;
158 /* Acknowledgement that the client has seen our published grant.
159 * If the client has met our shrinking target we can reuse its
160 * difference from the previous grant. It is reasonable to announce
161 * more dirty that cached as it tries to purge its previously granted
162 * dirty data down to its newly received target. */
163 if (fed->fed_grant_waiting && (oa->o_grant <= fed->fed_grant_sent)) {
164 if (fed->fed_grant_sent < fed->fed_grant) {
165 if (client_cached <= fed->fed_grant_sent) {
166 obd->u.filter.fo_tot_granted -=
167 fed->fed_grant - oa->o_grant;
168 CDEBUG(D_SUPER, "reduced grant from "LPU64" to "
169 LPU64", total grant now "LPU64"\n",
170 fed->fed_grant, oa->o_grant,
171 obd->u.filter.fo_tot_granted);
172 fed->fed_grant = oa->o_grant;
173 fed->fed_grant_waiting = 0;
176 fed->fed_grant_waiting = 0;
179 spin_unlock(&obd->obd_osfs_lock);
180 oa->o_valid &= ~(OBD_MD_FLGRANT|OBD_MD_FLBLOCKS);
184 /* Figure out how much space is available between what we've granted
185 * and what remains in the filesystem. Compensate for ext3 indirect
186 * block overhead when computing how much free space is left ungranted.
188 * Caller must hold obd_osfs_lock. */
189 obd_size filter_grant_space_left(struct obd_export *exp)
192 struct obd_device *obd = exp->exp_obd;
193 int blockbits = obd->u.filter.fo_sb->s_blocksize_bits;
194 /* XXX I disabled statfs caching as it only creates extra problems now.
196 unsigned long max_age = jiffies/* - HZ*/+1;
197 struct filter_export_data *fed = &exp->exp_filter_data;
201 rc = fsfilt_statfs(obd, obd->u.filter.fo_sb, max_age);
202 if (rc) /* N.B. statfs can't really fail, just for correctness */
205 left = obd->obd_osfs.os_bavail << blockbits;
206 left -= (left >> (blockbits - 2)) + (left >> (2 * blockbits - 2));
207 /* We cannot afford having absolutely no space, we need some for
209 if ( left >= PAGE_SIZE * 10)
210 left -= PAGE_SIZE * 10;
214 /* If fed->fed_grant_waiting is set, this means
215 obd->u.filter.fo_tot_granted does not represent actual granted
216 amount and client is supposedly actively shrinks its cache, so
217 no point in printing this warning */
218 if (left < obd->u.filter.fo_tot_granted && !fed->fed_grant_waiting)
219 CERROR("granted space "LPU64" more than available "LPU64"\n",
220 obd->u.filter.fo_tot_granted, left);
222 left -= min(left, obd->u.filter.fo_tot_granted);
223 if (left < FILTER_GRANT_CHUNK && time_after(jiffies,obd->obd_osfs_age)){
224 CDEBUG(D_SUPER, "fs has no space left and statfs too old\n");
229 CDEBUG(D_SUPER, "free: "LPU64" avail: "LPU64" grant left: "LPU64"\n",
230 obd->obd_osfs.os_bfree << blockbits,
231 obd->obd_osfs.os_bavail << blockbits, left);
236 /* When clients have dirtied as much space as they've been granted they
237 * fall through to sync writes. These sync writes haven't been expressed
238 * in grants and need to error with ENOSPC when there isn't room in the
239 * filesystem for them after grants are taken into account. However,
240 * writeback of the dirty data that was already granted space can write
241 * right on through. We have no need to stop writes that won't allocate
242 * new space, so we bmap to calculate how much this io is going to consume.
244 * Caller must hold obd_osfs_lock. */
245 static int filter_check_space(struct obd_export *exp, int objcount,
246 struct fsfilt_objinfo *fso, int niocount,
247 struct niobuf_remote *rnb,
248 struct niobuf_local *lnb, obd_size *left,
249 obd_size *consumed, struct inode *inode)
251 int blocksize = exp->exp_obd->u.filter.fo_sb->s_blocksize;
252 obd_size bytes, ungranted = 0;
253 int i, rc = -ENOSPC, obj, n = 0;
257 for (obj = 0; obj < objcount; obj++) {
258 for (i = 0; i < fso[obj].fso_bufcnt; i++, n++) {
262 tmp = rnb[n].offset & (blocksize - 1);
264 tmp = (rnb[n].offset + rnb[n].len) & (blocksize - 1);
266 bytes += blocksize - tmp;
268 if (rnb[n].flags & OBD_BRW_FROM_GRANT) {
273 if (*left - *consumed >= bytes) {
274 /* if enough space, pretend it was granted */
275 exp->exp_obd->u.filter.fo_tot_granted += bytes;
276 exp->exp_filter_data.fed_grant += bytes;
282 spin_unlock(&exp->exp_obd->obd_osfs_lock);
283 if (!filter_inode_has_holes(inode,
290 rc = lnb[n].rc = -ENOSPC;
292 spin_lock(&exp->exp_obd->obd_osfs_lock);
298 CDEBUG((*consumed != 0 && ungranted != 0) ? D_ERROR : D_SUPER,
299 "consumed: "LPU64" ungranted: "LPU64"\n", *consumed, ungranted);
301 if (*consumed > exp->exp_filter_data.fed_grant)
302 CERROR("request sent from cache, but not enough grant ("LPU64
303 ","LPU64")\n", *consumed,
304 exp->exp_filter_data.fed_grant);
309 /* Calculate how much grant space to allocate to this client, based on how
310 * much space is currently free and how much of that is already granted.
312 * Caller must hold obd_osfs_lock. */
313 static void filter_grant(struct obd_export *exp, struct obdo *oa,
314 obd_size left, obd_size from_grant)
316 struct obd_device *obd = exp->exp_obd;
317 struct filter_export_data *fed = &exp->exp_filter_data;
318 obd_size grant, extra;
321 blockbits = obd->u.filter.fo_sb->s_blocksize_bits;
323 /* if things go wrong conservatively try to clamp them from
324 * generating more dirty data until things are better on our end */
325 grant = fed->fed_cached;
327 extra = min(FILTER_GRANT_CHUNK, left / 2);
329 if (grant > fed->fed_grant) {
330 /* If client has screwed up, force basic grant until fixed */
331 CERROR("client %s cached more "LPU64" than granted "LPU64"\n",
332 exp->exp_client_uuid.uuid, fed->fed_cached,
335 } else if (fed->fed_grant_waiting) {
336 /* KISS: only one grant change in flight at a time. We
337 * could move it in the "same direction" easily,
338 * but changing directions (e.g. grow then shrink
339 * before client ACKs) would be bad. */
340 grant = fed->fed_grant_sent;
342 /* grant will shrink or grow as client cache/extra changes */
343 grant = fed->fed_cached + extra;
346 /* If we've granted all we're willing, we have to revoke
347 * the grant covering what the client just wrote. */
349 grant -= min(from_grant, grant);
352 if (!fed->fed_grant_waiting && grant + from_grant > left ) {
353 if (from_grant < left)
354 grant = left - from_grant;
359 if (grant != fed->fed_grant) {
360 fed->fed_grant_waiting = 1;
361 fed->fed_grant_sent = grant;
362 if (grant > fed->fed_grant) {
363 obd->u.filter.fo_tot_granted += grant - fed->fed_grant;
364 fed->fed_grant = grant;
368 CDEBUG(D_SUPER,"cli %s cache:"LPU64" grant:"LPU64", granting:"LPU64"\n",
369 exp->exp_connection->c_remote_uuid.uuid, oa->o_blocks,
371 CDEBUG(D_SUPER, "fed sent:"LPU64" wt:%d grant:"LPU64"\n",
372 fed->fed_grant_sent, fed->fed_grant_waiting,
374 CDEBUG(D_SUPER, "tot cached:"LPU64" granted:"LPU64" num_exports: %d\n",
375 obd->u.filter.fo_tot_cached,
376 obd->u.filter.fo_tot_granted, obd->obd_num_exports);
378 oa->o_valid |= OBD_MD_FLGRANT;
382 static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
383 int objcount, struct obd_ioobj *obj,
384 int niocount, struct niobuf_remote *nb,
385 struct niobuf_local *res,
386 struct obd_trans_info *oti)
388 struct obd_device *obd = exp->exp_obd;
389 struct obd_run_ctxt saved;
391 struct niobuf_remote *rnb;
392 struct niobuf_local *lnb = NULL;
393 struct fsfilt_objinfo *fso;
394 struct dentry *dentry;
396 int rc = 0, i, j, tot_bytes = 0, cleanup_phase = 0;
397 unsigned long now = jiffies;
400 /* We are currently not supporting multi-obj BRW_READ RPCS at all.
401 * When we do this function's dentry cleanup will need to be fixed */
402 LASSERT(objcount == 1);
404 OBD_ALLOC(fso, objcount * sizeof(*fso));
408 memset(res, 0, niocount * sizeof(*res));
410 push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
411 for (i = 0, o = obj; i < objcount; i++, o++) {
412 LASSERT(o->ioo_bufcnt);
414 dentry = filter_oa2dentry(obd, oa);
416 GOTO(cleanup, rc = PTR_ERR(dentry));
418 if (dentry->d_inode == NULL) {
419 CERROR("trying to BRW to non-existent file "LPU64"\n",
422 GOTO(cleanup, rc = -ENOENT);
425 fso[i].fso_dentry = dentry;
426 fso[i].fso_bufcnt = o->ioo_bufcnt;
429 if (time_after(jiffies, now + 15 * HZ))
430 CERROR("slow preprw_read setup %lus\n", (jiffies - now) / HZ);
432 CDEBUG(D_INFO, "preprw_read setup: %lu jiffies\n",
436 spin_lock(&obd->obd_osfs_lock);
437 filter_grant(exp, oa, filter_grant_space_left(exp), 0);
438 spin_unlock(&obd->obd_osfs_lock);
441 for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
442 dentry = fso[i].fso_dentry;
443 inode = dentry->d_inode;
445 for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
446 lnb->dentry = dentry;
447 lnb->offset = rnb->offset;
449 lnb->flags = rnb->flags;
450 lnb->start = jiffies;
452 if (inode->i_size <= rnb->offset) {
453 /* If there's no more data, abort early.
454 * lnb->page == NULL and lnb->rc == 0, so it's
455 * easy to detect later. */
458 rc = filter_start_page_read(inode, lnb);
462 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
463 "page err %u@"LPU64" %u/%u %p: rc %d\n",
464 lnb->len, lnb->offset, j, o->ioo_bufcnt,
470 tot_bytes += lnb->rc;
471 if (lnb->rc < lnb->len) {
472 /* short read, be sure to wait on it */
479 if (time_after(jiffies, now + 15 * HZ))
480 CERROR("slow start_page_read %lus\n", (jiffies - now) / HZ);
482 CDEBUG(D_INFO, "start_page_read: %lu jiffies\n",
485 lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES, tot_bytes);
486 while (lnb-- > res) {
487 rc = filter_finish_page_read(lnb);
489 CERROR("error page %u@"LPU64" %u %p: rc %d\n", lnb->len,
490 lnb->offset, (int)(lnb - res), lnb->dentry, rc);
496 if (time_after(jiffies, now + 15 * HZ))
497 CERROR("slow finish_page_read %lus\n", (jiffies - now) / HZ);
499 CDEBUG(D_INFO, "finish_page_read: %lu jiffies\n",
502 filter_tally_read(&exp->exp_obd->u.filter, res, niocount);
507 switch (cleanup_phase) {
509 for (lnb = res; lnb < (res + niocount); lnb++) {
511 page_cache_release(lnb->page);
513 if (res->dentry != NULL)
516 CERROR("NULL dentry in cleanup -- tell CFS\n");
518 OBD_FREE(fso, objcount * sizeof(*fso));
519 pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
524 static int filter_start_page_write(struct inode *inode,
525 struct niobuf_local *lnb)
527 struct page *page = alloc_pages(GFP_HIGHUSER, 0);
529 CERROR("no memory for a temp page\n");
530 RETURN(lnb->rc = -ENOMEM);
532 POISON_PAGE(page, 0xf1);
533 page->index = lnb->offset >> PAGE_SHIFT;
539 /* If we ever start to support multi-object BRW RPCs, we will need to get locks
540 * on mulitple inodes. That isn't all, because there still exists the
541 * possibility of a truncate starting a new transaction while holding the ext3
542 * rwsem = write while some writes (which have started their transactions here)
543 * blocking on the ext3 rwsem = read => lock inversion.
545 * The handling gets very ugly when dealing with locked pages. It may be easier
546 * to just get rid of the locked page code (which has problems of its own) and
547 * either discover we do not need it anymore (i.e. it was a symptom of another
548 * bug) or ensure we get the page locks in an appropriate order. */
549 static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
550 int objcount, struct obd_ioobj *obj,
551 int niocount, struct niobuf_remote *nb,
552 struct niobuf_local *res,
553 struct obd_trans_info *oti)
555 struct obd_device *obd = exp->exp_obd;
556 struct obd_run_ctxt saved;
557 struct niobuf_remote *rnb = nb;
558 struct niobuf_local *lnb = res;
559 struct fsfilt_objinfo fso;
560 struct dentry *dentry;
561 int rc = 0, i, tot_bytes = 0;
562 obd_size consumed = 0, left;
563 unsigned long now = jiffies;
565 LASSERT(objcount == 1);
566 LASSERT(obj->ioo_bufcnt > 0);
568 filter_grant_incoming(exp, oa);
570 memset(res, 0, niocount * sizeof(*res));
572 push_ctxt(&saved, &obd->obd_ctxt, NULL);
573 dentry = filter_fid2dentry(obd, NULL, obj->ioo_gr, obj->ioo_id);
575 GOTO(cleanup, rc = PTR_ERR(dentry));
577 if (dentry->d_inode == NULL) {
578 CERROR("trying to BRW to non-existent file "LPU64"\n",
581 GOTO(cleanup, rc = -ENOENT);
584 fso.fso_dentry = dentry;
585 fso.fso_bufcnt = obj->ioo_bufcnt;
587 if (time_after(jiffies, now + 15 * HZ))
588 CERROR("slow preprw_write setup %lus\n", (jiffies - now) / HZ);
590 CDEBUG(D_INFO, "preprw_write setup: %lu jiffies\n",
593 spin_lock(&obd->obd_osfs_lock);
594 left = filter_grant_space_left(exp);
596 rc = filter_check_space(exp, objcount, &fso, niocount, rnb, lnb,
597 &left, &consumed, dentry->d_inode);
599 filter_grant(exp, oa, left, consumed);
601 spin_unlock(&obd->obd_osfs_lock);
608 for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt;
611 /* If there were any granting failures, we should not have
613 LASSERT (lnb->rc == 0);
615 lnb->dentry = dentry;
616 lnb->offset = rnb->offset;
618 lnb->flags = rnb->flags;
619 lnb->start = jiffies;
621 rc = filter_start_page_write(dentry->d_inode, lnb);
623 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, "page err %u@"
624 LPU64" %u/%u %p: rc %d\n", lnb->len, lnb->offset,
625 i, obj->ioo_bufcnt, dentry, rc);
627 __free_pages(lnb->page, 0);
631 tot_bytes += lnb->len;
634 if (time_after(jiffies, now + 15 * HZ))
635 CERROR("slow start_page_write %lus\n", (jiffies - now) / HZ);
637 CDEBUG(D_INFO, "start_page_write: %lu jiffies\n",
640 lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_WRITE_BYTES, tot_bytes);
643 pop_ctxt(&saved, &obd->obd_ctxt, NULL);
647 int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
648 int objcount, struct obd_ioobj *obj, int niocount,
649 struct niobuf_remote *nb, struct niobuf_local *res,
650 struct obd_trans_info *oti)
652 if (cmd == OBD_BRW_WRITE)
653 return filter_preprw_write(cmd, exp, oa, objcount, obj,
654 niocount, nb, res, oti);
656 if (cmd == OBD_BRW_READ)
657 return filter_preprw_read(cmd, exp, oa, objcount, obj,
658 niocount, nb, res, oti);
664 static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa,
665 int objcount, struct obd_ioobj *obj,
666 int niocount, struct niobuf_local *res,
667 struct obd_trans_info *oti)
670 struct niobuf_local *lnb;
674 for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
675 for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
676 if (lnb->page != NULL)
677 page_cache_release(lnb->page);
680 if (res->dentry != NULL)
685 void flip_into_page_cache(struct inode *inode, struct page *new_page)
687 struct page *old_page;
691 /* the dlm is protecting us from read/write concurrency, so we
692 * expect this find_lock_page to return quickly. even if we
693 * race with another writer it won't be doing much work with
694 * the page locked. we do this 'cause t_c_p expects a
695 * locked page, and it wants to grab the pagecache lock
697 old_page = find_lock_page(inode->i_mapping, new_page->index);
699 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
700 truncate_complete_page(old_page);
702 truncate_complete_page(old_page->mapping, old_page);
704 unlock_page(old_page);
705 page_cache_release(old_page);
708 #if 0 /* this should be a /proc tunable someday */
709 /* racing o_directs (no locking ioctl) could race adding
710 * their pages, so we repeat the page invalidation unless
711 * we successfully added our new page */
712 rc = add_to_page_cache_unique(new_page, inode->i_mapping,
714 page_hash(inode->i_mapping,
717 /* add_to_page_cache clears uptodate|dirty and locks
719 SetPageUptodate(new_page);
720 unlock_page(new_page);
728 /* XXX needs to trickle its oa down */
729 int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
730 int objcount, struct obd_ioobj *obj, int niocount,
731 struct niobuf_local *res, struct obd_trans_info *oti)
733 if (cmd == OBD_BRW_WRITE)
734 return filter_commitrw_write(exp, oa, objcount, obj, niocount,
736 if (cmd == OBD_BRW_READ)
737 return filter_commitrw_read(exp, oa, objcount, obj, niocount,
743 int filter_brw(int cmd, struct obd_export *exp, struct obdo *oa,
744 struct lov_stripe_md *lsm, obd_count oa_bufs,
745 struct brw_page *pga, struct obd_trans_info *oti)
747 struct obd_ioobj ioo;
748 struct niobuf_local *lnb;
749 struct niobuf_remote *rnb;
754 OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
755 OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
757 if (lnb == NULL || rnb == NULL)
758 GOTO(out, ret = -ENOMEM);
760 for (i = 0; i < oa_bufs; i++) {
761 rnb[i].offset = pga[i].off;
762 rnb[i].len = pga[i].count;
765 obdo_to_ioobj(oa, &ioo);
766 ioo.ioo_bufcnt = oa_bufs;
768 ret = filter_preprw(cmd, exp, oa, 1, &ioo, oa_bufs, rnb, lnb, oti);
772 for (i = 0; i < oa_bufs; i++) {
773 void *virt = kmap(pga[i].pg);
774 obd_off off = pga[i].off & ~PAGE_MASK;
775 void *addr = kmap(lnb[i].page);
777 /* 2 kmaps == vanishingly small deadlock opportunity */
779 if (cmd & OBD_BRW_WRITE)
780 memcpy(addr + off, virt + off, pga[i].count);
782 memcpy(virt + off, addr + off, pga[i].count);
788 ret = filter_commitrw(cmd, exp, oa, 1, &ioo, oa_bufs, lnb, oti);
792 OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
794 OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));