Whamcloud - gitweb
ea2d3c0dbd4e865fb09bb72e7360aa72111e9606
[fs/lustre-release.git] / lustre / obdfilter / filter_io_26.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/fs/obdfilter/filter_io.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *   Author: Phil Schwan <phil@clusterfs.com>
10  *
11  *   This file is part of Lustre, http://www.lustre.org.
12  *
13  *   Lustre is free software; you can redistribute it and/or
14  *   modify it under the terms of version 2 of the GNU General Public
15  *   License as published by the Free Software Foundation.
16  *
17  *   Lustre is distributed in the hope that it will be useful,
18  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
19  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20  *   GNU General Public License for more details.
21  *
22  *   You should have received a copy of the GNU General Public License
23  *   along with Lustre; if not, write to the Free Software
24  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25  */
26
27 #include <linux/config.h>
28 #include <linux/module.h>
29 #include <linux/pagemap.h> // XXX kill me soon
30 #include <linux/version.h>
31 #include <linux/buffer_head.h>
32
33 #define DEBUG_SUBSYSTEM S_FILTER
34
35 #include <linux/obd_class.h>
36 #include <linux/lustre_fsfilt.h>
37 #include "filter_internal.h"
38
39 #warning "implement writeback mode -bzzz"
40
41 /* 512byte block min */
42 #define MAX_BLOCKS_PER_PAGE (PAGE_SIZE / 512)
43 struct dio_request {
44         atomic_t          dr_numreqs;  /* number of reqs being processed */
45         struct bio       *dr_bios;     /* list of completed bios */
46         wait_queue_head_t dr_wait;
47         int               dr_max_pages;
48         int               dr_npages;
49         int               dr_error;
50         struct page     **dr_pages;
51         unsigned long    *dr_blocks;
52         spinlock_t        dr_lock;
53 };
54
55 static int dio_complete_routine(struct bio *bio, unsigned int done, int error)
56 {
57         struct dio_request *dreq = bio->bi_private;
58         unsigned long flags;
59
60         spin_lock_irqsave(&dreq->dr_lock, flags);
61         bio->bi_private = dreq->dr_bios;
62         dreq->dr_bios = bio;
63         if (dreq->dr_error == 0)
64                 dreq->dr_error = error;
65         spin_unlock_irqrestore(&dreq->dr_lock, flags);
66
67         if (atomic_dec_and_test(&dreq->dr_numreqs))
68                 wake_up(&dreq->dr_wait);
69
70         return 0;
71 }
72
73 static int can_be_merged(struct bio *bio, sector_t sector)
74 {
75         unsigned int size;
76
77         if (!bio)
78                 return 0;
79
80         size = bio->bi_size >> 9;
81         return bio->bi_sector + size == sector ? 1 : 0;
82 }
83
84 int filter_alloc_iobuf(int rw, int num_pages, void **ret)
85 {
86         struct dio_request *dreq;
87
88         LASSERTF(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ, "%x\n", rw);
89
90         OBD_ALLOC(dreq, sizeof(*dreq));
91         if (dreq == NULL)
92                 goto failed_0;
93         
94         OBD_ALLOC(dreq->dr_pages, num_pages * sizeof(*dreq->dr_pages));
95         if (dreq->dr_pages == NULL)
96                 goto failed_1;
97         
98         OBD_ALLOC(dreq->dr_blocks,
99                   MAX_BLOCKS_PER_PAGE * num_pages * sizeof(*dreq->dr_blocks));
100         if (dreq->dr_blocks == NULL)
101                 goto failed_2;
102
103         dreq->dr_bios = NULL;
104         init_waitqueue_head(&dreq->dr_wait);
105         atomic_set(&dreq->dr_numreqs, 0);
106         spin_lock_init(&dreq->dr_lock);
107         dreq->dr_max_pages = num_pages;
108         dreq->dr_npages = 0;
109
110         *ret = dreq;
111         RETURN(0);
112         
113  failed_2:
114         OBD_FREE(dreq->dr_pages,
115                  num_pages * sizeof(*dreq->dr_pages));
116  failed_1:
117         OBD_FREE(dreq, sizeof(*dreq));
118  failed_0:
119         RETURN(-ENOMEM);
120 }
121
122 void filter_free_iobuf(void *iobuf)
123 {
124         struct dio_request *dreq = iobuf;
125         int                 num_pages = dreq->dr_max_pages;
126
127         /* free all bios */
128         while (dreq->dr_bios) {
129                 struct bio *bio = dreq->dr_bios;
130                 dreq->dr_bios = bio->bi_private;
131                 bio_put(bio);
132         }
133
134         OBD_FREE(dreq->dr_blocks,
135                  MAX_BLOCKS_PER_PAGE * num_pages * sizeof(*dreq->dr_blocks));
136         OBD_FREE(dreq->dr_pages,
137                  num_pages * sizeof(*dreq->dr_pages));
138         OBD_FREE(dreq, sizeof(*dreq));
139 }
140
141 int filter_iobuf_add_page(struct obd_device *obd, void *iobuf,
142                           struct inode *inode, struct page *page)
143 {
144         struct dio_request *dreq = iobuf;
145
146         LASSERT (dreq->dr_npages < dreq->dr_max_pages);
147         dreq->dr_pages[dreq->dr_npages++] = page;
148
149         return 0;
150 }
151                  
152 int filter_do_bio(struct obd_device *obd, struct inode *inode,
153                   struct dio_request *dreq, int rw)
154 {
155         int            blocks_per_page = PAGE_SIZE >> inode->i_blkbits;
156         struct page  **pages = dreq->dr_pages;
157         int            npages = dreq->dr_npages;
158         unsigned long *blocks = dreq->dr_blocks;
159         int            total_blocks = npages * blocks_per_page;
160         int            sector_bits = inode->i_sb->s_blocksize_bits - 9;
161         unsigned int   blocksize = inode->i_sb->s_blocksize;
162         struct bio    *bio = NULL;
163         struct page   *page;
164         unsigned int   page_offset;
165         sector_t       sector;
166         int            nblocks;
167         int            block_idx;
168         int            page_idx;
169         int            i;
170         int            rc = 0;
171         ENTRY;
172
173         LASSERT(dreq->dr_npages == npages);
174         LASSERT(total_blocks <= OBDFILTER_CREATED_SCRATCHPAD_ENTRIES);
175
176         for (page_idx = 0, block_idx = 0; 
177              page_idx < npages; 
178              page_idx++, block_idx += blocks_per_page) {
179                         
180                 page = pages[page_idx];
181                 LASSERT (block_idx + blocks_per_page <= total_blocks);
182
183                 for (i = 0, page_offset = 0; 
184                      i < blocks_per_page;
185                      i += nblocks, page_offset += blocksize * nblocks) {
186
187                         nblocks = 1;
188
189                         if (blocks[block_idx + i] == 0) {  /* hole */
190                                 LASSERT(rw == OBD_BRW_READ);
191                                 memset(kmap(page) + page_offset, 0, blocksize);
192                                 kunmap(page);
193                                 continue;
194                         }
195
196                         sector = blocks[block_idx + i] << sector_bits;
197
198                         /* Additional contiguous file blocks? */
199                         while (i + nblocks < blocks_per_page &&
200                                (sector + nblocks*(blocksize>>9)) ==
201                                (blocks[block_idx + i + nblocks] << sector_bits))
202                                 nblocks++;
203
204                         if (bio != NULL &&
205                             can_be_merged(bio, sector) &&
206                             bio_add_page(bio, page, 
207                                          blocksize * nblocks, page_offset) != 0)
208                                 continue;       /* added this frag OK */
209
210                         if (bio != NULL) {
211                                 request_queue_t *q = bdev_get_queue(bio->bi_bdev);
212
213                                 /* Dang! I have to fragment this I/O */
214                                 CDEBUG(D_INODE, "bio++ sz %d vcnt %d(%d) "
215                                        "sectors %d(%d) psg %d(%d) hsg %d(%d)\n",
216                                        bio->bi_size, 
217                                        bio->bi_vcnt, bio->bi_max_vecs,
218                                        bio->bi_size >> 9, q->max_sectors,
219                                        bio_phys_segments(q, bio), 
220                                        q->max_phys_segments,
221                                        bio_hw_segments(q, bio), 
222                                        q->max_hw_segments);
223
224                                 atomic_inc(&dreq->dr_numreqs);
225                                 rc = fsfilt_send_bio(rw, obd, inode, bio);
226                                 if (rc < 0) {
227                                         CERROR("Can't send bio: %d\n", rc);
228                                         /* OK do dec; we do the waiting */
229                                         atomic_dec(&dreq->dr_numreqs);
230                                         goto out;
231                                 }
232                                 rc = 0;
233                                         
234                                 bio = NULL;
235                         }
236
237                         /* allocate new bio */
238                         bio = bio_alloc(GFP_NOIO, 
239                                         (npages - page_idx) * blocks_per_page);
240                         if (bio == NULL) {
241                                 CERROR ("Can't allocate bio\n");
242                                 rc = -ENOMEM;
243                                 goto out;
244                         }
245
246                         bio->bi_bdev = inode->i_sb->s_bdev;
247                         bio->bi_sector = sector;
248                         bio->bi_end_io = dio_complete_routine;
249                         bio->bi_private = dreq;
250
251                         rc = bio_add_page(bio, page, 
252                                           blocksize * nblocks, page_offset);
253                         LASSERT (rc != 0);
254                 }
255         }
256
257         if (bio != NULL) {
258                 atomic_inc(&dreq->dr_numreqs);
259                 rc = fsfilt_send_bio(rw, obd, inode, bio);
260                 if (rc >= 0) {
261                         rc = 0;
262                 } else {
263                         CERROR("Can't send bio: %d\n", rc);
264                         /* OK do dec; we do the waiting */
265                         atomic_dec(&dreq->dr_numreqs);
266                 }
267         }
268                         
269  out:
270         wait_event(dreq->dr_wait, atomic_read(&dreq->dr_numreqs) == 0);
271
272         if (rc == 0)
273                 rc = dreq->dr_error;
274         RETURN(rc);
275 }
276
277 static void filter_clear_page_cache(struct inode *inode, struct bio *iobuf)
278 {
279 #if 0
280         struct page *page;
281         int i;
282
283         for (i = 0; i < iobuf->nr_pages ; i++) {
284                 page = find_lock_page(inode->i_mapping,
285                                       iobuf->maplist[i]->index);
286                 if (page == NULL)
287                         continue;
288                 if (page->mapping != NULL) {
289                         block_invalidatepage(page, 0);
290                         truncate_complete_page(page);
291                 }
292                 unlock_page(page);
293                 page_cache_release(page);
294         }
295 #endif
296 }
297
298 /* Must be called with i_sem taken for writes; this will drop it */
299 int filter_direct_io(int rw, struct dentry *dchild, void *iobuf,
300                      struct obd_export *exp, struct iattr *attr,
301                      struct obd_trans_info *oti, void **wait_handle)
302 {
303         struct obd_device *obd = exp->exp_obd;
304         struct dio_request *dreq = iobuf;
305         struct inode *inode = dchild->d_inode;
306         int rc;
307         int rc2;
308         ENTRY;
309
310         LASSERTF(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ, "%x\n", rw);
311         LASSERTF(dreq->dr_npages <= dreq->dr_max_pages, "%d,%d\n",
312                  dreq->dr_npages, dreq->dr_max_pages);
313
314         /* XXX FIXME these assertions should be handled properly here or
315          * checked elsewhere */
316         LASSERT(dreq->dr_npages <= OBDFILTER_CREATED_SCRATCHPAD_ENTRIES);
317         if (dreq->dr_npages == 0)
318                 RETURN(0);
319
320         rc = fsfilt_map_inode_pages(obd, inode,
321                                     dreq->dr_pages, dreq->dr_npages,
322                                     dreq->dr_blocks,
323                                     obdfilter_created_scratchpad,
324                                     rw == OBD_BRW_WRITE, NULL);
325
326         if (rw == OBD_BRW_WRITE) {
327                 if (rc == 0) {
328 #if 0
329                         filter_tally_write(&obd->u.filter,
330                                            dreq->dr_pages,
331                                            dreq->dr_page_idx,
332                                            dreq->dr_blocks,
333                                            blocks_per_page);
334 #endif
335                         if (attr->ia_size > inode->i_size)
336                                 attr->ia_valid |= ATTR_SIZE;
337                         rc = fsfilt_setattr(obd, dchild,
338                                             oti->oti_handle, attr, 0);
339                 }
340
341                 up(&inode->i_sem);
342
343                 rc2 = filter_finish_transno(exp, oti, 0);
344                 if (rc2 != 0)
345                         CERROR("can't close transaction: %d\n", rc);
346
347                 if (rc == 0)
348                         rc = rc2;
349                 if (rc != 0)
350                         RETURN(rc);
351
352         }
353
354         /* This is nearly osync_inode, without the waiting
355         rc = generic_osync_inode(inode, inode->i_mapping,
356                                  OSYNC_DATA|OSYNC_METADATA); */
357         rc = filemap_fdatawrite(inode->i_mapping);
358         rc2 = sync_mapping_buffers(inode->i_mapping);
359         if (rc == 0)
360                 rc = rc2;
361         rc2 = filemap_fdatawait(inode->i_mapping);
362         if (rc == 0)
363                 rc = rc2;
364
365         if (rc != 0)
366                 RETURN(rc);
367
368         /* be careful to call this after fsync_inode_data_buffers has waited
369          * for IO to complete before we evict it from the cache */
370         filter_clear_page_cache(inode, iobuf);
371
372         RETURN(filter_do_bio(obd, inode, dreq, rw));
373 }
374
375 /* See if there are unallocated parts in given file region */
376 static int filter_range_is_mapped(struct inode *inode, obd_size offset, int len)
377 {
378         sector_t (*fs_bmap)(struct address_space *, sector_t) =
379                 inode->i_mapping->a_ops->bmap;
380         int j;
381
382         /* We can't know if we are overwriting or not */
383         if (fs_bmap == NULL)
384                 return 0;
385
386         offset >>= inode->i_blkbits;
387         len >>= inode->i_blkbits;
388
389         for (j = 0; j <= len; j++)
390                 if (fs_bmap(inode->i_mapping, offset + j) == 0)
391                         return 0;
392
393         return 1;
394 }
395
396 int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
397                           int objcount, struct obd_ioobj *obj, int niocount,
398                           struct niobuf_local *res, struct obd_trans_info *oti,
399                           int rc)
400 {
401         struct niobuf_local *lnb;
402         struct dio_request *dreq = NULL;
403         struct obd_run_ctxt saved;
404         struct fsfilt_objinfo fso;
405         struct iattr iattr = { 0 };
406         struct inode *inode = NULL;
407         unsigned long now = jiffies;
408         int i, err, cleanup_phase = 0;
409         struct obd_device *obd = exp->exp_obd;
410         int   total_size = 0;
411         ENTRY;
412
413         LASSERT(oti != NULL);
414         LASSERT(objcount == 1);
415         LASSERT(current->journal_info == NULL);
416
417         if (rc != 0)
418                 GOTO(cleanup, rc);
419         
420         rc = filter_alloc_iobuf(OBD_BRW_WRITE, obj->ioo_bufcnt, (void **)&dreq);
421         if (rc)
422                 GOTO(cleanup, rc);
423         cleanup_phase = 1;
424
425         fso.fso_dentry = res->dentry;
426         fso.fso_bufcnt = obj->ioo_bufcnt;
427         inode = res->dentry->d_inode;
428
429         for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++) {
430                 loff_t this_size;
431
432                 /* If overwriting an existing block, we don't need a grant */
433                 if (!(lnb->flags & OBD_BRW_GRANTED) && lnb->rc == -ENOSPC &&
434                     filter_range_is_mapped(inode, lnb->offset, lnb->len))
435                         lnb->rc = 0;
436
437                 if (lnb->rc) { /* ENOSPC, network RPC error, etc. */
438                         CDEBUG(D_INODE, "Skipping [%d] == %d\n", i, lnb->rc);
439                         continue;
440                 }
441
442                 err = filter_iobuf_add_page(obd, dreq, inode, lnb->page);
443                 LASSERT (err == 0);
444
445                 total_size += lnb->len;
446
447                 /* we expect these pages to be in offset order, but we'll
448                  * be forgiving */
449                 this_size = lnb->offset + lnb->len;
450                 if (this_size > iattr.ia_size)
451                         iattr.ia_size = this_size;
452         }
453 #if 0
454         /* I use this when I'm checking our lovely 1M I/Os reach the disk -eeb */
455         if (total_size != (1<<20))
456                 CWARN("total size %d (%d pages)\n", 
457                       total_size, total_size/PAGE_SIZE);
458 #endif
459         push_ctxt(&saved, &obd->obd_ctxt, NULL);
460         cleanup_phase = 2;
461
462         down(&inode->i_sem);
463         oti->oti_handle = fsfilt_brw_start(obd, objcount, &fso, niocount, res,
464                                            oti);
465         if (IS_ERR(oti->oti_handle)) {
466                 up(&inode->i_sem);
467                 rc = PTR_ERR(oti->oti_handle);
468                 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
469                        "error starting transaction: rc = %d\n", rc);
470                 oti->oti_handle = NULL;
471                 GOTO(cleanup, rc);
472         }
473         /* have to call fsfilt_commit() from this point on */
474
475         fsfilt_check_slow(now, obd_timeout, "brw_start");
476
477         iattr_from_obdo(&iattr,oa,OBD_MD_FLATIME|OBD_MD_FLMTIME|OBD_MD_FLCTIME);
478         /* filter_direct_io drops i_sem */
479         rc = filter_direct_io(OBD_BRW_WRITE, res->dentry, dreq, exp, &iattr,
480                               oti, NULL);
481         if (rc == 0)
482                 obdo_from_inode(oa, inode, FILTER_VALID_FLAGS);
483
484         fsfilt_check_slow(now, obd_timeout, "direct_io");
485
486         err = fsfilt_commit(obd, inode, oti->oti_handle, obd_sync_filter);
487         if (err)
488                 rc = err;
489
490         if (obd_sync_filter && !err)
491                 LASSERT(oti->oti_transno <= obd->obd_last_committed);
492
493         fsfilt_check_slow(now, obd_timeout, "commitrw commit");
494
495 cleanup:
496         filter_grant_commit(exp, niocount, res);
497
498         switch (cleanup_phase) {
499         case 2:
500                 pop_ctxt(&saved, &obd->obd_ctxt, NULL);
501                 LASSERT(current->journal_info == NULL);
502         case 1:
503                 filter_free_iobuf(dreq);
504         case 0:
505                 filter_free_dio_pages(objcount, obj, niocount, res);
506                 f_dput(res->dentry);
507         }
508
509         RETURN(rc);
510 }