Whamcloud - gitweb
b=6817
[fs/lustre-release.git] / lustre / obdfilter / filter_io_26.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/fs/obdfilter/filter_io.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *   Author: Phil Schwan <phil@clusterfs.com>
10  *
11  *   This file is part of Lustre, http://www.lustre.org.
12  *
13  *   Lustre is free software; you can redistribute it and/or
14  *   modify it under the terms of version 2 of the GNU General Public
15  *   License as published by the Free Software Foundation.
16  *
17  *   Lustre is distributed in the hope that it will be useful,
18  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
19  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20  *   GNU General Public License for more details.
21  *
22  *   You should have received a copy of the GNU General Public License
23  *   along with Lustre; if not, write to the Free Software
24  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25  */
26
27 #include <linux/config.h>
28 #include <linux/module.h>
29 #include <linux/pagemap.h> // XXX kill me soon
30 #include <linux/version.h>
31 #include <linux/buffer_head.h>
32
33 #define DEBUG_SUBSYSTEM S_FILTER
34
35 #include <linux/obd_class.h>
36 #include <linux/lustre_fsfilt.h>
37 #include "filter_internal.h"
38
39 #warning "implement writeback mode -bzzz"
40
41 /* 512byte block min */
42 #define MAX_BLOCKS_PER_PAGE (PAGE_SIZE / 512)
43 struct dio_request {
44         atomic_t          dr_numreqs;  /* number of reqs being processed */
45         struct bio       *dr_bios;     /* list of completed bios */
46         wait_queue_head_t dr_wait;
47         int               dr_max_pages;
48         int               dr_npages;
49         int               dr_error;
50         struct page     **dr_pages;
51         unsigned long    *dr_blocks;
52         spinlock_t        dr_lock;
53 };
54
55 static int dio_complete_routine(struct bio *bio, unsigned int done, int error)
56 {
57         struct dio_request *dreq = bio->bi_private;
58         unsigned long flags;
59
60         if (bio->bi_size) {
61                 CWARN("gets called against non-complete bio 0x%p: %d/%d/%d\n",
62                       bio, bio->bi_size, done, error);
63                 return 1;
64         }
65
66         if (dreq == NULL) {
67                 CERROR("***** bio->bi_private is NULL!  This should never "
68                        "happen.  Normally, I would crash here, but instead I "
69                        "will dump the bio contents to the console.  Please "
70                        "report this to CFS, along with any interesting messages "
71                        "leading up to this point (like SCSI errors, perhaps).  "
72                        "Because bi_private is NULL, I can't wake up the thread "
73                        "that initiated this I/O -- so you will probably have to "
74                        "reboot this node.");
75                 CERROR("bi_next: %p, bi_flags: %lx, bi_rw: %lu, bi_vcnt: %d, "
76                        "bi_idx: %d, bi->size: %d, bi_end_io: %p, bi_cnt: %d, "
77                        "bi_private: %p\n", bio->bi_next, bio->bi_flags,
78                        bio->bi_rw, bio->bi_vcnt, bio->bi_idx, bio->bi_size,
79                        bio->bi_end_io, atomic_read(&bio->bi_cnt),
80                        bio->bi_private);
81                 return 0;
82         }
83
84         spin_lock_irqsave(&dreq->dr_lock, flags);
85         bio->bi_private = dreq->dr_bios;
86         dreq->dr_bios = bio;
87         if (dreq->dr_error == 0)
88                 dreq->dr_error = error;
89         spin_unlock_irqrestore(&dreq->dr_lock, flags);
90
91         if (atomic_dec_and_test(&dreq->dr_numreqs))
92                 wake_up(&dreq->dr_wait);
93
94         return 0;
95 }
96
97 static int can_be_merged(struct bio *bio, sector_t sector)
98 {
99         unsigned int size;
100         if (!bio)
101                 return 0;
102
103         size = bio->bi_size >> 9;
104         return bio->bi_sector + size == sector ? 1 : 0;
105 }
106
107
108 int filter_alloc_iobuf(int rw, int num_pages, void **ret)
109 {
110         struct dio_request *dreq;
111
112         LASSERTF(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ, "%x\n", rw);
113
114         OBD_ALLOC(dreq, sizeof(*dreq));
115         if (dreq == NULL)
116                 goto failed_0;
117         
118         OBD_ALLOC(dreq->dr_pages, num_pages * sizeof(*dreq->dr_pages));
119         if (dreq->dr_pages == NULL)
120                 goto failed_1;
121         
122         OBD_ALLOC(dreq->dr_blocks,
123                   MAX_BLOCKS_PER_PAGE * num_pages * sizeof(*dreq->dr_blocks));
124         if (dreq->dr_blocks == NULL)
125                 goto failed_2;
126
127         dreq->dr_bios = NULL;
128         init_waitqueue_head(&dreq->dr_wait);
129         atomic_set(&dreq->dr_numreqs, 0);
130         spin_lock_init(&dreq->dr_lock);
131         dreq->dr_max_pages = num_pages;
132         dreq->dr_npages = 0;
133
134         *ret = dreq;
135         RETURN(0);
136         
137  failed_2:
138         OBD_FREE(dreq->dr_pages,
139                  num_pages * sizeof(*dreq->dr_pages));
140  failed_1:
141         OBD_FREE(dreq, sizeof(*dreq));
142  failed_0:
143         RETURN(-ENOMEM);
144 }
145
146 void filter_free_iobuf(void *iobuf)
147 {
148         struct dio_request *dreq = iobuf;
149         int                 num_pages = dreq->dr_max_pages;
150
151         /* free all bios */
152         while (dreq->dr_bios) {
153                 struct bio *bio = dreq->dr_bios;
154                 dreq->dr_bios = bio->bi_private;
155                 bio_put(bio);
156         }
157
158         OBD_FREE(dreq->dr_blocks,
159                  MAX_BLOCKS_PER_PAGE * num_pages * sizeof(*dreq->dr_blocks));
160         OBD_FREE(dreq->dr_pages,
161                  num_pages * sizeof(*dreq->dr_pages));
162         OBD_FREE(dreq, sizeof(*dreq));
163 }
164
165 int filter_iobuf_add_page(struct obd_device *obd, void *iobuf,
166                           struct inode *inode, struct page *page)
167 {
168         struct dio_request *dreq = iobuf;
169
170         LASSERT (dreq->dr_npages < dreq->dr_max_pages);
171         dreq->dr_pages[dreq->dr_npages++] = page;
172
173         return 0;
174 }
175
176 int filter_do_bio(struct obd_device *obd, struct inode *inode,
177                   struct dio_request *dreq, int rw)
178 {
179         int            blocks_per_page = PAGE_SIZE >> inode->i_blkbits;
180         struct page  **pages = dreq->dr_pages;
181         int            npages = dreq->dr_npages;
182         unsigned long *blocks = dreq->dr_blocks;
183         int            total_blocks = npages * blocks_per_page;
184         int            sector_bits = inode->i_sb->s_blocksize_bits - 9;
185         unsigned int   blocksize = inode->i_sb->s_blocksize;
186         struct bio    *bio = NULL;
187         struct page   *page;
188         unsigned int   page_offset;
189         sector_t       sector;
190         int            nblocks;
191         int            block_idx;
192         int            page_idx;
193         int            i;
194         int            rc = 0;
195         ENTRY;
196
197         LASSERT(dreq->dr_npages == npages);
198         LASSERT(total_blocks <= OBDFILTER_CREATED_SCRATCHPAD_ENTRIES);
199
200         for (page_idx = 0, block_idx = 0; 
201              page_idx < npages; 
202              page_idx++, block_idx += blocks_per_page) {
203                         
204                 page = pages[page_idx];
205                 LASSERT (block_idx + blocks_per_page <= total_blocks);
206
207                 for (i = 0, page_offset = 0; 
208                      i < blocks_per_page;
209                      i += nblocks, page_offset += blocksize * nblocks) {
210
211                         nblocks = 1;
212
213                         if (blocks[block_idx + i] == 0) {  /* hole */
214                                 LASSERT(rw == OBD_BRW_READ);
215                                 memset(kmap(page) + page_offset, 0, blocksize);
216                                 kunmap(page);
217                                 continue;
218                         }
219
220                         sector = blocks[block_idx + i] << sector_bits;
221
222                         /* Additional contiguous file blocks? */
223                         while (i + nblocks < blocks_per_page &&
224                                (sector + nblocks*(blocksize>>9)) ==
225                                (blocks[block_idx + i + nblocks] << sector_bits))
226                                 nblocks++;
227
228                         if (bio != NULL &&
229                             can_be_merged(bio, sector) &&
230                             bio_add_page(bio, page, 
231                                          blocksize * nblocks, page_offset) != 0)
232                                 continue;       /* added this frag OK */
233
234                         if (bio != NULL) {
235                                 request_queue_t *q = bdev_get_queue(bio->bi_bdev);
236
237                                 /* Dang! I have to fragment this I/O */
238                                 CDEBUG(D_INODE, "bio++ sz %d vcnt %d(%d) "
239                                        "sectors %d(%d) psg %d(%d) hsg %d(%d)\n",
240                                        bio->bi_size, 
241                                        bio->bi_vcnt, bio->bi_max_vecs,
242                                        bio->bi_size >> 9, q->max_sectors,
243                                        bio_phys_segments(q, bio), 
244                                        q->max_phys_segments,
245                                        bio_hw_segments(q, bio), 
246                                        q->max_hw_segments);
247
248                                 atomic_inc(&dreq->dr_numreqs);
249                                 rc = fsfilt_send_bio(rw, obd, inode, bio);
250                                 if (rc < 0) {
251                                         CERROR("Can't send bio: %d\n", rc);
252                                         /* OK do dec; we do the waiting */
253                                         atomic_dec(&dreq->dr_numreqs);
254                                         goto out;
255                                 }
256                                 rc = 0;
257                                         
258                                 bio = NULL;
259                         }
260
261                         /* allocate new bio */
262                         bio = bio_alloc(GFP_NOIO, 
263                                         (npages - page_idx) * blocks_per_page);
264                         if (bio == NULL) {
265                                 CERROR ("Can't allocate bio\n");
266                                 rc = -ENOMEM;
267                                 goto out;
268                         }
269
270                         bio->bi_bdev = inode->i_sb->s_bdev;
271                         bio->bi_sector = sector;
272                         bio->bi_end_io = dio_complete_routine;
273                         bio->bi_private = dreq;
274
275                         rc = bio_add_page(bio, page, 
276                                           blocksize * nblocks, page_offset);
277                         LASSERT (rc != 0);
278                 }
279         }
280
281         if (bio != NULL) {
282                 atomic_inc(&dreq->dr_numreqs);
283                 rc = fsfilt_send_bio(rw, obd, inode, bio);
284                 if (rc >= 0) {
285                         rc = 0;
286                 } else {
287                         CERROR("Can't send bio: %d\n", rc);
288                         /* OK do dec; we do the waiting */
289                         atomic_dec(&dreq->dr_numreqs);
290                 }
291         }
292                         
293  out:
294         wait_event(dreq->dr_wait, atomic_read(&dreq->dr_numreqs) == 0);
295
296         if (rc == 0)
297                 rc = dreq->dr_error;
298         RETURN(rc);
299 }
300   
301 /* These are our hacks to keep our directio/bh IO coherent with ext3's
302  * page cache use.  Most notably ext3 reads file data into the page
303  * cache when it is zeroing the tail of partial-block truncates and
304  * leaves it there, sometimes generating io from it at later truncates.
305  * This removes the partial page and its buffers from the page cache,
306  * so it should only ever cause a wait in rare cases, as otherwise we
307  * always do full-page IO to the OST.
308  *
309  * The call to truncate_complete_page() will call journal_invalidatepage()
310  * to free the buffers and drop the page from cache.  The buffers should
311  * not be dirty, because we already called fdatasync/fdatawait on them.
312  */
313 static int filter_clear_page_cache(struct inode *inode,
314                                    struct dio_request *iobuf)
315 {
316         struct page *page;
317         int i, rc, rc2;
318   
319         /* This is nearly generic_osync_inode, without the waiting on the inode
320         rc = generic_osync_inode(inode, inode->i_mapping,
321                                   OSYNC_DATA|OSYNC_METADATA);
322         */
323         rc = filemap_fdatawrite(inode->i_mapping);
324         rc2 = sync_mapping_buffers(inode->i_mapping);
325         if (rc == 0)
326                 rc = rc2;
327         rc2 = filemap_fdatawait(inode->i_mapping);
328         if (rc == 0)
329                 rc = rc2;
330         if (rc != 0)
331                 RETURN(rc);
332  
333         /* be careful to call this after fsync_inode_data_buffers has waited
334          * for IO to complete before we evict it from the cache */
335         for (i = 0; i < iobuf->dr_npages; i++) {
336                 page = find_lock_page(inode->i_mapping,
337                                        iobuf->dr_pages[i]->index);
338                 if (page == NULL)
339                        continue;
340                 if (page->mapping != NULL) {
341                        wait_on_page_writeback(page);
342                        ll_truncate_complete_page(page);
343                 }
344   
345                 unlock_page(page);
346                 page_cache_release(page);
347         }
348         return 0;
349 }
350 /* Must be called with i_sem taken for writes; this will drop it */
351 int filter_direct_io(int rw, struct dentry *dchild, void *iobuf,
352                      struct obd_export *exp, struct iattr *attr,
353                      struct obd_trans_info *oti, void **wait_handle)
354 {
355         struct obd_device *obd = exp->exp_obd;
356         struct inode *inode = dchild->d_inode;
357         struct dio_request *dreq = iobuf;
358         int rc, rc2;
359         ENTRY;
360
361         LASSERTF(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ, "%x\n", rw);
362         LASSERTF(dreq->dr_npages <= dreq->dr_max_pages, "%d,%d\n",
363                  dreq->dr_npages, dreq->dr_max_pages);
364
365         if (dreq->dr_npages == 0)
366                 RETURN(0);
367
368         if (dreq->dr_npages > OBDFILTER_CREATED_SCRATCHPAD_ENTRIES)
369                 RETURN(-EINVAL);
370         
371         rc = fsfilt_map_inode_pages(obd, inode,
372                                     dreq->dr_pages, dreq->dr_npages,
373                                     dreq->dr_blocks,
374                                     obdfilter_created_scratchpad,
375                                     rw == OBD_BRW_WRITE, NULL);
376
377         if (rw == OBD_BRW_WRITE) {
378                 if (rc == 0) {
379 #if 0
380                         filter_tally_write(&obd->u.filter, 
381                                            dreq->dr_pages,
382                                            dreq->dr_page_idx,
383                                            dreq->dr_blocks,
384                                            blocks_per_page);
385 #endif
386                         if (attr->ia_size > inode->i_size)
387                                 attr->ia_valid |= ATTR_SIZE;
388                         rc = fsfilt_setattr(obd, dchild, 
389                                             oti->oti_handle, attr, 0);
390                 }
391                 
392                 up(&inode->i_sem);
393
394                 rc2 = filter_finish_transno(exp, oti, 0);
395                 if (rc2 != 0)
396                         CERROR("can't close transaction: %d\n", rc);
397
398                 if (rc == 0)
399                         rc = rc2;
400                 if (rc != 0)
401                         RETURN(rc);
402         }
403
404         rc = filter_clear_page_cache(inode, dreq);
405         if (rc != 0)
406                 RETURN(rc);
407
408         RETURN(filter_do_bio(obd, inode, dreq, rw));
409 }
410
411 /* See if there are unallocated parts in given file region */
412 static int filter_range_is_mapped(struct inode *inode, obd_size offset, int len)
413 {
414         sector_t (*fs_bmap)(struct address_space *, sector_t) =
415                 inode->i_mapping->a_ops->bmap;
416         int j;
417
418         /* We can't know if we are overwriting or not */
419         if (fs_bmap == NULL)
420                 return 0;
421
422         offset >>= inode->i_blkbits;
423         len >>= inode->i_blkbits;
424
425         for (j = 0; j <= len; j++)
426                 if (fs_bmap(inode->i_mapping, offset + j) == 0)
427                         return 0;
428
429         return 1;
430 }
431
432 int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
433                           int objcount, struct obd_ioobj *obj, int niocount,
434                           struct niobuf_local *res, struct obd_trans_info *oti,
435                           int rc)
436 {
437         struct niobuf_local *lnb;
438         struct dio_request *dreq = NULL;
439         struct lvfs_run_ctxt saved;
440         struct fsfilt_objinfo fso;
441         struct iattr iattr = { 0 };
442         struct inode *inode = NULL;
443         unsigned long now = jiffies;
444         int i, err, cleanup_phase = 0;
445         struct obd_device *obd = exp->exp_obd;
446         int   total_size = 0;
447         loff_t old_size;
448         ENTRY;
449
450         LASSERT(oti != NULL);
451         LASSERT(objcount == 1);
452         LASSERT(current->journal_info == NULL);
453
454         if (rc != 0)
455                 GOTO(cleanup, rc);
456         
457         rc = filter_alloc_iobuf(OBD_BRW_WRITE, obj->ioo_bufcnt, (void **)&dreq);
458         if (rc)
459                 GOTO(cleanup, rc);
460         cleanup_phase = 1;
461
462         fso.fso_dentry = res->dentry;
463         fso.fso_bufcnt = obj->ioo_bufcnt;
464         inode = res->dentry->d_inode;
465
466         for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++) {
467                 loff_t this_size;
468
469                 /* If overwriting an existing block, we don't need a grant */
470                 if (!(lnb->flags & OBD_BRW_GRANTED) && lnb->rc == -ENOSPC &&
471                     filter_range_is_mapped(inode, lnb->offset, lnb->len))
472                         lnb->rc = 0;
473
474                 if (lnb->rc) { /* ENOSPC, network RPC error, etc. */
475                         CDEBUG(D_INODE, "Skipping [%d] == %d\n", i, lnb->rc);
476                         continue;
477                 }
478
479                 err = filter_iobuf_add_page(obd, dreq, inode, lnb->page);
480                 LASSERT (err == 0);
481
482                 total_size += lnb->len;
483
484                 /* we expect these pages to be in offset order, but we'll
485                  * be forgiving */
486                 this_size = lnb->offset + lnb->len;
487                 if (this_size > iattr.ia_size)
488                         iattr.ia_size = this_size;
489         }
490 #if 0
491         /* I use this when I'm checking our lovely 1M I/Os reach the disk -eeb */
492         if (total_size != (1<<20))
493                 CWARN("total size %d (%d pages)\n", 
494                       total_size, total_size/PAGE_SIZE);
495 #endif
496         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
497         cleanup_phase = 2;
498
499         down(&inode->i_sem);
500         old_size = inode->i_size;
501         oti->oti_handle = fsfilt_brw_start(obd, objcount, &fso, niocount, res,
502                                            oti);
503         if (IS_ERR(oti->oti_handle)) {
504                 up(&inode->i_sem);
505                 rc = PTR_ERR(oti->oti_handle);
506                 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
507                        "error starting transaction: rc = %d\n", rc);
508                 oti->oti_handle = NULL;
509                 GOTO(cleanup, rc);
510         }
511         /* have to call fsfilt_commit() from this point on */
512
513         fsfilt_check_slow(now, obd_timeout, "brw_start");
514
515         iattr_from_obdo(&iattr,oa,OBD_MD_FLATIME|OBD_MD_FLMTIME|OBD_MD_FLCTIME);
516         /* filter_direct_io drops i_sem */
517         rc = filter_direct_io(OBD_BRW_WRITE, res->dentry, dreq, exp, &iattr,
518                               oti, NULL);
519
520 #if 0
521         if (inode->i_size != old_size) {
522                 struct llog_cookie *cookie = obdo_logcookie(oa);
523                 struct lustre_id *id = obdo_id(oa);
524                 filter_log_sz_change(obd, id, oa->o_easize, cookie, inode);
525         }
526 #endif
527
528         if (rc == 0)
529                 obdo_from_inode(oa, inode, FILTER_VALID_FLAGS);
530
531         fsfilt_check_slow(now, obd_timeout, "direct_io");
532
533         err = fsfilt_commit(obd, obd->u.filter.fo_sb, inode, oti->oti_handle,
534                             obd_sync_filter);
535         if (err)
536                 rc = err;
537
538         if (obd_sync_filter && !err)
539                 LASSERTF(oti->oti_transno <= obd->obd_last_committed,
540                          "oti_transno "LPU64" last_committed "LPU64"\n",
541                          oti->oti_transno, obd->obd_last_committed);
542
543         fsfilt_check_slow(now, obd_timeout, "commitrw commit");
544
545 cleanup:
546         filter_grant_commit(exp, niocount, res);
547
548         switch (cleanup_phase) {
549         case 2:
550                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
551                 LASSERT(current->journal_info == NULL);
552         case 1:
553                 filter_free_iobuf(dreq);
554         case 0:
555                 filter_free_dio_pages(objcount, obj, niocount, res);
556                 f_dput(res->dentry);
557         }
558
559         RETURN(rc);
560 }