1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdfilter/filter_io.c
38 * Author: Peter Braam <braam@clusterfs.com>
39 * Author: Andreas Dilger <adilger@clusterfs.com>
40 * Author: Phil Schwan <phil@clusterfs.com>
43 #define DEBUG_SUBSYSTEM S_FILTER
45 #ifndef AUTOCONF_INCLUDED
46 #include <linux/config.h>
48 #include <linux/module.h>
49 #include <linux/pagemap.h> // XXX kill me soon
50 #include <linux/version.h>
52 #include <obd_class.h>
54 #include <lustre_fsfilt.h>
55 #include "filter_internal.h"
57 int *obdfilter_created_scratchpad;
59 static int filter_alloc_dio_page(struct obd_device *obd, struct inode *inode,
60 struct niobuf_local *lnb)
64 LASSERT(lnb->page != NULL);
68 POISON_PAGE(page, 0xf1);
69 if (lnb->len != CFS_PAGE_SIZE) {
70 memset(kmap(page) + lnb->len, 0, CFS_PAGE_SIZE - lnb->len);
74 page->index = lnb->offset >> CFS_PAGE_SHIFT;
79 static void filter_free_dio_pages(int objcount, struct obd_ioobj *obj,
80 int niocount, struct niobuf_local *res)
84 for (i = 0; i < objcount; i++, obj++) {
85 for (j = 0 ; j < obj->ioo_bufcnt ; j++, res++)
90 /* Grab the dirty and seen grant announcements from the incoming obdo.
91 * We will later calculate the clients new grant and return it.
92 * Caller must hold osfs lock */
93 static void filter_grant_incoming(struct obd_export *exp, struct obdo *oa)
95 struct filter_export_data *fed;
96 struct obd_device *obd = exp->exp_obd;
99 LASSERT_SPIN_LOCKED(&obd->obd_osfs_lock);
101 if ((oa->o_valid & (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) !=
102 (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) {
103 oa->o_valid &= ~OBD_MD_FLGRANT;
108 fed = &exp->exp_filter_data;
110 /* Add some margin, since there is a small race if other RPCs arrive
111 * out-or-order and have already consumed some grant. We want to
112 * leave this here in case there is a large error in accounting. */
114 "%s: cli %s/%p reports grant: "LPU64" dropped: %u, local: %lu\n",
115 obd->obd_name, exp->exp_client_uuid.uuid, exp, oa->o_grant,
116 oa->o_dropped, fed->fed_grant);
118 /* Update our accounting now so that statfs takes it into account.
119 * Note that fed_dirty is only approximate and can become incorrect
120 * if RPCs arrive out-of-order. No important calculations depend
121 * on fed_dirty however, but we must check sanity to not assert. */
122 if ((long long)oa->o_dirty < 0)
124 else if (oa->o_dirty > fed->fed_grant + 4 * FILTER_GRANT_CHUNK)
125 oa->o_dirty = fed->fed_grant + 4 * FILTER_GRANT_CHUNK;
126 obd->u.filter.fo_tot_dirty += oa->o_dirty - fed->fed_dirty;
127 if (fed->fed_grant < oa->o_dropped) {
128 CDEBUG(D_CACHE,"%s: cli %s/%p reports %u dropped > grant %lu\n",
129 obd->obd_name, exp->exp_client_uuid.uuid, exp,
130 oa->o_dropped, fed->fed_grant);
133 if (obd->u.filter.fo_tot_granted < oa->o_dropped) {
134 CERROR("%s: cli %s/%p reports %u dropped > tot_grant "LPU64"\n",
135 obd->obd_name, exp->exp_client_uuid.uuid, exp,
136 oa->o_dropped, obd->u.filter.fo_tot_granted);
139 obd->u.filter.fo_tot_granted -= oa->o_dropped;
140 fed->fed_grant -= oa->o_dropped;
141 fed->fed_dirty = oa->o_dirty;
142 if (fed->fed_dirty < 0 || fed->fed_grant < 0 || fed->fed_pending < 0) {
143 CERROR("%s: cli %s/%p dirty %ld pend %ld grant %ld\n",
144 obd->obd_name, exp->exp_client_uuid.uuid, exp,
145 fed->fed_dirty, fed->fed_pending, fed->fed_grant);
146 spin_unlock(&obd->obd_osfs_lock);
152 /* Figure out how much space is available between what we've granted
153 * and what remains in the filesystem. Compensate for ext3 indirect
154 * block overhead when computing how much free space is left ungranted.
156 * Caller must hold obd_osfs_lock. */
157 obd_size filter_grant_space_left(struct obd_export *exp)
159 struct obd_device *obd = exp->exp_obd;
160 int blockbits = obd->u.obt.obt_sb->s_blocksize_bits;
161 obd_size tot_granted = obd->u.filter.fo_tot_granted, avail, left = 0;
162 int rc, statfs_done = 0;
164 LASSERT_SPIN_LOCKED(&obd->obd_osfs_lock);
166 if (cfs_time_before_64(obd->obd_osfs_age, cfs_time_current_64() - HZ)) {
168 rc = fsfilt_statfs(obd, obd->u.obt.obt_sb,
169 cfs_time_current_64() + HZ);
170 if (rc) /* N.B. statfs can't really fail */
175 avail = obd->obd_osfs.os_bavail;
176 left = avail - (avail >> (blockbits - 3)); /* (d)indirect */
177 if (left > GRANT_FOR_LLOG(obd)) {
178 left = (left - GRANT_FOR_LLOG(obd)) << blockbits;
180 left = 0 /* << blockbits */;
183 if (!statfs_done && left < 32 * FILTER_GRANT_CHUNK + tot_granted) {
184 CDEBUG(D_CACHE, "fs has no space left and statfs too old\n");
188 if (left >= tot_granted) {
191 if (left < tot_granted - obd->u.filter.fo_tot_pending) {
192 CERROR("%s: cli %s/%p grant "LPU64" > available "
193 LPU64" and pending "LPU64"\n", obd->obd_name,
194 exp->exp_client_uuid.uuid, exp, tot_granted,
195 left, obd->u.filter.fo_tot_pending);
200 CDEBUG(D_CACHE, "%s: cli %s/%p free: "LPU64" avail: "LPU64" grant "LPU64
201 " left: "LPU64" pending: "LPU64"\n", obd->obd_name,
202 exp->exp_client_uuid.uuid, exp,
203 obd->obd_osfs.os_bfree << blockbits, avail << blockbits,
204 tot_granted, left, obd->u.filter.fo_tot_pending);
209 /* Calculate how much grant space to allocate to this client, based on how
210 * much space is currently free and how much of that is already granted.
212 * Caller must hold obd_osfs_lock. */
213 long filter_grant(struct obd_export *exp, obd_size current_grant,
214 obd_size want, obd_size fs_space_left)
216 struct obd_device *obd = exp->exp_obd;
217 struct filter_export_data *fed = &exp->exp_filter_data;
218 int blockbits = obd->u.obt.obt_sb->s_blocksize_bits;
221 LASSERT_SPIN_LOCKED(&obd->obd_osfs_lock);
223 /* Grant some fraction of the client's requested grant space so that
224 * they are not always waiting for write credits (not all of it to
225 * avoid overgranting in face of multiple RPCs in flight). This
226 * essentially will be able to control the OSC_MAX_RIF for a client.
228 * If we do have a large disparity between what the client thinks it
229 * has and what we think it has, don't grant very much and let the
230 * client consume its grant first. Either it just has lots of RPCs
231 * in flight, or it was evicted and its grants will soon be used up. */
232 if (want > 0x7fffffff) {
233 CERROR("%s: client %s/%p requesting > 2GB grant "LPU64"\n",
234 obd->obd_name, exp->exp_client_uuid.uuid, exp, want);
235 } else if (current_grant < want &&
236 current_grant < fed->fed_grant + FILTER_GRANT_CHUNK) {
237 grant = min((want >> blockbits),
238 (fs_space_left >> blockbits) / 8);
242 /* Allow >FILTER_GRANT_CHUNK size when clients
243 * reconnect due to a server reboot.
245 if ((grant > FILTER_GRANT_CHUNK) &&
246 (!obd->obd_recovering))
247 grant = FILTER_GRANT_CHUNK;
249 obd->u.filter.fo_tot_granted += grant;
250 fed->fed_grant += grant;
251 if (fed->fed_grant < 0) {
252 CERROR("%s: cli %s/%p grant %ld want "LPU64
254 obd->obd_name, exp->exp_client_uuid.uuid,
255 exp, fed->fed_grant, want,current_grant);
256 spin_unlock(&obd->obd_osfs_lock);
263 "%s: cli %s/%p wants: "LPU64" current grant "LPU64
264 " granting: "LPU64"\n", obd->obd_name, exp->exp_client_uuid.uuid,
265 exp, want, current_grant, grant);
267 "%s: cli %s/%p tot cached:"LPU64" granted:"LPU64
268 " num_exports: %d\n", obd->obd_name, exp->exp_client_uuid.uuid,
269 exp, obd->u.filter.fo_tot_dirty,
270 obd->u.filter.fo_tot_granted, obd->obd_num_exports);
275 static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
276 int objcount, struct obd_ioobj *obj,
277 int niocount, struct niobuf_remote *nb,
278 struct niobuf_local *res,
279 struct obd_trans_info *oti,
280 struct lustre_capa *capa)
282 struct obd_device *obd = exp->exp_obd;
283 struct lvfs_run_ctxt saved;
284 struct niobuf_remote *rnb;
285 struct niobuf_local *lnb;
286 struct dentry *dentry = NULL;
289 int rc = 0, i, tot_bytes = 0;
290 unsigned long now = jiffies;
293 /* We are currently not supporting multi-obj BRW_READ RPCS at all.
294 * When we do this function's dentry cleanup will need to be fixed.
295 * These values are verified in ost_brw_write() from the wire. */
296 LASSERTF(objcount == 1, "%d\n", objcount);
297 LASSERTF(obj->ioo_bufcnt > 0, "%d\n", obj->ioo_bufcnt);
299 rc = filter_auth_capa(exp, NULL, obdo_mdsno(oa), capa,
304 if (oa && oa->o_valid & OBD_MD_FLGRANT) {
305 spin_lock(&obd->obd_osfs_lock);
306 filter_grant_incoming(exp, oa);
309 spin_unlock(&obd->obd_osfs_lock);
312 iobuf = filter_iobuf_get(&obd->u.filter, oti);
314 RETURN(PTR_ERR(iobuf));
316 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
317 dentry = filter_oa2dentry(obd, oa);
318 if (IS_ERR(dentry)) {
319 rc = PTR_ERR(dentry);
324 inode = dentry->d_inode;
326 obdo_to_inode(inode, oa, OBD_MD_FLATIME);
327 fsfilt_check_slow(obd, now, "preprw_read setup");
329 for (i = 0, lnb = res, rnb = nb; i < obj->ioo_bufcnt;
331 lnb->dentry = dentry;
332 lnb->offset = rnb->offset;
334 lnb->flags = rnb->flags;
337 * ost_brw_write()->ost_nio_pages_get() already initialized
338 * lnb->page to point to the page from the per-thread page
339 * pool (bug 5137), initialize page.
341 LASSERT(lnb->page != NULL);
343 if (i_size_read(inode) <= rnb->offset)
344 /* If there's no more data, abort early. lnb->rc == 0,
345 * so it's easy to detect later. */
348 filter_alloc_dio_page(obd, inode, lnb);
350 if (i_size_read(inode) < lnb->offset + lnb->len - 1)
351 lnb->rc = i_size_read(inode) - lnb->offset;
355 tot_bytes += lnb->rc;
357 filter_iobuf_add_page(obd, iobuf, inode, lnb->page);
360 fsfilt_check_slow(obd, now, "start_page_read");
362 rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf,
363 exp, NULL, NULL, NULL);
367 lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES, tot_bytes);
369 if (exp->exp_nid_stats && exp->exp_nid_stats->nid_stats)
370 lprocfs_counter_add(exp->exp_nid_stats->nid_stats,
371 LPROC_FILTER_READ_BYTES, tot_bytes);
377 filter_free_dio_pages(objcount, obj, niocount, res);
383 filter_iobuf_put(&obd->u.filter, iobuf, oti);
385 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
387 CERROR("io error %d\n", rc);
392 /* When clients have dirtied as much space as they've been granted they
393 * fall through to sync writes. These sync writes haven't been expressed
394 * in grants and need to error with ENOSPC when there isn't room in the
395 * filesystem for them after grants are taken into account. However,
396 * writeback of the dirty data that was already granted space can write
399 * Caller must hold obd_osfs_lock. */
400 static int filter_grant_check(struct obd_export *exp, struct obdo *oa,
401 int objcount, struct fsfilt_objinfo *fso,
402 int niocount, struct niobuf_remote *rnb,
403 struct niobuf_local *lnb, obd_size *left,
406 struct filter_export_data *fed = &exp->exp_filter_data;
407 int blocksize = exp->exp_obd->u.obt.obt_sb->s_blocksize;
408 unsigned long used = 0, ungranted = 0, using;
409 int i, rc = -ENOSPC, obj, n = 0;
411 LASSERT_SPIN_LOCKED(&exp->exp_obd->obd_osfs_lock);
413 for (obj = 0; obj < objcount; obj++) {
414 for (i = 0; i < fso[obj].fso_bufcnt; i++, n++) {
417 /* should match the code in osc_exit_cache */
419 bytes += rnb[n].offset & (blocksize - 1);
420 tmp = (rnb[n].offset + rnb[n].len) & (blocksize - 1);
422 bytes += blocksize - tmp;
424 if ((rnb[n].flags & OBD_BRW_FROM_GRANT) &&
425 (oa->o_valid & OBD_MD_FLGRANT)) {
426 if (fed->fed_grant < used + bytes) {
428 "%s: cli %s/%p claims %ld+%d "
429 "GRANT, real grant %lu idx %d\n",
430 exp->exp_obd->obd_name,
431 exp->exp_client_uuid.uuid, exp,
432 used, bytes, fed->fed_grant, n);
435 rnb[n].flags |= OBD_BRW_GRANTED;
436 lnb[n].lnb_grant_used = bytes;
437 CDEBUG(0, "idx %d used=%lu\n", n, used);
442 if (*left > ungranted + bytes) {
443 /* if enough space, pretend it was granted */
445 rnb[n].flags |= OBD_BRW_GRANTED;
446 lnb[n].lnb_grant_used = bytes;
447 CDEBUG(0, "idx %d ungranted=%lu\n",n,ungranted);
452 /* We can't check for already-mapped blocks here, as
453 * it requires dropping the osfs lock to do the bmap.
454 * Instead, we return ENOSPC and in that case we need
455 * to go through and verify if all of the blocks not
456 * marked BRW_GRANTED are already mapped and we can
457 * ignore this error. */
459 rnb[n].flags &= ~OBD_BRW_GRANTED;
460 CDEBUG(D_CACHE,"%s: cli %s/%p idx %d no space for %d\n",
461 exp->exp_obd->obd_name,
462 exp->exp_client_uuid.uuid, exp, n, bytes);
466 /* Now substract what client have used already. We don't subtract
467 * this from the tot_granted yet, so that other client's can't grab
468 * that space before we have actually allocated our blocks. That
469 * happens in filter_grant_commit() after the writes are done. */
471 fed->fed_grant -= used;
472 fed->fed_pending += used + ungranted;
473 exp->exp_obd->u.filter.fo_tot_granted += ungranted;
474 exp->exp_obd->u.filter.fo_tot_pending += used + ungranted;
477 "%s: cli %s/%p used: %lu ungranted: %lu grant: %lu dirty: %lu\n",
478 exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp, used,
479 ungranted, fed->fed_grant, fed->fed_dirty);
481 /* Rough calc in case we don't refresh cached statfs data */
482 using = (used + ungranted + 1 ) >>
483 exp->exp_obd->u.obt.obt_sb->s_blocksize_bits;
484 if (exp->exp_obd->obd_osfs.os_bavail > using)
485 exp->exp_obd->obd_osfs.os_bavail -= using;
487 exp->exp_obd->obd_osfs.os_bavail = 0;
489 if (fed->fed_dirty < used) {
490 CERROR("%s: cli %s/%p claims used %lu > fed_dirty %lu\n",
491 exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
492 used, fed->fed_dirty);
493 used = fed->fed_dirty;
495 exp->exp_obd->u.filter.fo_tot_dirty -= used;
496 fed->fed_dirty -= used;
498 if (fed->fed_dirty < 0 || fed->fed_grant < 0 || fed->fed_pending < 0) {
499 CERROR("%s: cli %s/%p dirty %ld pend %ld grant %ld\n",
500 exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
501 fed->fed_dirty, fed->fed_pending, fed->fed_grant);
502 spin_unlock(&exp->exp_obd->obd_osfs_lock);
508 /* If we ever start to support multi-object BRW RPCs, we will need to get locks
509 * on mulitple inodes. That isn't all, because there still exists the
510 * possibility of a truncate starting a new transaction while holding the ext3
511 * rwsem = write while some writes (which have started their transactions here)
512 * blocking on the ext3 rwsem = read => lock inversion.
514 * The handling gets very ugly when dealing with locked pages. It may be easier
515 * to just get rid of the locked page code (which has problems of its own) and
516 * either discover we do not need it anymore (i.e. it was a symptom of another
517 * bug) or ensure we get the page locks in an appropriate order. */
518 static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
519 int objcount, struct obd_ioobj *obj,
520 int niocount, struct niobuf_remote *nb,
521 struct niobuf_local *res,
522 struct obd_trans_info *oti,
523 struct lustre_capa *capa)
525 struct lvfs_run_ctxt saved;
526 struct niobuf_remote *rnb;
527 struct niobuf_local *lnb = res;
528 struct fsfilt_objinfo fso;
529 struct filter_mod_data *fmd;
530 struct dentry *dentry = NULL;
533 unsigned long now = jiffies;
534 int rc = 0, i, tot_bytes = 0, cleanup_phase = 0;
536 LASSERT(objcount == 1);
537 LASSERT(obj->ioo_bufcnt > 0);
539 rc = filter_auth_capa(exp, NULL, obdo_mdsno(oa), capa,
544 push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
545 iobuf = filter_iobuf_get(&exp->exp_obd->u.filter, oti);
547 GOTO(cleanup, rc = PTR_ERR(iobuf));
550 dentry = filter_fid2dentry(exp->exp_obd, NULL, obj->ioo_gr,
553 GOTO(cleanup, rc = PTR_ERR(dentry));
556 if (dentry->d_inode == NULL) {
557 CERROR("%s: trying to BRW to non-existent file "LPU64"\n",
558 exp->exp_obd->obd_name, obj->ioo_id);
559 GOTO(cleanup, rc = -ENOENT);
562 fso.fso_dentry = dentry;
563 fso.fso_bufcnt = obj->ioo_bufcnt;
565 fsfilt_check_slow(exp->exp_obd, now, "preprw_write setup");
567 /* Don't update inode timestamps if this write is older than a
568 * setattr which modifies the timestamps. b=10150 */
569 /* XXX when we start having persistent reservations this needs to
570 * be changed to filter_fmd_get() to create the fmd if it doesn't
571 * already exist so we can store the reservation handle there. */
572 fmd = filter_fmd_find(exp, obj->ioo_id, obj->ioo_gr);
575 spin_lock(&exp->exp_obd->obd_osfs_lock);
576 filter_grant_incoming(exp, oa);
577 if (fmd && fmd->fmd_mactime_xid > oti->oti_xid)
578 oa->o_valid &= ~(OBD_MD_FLMTIME | OBD_MD_FLCTIME |
581 obdo_to_inode(dentry->d_inode, oa, OBD_MD_FLATIME |
582 OBD_MD_FLMTIME | OBD_MD_FLCTIME);
585 left = filter_grant_space_left(exp);
587 rc = filter_grant_check(exp, oa, objcount, &fso, niocount, nb, res,
588 &left, dentry->d_inode);
590 /* do not zero out oa->o_valid as it is used in filter_commitrw_write()
591 * for setting UID/GID and fid EA in first write time. */
592 if (oa->o_valid & OBD_MD_FLGRANT)
593 oa->o_grant = filter_grant(exp,oa->o_grant,oa->o_undirty,left);
595 spin_unlock(&exp->exp_obd->obd_osfs_lock);
596 filter_fmd_put(exp, fmd);
601 for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt;
603 /* We still set up for ungranted pages so that granted pages
604 * can be written to disk as they were promised, and portals
605 * needs to keep the pages all aligned properly. */
606 lnb->dentry = dentry;
607 lnb->offset = rnb->offset;
609 lnb->flags = rnb->flags;
612 * ost_brw_write()->ost_nio_pages_get() already initialized
613 * lnb->page to point to the page from the per-thread page
614 * pool (bug 5137), initialize page.
616 LASSERT(lnb->page != NULL);
617 if (lnb->len != CFS_PAGE_SIZE) {
618 memset(kmap(lnb->page) + lnb->len,
619 0, CFS_PAGE_SIZE - lnb->len);
622 lnb->page->index = lnb->offset >> CFS_PAGE_SHIFT;
626 /* If the filter writes a partial page, then has the file
627 * extended, the client will read in the whole page. the
628 * filter has to be careful to zero the rest of the partial
629 * page on disk. we do it by hand for partial extending
630 * writes, send_bio() is responsible for zeroing pages when
631 * asked to read unmapped blocks -- brw_kiovec() does this. */
632 if (lnb->len != CFS_PAGE_SIZE) {
635 maxidx = ((i_size_read(dentry->d_inode) +
636 CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT) - 1;
637 if (maxidx >= lnb->page->index) {
638 LL_CDEBUG_PAGE(D_PAGE, lnb->page, "write %u @ "
639 LPU64" flg %x before EOF %llu\n",
640 lnb->len, lnb->offset,lnb->flags,
641 i_size_read(dentry->d_inode));
642 filter_iobuf_add_page(exp->exp_obd, iobuf,
647 char *p = kmap(lnb->page);
649 off = lnb->offset & ~CFS_PAGE_MASK;
652 off = (lnb->offset + lnb->len) & ~CFS_PAGE_MASK;
654 memset(p + off, 0, CFS_PAGE_SIZE - off);
659 tot_bytes += lnb->len;
662 rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf, exp,
665 fsfilt_check_slow(exp->exp_obd, now, "start_page_write");
667 if (exp->exp_nid_stats && exp->exp_nid_stats->nid_stats)
668 lprocfs_counter_add(exp->exp_nid_stats->nid_stats,
669 LPROC_FILTER_WRITE_BYTES, tot_bytes);
672 switch(cleanup_phase) {
675 filter_iobuf_put(&exp->exp_obd->u.filter, iobuf, oti);
677 pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
682 filter_iobuf_put(&exp->exp_obd->u.filter, iobuf, oti);
684 spin_lock(&exp->exp_obd->obd_osfs_lock);
686 filter_grant_incoming(exp, oa);
687 spin_unlock(&exp->exp_obd->obd_osfs_lock);
688 pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
695 int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
696 int objcount, struct obd_ioobj *obj, int niocount,
697 struct niobuf_remote *nb, struct niobuf_local *res,
698 struct obd_trans_info *oti, struct lustre_capa *capa)
700 if (cmd == OBD_BRW_WRITE)
701 return filter_preprw_write(cmd, exp, oa, objcount, obj,
702 niocount, nb, res, oti, capa);
703 if (cmd == OBD_BRW_READ)
704 return filter_preprw_read(cmd, exp, oa, objcount, obj,
705 niocount, nb, res, oti, capa);
710 void filter_release_read_page(struct filter_obd *filter, struct inode *inode,
716 (i_size_read(inode) > filter->fo_readcache_max_filesize))
719 /* drop from cache like truncate_list_pages() */
720 if (drop && !TryLockPage(page)) {
722 ll_truncate_complete_page(page);
725 page_cache_release(page);
728 static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa,
729 int objcount, struct obd_ioobj *obj,
730 int niocount, struct niobuf_local *res,
731 struct obd_trans_info *oti, int rc)
733 struct inode *inode = NULL;
734 struct ldlm_res_id res_id;
735 struct ldlm_resource *resource = NULL;
736 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
739 osc_build_res_name(obj->ioo_id, obj->ioo_gr, &res_id);
740 /* If oa != NULL then filter_preprw_read updated the inode atime
741 * and we should update the lvb so that other glimpses will also
742 * get the updated value. bug 5972 */
743 if (oa && ns && ns->ns_lvbo && ns->ns_lvbo->lvbo_update) {
744 resource = ldlm_resource_get(ns, NULL, &res_id, LDLM_EXTENT, 0);
746 if (resource != NULL) {
747 ns->ns_lvbo->lvbo_update(resource, NULL, 0, 1);
748 ldlm_resource_putref(resource);
752 if (res->dentry != NULL)
753 inode = res->dentry->d_inode;
755 filter_free_dio_pages(objcount, obj, niocount, res);
757 if (res->dentry != NULL)
762 void flip_into_page_cache(struct inode *inode, struct page *new_page)
764 struct page *old_page;
768 /* the dlm is protecting us from read/write concurrency, so we
769 * expect this find_lock_page to return quickly. even if we
770 * race with another writer it won't be doing much work with
771 * the page locked. we do this 'cause t_c_p expects a
772 * locked page, and it wants to grab the pagecache lock
774 old_page = find_lock_page(inode->i_mapping, new_page->index);
776 ll_truncate_complete_page(old_page);
777 unlock_page(old_page);
778 page_cache_release(old_page);
781 #if 0 /* this should be a /proc tunable someday */
782 /* racing o_directs (no locking ioctl) could race adding
783 * their pages, so we repeat the page invalidation unless
784 * we successfully added our new page */
785 rc = add_to_page_cache_unique(new_page, inode->i_mapping,
787 page_hash(inode->i_mapping,
790 /* add_to_page_cache clears uptodate|dirty and locks
792 SetPageUptodate(new_page);
793 unlock_page(new_page);
801 void filter_grant_commit(struct obd_export *exp, int niocount,
802 struct niobuf_local *res)
804 struct filter_obd *filter = &exp->exp_obd->u.filter;
805 struct niobuf_local *lnb = res;
806 unsigned long pending = 0;
809 spin_lock(&exp->exp_obd->obd_osfs_lock);
810 for (i = 0, lnb = res; i < niocount; i++, lnb++)
811 pending += lnb->lnb_grant_used;
813 LASSERTF(exp->exp_filter_data.fed_pending >= pending,
814 "%s: cli %s/%p fed_pending: %lu grant_used: %lu\n",
815 exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
816 exp->exp_filter_data.fed_pending, pending);
817 exp->exp_filter_data.fed_pending -= pending;
818 LASSERTF(filter->fo_tot_granted >= pending,
819 "%s: cli %s/%p tot_granted: "LPU64" grant_used: %lu\n",
820 exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
821 exp->exp_obd->u.filter.fo_tot_granted, pending);
822 filter->fo_tot_granted -= pending;
823 LASSERTF(filter->fo_tot_pending >= pending,
824 "%s: cli %s/%p tot_pending: "LPU64" grant_used: %lu\n",
825 exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
826 filter->fo_tot_pending, pending);
827 filter->fo_tot_pending -= pending;
829 spin_unlock(&exp->exp_obd->obd_osfs_lock);
832 int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
833 int objcount, struct obd_ioobj *obj, int niocount,
834 struct niobuf_local *res, struct obd_trans_info *oti,
837 if (cmd == OBD_BRW_WRITE)
838 return filter_commitrw_write(exp, oa, objcount, obj, niocount,
840 if (cmd == OBD_BRW_READ)
841 return filter_commitrw_read(exp, oa, objcount, obj, niocount,
847 int filter_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
848 obd_count oa_bufs, struct brw_page *pga,
849 struct obd_trans_info *oti)
851 struct obd_ioobj ioo;
852 struct niobuf_local *lnb;
853 struct niobuf_remote *rnb;
858 OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
859 OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
861 if (lnb == NULL || rnb == NULL)
862 GOTO(out, ret = -ENOMEM);
864 for (i = 0; i < oa_bufs; i++) {
865 lnb[i].page = pga[i].pg;
866 rnb[i].offset = pga[i].off;
867 rnb[i].len = pga[i].count;
870 obdo_to_ioobj(oinfo->oi_oa, &ioo);
871 ioo.ioo_bufcnt = oa_bufs;
873 ret = filter_preprw(cmd, exp, oinfo->oi_oa, 1, &ioo,
874 oa_bufs, rnb, lnb, oti, oinfo_capa(oinfo));
878 ret = filter_commitrw(cmd, exp, oinfo->oi_oa, 1, &ioo,
879 oa_bufs, lnb, oti, ret);
883 OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
885 OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));