1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * linux/fs/obdfilter/filter_io.c
6 * Copyright (c) 2001-2003 Cluster File Systems, Inc.
7 * Author: Peter Braam <braam@clusterfs.com>
8 * Author: Andreas Dilger <adilger@clusterfs.com>
9 * Author: Phil Schwan <phil@clusterfs.com>
11 * This file is part of Lustre, http://www.lustre.org.
13 * Lustre is free software; you can redistribute it and/or
14 * modify it under the terms of version 2 of the GNU General Public
15 * License as published by the Free Software Foundation.
17 * Lustre is distributed in the hope that it will be useful,
18 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 * GNU General Public License for more details.
22 * You should have received a copy of the GNU General Public License
23 * along with Lustre; if not, write to the Free Software
24 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27 #include <linux/config.h>
28 #include <linux/module.h>
29 #include <linux/pagemap.h> // XXX kill me soon
30 #include <linux/version.h>
31 #include <linux/buffer_head.h>
33 #define DEBUG_SUBSYSTEM S_FILTER
35 #include <linux/obd_class.h>
36 #include <linux/lustre_fsfilt.h>
37 #include "filter_internal.h"
39 #warning "implement writeback mode -bzzz"
41 /* 512byte block min */
42 #define MAX_BLOCKS_PER_PAGE (PAGE_SIZE / 512)
44 atomic_t numreqs; /* number of reqs being processed */
45 struct bio *bio_current;/* bio currently being constructed */
46 struct bio *bio_list; /* list of completed bios */
47 wait_queue_head_t dr_wait;
51 int dr_created[MAX_BLOCKS_PER_PAGE];
52 unsigned long dr_blocks[MAX_BLOCKS_PER_PAGE];
57 static int dio_complete_routine(struct bio *bio, unsigned int done, int error)
59 struct dio_request *dreq = bio->bi_private;
62 spin_lock_irqsave(&dreq->dr_lock, flags);
63 bio->bi_private = dreq->bio_list;
65 spin_unlock_irqrestore(&dreq->dr_lock, flags);
66 if (atomic_dec_and_test(&dreq->numreqs))
67 wake_up(&dreq->dr_wait);
68 if (dreq->dr_error == 0)
69 dreq->dr_error = error;
73 static int can_be_merged(struct bio *bio, sector_t sector)
79 size = bio->bi_size >> 9;
80 return bio->bi_sector + size == sector ? 1 : 0;
82 int filter_alloc_iobuf(int rw, int num_pages, void **ret)
84 struct dio_request *dreq;
86 LASSERTF(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ, "%x\n", rw);
88 OBD_ALLOC(dreq, sizeof(*dreq));
92 dreq->bio_list = NULL;
93 init_waitqueue_head(&dreq->dr_wait);
94 atomic_set(&dreq->numreqs, 0);
95 spin_lock_init(&dreq->dr_lock);
96 dreq->dr_num_pages = num_pages;
103 void filter_free_iobuf(void *iobuf)
105 struct dio_request *dreq = iobuf;
108 while (dreq->bio_list) {
109 struct bio *bio = dreq->bio_list;
110 dreq->bio_list = bio->bi_private;
114 OBD_FREE(dreq, sizeof(*dreq));
117 int filter_iobuf_add_page(struct obd_device *obd, void *iobuf,
118 struct inode *inode, struct page *page)
120 struct dio_request *dreq = iobuf;
121 int blocks_per_page = PAGE_SIZE >> inode->i_blkbits;
122 unsigned int len = inode->i_sb->s_blocksize, offs;
123 struct bio *bio = dreq->bio_current;
128 /* get block number for next page */
129 rc = fsfilt_map_inode_pages(obd, inode, &page, 1, dreq->dr_blocks,
131 dreq->dr_rw == OBD_BRW_WRITE, NULL);
135 for (k = 0, offs = 0; k < blocks_per_page; k++, offs += len) {
136 if (dreq->dr_created[k] == -1) {
137 memset(kmap(page) + offs, 0, len);
142 sector = dreq->dr_blocks[k] <<(inode->i_sb->s_blocksize_bits-9);
144 if (!bio || !can_be_merged(bio, sector) ||
145 !bio_add_page(bio, page, len, offs)) {
147 atomic_inc(&dreq->numreqs);
149 filter_tally_write(&obd->u.filter,dreq->maplist,
150 dreq->nr_pages,dreq->blocks,
153 fsfilt_send_bio(dreq->dr_rw, obd, inode, bio);
154 dreq->bio_current = bio = NULL;
156 /* allocate new bio */
157 dreq->bio_current = bio =
158 bio_alloc(GFP_NOIO, dreq->dr_num_pages *
160 bio->bi_bdev = inode->i_sb->s_bdev;
161 bio->bi_sector = sector;
162 bio->bi_end_io = dio_complete_routine;
163 bio->bi_private = dreq;
165 if (!bio_add_page(bio, page, len, offs))
169 dreq->dr_num_pages--;
174 static void filter_clear_page_cache(struct inode *inode, struct kiobuf *iobuf)
180 for (i = 0; i < iobuf->nr_pages ; i++) {
181 page = find_lock_page(inode->i_mapping,
182 iobuf->maplist[i]->index);
185 if (page->mapping != NULL) {
186 block_invalidatepage(page, 0);
187 truncate_complete_page(page);
190 page_cache_release(page);
195 /* Must be called with i_sem taken for writes; this will drop it */
196 int filter_direct_io(int rw, struct dentry *dchild, void *iobuf,
197 struct obd_export *exp, struct iattr *attr,
198 struct obd_trans_info *oti, void **wait_handle)
200 struct dio_request *dreq = iobuf;
201 struct inode *inode = dchild->d_inode;
205 LASSERTF(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ, "%x\n", rw);
207 /* This is nearly osync_inode, without the waiting
208 rc = generic_osync_inode(inode, inode->i_mapping,
209 OSYNC_DATA|OSYNC_METADATA); */
210 rc = filemap_fdatawrite(inode->i_mapping);
212 rc = sync_mapping_buffers(inode->i_mapping);
214 rc = filemap_fdatawait(inode->i_mapping);
218 if (rw == OBD_BRW_WRITE)
221 /* be careful to call this after fsync_inode_data_buffers has waited
222 * for IO to complete before we evict it from the cache */
223 filter_clear_page_cache(inode, iobuf);
225 if (dreq->bio_current != NULL) {
226 atomic_inc(&dreq->numreqs);
227 fsfilt_send_bio(rw, exp->exp_obd, inode, dreq->bio_current);
228 dreq->bio_current = NULL;
231 /* time to wait for I/O completion */
232 wait_event(dreq->dr_wait, atomic_read(&dreq->numreqs) == 0);
235 if (rw == OBD_BRW_WRITE && rc == 0) {
237 filter_tally_write(&obd->u.filter, dreq->maplist,
238 dreq->nr_pages, dreq->blocks,
242 if (attr->ia_size > inode->i_size) {
243 CDEBUG(D_INFO, "setting i_size to "LPU64"\n",
246 attr->ia_valid |= ATTR_SIZE;
248 fsfilt_setattr(exp->exp_obd, dchild, oti->oti_handle,
258 /* See if there are unallocated parts in given file region */
259 static int filter_range_is_mapped(struct inode *inode, obd_size offset, int len)
261 sector_t (*fs_bmap)(struct address_space *, sector_t) =
262 inode->i_mapping->a_ops->bmap;
265 /* We can't know if we are overwriting or not */
269 offset >>= inode->i_blkbits;
270 len >>= inode->i_blkbits;
272 for (j = 0; j <= len; j++)
273 if (fs_bmap(inode->i_mapping, offset + j) == 0)
279 int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
280 int objcount, struct obd_ioobj *obj, int niocount,
281 struct niobuf_local *res, struct obd_trans_info *oti,
284 struct niobuf_local *lnb;
285 struct dio_request *dreq = NULL;
286 struct lvfs_run_ctxt saved;
287 struct fsfilt_objinfo fso;
288 struct iattr iattr = { 0 };
289 struct inode *inode = NULL;
290 unsigned long now = jiffies;
291 int i, err, cleanup_phase = 0;
292 struct obd_device *obd = exp->exp_obd;
296 LASSERT(oti != NULL);
297 LASSERT(objcount == 1);
298 LASSERT(current->journal_info == NULL);
303 inode = res->dentry->d_inode;
305 rc = filter_alloc_iobuf(OBD_BRW_WRITE, obj->ioo_bufcnt, (void **)&dreq);
310 fso.fso_dentry = res->dentry;
311 fso.fso_bufcnt = obj->ioo_bufcnt;
313 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
316 generic_osync_inode(inode, inode->i_mapping, OSYNC_DATA|OSYNC_METADATA);
318 oti->oti_handle = fsfilt_brw_start(obd, objcount, &fso, niocount, res,
320 if (IS_ERR(oti->oti_handle)) {
321 rc = PTR_ERR(oti->oti_handle);
322 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
323 "error starting transaction: rc = %d\n", rc);
324 oti->oti_handle = NULL;
328 /* have to call fsfilt_commit() from this point on */
330 if (time_after(jiffies, now + 15 * HZ))
331 CERROR("slow brw_start %lus\n", (jiffies - now) / HZ);
334 for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++) {
337 /* If overwriting an existing block, we don't need a grant */
338 if (!(lnb->flags & OBD_BRW_GRANTED) && lnb->rc == -ENOSPC &&
339 filter_range_is_mapped(inode, lnb->offset, lnb->len))
342 if (lnb->rc) /* ENOSPC, network RPC error, etc. */
345 err = filter_iobuf_add_page(obd, dreq, inode, lnb->page);
351 /* we expect these pages to be in offset order, but we'll
353 this_size = lnb->offset + lnb->len;
354 if (this_size > iattr.ia_size)
355 iattr.ia_size = this_size;
358 iattr_from_obdo(&iattr,oa,OBD_MD_FLATIME|OBD_MD_FLMTIME|OBD_MD_FLCTIME);
359 rc = filter_direct_io(OBD_BRW_WRITE, res->dentry, dreq, exp, &iattr,
361 rc = filter_finish_transno(exp, oti, rc);
363 if (time_after(jiffies, now + 15 * HZ))
364 CERROR("slow direct_io %lus\n", (jiffies - now) / HZ);
367 err = fsfilt_commit(obd, obd->u.filter.fo_sb, inode, oti->oti_handle,
373 LASSERT(oti->oti_transno <= obd->obd_last_committed);
375 if (time_after(jiffies, now + 15 * HZ))
376 CERROR("slow commitrw commit %lus\n", (jiffies - now) / HZ);
379 filter_grant_commit(exp, niocount, res);
381 switch (cleanup_phase) {
383 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
384 LASSERT(current->journal_info == NULL);
386 filter_free_iobuf(dreq);
388 filter_free_dio_pages(objcount, obj, niocount, res);