Whamcloud - gitweb
b=3026
[fs/lustre-release.git] / lustre / obdfilter / filter_io_26.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/fs/obdfilter/filter_io.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *   Author: Phil Schwan <phil@clusterfs.com>
10  *
11  *   This file is part of Lustre, http://www.lustre.org.
12  *
13  *   Lustre is free software; you can redistribute it and/or
14  *   modify it under the terms of version 2 of the GNU General Public
15  *   License as published by the Free Software Foundation.
16  *
17  *   Lustre is distributed in the hope that it will be useful,
18  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
19  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20  *   GNU General Public License for more details.
21  *
22  *   You should have received a copy of the GNU General Public License
23  *   along with Lustre; if not, write to the Free Software
24  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25  */
26
27 #include <linux/config.h>
28 #include <linux/module.h>
29 #include <linux/pagemap.h> // XXX kill me soon
30 #include <linux/version.h>
31 #include <linux/buffer_head.h>
32
33 #define DEBUG_SUBSYSTEM S_FILTER
34
35 #include <linux/obd_class.h>
36 #include <linux/lustre_fsfilt.h>
37 #include "filter_internal.h"
38
39 #warning "implement writeback mode -bzzz"
40
41 /* 512byte block min */
42 #define MAX_BLOCKS_PER_PAGE (PAGE_SIZE / 512)
43 struct dio_request {
44         atomic_t          dr_numreqs;  /* number of reqs being processed */
45         struct bio       *dr_bios;     /* list of completed bios */
46         wait_queue_head_t dr_wait;
47         int               dr_max_pages;
48         int               dr_npages;
49         int               dr_error;
50         struct page     **dr_pages;
51         unsigned long    *dr_blocks;
52         spinlock_t        dr_lock;
53         unsigned long     dr_start_time; /* jiffies */
54         struct filter_obd *dr_filter;
55 };
56
57 static void record_start_io(struct dio_request *dreq, int rw, int size)
58 {
59         struct filter_obd *filter = dreq->dr_filter;
60         unsigned long flags;
61
62         atomic_inc(&dreq->dr_numreqs);
63
64         if (rw == OBD_BRW_READ) {
65                 lprocfs_oh_tally(&filter->fo_read_rpc_hist,
66                                  filter->fo_r_in_flight);
67                 lprocfs_oh_tally_log2(&filter->fo_r_disk_iosize, size);
68         } else {
69                 lprocfs_oh_tally(&filter->fo_write_rpc_hist,
70                                  filter->fo_w_in_flight);
71                 lprocfs_oh_tally_log2(&filter->fo_w_disk_iosize, size);
72         }
73         spin_lock_irqsave(&filter->fo_stats_lock, flags);
74         if (rw == OBD_BRW_READ)
75                 filter->fo_r_in_flight++;
76         else
77                 filter->fo_w_in_flight++;
78         spin_unlock_irqrestore(&filter->fo_stats_lock, flags);
79         dreq->dr_start_time = jiffies;
80 }
81
82 static void record_finish_io(struct dio_request *dreq, int rw, int rc)
83 {
84         struct filter_obd *filter = dreq->dr_filter;
85         unsigned long flags, stop_time = jiffies;
86
87         spin_lock_irqsave(&filter->fo_stats_lock, flags);
88         if (rw == OBD_BRW_READ)
89                 filter->fo_r_in_flight--;
90         else
91                 filter->fo_w_in_flight--;
92         spin_unlock_irqrestore(&filter->fo_stats_lock, flags);
93
94         if (atomic_dec_and_test(&dreq->dr_numreqs))
95                 wake_up(&dreq->dr_wait);
96
97         if (rc != 0)
98                 return;
99
100         if (rw == OBD_BRW_READ) {
101                 lprocfs_oh_tally_log2(&filter->fo_r_io_time,
102                                       stop_time - dreq->dr_start_time);
103         } else {
104                 lprocfs_oh_tally_log2(&filter->fo_w_io_time,
105                                       stop_time - dreq->dr_start_time);
106         }
107 }
108
109 static int dio_complete_routine(struct bio *bio, unsigned int done, int error)
110 {
111         struct dio_request *dreq = bio->bi_private;
112         unsigned long flags;
113
114         if (dreq == NULL) {
115                 CERROR("***** bio->bi_private is NULL!  This should never "
116                        "happen.  Normally, I would crash here, but instead I "
117                        "will dump the bio contents to the console.  Please "
118                        "report this to CFS, along with any interesting messages "
119                        "leading up to this point (like SCSI errors, perhaps).  "
120                        "Because bi_private is NULL, I can't wake up the thread "
121                        "that initiated this I/O -- so you will probably have to "
122                        "reboot this node.");
123                 CERROR("bi_next: %p, bi_flags: %lx, bi_rw: %lu, bi_vcnt: %d, "
124                        "bi_idx: %d, bi->size: %d, bi_end_io: %p, bi_cnt: %d, "
125                        "bi_private: %p\n", bio->bi_next, bio->bi_flags,
126                        bio->bi_rw, bio->bi_vcnt, bio->bi_idx, bio->bi_size,
127                        bio->bi_end_io, atomic_read(&bio->bi_cnt),
128                        bio->bi_private);
129                 return 0;
130         }
131
132         spin_lock_irqsave(&dreq->dr_lock, flags);
133         bio->bi_private = dreq->dr_bios;
134         dreq->dr_bios = bio;
135         if (dreq->dr_error == 0)
136                 dreq->dr_error = error;
137         spin_unlock_irqrestore(&dreq->dr_lock, flags);
138
139         record_finish_io(dreq, test_bit(BIO_RW, &bio->bi_rw) ?
140                          OBD_BRW_WRITE : OBD_BRW_READ, error);
141
142         return 0;
143 }
144
145 static int can_be_merged(struct bio *bio, sector_t sector)
146 {
147         unsigned int size;
148
149         if (!bio)
150                 return 0;
151
152         size = bio->bi_size >> 9;
153         return bio->bi_sector + size == sector ? 1 : 0;
154 }
155
156 int filter_alloc_iobuf(struct filter_obd *filter, int rw, int num_pages,
157                        void **ret)
158 {
159         struct dio_request *dreq;
160
161         LASSERTF(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ, "%x\n", rw);
162
163         OBD_ALLOC(dreq, sizeof(*dreq));
164         if (dreq == NULL)
165                 goto failed_0;
166         
167         OBD_ALLOC(dreq->dr_pages, num_pages * sizeof(*dreq->dr_pages));
168         if (dreq->dr_pages == NULL)
169                 goto failed_1;
170         
171         OBD_ALLOC(dreq->dr_blocks,
172                   MAX_BLOCKS_PER_PAGE * num_pages * sizeof(*dreq->dr_blocks));
173         if (dreq->dr_blocks == NULL)
174                 goto failed_2;
175
176         dreq->dr_filter = filter;
177         dreq->dr_bios = NULL;
178         init_waitqueue_head(&dreq->dr_wait);
179         atomic_set(&dreq->dr_numreqs, 0);
180         spin_lock_init(&dreq->dr_lock);
181         dreq->dr_max_pages = num_pages;
182         dreq->dr_npages = 0;
183
184         *ret = dreq;
185         RETURN(0);
186         
187  failed_2:
188         OBD_FREE(dreq->dr_pages,
189                  num_pages * sizeof(*dreq->dr_pages));
190  failed_1:
191         OBD_FREE(dreq, sizeof(*dreq));
192  failed_0:
193         RETURN(-ENOMEM);
194 }
195
196 void filter_free_iobuf(void *iobuf)
197 {
198         struct dio_request *dreq = iobuf;
199         int                 num_pages = dreq->dr_max_pages;
200
201         /* free all bios */
202         while (dreq->dr_bios) {
203                 struct bio *bio = dreq->dr_bios;
204                 dreq->dr_bios = bio->bi_private;
205                 bio_put(bio);
206         }
207
208         OBD_FREE(dreq->dr_blocks,
209                  MAX_BLOCKS_PER_PAGE * num_pages * sizeof(*dreq->dr_blocks));
210         OBD_FREE(dreq->dr_pages,
211                  num_pages * sizeof(*dreq->dr_pages));
212         OBD_FREE(dreq, sizeof(*dreq));
213 }
214
215 int filter_iobuf_add_page(struct obd_device *obd, void *iobuf,
216                           struct inode *inode, struct page *page)
217 {
218         struct dio_request *dreq = iobuf;
219
220         LASSERT (dreq->dr_npages < dreq->dr_max_pages);
221         dreq->dr_pages[dreq->dr_npages++] = page;
222
223         return 0;
224 }
225
226 int filter_do_bio(struct obd_device *obd, struct inode *inode,
227                   struct dio_request *dreq, int rw)
228 {
229         int            blocks_per_page = PAGE_SIZE >> inode->i_blkbits;
230         struct page  **pages = dreq->dr_pages;
231         int            npages = dreq->dr_npages;
232         unsigned long *blocks = dreq->dr_blocks;
233         int            total_blocks = npages * blocks_per_page;
234         int            sector_bits = inode->i_sb->s_blocksize_bits - 9;
235         unsigned int   blocksize = inode->i_sb->s_blocksize;
236         struct bio    *bio = NULL;
237         struct page   *page;
238         unsigned int   page_offset;
239         sector_t       sector;
240         int            nblocks;
241         int            block_idx;
242         int            page_idx;
243         int            i;
244         int            rc = 0;
245         ENTRY;
246
247         LASSERT(dreq->dr_npages == npages);
248         LASSERT(total_blocks <= OBDFILTER_CREATED_SCRATCHPAD_ENTRIES);
249
250         for (page_idx = 0, block_idx = 0; 
251              page_idx < npages; 
252              page_idx++, block_idx += blocks_per_page) {
253                         
254                 page = pages[page_idx];
255                 LASSERT (block_idx + blocks_per_page <= total_blocks);
256
257                 for (i = 0, page_offset = 0; 
258                      i < blocks_per_page;
259                      i += nblocks, page_offset += blocksize * nblocks) {
260
261                         nblocks = 1;
262
263                         if (blocks[block_idx + i] == 0) {  /* hole */
264                                 LASSERT(rw == OBD_BRW_READ);
265                                 memset(kmap(page) + page_offset, 0, blocksize);
266                                 kunmap(page);
267                                 continue;
268                         }
269
270                         sector = blocks[block_idx + i] << sector_bits;
271
272                         /* Additional contiguous file blocks? */
273                         while (i + nblocks < blocks_per_page &&
274                                (sector + nblocks*(blocksize>>9)) ==
275                                (blocks[block_idx + i + nblocks] << sector_bits))
276                                 nblocks++;
277
278                         if (bio != NULL &&
279                             can_be_merged(bio, sector) &&
280                             bio_add_page(bio, page, 
281                                          blocksize * nblocks, page_offset) != 0)
282                                 continue;       /* added this frag OK */
283
284                         if (bio != NULL) {
285                                 request_queue_t *q =
286                                         bdev_get_queue(bio->bi_bdev);
287
288                                 /* Dang! I have to fragment this I/O */
289                                 CDEBUG(D_INODE, "bio++ sz %d vcnt %d(%d) "
290                                        "sectors %d(%d) psg %d(%d) hsg %d(%d)\n",
291                                        bio->bi_size, 
292                                        bio->bi_vcnt, bio->bi_max_vecs,
293                                        bio->bi_size >> 9, q->max_sectors,
294                                        bio_phys_segments(q, bio), 
295                                        q->max_phys_segments,
296                                        bio_hw_segments(q, bio), 
297                                        q->max_hw_segments);
298
299                                 record_start_io(dreq, rw, bio->bi_size);
300                                 rc = fsfilt_send_bio(rw, obd, inode, bio);
301                                 if (rc < 0) {
302                                         CERROR("Can't send bio: %d\n", rc);
303                                         record_finish_io(dreq, rw, rc);
304                                         goto out;
305                                 }
306                         }
307
308                         /* allocate new bio */
309                         bio = bio_alloc(GFP_NOIO, 
310                                         (npages - page_idx) * blocks_per_page);
311                         if (bio == NULL) {
312                                 CERROR ("Can't allocate bio\n");
313                                 rc = -ENOMEM;
314                                 goto out;
315                         }
316
317                         bio->bi_bdev = inode->i_sb->s_bdev;
318                         bio->bi_sector = sector;
319                         bio->bi_end_io = dio_complete_routine;
320                         bio->bi_private = dreq;
321
322                         rc = bio_add_page(bio, page, 
323                                           blocksize * nblocks, page_offset);
324                         LASSERT (rc != 0);
325                 }
326         }
327
328         if (bio != NULL) {
329                 record_start_io(dreq, rw, bio->bi_size);
330                 rc = fsfilt_send_bio(rw, obd, inode, bio);
331                 if (rc >= 0) {
332                         rc = 0;
333                 } else {
334                         CERROR("Can't send bio: %d\n", rc);
335                         record_finish_io(dreq, rw, rc);
336                 }
337         }
338                         
339  out:
340         wait_event(dreq->dr_wait, atomic_read(&dreq->dr_numreqs) == 0);
341
342         if (rc == 0)
343                 rc = dreq->dr_error;
344         RETURN(rc);
345 }
346
347 static void filter_clear_page_cache(struct inode *inode, struct bio *iobuf)
348 {
349 #if 0
350         struct page *page;
351         int i;
352
353         for (i = 0; i < iobuf->nr_pages ; i++) {
354                 page = find_lock_page(inode->i_mapping,
355                                       iobuf->maplist[i]->index);
356                 if (page == NULL)
357                         continue;
358                 if (page->mapping != NULL) {
359                         block_invalidatepage(page, 0);
360                         truncate_complete_page(page);
361                 }
362                 unlock_page(page);
363                 page_cache_release(page);
364         }
365 #endif
366 }
367
368 /* Must be called with i_sem taken for writes; this will drop it */
369 int filter_direct_io(int rw, struct dentry *dchild, void *iobuf,
370                      struct obd_export *exp, struct iattr *attr,
371                      struct obd_trans_info *oti, void **wait_handle)
372 {
373         struct obd_device *obd = exp->exp_obd;
374         struct dio_request *dreq = iobuf;
375         struct inode *inode = dchild->d_inode;
376         int blocks_per_page = PAGE_SIZE >> inode->i_blkbits;
377         int rc, rc2;
378         ENTRY;
379
380         LASSERTF(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ, "%x\n", rw);
381         LASSERTF(dreq->dr_npages <= dreq->dr_max_pages, "%d,%d\n",
382                  dreq->dr_npages, dreq->dr_max_pages);
383         LASSERT(dreq->dr_npages <= OBDFILTER_CREATED_SCRATCHPAD_ENTRIES);
384         LASSERT(dreq->dr_npages > 0 || rw != OBD_BRW_WRITE);
385
386         if (dreq->dr_npages == 0)
387                 RETURN(0);
388
389         rc = fsfilt_map_inode_pages(obd, inode,
390                                     dreq->dr_pages, dreq->dr_npages,
391                                     dreq->dr_blocks,
392                                     obdfilter_created_scratchpad,
393                                     rw == OBD_BRW_WRITE, NULL);
394
395         if (rw == OBD_BRW_WRITE) {
396                 if (rc == 0) {
397                         filter_tally_write(&obd->u.filter,
398                                            dreq->dr_pages,
399                                            dreq->dr_npages,
400                                            dreq->dr_blocks,
401                                            blocks_per_page);
402                         if (attr->ia_size > inode->i_size)
403                                 attr->ia_valid |= ATTR_SIZE;
404                         rc = fsfilt_setattr(obd, dchild,
405                                             oti->oti_handle, attr, 0);
406                 }
407
408                 up(&inode->i_sem);
409
410                 rc2 = filter_finish_transno(exp, oti, 0);
411                 if (rc2 != 0)
412                         CERROR("can't close transaction: %d\n", rc);
413
414                 if (rc == 0)
415                         rc = rc2;
416                 if (rc != 0)
417                         RETURN(rc);
418
419         }
420
421         /* This is nearly osync_inode, without the waiting
422         rc = generic_osync_inode(inode, inode->i_mapping,
423                                  OSYNC_DATA|OSYNC_METADATA); */
424         rc = filemap_fdatawrite(inode->i_mapping);
425         rc2 = sync_mapping_buffers(inode->i_mapping);
426         if (rc == 0)
427                 rc = rc2;
428         rc2 = filemap_fdatawait(inode->i_mapping);
429         if (rc == 0)
430                 rc = rc2;
431
432         if (rc != 0)
433                 RETURN(rc);
434
435         /* be careful to call this after fsync_inode_data_buffers has waited
436          * for IO to complete before we evict it from the cache */
437         filter_clear_page_cache(inode, iobuf);
438
439         RETURN(filter_do_bio(obd, inode, dreq, rw));
440 }
441
442 /* See if there are unallocated parts in given file region */
443 static int filter_range_is_mapped(struct inode *inode, obd_size offset, int len)
444 {
445         sector_t (*fs_bmap)(struct address_space *, sector_t) =
446                 inode->i_mapping->a_ops->bmap;
447         int j;
448
449         /* We can't know if we are overwriting or not */
450         if (fs_bmap == NULL)
451                 return 0;
452
453         offset >>= inode->i_blkbits;
454         len >>= inode->i_blkbits;
455
456         for (j = 0; j <= len; j++)
457                 if (fs_bmap(inode->i_mapping, offset + j) == 0)
458                         return 0;
459
460         return 1;
461 }
462
463 int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
464                           int objcount, struct obd_ioobj *obj, int niocount,
465                           struct niobuf_local *res, struct obd_trans_info *oti,
466                           int rc)
467 {
468         struct niobuf_local *lnb;
469         struct dio_request *dreq = NULL;
470         struct obd_run_ctxt saved;
471         struct fsfilt_objinfo fso;
472         struct iattr iattr = { 0 };
473         struct inode *inode = NULL;
474         unsigned long now = jiffies;
475         int i, err, cleanup_phase = 0;
476         struct obd_device *obd = exp->exp_obd;
477         int   total_size = 0;
478         ENTRY;
479
480         LASSERT(oti != NULL);
481         LASSERT(objcount == 1);
482         LASSERT(current->journal_info == NULL);
483
484         if (rc != 0)
485                 GOTO(cleanup, rc);
486         
487         rc = filter_alloc_iobuf(&obd->u.filter, OBD_BRW_WRITE, obj->ioo_bufcnt,
488                                 (void **)&dreq);
489         if (rc)
490                 GOTO(cleanup, rc);
491         cleanup_phase = 1;
492
493         fso.fso_dentry = res->dentry;
494         fso.fso_bufcnt = obj->ioo_bufcnt;
495         inode = res->dentry->d_inode;
496
497         for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++) {
498                 loff_t this_size;
499
500                 /* If overwriting an existing block, we don't need a grant */
501                 if (!(lnb->flags & OBD_BRW_GRANTED) && lnb->rc == -ENOSPC &&
502                     filter_range_is_mapped(inode, lnb->offset, lnb->len))
503                         lnb->rc = 0;
504
505                 if (lnb->rc) { /* ENOSPC, network RPC error, etc. */
506                         CDEBUG(D_INODE, "Skipping [%d] == %d\n", i, lnb->rc);
507                         continue;
508                 }
509
510                 err = filter_iobuf_add_page(obd, dreq, inode, lnb->page);
511                 LASSERT (err == 0);
512
513                 total_size += lnb->len;
514
515                 /* we expect these pages to be in offset order, but we'll
516                  * be forgiving */
517                 this_size = lnb->offset + lnb->len;
518                 if (this_size > iattr.ia_size)
519                         iattr.ia_size = this_size;
520         }
521
522         push_ctxt(&saved, &obd->obd_ctxt, NULL);
523         cleanup_phase = 2;
524
525         down(&inode->i_sem);
526         oti->oti_handle = fsfilt_brw_start(obd, objcount, &fso, niocount, res,
527                                            oti);
528         if (IS_ERR(oti->oti_handle)) {
529                 up(&inode->i_sem);
530                 rc = PTR_ERR(oti->oti_handle);
531                 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
532                        "error starting transaction: rc = %d\n", rc);
533                 oti->oti_handle = NULL;
534                 GOTO(cleanup, rc);
535         }
536         /* have to call fsfilt_commit() from this point on */
537
538         fsfilt_check_slow(now, obd_timeout, "brw_start");
539
540         iattr_from_obdo(&iattr,oa,OBD_MD_FLATIME|OBD_MD_FLMTIME|OBD_MD_FLCTIME);
541         /* filter_direct_io drops i_sem */
542         rc = filter_direct_io(OBD_BRW_WRITE, res->dentry, dreq, exp, &iattr,
543                               oti, NULL);
544         if (rc == 0)
545                 obdo_from_inode(oa, inode, FILTER_VALID_FLAGS);
546
547         fsfilt_check_slow(now, obd_timeout, "direct_io");
548
549         err = fsfilt_commit(obd, inode, oti->oti_handle, obd_sync_filter);
550         if (err)
551                 rc = err;
552
553         if (obd_sync_filter && !err)
554                 LASSERT(oti->oti_transno <= obd->obd_last_committed);
555
556         fsfilt_check_slow(now, obd_timeout, "commitrw commit");
557
558 cleanup:
559         filter_grant_commit(exp, niocount, res);
560
561         switch (cleanup_phase) {
562         case 2:
563                 pop_ctxt(&saved, &obd->obd_ctxt, NULL);
564                 LASSERT(current->journal_info == NULL);
565         case 1:
566                 filter_free_iobuf(dreq);
567         case 0:
568                 filter_free_dio_pages(objcount, obj, niocount, res);
569                 f_dput(res->dentry);
570         }
571
572         RETURN(rc);
573 }