Whamcloud - gitweb
- landing of b_hd_cleanup_merge to HEAD.
[fs/lustre-release.git] / lustre / obdfilter / filter_io_24.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/fs/obdfilter/filter_io.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *   Author: Phil Schwan <phil@clusterfs.com>
10  *
11  *   This file is part of Lustre, http://www.lustre.org.
12  *
13  *   Lustre is free software; you can redistribute it and/or
14  *   modify it under the terms of version 2 of the GNU General Public
15  *   License as published by the Free Software Foundation.
16  *
17  *   Lustre is distributed in the hope that it will be useful,
18  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
19  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20  *   GNU General Public License for more details.
21  *
22  *   You should have received a copy of the GNU General Public License
23  *   along with Lustre; if not, write to the Free Software
24  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25  */
26
27 #include <linux/config.h>
28 #include <linux/module.h>
29 #include <linux/pagemap.h> // XXX kill me soon
30 #include <linux/version.h>
31
32 #define DEBUG_SUBSYSTEM S_FILTER
33
34 #include <linux/iobuf.h>
35 #include <linux/locks.h>
36
37 #include <linux/obd_class.h>
38 #include <linux/lustre_fsfilt.h>
39 #include "filter_internal.h"
40
41
42 /* We should only change the file mtime (and not the ctime, like
43  * update_inode_times() in generic_file_write()) when we only change data. */
44 void inode_update_time(struct inode *inode, int ctime_too)
45 {
46         time_t now = CURRENT_TIME;
47         if (inode->i_mtime == now && (!ctime_too || inode->i_ctime == now))
48                 return;
49         inode->i_mtime = now;
50         if (ctime_too)
51                 inode->i_ctime = now;
52         mark_inode_dirty_sync(inode);
53 }
54
55 /* Bug 2254 -- this is better done in ext3_map_inode_page, but this
56  * workaround will suffice until everyone has upgraded their kernels */
57 static void check_pending_bhs(unsigned long *blocks, int nr_pages, dev_t dev,
58                               int size)
59 {
60 #if (LUSTRE_KERNEL_VERSION < 32)
61         struct buffer_head *bh;
62         int i;
63
64         for (i = 0; i < nr_pages; i++) {
65                 bh = get_hash_table(dev, blocks[i], size);
66                 if (bh == NULL)
67                         continue;
68                 if (!buffer_dirty(bh)) {
69                         put_bh(bh);
70                         continue;
71                 }
72                 mark_buffer_clean(bh);
73                 wait_on_buffer(bh);
74                 clear_bit(BH_Req, &bh->b_state);
75                 __brelse(bh);
76         }
77 #endif
78 }
79
80 /* when brw_kiovec() is asked to read from block -1UL it just zeros
81  * the page.  this gives us a chance to verify the write mappings
82  * as well */
83 static int filter_cleanup_mappings(int rw, struct kiobuf *iobuf,
84                                    struct inode *inode)
85 {
86         int i, blocks_per_page_bits = PAGE_SHIFT - inode->i_blkbits;
87         ENTRY;
88
89         for (i = 0 ; i < iobuf->nr_pages << blocks_per_page_bits; i++) {
90                 if (iobuf->blocks[i] > 0)
91                         continue;
92
93                 if (rw == OBD_BRW_WRITE)
94                         RETURN(-EINVAL);
95
96                 iobuf->blocks[i] = -1UL;
97         }
98         RETURN(0);
99 }
100
101 #if 0
102 static void dump_page(int rw, unsigned long block, struct page *page)
103 {
104         char *blah = kmap(page);
105         CDEBUG(D_PAGE, "rw %d block %lu: %02x %02x %02x %02x\n", rw, block,
106                        blah[0], blah[1], blah[2], blah[3]);
107         kunmap(page);
108 }
109 #endif
110
111 static void filter_clear_page_cache(struct inode *inode, struct kiobuf *iobuf)
112 {
113         struct page *page;
114         int i;
115
116         for (i = 0; i < iobuf->nr_pages ; i++) {
117                 page = find_lock_page(inode->i_mapping,
118                                       iobuf->maplist[i]->index);
119                 if (page == NULL)
120                         continue;
121                 if (page->mapping != NULL) {
122                         block_flushpage(page, 0);
123                         truncate_complete_page(page);
124                 }
125                 unlock_page(page);
126                 page_cache_release(page);
127         }
128 }
129
130 /* Must be called with i_sem taken for writes; this will drop it */
131 int filter_direct_io(int rw, struct dentry *dchild, void *buf,
132                      struct obd_export *exp, struct iattr *attr,
133                      struct obd_trans_info *oti, void **wait_handle)
134 {
135         struct obd_device *obd = exp->exp_obd;
136         struct inode *inode = dchild->d_inode;
137          struct kiobuf *iobuf = buf;
138         int rc, create = (rw == OBD_BRW_WRITE), *created = NULL, committed = 0;
139         int blocks_per_page = PAGE_SIZE >> inode->i_blkbits, cleanup_phase = 0;
140         struct semaphore *sem = NULL;
141         ENTRY;
142
143         LASSERTF(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ, "%x\n", rw);
144
145         if (iobuf->nr_pages == 0)
146                 GOTO(cleanup, rc = 0);
147
148         if (iobuf->nr_pages * blocks_per_page > KIO_MAX_SECTORS)
149                 GOTO(cleanup, rc = -EINVAL);
150
151         OBD_ALLOC(created, sizeof(*created) * iobuf->nr_pages*blocks_per_page);
152         if (created == NULL)
153                 GOTO(cleanup, rc = -ENOMEM);
154         cleanup_phase = 1;
155
156         rc = lock_kiovec(1, &iobuf, 1);
157         if (rc < 0)
158                 GOTO(cleanup, rc);
159         cleanup_phase = 2;
160
161         if (rw == OBD_BRW_WRITE) {
162                 create = 1;
163                 sem = &obd->u.filter.fo_alloc_lock;
164         }
165         
166         rc = fsfilt_map_inode_pages(obd, inode, iobuf->maplist,
167                                     iobuf->nr_pages, iobuf->blocks, created,
168                                     create, sem);
169         if (rc)
170                 GOTO(cleanup, rc);
171
172         rc = filter_cleanup_mappings(rw, iobuf, inode);
173         if (rc)
174                 GOTO(cleanup, rc);
175
176         if (rw == OBD_BRW_WRITE) {
177                 filter_tally_write(&obd->u.filter, iobuf->maplist,
178                                    iobuf->nr_pages, iobuf->blocks,
179                                    blocks_per_page);
180
181                 if (attr->ia_size > inode->i_size)
182                         attr->ia_valid |= ATTR_SIZE;
183                 rc = fsfilt_setattr(obd, dchild, oti->oti_handle, attr, 0);
184                 if (rc)
185                         GOTO(cleanup, rc);
186                 up(&inode->i_sem);
187                 cleanup_phase = 3;
188                 rc = filter_finish_transno(exp, oti, 0);
189                 if (rc)
190                         GOTO(cleanup, rc);
191
192                 rc = fsfilt_commit_async(obd,inode,oti->oti_handle,wait_handle);
193                 committed = 1;
194                 if (rc)
195                         GOTO(cleanup, rc);
196         }
197
198         /* these are our hacks to keep our directio/bh IO coherent with ext3's
199          * page cache use.  Most notably ext3 reads file data into the page
200          * cache when it is zeroing the tail of partial-block truncates and
201          * leaves it there, sometimes generating io from it at later truncates.
202          * Someday very soon we'll be performing our brw_kiovec() IO to and
203          * from the page cache. */
204
205         check_pending_bhs(iobuf->blocks, iobuf->nr_pages, inode->i_dev,
206                           1 << inode->i_blkbits);
207
208         rc = filemap_fdatasync(inode->i_mapping);
209         if (rc == 0)
210                 rc = fsync_inode_data_buffers(inode);
211         if (rc == 0)
212                 rc = filemap_fdatawait(inode->i_mapping);
213         if (rc < 0)
214                 GOTO(cleanup, rc);
215
216         /* be careful to call this after fsync_inode_data_buffers has waited
217          * for IO to complete before we evict it from the cache */
218         filter_clear_page_cache(inode, iobuf);
219
220         rc = fsfilt_send_bio(rw, obd, inode, iobuf);
221
222         CDEBUG(D_INFO, "tried to write %d pages, rc = %d\n",
223                iobuf->nr_pages, rc);
224
225         if (rc > 0)
226                 rc = 0;
227
228         EXIT;
229 cleanup:
230         if (!committed && (rw == OBD_BRW_WRITE)) {                
231                 int err = fsfilt_commit_async(obd, inode,
232                                               oti->oti_handle, wait_handle);
233                 oti->oti_handle = NULL;
234                 if (err)
235                         CERROR("can't close transaction: %d\n", err);
236                 /*
237                  * this is error path, so we prefer to return
238                  * original error, not this one
239                  */
240         }
241
242         switch(cleanup_phase) {
243         case 3:
244         case 2:
245                 unlock_kiovec(1, &iobuf);
246         case 1:
247                 OBD_FREE(created, sizeof(*created) *
248                          iobuf->nr_pages*blocks_per_page);
249         case 0:
250                 if (cleanup_phase != 3 && rw == OBD_BRW_WRITE)            
251                         up(&inode->i_sem);
252                 break;
253         default:
254                 CERROR("corrupt cleanup_phase (%d)?\n", cleanup_phase);
255                 LBUG();
256                 break;
257         }
258         return rc;
259 }
260
261 /* See if there are unallocated parts in given file region */
262 int filter_range_is_mapped(struct inode *inode, obd_size offset, int len)
263 {
264         int (*fs_bmap)(struct address_space *, long) =
265                 inode->i_mapping->a_ops->bmap;
266         int j;
267
268         /* We can't know if the range is mapped already or not */
269         if (fs_bmap == NULL)
270                 return 0;
271
272         offset >>= inode->i_blkbits;
273         len >>= inode->i_blkbits;
274
275         for (j = 0; j < len; j++)
276                 if (fs_bmap(inode->i_mapping, offset + j) == 0)
277                         return 0;
278
279         return 1;
280 }
281
282
283 /* some kernels require alloc_kiovec callers to zero members through the use of
284  * map_user_kiobuf and unmap_.. we don't use those, so we have a little helper
285  * that makes sure we don't break the rules. */
286 static void clear_kiobuf(struct kiobuf *iobuf)
287 {
288         int i;
289
290         for (i = 0; i < iobuf->array_len; i++)
291                 iobuf->maplist[i] = NULL;
292
293         iobuf->nr_pages = 0;
294         iobuf->offset = 0;
295         iobuf->length = 0;
296 }
297
298 int filter_alloc_iobuf(int rw, int num_pages, void **ret)
299 {
300         int rc;
301         struct kiobuf *iobuf;
302         ENTRY;
303
304         LASSERTF(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ, "%x\n", rw);
305
306         rc = alloc_kiovec(1, &iobuf);
307         if (rc)
308                 RETURN(rc);
309
310         rc = expand_kiobuf(iobuf, num_pages);
311         if (rc) {
312                 free_kiovec(1, &iobuf);
313                 RETURN(rc);
314         }
315
316 #ifdef HAVE_KIOBUF_DOVARY
317         iobuf->dovary = 0; /* this prevents corruption, not present in 2.4.20 */
318 #endif
319         clear_kiobuf(iobuf);
320         *ret = iobuf;
321         RETURN(0);
322 }
323
324 void filter_free_iobuf(void *buf)
325 {
326         struct kiobuf *iobuf = buf;
327
328         clear_kiobuf(iobuf);
329         free_kiovec(1, &iobuf);
330 }
331
332 int filter_iobuf_add_page(struct obd_device *obd, void *buf,
333                            struct inode *inode, struct page *page)
334 {
335         struct kiobuf *iobuf = buf;
336
337         iobuf->maplist[iobuf->nr_pages++] = page;
338         iobuf->length += PAGE_SIZE;
339
340         return 0;
341 }
342
343 int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount,
344                           struct obd_ioobj *obj, int niocount,
345                           struct niobuf_local *res, struct obd_trans_info *oti,
346                           int rc)
347 {
348         struct obd_device *obd = exp->exp_obd;
349         struct lvfs_run_ctxt saved;
350         struct niobuf_local *lnb;
351         struct fsfilt_objinfo fso;
352         struct iattr iattr = { 0 };
353         void *iobuf = NULL;
354         struct inode *inode = NULL;
355         int i, n, cleanup_phase = 0, err;
356         unsigned long now = jiffies; /* DEBUGGING OST TIMEOUTS */
357         void *wait_handle;
358         ENTRY;
359         LASSERT(oti != NULL);
360         LASSERT(objcount == 1);
361         LASSERT(current->journal_info == NULL);
362
363         if (rc != 0)
364                 GOTO(cleanup, rc);
365
366         rc = filter_alloc_iobuf(OBD_BRW_WRITE, obj->ioo_bufcnt, &iobuf);
367         if (rc)
368                 GOTO(cleanup, rc);
369         cleanup_phase = 1;
370
371         fso.fso_dentry = res->dentry;
372         fso.fso_bufcnt = obj->ioo_bufcnt;
373         inode = res->dentry->d_inode;
374
375         for (i = 0, lnb = res, n = 0; i < obj->ioo_bufcnt; i++, lnb++) {
376                 loff_t this_size;
377
378                 /* If overwriting an existing block, we don't need a grant */
379                 if (!(lnb->flags & OBD_BRW_GRANTED) && lnb->rc == -ENOSPC &&
380                      filter_range_is_mapped(inode, lnb->offset, lnb->len))    
381                         lnb->rc = 0;
382
383                 if (lnb->rc) /* ENOSPC, network RPC error */
384                         continue;
385
386                 filter_iobuf_add_page(obd, iobuf, inode, lnb->page);
387                 /* We expect these pages to be in offset order, but we'll
388                  * be forgiving */
389                 this_size = lnb->offset + lnb->len;
390                 if (this_size > iattr.ia_size)
391                         iattr.ia_size = this_size;
392         }
393
394         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
395         cleanup_phase = 2;
396
397         down(&inode->i_sem);
398         oti->oti_handle = fsfilt_brw_start(obd, objcount, &fso, niocount, res,
399                                            oti);
400         if (IS_ERR(oti->oti_handle)) {
401                 up(&inode->i_sem);
402                 rc = PTR_ERR(oti->oti_handle);
403                 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
404                        "error starting transaction: rc = %d\n", rc);
405                 oti->oti_handle = NULL;
406                 GOTO(cleanup, rc);
407         }
408
409         if (time_after(jiffies, now + 15 * HZ))
410                 CERROR("slow brw_start %lus\n", (jiffies - now) / HZ);
411
412         iattr_from_obdo(&iattr,oa,OBD_MD_FLATIME|OBD_MD_FLMTIME|OBD_MD_FLCTIME);
413         /* filter_direct_io drops i_sem */
414         rc = filter_direct_io(OBD_BRW_WRITE, res->dentry, iobuf, exp, &iattr,
415                               oti, &wait_handle);
416         if (rc == 0)
417                 obdo_from_inode(oa, inode, FILTER_VALID_FLAGS);
418
419         if (time_after(jiffies, now + 15 * HZ))
420                 CERROR("slow direct_io %lus\n", (jiffies - now) / HZ);
421
422         err = fsfilt_commit_wait(obd, inode, wait_handle);
423         if (err)
424                 rc = err;
425         if (obd_sync_filter)
426                 LASSERT(oti->oti_transno <= obd->obd_last_committed);
427         if (time_after(jiffies, now + 15 * HZ))
428                 CERROR("slow commitrw commit %lus\n", (jiffies - now) / HZ);
429 cleanup:
430         filter_grant_commit(exp, niocount, res);
431
432         switch (cleanup_phase) {
433         case 2:
434                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
435                 LASSERT(current->journal_info == NULL);
436         case 1:
437                 filter_free_iobuf(iobuf);
438         case 0:
439                 filter_free_dio_pages(objcount, obj, niocount, res);
440                 f_dput(res->dentry);
441         }
442
443         RETURN(rc);
444 }
445