1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * linux/fs/obdfilter/filter_io.c
6 * Copyright (c) 2001-2003 Cluster File Systems, Inc.
7 * Author: Peter Braam <braam@clusterfs.com>
8 * Author: Andreas Dilger <adilger@clusterfs.com>
9 * Author: Phil Schwan <phil@clusterfs.com>
11 * This file is part of Lustre, http://www.lustre.org.
13 * Lustre is free software; you can redistribute it and/or
14 * modify it under the terms of version 2 of the GNU General Public
15 * License as published by the Free Software Foundation.
17 * Lustre is distributed in the hope that it will be useful,
18 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 * GNU General Public License for more details.
22 * You should have received a copy of the GNU General Public License
23 * along with Lustre; if not, write to the Free Software
24 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27 #include <linux/config.h>
28 #include <linux/module.h>
29 #include <linux/pagemap.h> // XXX kill me soon
30 #include <linux/version.h>
31 #include <linux/buffer_head.h>
33 #define DEBUG_SUBSYSTEM S_FILTER
35 #include <linux/obd_class.h>
36 #include <linux/lustre_fsfilt.h>
37 #include "filter_internal.h"
39 /* 512byte block min */
40 #define MAX_BLOCKS_PER_PAGE (PAGE_SIZE / 512)
42 atomic_t dr_numreqs; /* number of reqs being processed */
43 struct bio *dr_bios; /* list of completed bios */
44 wait_queue_head_t dr_wait;
48 struct page **dr_pages;
49 unsigned long *dr_blocks;
53 static int dio_complete_routine(struct bio *bio, unsigned int done, int error)
55 struct dio_request *dreq = bio->bi_private;
59 CWARN("gets called against non-complete bio 0x%p: %d/%d/%d\n",
60 bio, bio->bi_size, done, error);
65 CERROR("***** bio->bi_private is NULL! This should never "
66 "happen. Normally, I would crash here, but instead I "
67 "will dump the bio contents to the console. Please "
68 "report this to CFS, along with any interesting messages "
69 "leading up to this point (like SCSI errors, perhaps). "
70 "Because bi_private is NULL, I can't wake up the thread "
71 "that initiated this I/O -- so you will probably have to "
73 CERROR("bi_next: %p, bi_flags: %lx, bi_rw: %lu, bi_vcnt: %d, "
74 "bi_idx: %d, bi->size: %d, bi_end_io: %p, bi_cnt: %d, "
75 "bi_private: %p\n", bio->bi_next, bio->bi_flags,
76 bio->bi_rw, bio->bi_vcnt, bio->bi_idx, bio->bi_size,
77 bio->bi_end_io, atomic_read(&bio->bi_cnt),
82 spin_lock_irqsave(&dreq->dr_lock, flags);
83 bio->bi_private = dreq->dr_bios;
85 if (dreq->dr_error == 0)
86 dreq->dr_error = error;
87 spin_unlock_irqrestore(&dreq->dr_lock, flags);
89 if (atomic_dec_and_test(&dreq->dr_numreqs))
90 wake_up(&dreq->dr_wait);
95 static int can_be_merged(struct bio *bio, sector_t sector)
101 size = bio->bi_size >> 9;
102 return bio->bi_sector + size == sector ? 1 : 0;
106 int filter_alloc_iobuf(int rw, int num_pages, void **ret)
108 struct dio_request *dreq;
110 LASSERTF(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ, "%x\n", rw);
112 OBD_ALLOC(dreq, sizeof(*dreq));
116 OBD_ALLOC(dreq->dr_pages, num_pages * sizeof(*dreq->dr_pages));
117 if (dreq->dr_pages == NULL)
120 OBD_ALLOC(dreq->dr_blocks,
121 MAX_BLOCKS_PER_PAGE * num_pages * sizeof(*dreq->dr_blocks));
122 if (dreq->dr_blocks == NULL)
125 dreq->dr_bios = NULL;
126 init_waitqueue_head(&dreq->dr_wait);
127 atomic_set(&dreq->dr_numreqs, 0);
128 spin_lock_init(&dreq->dr_lock);
129 dreq->dr_max_pages = num_pages;
136 OBD_FREE(dreq->dr_pages,
137 num_pages * sizeof(*dreq->dr_pages));
139 OBD_FREE(dreq, sizeof(*dreq));
144 void filter_free_iobuf(void *iobuf)
146 struct dio_request *dreq = iobuf;
147 int num_pages = dreq->dr_max_pages;
150 while (dreq->dr_bios) {
151 struct bio *bio = dreq->dr_bios;
152 dreq->dr_bios = bio->bi_private;
156 OBD_FREE(dreq->dr_blocks,
157 MAX_BLOCKS_PER_PAGE * num_pages * sizeof(*dreq->dr_blocks));
158 OBD_FREE(dreq->dr_pages,
159 num_pages * sizeof(*dreq->dr_pages));
160 OBD_FREE(dreq, sizeof(*dreq));
163 int filter_iobuf_add_page(struct obd_device *obd, void *iobuf,
164 struct inode *inode, struct page *page)
166 struct dio_request *dreq = iobuf;
168 LASSERT (dreq->dr_npages < dreq->dr_max_pages);
169 dreq->dr_pages[dreq->dr_npages++] = page;
174 int filter_do_bio(struct obd_device *obd, struct inode *inode,
175 struct dio_request *dreq, int rw)
177 int blocks_per_page = PAGE_SIZE >> inode->i_blkbits;
178 struct page **pages = dreq->dr_pages;
179 int npages = dreq->dr_npages;
180 unsigned long *blocks = dreq->dr_blocks;
181 int total_blocks = npages * blocks_per_page;
182 int sector_bits = inode->i_sb->s_blocksize_bits - 9;
183 unsigned int blocksize = inode->i_sb->s_blocksize;
184 struct bio *bio = NULL;
186 unsigned int page_offset;
195 LASSERT(dreq->dr_npages == npages);
196 LASSERT(total_blocks <= OBDFILTER_CREATED_SCRATCHPAD_ENTRIES);
198 for (page_idx = 0, block_idx = 0;
200 page_idx++, block_idx += blocks_per_page) {
202 page = pages[page_idx];
203 LASSERT (block_idx + blocks_per_page <= total_blocks);
205 for (i = 0, page_offset = 0;
207 i += nblocks, page_offset += blocksize * nblocks) {
211 if (blocks[block_idx + i] == 0) { /* hole */
212 LASSERT(rw == OBD_BRW_READ);
213 memset(kmap(page) + page_offset, 0, blocksize);
218 sector = blocks[block_idx + i] << sector_bits;
220 /* Additional contiguous file blocks? */
221 while (i + nblocks < blocks_per_page &&
222 (sector + nblocks*(blocksize>>9)) ==
223 (blocks[block_idx + i + nblocks] << sector_bits))
227 can_be_merged(bio, sector) &&
228 bio_add_page(bio, page,
229 blocksize * nblocks, page_offset) != 0)
230 continue; /* added this frag OK */
233 request_queue_t *q = bdev_get_queue(bio->bi_bdev);
235 /* Dang! I have to fragment this I/O */
236 CDEBUG(D_INODE, "bio++ sz %d vcnt %d(%d) "
237 "sectors %d(%d) psg %d(%d) hsg %d(%d)\n",
239 bio->bi_vcnt, bio->bi_max_vecs,
240 bio->bi_size >> 9, q->max_sectors,
241 bio_phys_segments(q, bio),
242 q->max_phys_segments,
243 bio_hw_segments(q, bio),
246 atomic_inc(&dreq->dr_numreqs);
247 rc = fsfilt_send_bio(rw, obd, inode, bio);
249 CERROR("Can't send bio: %d\n", rc);
250 /* OK do dec; we do the waiting */
251 atomic_dec(&dreq->dr_numreqs);
259 /* allocate new bio */
260 bio = bio_alloc(GFP_NOIO,
261 (npages - page_idx) * blocks_per_page);
263 CERROR ("Can't allocate bio\n");
268 bio->bi_bdev = inode->i_sb->s_bdev;
269 bio->bi_sector = sector;
270 bio->bi_end_io = dio_complete_routine;
271 bio->bi_private = dreq;
273 rc = bio_add_page(bio, page,
274 blocksize * nblocks, page_offset);
280 atomic_inc(&dreq->dr_numreqs);
281 rc = fsfilt_send_bio(rw, obd, inode, bio);
285 CERROR("Can't send bio: %d\n", rc);
286 /* OK do dec; we do the waiting */
287 atomic_dec(&dreq->dr_numreqs);
292 wait_event(dreq->dr_wait, atomic_read(&dreq->dr_numreqs) == 0);
299 /* These are our hacks to keep our directio/bh IO coherent with ext3's
300 * page cache use. Most notably ext3 reads file data into the page
301 * cache when it is zeroing the tail of partial-block truncates and
302 * leaves it there, sometimes generating io from it at later truncates.
303 * This removes the partial page and its buffers from the page cache,
304 * so it should only ever cause a wait in rare cases, as otherwise we
305 * always do full-page IO to the OST.
307 * The call to truncate_complete_page() will call journal_invalidatepage()
308 * to free the buffers and drop the page from cache. The buffers should
309 * not be dirty, because we already called fdatasync/fdatawait on them.
311 static int filter_clear_page_cache(struct inode *inode,
312 struct dio_request *iobuf)
317 /* This is nearly generic_osync_inode, without the waiting on the inode
318 rc = generic_osync_inode(inode, inode->i_mapping,
319 OSYNC_DATA|OSYNC_METADATA);
321 rc = filemap_fdatawrite(inode->i_mapping);
322 rc2 = sync_mapping_buffers(inode->i_mapping);
325 rc2 = filemap_fdatawait(inode->i_mapping);
331 /* be careful to call this after fsync_inode_data_buffers has waited
332 * for IO to complete before we evict it from the cache */
333 for (i = 0; i < iobuf->dr_npages; i++) {
334 page = find_lock_page(inode->i_mapping,
335 iobuf->dr_pages[i]->index);
338 if (page->mapping != NULL) {
339 wait_on_page_writeback(page);
340 ll_truncate_complete_page(page);
344 page_cache_release(page);
348 /* Must be called with i_sem taken for writes; this will drop it */
349 int filter_direct_io(int rw, struct dentry *dchild, void *iobuf,
350 struct obd_export *exp, struct iattr *attr,
351 struct obd_trans_info *oti, void **wait_handle)
353 struct obd_device *obd = exp->exp_obd;
354 struct inode *inode = dchild->d_inode;
355 struct dio_request *dreq = iobuf;
359 LASSERTF(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ, "%x\n", rw);
360 LASSERTF(dreq->dr_npages <= dreq->dr_max_pages, "%d,%d\n",
361 dreq->dr_npages, dreq->dr_max_pages);
363 if (dreq->dr_npages == 0)
366 if (dreq->dr_npages > OBDFILTER_CREATED_SCRATCHPAD_ENTRIES)
369 rc = fsfilt_map_inode_pages(obd, inode,
370 dreq->dr_pages, dreq->dr_npages,
372 obdfilter_created_scratchpad,
373 rw == OBD_BRW_WRITE, NULL);
375 if (rw == OBD_BRW_WRITE) {
378 filter_tally_write(&obd->u.filter,
384 if (attr->ia_size > inode->i_size)
385 attr->ia_valid |= ATTR_SIZE;
386 rc = fsfilt_setattr(obd, dchild,
387 oti->oti_handle, attr, 0);
392 rc2 = filter_finish_transno(exp, oti, 0);
394 CERROR("can't close transaction: %d\n", rc);
402 rc = filter_clear_page_cache(inode, dreq);
406 RETURN(filter_do_bio(obd, inode, dreq, rw));
409 /* See if there are unallocated parts in given file region */
410 static int filter_range_is_mapped(struct inode *inode, obd_size offset, int len)
412 sector_t (*fs_bmap)(struct address_space *, sector_t) =
413 inode->i_mapping->a_ops->bmap;
416 /* We can't know if we are overwriting or not */
420 offset >>= inode->i_blkbits;
421 len >>= inode->i_blkbits;
423 for (j = 0; j <= len; j++)
424 if (fs_bmap(inode->i_mapping, offset + j) == 0)
430 int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
431 int objcount, struct obd_ioobj *obj, int niocount,
432 struct niobuf_local *res, struct obd_trans_info *oti,
435 struct niobuf_local *lnb;
436 struct dio_request *dreq = NULL;
437 struct lvfs_run_ctxt saved;
438 struct fsfilt_objinfo fso;
439 struct iattr iattr = { 0 };
440 struct inode *inode = NULL;
441 unsigned long now = jiffies;
442 int i, err, cleanup_phase = 0;
443 struct obd_device *obd = exp->exp_obd;
448 LASSERT(oti != NULL);
449 LASSERT(objcount == 1);
450 LASSERT(current->journal_info == NULL);
455 rc = filter_alloc_iobuf(OBD_BRW_WRITE, obj->ioo_bufcnt, (void **)&dreq);
460 fso.fso_dentry = res->dentry;
461 fso.fso_bufcnt = obj->ioo_bufcnt;
462 inode = res->dentry->d_inode;
464 for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++) {
467 /* If overwriting an existing block, we don't need a grant */
468 if (!(lnb->flags & OBD_BRW_GRANTED) && lnb->rc == -ENOSPC &&
469 filter_range_is_mapped(inode, lnb->offset, lnb->len))
472 if (lnb->rc) { /* ENOSPC, network RPC error, etc. */
473 CDEBUG(D_INODE, "Skipping [%d] == %d\n", i, lnb->rc);
477 err = filter_iobuf_add_page(obd, dreq, inode, lnb->page);
480 total_size += lnb->len;
482 /* we expect these pages to be in offset order, but we'll
484 this_size = lnb->offset + lnb->len;
485 if (this_size > iattr.ia_size)
486 iattr.ia_size = this_size;
489 /* I use this when I'm checking our lovely 1M I/Os reach the disk -eeb */
490 if (total_size != (1<<20))
491 CWARN("total size %d (%d pages)\n",
492 total_size, total_size/PAGE_SIZE);
494 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
498 old_size = inode->i_size;
499 oti->oti_handle = fsfilt_brw_start(obd, objcount, &fso, niocount, res,
501 if (IS_ERR(oti->oti_handle)) {
503 rc = PTR_ERR(oti->oti_handle);
504 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
505 "error starting transaction: rc = %d\n", rc);
506 oti->oti_handle = NULL;
509 /* have to call fsfilt_commit() from this point on */
511 fsfilt_check_slow(now, obd_timeout, "brw_start");
513 iattr_from_obdo(&iattr,oa,OBD_MD_FLATIME|OBD_MD_FLMTIME|OBD_MD_FLCTIME);
514 /* filter_direct_io drops i_sem */
515 rc = filter_direct_io(OBD_BRW_WRITE, res->dentry, dreq, exp, &iattr,
519 if (inode->i_size != old_size) {
520 struct llog_cookie *cookie = obdo_logcookie(oa);
521 struct lustre_id *id = obdo_id(oa);
522 filter_log_sz_change(obd, id, oa->o_easize, cookie, inode);
527 obdo_from_inode(oa, inode, FILTER_VALID_FLAGS);
529 fsfilt_check_slow(now, obd_timeout, "direct_io");
531 err = fsfilt_commit(obd, obd->u.filter.fo_sb, inode, oti->oti_handle,
536 if (obd_sync_filter && !err)
537 LASSERTF(oti->oti_transno <= obd->obd_last_committed,
538 "oti_transno "LPU64" last_committed "LPU64"\n",
539 oti->oti_transno, obd->obd_last_committed);
541 fsfilt_check_slow(now, obd_timeout, "commitrw commit");
544 filter_grant_commit(exp, niocount, res);
546 switch (cleanup_phase) {
548 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
549 LASSERT(current->journal_info == NULL);
551 filter_free_iobuf(dreq);
553 filter_free_dio_pages(objcount, obj, niocount, res);