Whamcloud - gitweb
b=7049
[fs/lustre-release.git] / lustre / obdfilter / filter_io_26.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/fs/obdfilter/filter_io.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *   Author: Phil Schwan <phil@clusterfs.com>
10  *
11  *   This file is part of Lustre, http://www.lustre.org.
12  *
13  *   Lustre is free software; you can redistribute it and/or
14  *   modify it under the terms of version 2 of the GNU General Public
15  *   License as published by the Free Software Foundation.
16  *
17  *   Lustre is distributed in the hope that it will be useful,
18  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
19  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20  *   GNU General Public License for more details.
21  *
22  *   You should have received a copy of the GNU General Public License
23  *   along with Lustre; if not, write to the Free Software
24  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25  */
26
27 #include <linux/config.h>
28 #include <linux/module.h>
29 #include <linux/pagemap.h> // XXX kill me soon
30 #include <linux/version.h>
31 #include <linux/buffer_head.h>
32
33 #define DEBUG_SUBSYSTEM S_FILTER
34
35 #include <linux/obd_class.h>
36 #include <linux/lustre_fsfilt.h>
37 #include "filter_internal.h"
38
39 /* 512byte block min */
40 #define MAX_BLOCKS_PER_PAGE (PAGE_SIZE / 512)
41 struct dio_request {
42         atomic_t          dr_numreqs;  /* number of reqs being processed */
43         struct bio       *dr_bios;     /* list of completed bios */
44         wait_queue_head_t dr_wait;
45         int               dr_max_pages;
46         int               dr_npages;
47         int               dr_error;
48         struct page     **dr_pages;
49         unsigned long    *dr_blocks;
50         spinlock_t        dr_lock;
51 };
52
53 static int dio_complete_routine(struct bio *bio, unsigned int done, int error)
54 {
55         struct dio_request *dreq = bio->bi_private;
56         unsigned long flags;
57
58         if (bio->bi_size) {
59                 CWARN("gets called against non-complete bio 0x%p: %d/%d/%d\n",
60                       bio, bio->bi_size, done, error);
61                 return 1;
62         }
63
64         if (dreq == NULL) {
65                 CERROR("***** bio->bi_private is NULL!  This should never "
66                        "happen.  Normally, I would crash here, but instead I "
67                        "will dump the bio contents to the console.  Please "
68                        "report this to CFS, along with any interesting messages "
69                        "leading up to this point (like SCSI errors, perhaps).  "
70                        "Because bi_private is NULL, I can't wake up the thread "
71                        "that initiated this I/O -- so you will probably have to "
72                        "reboot this node.");
73                 CERROR("bi_next: %p, bi_flags: %lx, bi_rw: %lu, bi_vcnt: %d, "
74                        "bi_idx: %d, bi->size: %d, bi_end_io: %p, bi_cnt: %d, "
75                        "bi_private: %p\n", bio->bi_next, bio->bi_flags,
76                        bio->bi_rw, bio->bi_vcnt, bio->bi_idx, bio->bi_size,
77                        bio->bi_end_io, atomic_read(&bio->bi_cnt),
78                        bio->bi_private);
79                 return 0;
80         }
81
82         spin_lock_irqsave(&dreq->dr_lock, flags);
83         bio->bi_private = dreq->dr_bios;
84         dreq->dr_bios = bio;
85         if (dreq->dr_error == 0)
86                 dreq->dr_error = error;
87         spin_unlock_irqrestore(&dreq->dr_lock, flags);
88
89         if (atomic_dec_and_test(&dreq->dr_numreqs))
90                 wake_up(&dreq->dr_wait);
91
92         return 0;
93 }
94
95 static int can_be_merged(struct bio *bio, sector_t sector)
96 {
97         unsigned int size;
98         if (!bio)
99                 return 0;
100
101         size = bio->bi_size >> 9;
102         return bio->bi_sector + size == sector ? 1 : 0;
103 }
104
105
106 int filter_alloc_iobuf(int rw, int num_pages, void **ret)
107 {
108         struct dio_request *dreq;
109
110         LASSERTF(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ, "%x\n", rw);
111
112         OBD_ALLOC(dreq, sizeof(*dreq));
113         if (dreq == NULL)
114                 goto failed_0;
115         
116         OBD_ALLOC(dreq->dr_pages, num_pages * sizeof(*dreq->dr_pages));
117         if (dreq->dr_pages == NULL)
118                 goto failed_1;
119         
120         OBD_ALLOC(dreq->dr_blocks,
121                   MAX_BLOCKS_PER_PAGE * num_pages * sizeof(*dreq->dr_blocks));
122         if (dreq->dr_blocks == NULL)
123                 goto failed_2;
124
125         dreq->dr_bios = NULL;
126         init_waitqueue_head(&dreq->dr_wait);
127         atomic_set(&dreq->dr_numreqs, 0);
128         spin_lock_init(&dreq->dr_lock);
129         dreq->dr_max_pages = num_pages;
130         dreq->dr_npages = 0;
131
132         *ret = dreq;
133         RETURN(0);
134         
135  failed_2:
136         OBD_FREE(dreq->dr_pages,
137                  num_pages * sizeof(*dreq->dr_pages));
138  failed_1:
139         OBD_FREE(dreq, sizeof(*dreq));
140  failed_0:
141         RETURN(-ENOMEM);
142 }
143
144 void filter_free_iobuf(void *iobuf)
145 {
146         struct dio_request *dreq = iobuf;
147         int                 num_pages = dreq->dr_max_pages;
148
149         /* free all bios */
150         while (dreq->dr_bios) {
151                 struct bio *bio = dreq->dr_bios;
152                 dreq->dr_bios = bio->bi_private;
153                 bio_put(bio);
154         }
155
156         OBD_FREE(dreq->dr_blocks,
157                  MAX_BLOCKS_PER_PAGE * num_pages * sizeof(*dreq->dr_blocks));
158         OBD_FREE(dreq->dr_pages,
159                  num_pages * sizeof(*dreq->dr_pages));
160         OBD_FREE(dreq, sizeof(*dreq));
161 }
162
163 int filter_iobuf_add_page(struct obd_device *obd, void *iobuf,
164                           struct inode *inode, struct page *page)
165 {
166         struct dio_request *dreq = iobuf;
167
168         LASSERT (dreq->dr_npages < dreq->dr_max_pages);
169         dreq->dr_pages[dreq->dr_npages++] = page;
170
171         return 0;
172 }
173
174 int filter_do_bio(struct obd_device *obd, struct inode *inode,
175                   struct dio_request *dreq, int rw)
176 {
177         int            blocks_per_page = PAGE_SIZE >> inode->i_blkbits;
178         struct page  **pages = dreq->dr_pages;
179         int            npages = dreq->dr_npages;
180         unsigned long *blocks = dreq->dr_blocks;
181         int            total_blocks = npages * blocks_per_page;
182         int            sector_bits = inode->i_sb->s_blocksize_bits - 9;
183         unsigned int   blocksize = inode->i_sb->s_blocksize;
184         struct bio    *bio = NULL;
185         struct page   *page;
186         unsigned int   page_offset;
187         sector_t       sector;
188         int            nblocks;
189         int            block_idx;
190         int            page_idx;
191         int            i;
192         int            rc = 0;
193         ENTRY;
194
195         LASSERT(dreq->dr_npages == npages);
196         LASSERT(total_blocks <= OBDFILTER_CREATED_SCRATCHPAD_ENTRIES);
197
198         for (page_idx = 0, block_idx = 0; 
199              page_idx < npages; 
200              page_idx++, block_idx += blocks_per_page) {
201                         
202                 page = pages[page_idx];
203                 LASSERT (block_idx + blocks_per_page <= total_blocks);
204
205                 for (i = 0, page_offset = 0; 
206                      i < blocks_per_page;
207                      i += nblocks, page_offset += blocksize * nblocks) {
208
209                         nblocks = 1;
210
211                         if (blocks[block_idx + i] == 0) {  /* hole */
212                                 LASSERT(rw == OBD_BRW_READ);
213                                 memset(kmap(page) + page_offset, 0, blocksize);
214                                 kunmap(page);
215                                 continue;
216                         }
217
218                         sector = blocks[block_idx + i] << sector_bits;
219
220                         /* Additional contiguous file blocks? */
221                         while (i + nblocks < blocks_per_page &&
222                                (sector + nblocks*(blocksize>>9)) ==
223                                (blocks[block_idx + i + nblocks] << sector_bits))
224                                 nblocks++;
225
226                         if (bio != NULL &&
227                             can_be_merged(bio, sector) &&
228                             bio_add_page(bio, page, 
229                                          blocksize * nblocks, page_offset) != 0)
230                                 continue;       /* added this frag OK */
231
232                         if (bio != NULL) {
233                                 request_queue_t *q = bdev_get_queue(bio->bi_bdev);
234
235                                 /* Dang! I have to fragment this I/O */
236                                 CDEBUG(D_INODE, "bio++ sz %d vcnt %d(%d) "
237                                        "sectors %d(%d) psg %d(%d) hsg %d(%d)\n",
238                                        bio->bi_size, 
239                                        bio->bi_vcnt, bio->bi_max_vecs,
240                                        bio->bi_size >> 9, q->max_sectors,
241                                        bio_phys_segments(q, bio), 
242                                        q->max_phys_segments,
243                                        bio_hw_segments(q, bio), 
244                                        q->max_hw_segments);
245
246                                 atomic_inc(&dreq->dr_numreqs);
247                                 rc = fsfilt_send_bio(rw, obd, inode, bio);
248                                 if (rc < 0) {
249                                         CERROR("Can't send bio: %d\n", rc);
250                                         /* OK do dec; we do the waiting */
251                                         atomic_dec(&dreq->dr_numreqs);
252                                         goto out;
253                                 }
254                                 rc = 0;
255                                         
256                                 bio = NULL;
257                         }
258
259                         /* allocate new bio */
260                         bio = bio_alloc(GFP_NOIO, 
261                                         (npages - page_idx) * blocks_per_page);
262                         if (bio == NULL) {
263                                 CERROR ("Can't allocate bio\n");
264                                 rc = -ENOMEM;
265                                 goto out;
266                         }
267
268                         bio->bi_bdev = inode->i_sb->s_bdev;
269                         bio->bi_sector = sector;
270                         bio->bi_end_io = dio_complete_routine;
271                         bio->bi_private = dreq;
272
273                         rc = bio_add_page(bio, page, 
274                                           blocksize * nblocks, page_offset);
275                         LASSERT (rc != 0);
276                 }
277         }
278
279         if (bio != NULL) {
280                 atomic_inc(&dreq->dr_numreqs);
281                 rc = fsfilt_send_bio(rw, obd, inode, bio);
282                 if (rc >= 0) {
283                         rc = 0;
284                 } else {
285                         CERROR("Can't send bio: %d\n", rc);
286                         /* OK do dec; we do the waiting */
287                         atomic_dec(&dreq->dr_numreqs);
288                 }
289         }
290                         
291  out:
292         wait_event(dreq->dr_wait, atomic_read(&dreq->dr_numreqs) == 0);
293
294         if (rc == 0)
295                 rc = dreq->dr_error;
296         RETURN(rc);
297 }
298  
299 static void check_metadata(struct super_block *sb, sector_t block)
300 {
301         struct inode *bd_inode = sb->s_bdev->bd_inode;
302         struct address_space *bd_mapping = bd_inode->i_mapping;
303         pgoff_t index;
304         struct buffer_head *bh;
305         struct buffer_head *head;
306         struct page *page;
307
308         index = block >> (PAGE_CACHE_SHIFT - bd_inode->i_blkbits);
309         page = find_get_page(bd_mapping, index);
310         if (!page)
311                 return;
312
313         if (PageDirty(page))
314                 CERROR("page 0x%p/%lu in mapping 0x%p is dirty\n",
315                        page, page->index, bd_mapping);
316
317         spin_lock(&bd_mapping->private_lock);
318         if (!page_has_buffers(page))
319                 goto out_unlock;
320         head = page_buffers(page);
321         bh = head;
322         do {
323                 if (buffer_dirty(bh))
324                         CERROR("buffer 0x%p in page 0x%p/%lu/%u is dirty (0x%p)\n",
325                                bh, page, page->index, (unsigned) block,
326                                page->mapping);
327                 bh = bh->b_this_page;
328         } while (bh != head);
329
330 out_unlock:
331         spin_unlock(&bd_mapping->private_lock);
332         page_cache_release(page);
333         return;
334 }
335
336 /* These are our hacks to keep our directio/bh IO coherent with ext3's
337  * page cache use.  Most notably ext3 reads file data into the page
338  * cache when it is zeroing the tail of partial-block truncates and
339  * leaves it there, sometimes generating io from it at later truncates.
340  * This removes the partial page and its buffers from the page cache,
341  * so it should only ever cause a wait in rare cases, as otherwise we
342  * always do full-page IO to the OST.
343  *
344  * The call to truncate_complete_page() will call journal_invalidatepage()
345  * to free the buffers and drop the page from cache.  The buffers should
346  * not be dirty, because we already called fdatasync/fdatawait on them.
347  */
348 static int filter_clear_page_cache(struct inode *inode,
349                                    struct dio_request *iobuf, int rw)
350 {
351         struct page *page;
352         int i, rc, rc2;
353   
354         /* This is nearly generic_osync_inode, without the waiting on the inode
355         rc = generic_osync_inode(inode, inode->i_mapping,
356                                   OSYNC_DATA|OSYNC_METADATA);
357         */
358         rc = filemap_fdatawrite(inode->i_mapping);
359         rc2 = sync_mapping_buffers(inode->i_mapping);
360         if (rc == 0)
361                 rc = rc2;
362         rc2 = filemap_fdatawait(inode->i_mapping);
363         if (rc == 0)
364                 rc = rc2;
365         if (rc != 0)
366                 RETURN(rc);
367  
368         /* be careful to call this after fsync_inode_data_buffers has waited
369          * for IO to complete before we evict it from the cache */
370         for (i = 0; i < iobuf->dr_npages; i++) {
371                 page = find_lock_page(inode->i_mapping,
372                                        iobuf->dr_pages[i]->index);
373                 if (page == NULL)
374                        continue;
375                 if (page->mapping != NULL) {
376                        wait_on_page_writeback(page);
377                        ll_truncate_complete_page(page);
378                 }
379   
380                 unlock_page(page);
381                 page_cache_release(page);
382
383                 if (rw == OBD_BRW_WRITE)
384                         check_metadata(inode->i_sb, iobuf->dr_blocks[i]);
385         }
386         return 0;
387 }
388 /* Must be called with i_sem taken for writes; this will drop it */
389 int filter_direct_io(int rw, struct dentry *dchild, void *iobuf,
390                      struct obd_export *exp, struct iattr *attr,
391                      struct obd_trans_info *oti, void **wait_handle)
392 {
393         struct obd_device *obd = exp->exp_obd;
394         struct inode *inode = dchild->d_inode;
395         struct dio_request *dreq = iobuf;
396         int rc, rc2;
397         ENTRY;
398
399         LASSERTF(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ, "%x\n", rw);
400         LASSERTF(dreq->dr_npages <= dreq->dr_max_pages, "%d,%d\n",
401                  dreq->dr_npages, dreq->dr_max_pages);
402
403         if (dreq->dr_npages == 0)
404                 RETURN(0);
405
406         if (dreq->dr_npages > OBDFILTER_CREATED_SCRATCHPAD_ENTRIES)
407                 RETURN(-EINVAL);
408         
409         rc = fsfilt_map_inode_pages(obd, inode,
410                                     dreq->dr_pages, dreq->dr_npages,
411                                     dreq->dr_blocks,
412                                     obdfilter_created_scratchpad,
413                                     rw == OBD_BRW_WRITE, NULL);
414
415         if (rw == OBD_BRW_WRITE) {
416                 if (rc == 0) {
417 #if 0
418                         filter_tally_write(&obd->u.filter, 
419                                            dreq->dr_pages,
420                                            dreq->dr_page_idx,
421                                            dreq->dr_blocks,
422                                            blocks_per_page);
423 #endif
424                         if (attr->ia_size > inode->i_size)
425                                 attr->ia_valid |= ATTR_SIZE;
426                         rc = fsfilt_setattr(obd, dchild, 
427                                             oti->oti_handle, attr, 0);
428                 }
429                 
430                 up(&inode->i_sem);
431
432                 rc2 = filter_finish_transno(exp, oti, 0);
433                 if (rc2 != 0)
434                         CERROR("can't close transaction: %d\n", rc);
435                 rc = (rc == 0) ? rc2 : rc;
436
437                 rc2 = fsfilt_commit_async(obd,inode,oti->oti_handle,wait_handle);
438                 rc = (rc == 0) ? rc2 : rc;
439
440                 if (rc != 0)
441                         RETURN(rc);
442         }
443
444         rc = filter_clear_page_cache(inode, dreq, rw);
445         if (rc != 0)
446                 RETURN(rc);
447
448         RETURN(filter_do_bio(obd, inode, dreq, rw));
449 }
450
451 /* See if there are unallocated parts in given file region */
452 static int filter_range_is_mapped(struct inode *inode, obd_size offset, int len)
453 {
454         sector_t (*fs_bmap)(struct address_space *, sector_t) =
455                 inode->i_mapping->a_ops->bmap;
456         int j;
457
458         /* We can't know if we are overwriting or not */
459         if (fs_bmap == NULL)
460                 return 0;
461
462         offset >>= inode->i_blkbits;
463         len >>= inode->i_blkbits;
464
465         for (j = 0; j <= len; j++)
466                 if (fs_bmap(inode->i_mapping, offset + j) == 0)
467                         return 0;
468
469         return 1;
470 }
471
472 int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
473                           int objcount, struct obd_ioobj *obj, int niocount,
474                           struct niobuf_local *res, struct obd_trans_info *oti,
475                           int rc)
476 {
477         struct niobuf_local *lnb;
478         struct dio_request *dreq = NULL;
479         struct lvfs_run_ctxt saved;
480         struct fsfilt_objinfo fso;
481         struct iattr iattr = { 0 };
482         struct inode *inode = NULL;
483         unsigned long now = jiffies;
484         int i, err, cleanup_phase = 0;
485         struct obd_device *obd = exp->exp_obd;
486         void *wait_handle = NULL;
487         int   total_size = 0;
488         loff_t old_size;
489         ENTRY;
490
491         LASSERT(oti != NULL);
492         LASSERT(objcount == 1);
493         LASSERT(current->journal_info == NULL);
494
495         if (rc != 0)
496                 GOTO(cleanup, rc);
497         
498         rc = filter_alloc_iobuf(OBD_BRW_WRITE, obj->ioo_bufcnt, (void **)&dreq);
499         if (rc)
500                 GOTO(cleanup, rc);
501         cleanup_phase = 1;
502
503         fso.fso_dentry = res->dentry;
504         fso.fso_bufcnt = obj->ioo_bufcnt;
505         inode = res->dentry->d_inode;
506
507         for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++) {
508                 loff_t this_size;
509
510                 /* If overwriting an existing block, we don't need a grant */
511                 if (!(lnb->flags & OBD_BRW_GRANTED) && lnb->rc == -ENOSPC &&
512                     filter_range_is_mapped(inode, lnb->offset, lnb->len))
513                         lnb->rc = 0;
514
515                 if (lnb->rc) { /* ENOSPC, network RPC error, etc. */
516                         CDEBUG(D_INODE, "Skipping [%d] == %d\n", i, lnb->rc);
517                         continue;
518                 }
519
520                 err = filter_iobuf_add_page(obd, dreq, inode, lnb->page);
521                 LASSERT (err == 0);
522
523                 total_size += lnb->len;
524
525                 /* we expect these pages to be in offset order, but we'll
526                  * be forgiving */
527                 this_size = lnb->offset + lnb->len;
528                 if (this_size > iattr.ia_size)
529                         iattr.ia_size = this_size;
530         }
531 #if 0
532         /* I use this when I'm checking our lovely 1M I/Os reach the disk -eeb */
533         if (total_size != (1<<20))
534                 CWARN("total size %d (%d pages)\n", 
535                       total_size, total_size/PAGE_SIZE);
536 #endif
537         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
538         cleanup_phase = 2;
539
540         down(&inode->i_sem);
541         old_size = inode->i_size;
542         oti->oti_handle = fsfilt_brw_start(obd, objcount, &fso, niocount, res,
543                                            oti);
544         if (IS_ERR(oti->oti_handle)) {
545                 up(&inode->i_sem);
546                 rc = PTR_ERR(oti->oti_handle);
547                 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
548                        "error starting transaction: rc = %d\n", rc);
549                 oti->oti_handle = NULL;
550                 GOTO(cleanup, rc);
551         }
552         /* have to call fsfilt_commit() from this point on */
553
554         fsfilt_check_slow(now, obd_timeout, "brw_start");
555
556         iattr_from_obdo(&iattr,oa,OBD_MD_FLATIME|OBD_MD_FLMTIME|OBD_MD_FLCTIME);
557         /* filter_direct_io drops i_sem */
558         rc = filter_direct_io(OBD_BRW_WRITE, res->dentry, dreq, exp, &iattr,
559                               oti, &wait_handle);
560
561 #if 0
562         if (inode->i_size != old_size) {
563                 struct llog_cookie *cookie = obdo_logcookie(oa);
564                 struct lustre_id *id = obdo_id(oa);
565                 filter_log_sz_change(obd, id, oa->o_easize, cookie, inode);
566         }
567 #endif
568
569         if (rc == 0)
570                 obdo_from_inode(oa, inode, FILTER_VALID_FLAGS);
571
572         fsfilt_check_slow(now, obd_timeout, "direct_io");
573
574         err = fsfilt_commit_wait(obd, inode, wait_handle);
575         if (rc == 0)
576                 rc = err;
577    
578         fsfilt_check_slow(now, obd_timeout, "commitrw commit");
579
580 cleanup:
581         filter_grant_commit(exp, niocount, res);
582
583         switch (cleanup_phase) {
584         case 2:
585                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
586                 LASSERT(current->journal_info == NULL);
587         case 1:
588                 filter_free_iobuf(dreq);
589         case 0:
590                 filter_free_dio_pages(objcount, obj, niocount, res);
591                 f_dput(res->dentry);
592         }
593
594         RETURN(rc);
595 }