Whamcloud - gitweb
Need to replace the nfs-utils-lustre that was installed with CMD2.
[fs/lustre-release.git] / lustre / obdfilter / filter_io_24.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/fs/obdfilter/filter_io.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *   Author: Phil Schwan <phil@clusterfs.com>
10  *
11  *   This file is part of Lustre, http://www.lustre.org.
12  *
13  *   Lustre is free software; you can redistribute it and/or
14  *   modify it under the terms of version 2 of the GNU General Public
15  *   License as published by the Free Software Foundation.
16  *
17  *   Lustre is distributed in the hope that it will be useful,
18  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
19  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20  *   GNU General Public License for more details.
21  *
22  *   You should have received a copy of the GNU General Public License
23  *   along with Lustre; if not, write to the Free Software
24  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25  */
26
27 #include <linux/config.h>
28 #include <linux/module.h>
29 #include <linux/pagemap.h> // XXX kill me soon
30 #include <linux/version.h>
31
32 #define DEBUG_SUBSYSTEM S_FILTER
33
34 #include <linux/iobuf.h>
35 #include <linux/locks.h>
36
37 #include <linux/obd_class.h>
38 #include <linux/lustre_fsfilt.h>
39 #include "filter_internal.h"
40
41 /* Bug 2254 -- this is better done in ext3_map_inode_page, but this
42  * workaround will suffice until everyone has upgraded their kernels */
43 static void check_pending_bhs(unsigned long *blocks, int nr_pages, dev_t dev,
44                               int size)
45 {
46 #if (LUSTRE_KERNEL_VERSION < 32)
47         struct buffer_head *bh;
48         int i;
49
50         for (i = 0; i < nr_pages; i++) {
51                 bh = get_hash_table(dev, blocks[i], size);
52                 if (bh == NULL)
53                         continue;
54                 if (!buffer_dirty(bh)) {
55                         put_bh(bh);
56                         continue;
57                 }
58                 mark_buffer_clean(bh);
59                 wait_on_buffer(bh);
60                 clear_bit(BH_Req, &bh->b_state);
61                 __brelse(bh);
62         }
63 #endif
64 }
65
66 /* when brw_kiovec() is asked to read from block -1UL it just zeros
67  * the page.  this gives us a chance to verify the write mappings
68  * as well */
69 static int filter_cleanup_mappings(int rw, struct kiobuf *iobuf,
70                                    struct inode *inode)
71 {
72         int i, blocks_per_page_bits = PAGE_SHIFT - inode->i_blkbits;
73         ENTRY;
74
75         for (i = 0 ; i < iobuf->nr_pages << blocks_per_page_bits; i++) {
76                 if (iobuf->blocks[i] > 0)
77                         continue;
78
79                 if (rw == OBD_BRW_WRITE)
80                         RETURN(-EINVAL);
81
82                 iobuf->blocks[i] = -1UL;
83         }
84         RETURN(0);
85 }
86
87 #if 0
88 static void dump_page(int rw, unsigned long block, struct page *page)
89 {
90         char *blah = kmap(page);
91         CDEBUG(D_PAGE, "rw %d block %lu: %02x %02x %02x %02x\n", rw, block,
92                        blah[0], blah[1], blah[2], blah[3]);
93         kunmap(page);
94 }
95 #endif
96
97 /* These are our hacks to keep our directio/bh IO coherent with ext3's
98  * page cache use.  Most notably ext3 reads file data into the page
99  * cache when it is zeroing the tail of partial-block truncates and
100  * leaves it there, sometimes generating io from it at later truncates.
101  * This removes the partial page and its buffers from the page cache,
102  * so it should only ever cause a wait in rare cases, as otherwise we
103  * always do full-page IO to the OST.
104  *
105  * The call to truncate_complete_page() will call journal_flushpage() to
106  * free the buffers and drop the page from cache.  The buffers should not
107  * be dirty, because we already called fdatasync/fdatawait on them.
108  */
109 static int filter_clear_page_cache(struct inode *inode, struct kiobuf *iobuf)
110 {
111         struct page *page;
112         int i, rc, rc2;
113
114         check_pending_bhs(KIOBUF_GET_BLOCKS(iobuf), iobuf->nr_pages,
115                           inode->i_dev, 1 << inode->i_blkbits);
116
117         /* This is nearly generic_osync_inode, without the waiting on the inode
118         rc = generic_osync_inode(inode, inode->i_mapping,
119                                  OSYNC_DATA|OSYNC_METADATA);
120          */
121         rc = filemap_fdatasync(inode->i_mapping);
122         rc2 = fsync_inode_data_buffers(inode);
123         if (rc == 0)
124                 rc = rc2;
125         rc2 = filemap_fdatawait(inode->i_mapping);
126         if (rc == 0)
127                 rc = rc2;
128         if (rc != 0)
129                 RETURN(rc);
130
131         /* be careful to call this after fsync_inode_data_buffers has waited
132          * for IO to complete before we evict it from the cache */
133         for (i = 0; i < iobuf->nr_pages ; i++) {
134                 page = find_lock_page(inode->i_mapping,
135                                       iobuf->maplist[i]->index);
136                 if (page == NULL)
137                         continue;
138                 if (page->mapping != NULL)
139                         ll_truncate_complete_page(page);
140
141                 unlock_page(page);
142                 page_cache_release(page);
143         }
144
145         return 0;
146 }
147
148 /* Must be called with i_sem taken for writes; this will drop it */
149 int filter_direct_io(int rw, struct dentry *dchild, void *buf,
150                      struct obd_export *exp, struct iattr *attr,
151                      struct obd_trans_info *oti, void **wait_handle)
152 {
153         struct obd_device *obd = exp->exp_obd;
154         struct inode *inode = dchild->d_inode;
155         struct kiobuf *iobuf = buf;
156         int rc, create = (rw == OBD_BRW_WRITE), *created = NULL, committed = 0;
157         int blocks_per_page = PAGE_SIZE >> inode->i_blkbits, cleanup_phase = 0;
158         struct semaphore *sem = NULL;
159         ENTRY;
160
161         LASSERTF(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ, "%x\n", rw);
162
163         if (iobuf->nr_pages == 0)
164                 GOTO(cleanup, rc = 0);
165
166         if (iobuf->nr_pages * blocks_per_page > KIO_MAX_SECTORS)
167                 GOTO(cleanup, rc = -EINVAL);
168
169         if (iobuf->nr_pages * blocks_per_page > 
170             OBDFILTER_CREATED_SCRATCHPAD_ENTRIES)
171                 GOTO(cleanup, rc = -EINVAL);
172
173         cleanup_phase = 1;
174
175         rc = lock_kiovec(1, &iobuf, 1);
176         if (rc < 0)
177                 GOTO(cleanup, rc);
178         cleanup_phase = 2;
179
180         if (rw == OBD_BRW_WRITE) {
181                 create = 1;
182                 sem = &obd->u.filter.fo_alloc_lock;
183         }
184         
185         rc = fsfilt_map_inode_pages(obd, inode, iobuf->maplist,
186                                     iobuf->nr_pages, iobuf->blocks, 
187                                     obdfilter_created_scratchpad, create, sem);
188         if (rc)
189                 GOTO(cleanup, rc);
190
191         rc = filter_cleanup_mappings(rw, iobuf, inode);
192         if (rc)
193                 GOTO(cleanup, rc);
194
195         if (rw == OBD_BRW_WRITE) {
196                 filter_tally_write(&obd->u.filter, iobuf->maplist,
197                                    iobuf->nr_pages, iobuf->blocks,
198                                    blocks_per_page);
199
200                 if (attr->ia_size > inode->i_size)
201                         attr->ia_valid |= ATTR_SIZE;
202                 rc = fsfilt_setattr(obd, dchild, oti->oti_handle, attr, 0);
203                 if (rc)
204                         GOTO(cleanup, rc);
205                 up(&inode->i_sem);
206                 cleanup_phase = 3;
207                 rc = filter_finish_transno(exp, oti, 0);
208                 if (rc)
209                         GOTO(cleanup, rc);
210
211                 rc = fsfilt_commit_async(obd,inode,oti->oti_handle,wait_handle);
212                 committed = 1;
213                 if (rc)
214                         GOTO(cleanup, rc);
215         }
216
217         rc = filter_clear_page_cache(inode, iobuf);
218         if (rc < 0)
219                 GOTO(cleanup, rc);
220
221         rc = fsfilt_send_bio(rw, obd, inode, iobuf);
222
223         CDEBUG(D_INFO, "tried to write %d pages, rc = %d\n",
224                iobuf->nr_pages, rc);
225
226         if (rc > 0)
227                 rc = 0;
228
229         EXIT;
230 cleanup:
231         if (!committed && (rw == OBD_BRW_WRITE)) {                
232                 int err = fsfilt_commit_async(obd, inode,
233                                               oti->oti_handle, wait_handle);
234                 oti->oti_handle = NULL;
235                 if (err)
236                         CERROR("can't close transaction: %d\n", err);
237                 /*
238                  * this is error path, so we prefer to return
239                  * original error, not this one
240                  */
241         }
242
243         switch(cleanup_phase) {
244         case 3:
245         case 2:
246                 unlock_kiovec(1, &iobuf);
247         case 1:
248         case 0:
249                 if (cleanup_phase != 3 && rw == OBD_BRW_WRITE)            
250                         up(&inode->i_sem);
251                 break;
252         default:
253                 CERROR("corrupt cleanup_phase (%d)?\n", cleanup_phase);
254                 LBUG();
255                 break;
256         }
257         return rc;
258 }
259
260 /* See if there are unallocated parts in given file region */
261 int filter_range_is_mapped(struct inode *inode, obd_size offset, int len)
262 {
263         int (*fs_bmap)(struct address_space *, long) =
264                 inode->i_mapping->a_ops->bmap;
265         int j;
266
267         /* We can't know if the range is mapped already or not */
268         if (fs_bmap == NULL)
269                 return 0;
270
271         offset >>= inode->i_blkbits;
272         len >>= inode->i_blkbits;
273
274         for (j = 0; j < len; j++)
275                 if (fs_bmap(inode->i_mapping, offset + j) == 0)
276                         return 0;
277
278         return 1;
279 }
280
281 /* some kernels require alloc_kiovec callers to zero members through the use of
282  * map_user_kiobuf and unmap_.. we don't use those, so we have a little helper
283  * that makes sure we don't break the rules. */
284 static void clear_kiobuf(struct kiobuf *iobuf)
285 {
286         int i;
287
288         for (i = 0; i < iobuf->array_len; i++)
289                 iobuf->maplist[i] = NULL;
290
291         iobuf->nr_pages = 0;
292         iobuf->offset = 0;
293         iobuf->length = 0;
294 }
295
296 int filter_alloc_iobuf(int rw, int num_pages, void **ret)
297 {
298         int rc;
299         struct kiobuf *iobuf;
300         ENTRY;
301
302         LASSERTF(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ, "%x\n", rw);
303
304         rc = alloc_kiovec(1, &iobuf);
305         if (rc)
306                 RETURN(rc);
307
308         rc = expand_kiobuf(iobuf, num_pages);
309         if (rc) {
310                 free_kiovec(1, &iobuf);
311                 RETURN(rc);
312         }
313
314 #ifdef HAVE_KIOBUF_DOVARY
315         iobuf->dovary = 0; /* this prevents corruption, not present in 2.4.20 */
316 #endif
317         clear_kiobuf(iobuf);
318         *ret = iobuf;
319         RETURN(0);
320 }
321
322 void filter_free_iobuf(void *buf)
323 {
324         struct kiobuf *iobuf = buf;
325
326         clear_kiobuf(iobuf);
327         free_kiovec(1, &iobuf);
328 }
329
330 int filter_iobuf_add_page(struct obd_device *obd, void *buf,
331                            struct inode *inode, struct page *page)
332 {
333         struct kiobuf *iobuf = buf;
334
335         iobuf->maplist[iobuf->nr_pages++] = page;
336         iobuf->length += PAGE_SIZE;
337
338         return 0;
339 }
340
341 int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount,
342                           struct obd_ioobj *obj, int niocount,
343                           struct niobuf_local *res, struct obd_trans_info *oti,
344                           int rc)
345 {
346         struct obd_device *obd = exp->exp_obd;
347         struct lvfs_run_ctxt saved;
348         struct niobuf_local *lnb;
349         struct fsfilt_objinfo fso;
350         struct iattr iattr = { 0 };
351         void *iobuf = NULL;
352         struct inode *inode = NULL;
353         int i, n, cleanup_phase = 0, err;
354         unsigned long now = jiffies; /* DEBUGGING OST TIMEOUTS */
355         void *wait_handle;
356         ENTRY;
357         LASSERT(oti != NULL);
358         LASSERT(objcount == 1);
359         LASSERT(current->journal_info == NULL);
360
361         if (rc != 0)
362                 GOTO(cleanup, rc);
363
364         rc = filter_alloc_iobuf(OBD_BRW_WRITE, obj->ioo_bufcnt, &iobuf);
365         if (rc)
366                 GOTO(cleanup, rc);
367         cleanup_phase = 1;
368
369         fso.fso_dentry = res->dentry;
370         fso.fso_bufcnt = obj->ioo_bufcnt;
371         inode = res->dentry->d_inode;
372
373         for (i = 0, lnb = res, n = 0; i < obj->ioo_bufcnt; i++, lnb++) {
374                 loff_t this_size;
375
376                 /* If overwriting an existing block, we don't need a grant */
377                 if (!(lnb->flags & OBD_BRW_GRANTED) && lnb->rc == -ENOSPC &&
378                     filter_range_is_mapped(inode, lnb->offset, lnb->len))
379                         lnb->rc = 0;
380
381                 if (lnb->rc) /* ENOSPC, network RPC error */
382                         continue;
383
384                 filter_iobuf_add_page(obd, iobuf, inode, lnb->page);
385                 
386                 /* We expect these pages to be in offset order, but we'll
387                  * be forgiving */
388                 this_size = lnb->offset + lnb->len;
389                 if (this_size > iattr.ia_size)
390                         iattr.ia_size = this_size;
391         }
392
393         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
394         cleanup_phase = 2;
395
396         down(&inode->i_sem);
397         oti->oti_handle = fsfilt_brw_start(obd, objcount, &fso, niocount, res,
398                                            oti);
399         if (IS_ERR(oti->oti_handle)) {
400                 up(&inode->i_sem);
401                 rc = PTR_ERR(oti->oti_handle);
402                 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
403                        "error starting transaction: rc = %d\n", rc);
404                 oti->oti_handle = NULL;
405                 GOTO(cleanup, rc);
406         }
407
408         fsfilt_check_slow(now, obd_timeout, "brw_start");
409
410         iattr_from_obdo(&iattr,oa,OBD_MD_FLATIME|OBD_MD_FLMTIME|OBD_MD_FLCTIME);
411         /* filter_direct_io drops i_sem */
412         rc = filter_direct_io(OBD_BRW_WRITE, res->dentry, iobuf, exp, &iattr,
413                               oti, &wait_handle);
414         if (rc == 0)
415                 obdo_from_inode(oa, inode, FILTER_VALID_FLAGS);
416
417         fsfilt_check_slow(now, obd_timeout, "direct_io");
418
419         err = fsfilt_commit_wait(obd, inode, wait_handle);
420         if (err)
421                 rc = err;
422         if (obd_sync_filter && !err)
423                 LASSERTF(oti->oti_transno <= obd->obd_last_committed,
424                          "oti_transno "LPU64" last_committed "LPU64"\n",
425                          oti->oti_transno, obd->obd_last_committed);
426         fsfilt_check_slow(now, obd_timeout, "commitrw commit");
427 cleanup:
428         filter_grant_commit(exp, niocount, res);
429
430         switch (cleanup_phase) {
431         case 2:
432                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
433                 LASSERT(current->journal_info == NULL);
434         case 1:
435                 filter_free_iobuf(iobuf);
436         case 0:
437                 filter_free_dio_pages(objcount, obj, niocount, res);
438                 f_dput(res->dentry);
439         }
440
441         RETURN(rc);
442 }