Whamcloud - gitweb
Branch b1_4_mountconf
[fs/lustre-release.git] / lustre / obdfilter / filter_io_24.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/fs/obdfilter/filter_io.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *   Author: Phil Schwan <phil@clusterfs.com>
10  *
11  *   This file is part of the Lustre file system, http://www.lustre.org
12  *   Lustre is a trademark of Cluster File Systems, Inc.
13  *
14  *   You may have signed or agreed to another license before downloading
15  *   this software.  If so, you are bound by the terms and conditions
16  *   of that agreement, and the following does not apply to you.  See the
17  *   LICENSE file included with this distribution for more information.
18  *
19  *   If you did not agree to a different license, then this copy of Lustre
20  *   is open source software; you can redistribute it and/or modify it
21  *   under the terms of version 2 of the GNU General Public License as
22  *   published by the Free Software Foundation.
23  *
24  *   In either case, Lustre is distributed in the hope that it will be
25  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
26  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
27  *   license text for more details.
28  */
29
30 #include <linux/config.h>
31 #include <linux/module.h>
32 #include <linux/pagemap.h> // XXX kill me soon
33 #include <linux/version.h>
34
35 #define DEBUG_SUBSYSTEM S_FILTER
36
37 #include <linux/iobuf.h>
38 #include <linux/locks.h>
39
40 #include <linux/obd_class.h>
41 #include <linux/lustre_fsfilt.h>
42 #include "filter_internal.h"
43
44 /* Bug 2254 -- this is better done in ext3_map_inode_page, but this
45  * workaround will suffice until everyone has upgraded their kernels */
46 static void check_pending_bhs(unsigned long *blocks, int nr_pages, dev_t dev,
47                               int size)
48 {
49 #if (LUSTRE_KERNEL_VERSION < 32)
50         struct buffer_head *bh;
51         int i;
52
53         for (i = 0; i < nr_pages; i++) {
54                 bh = get_hash_table(dev, blocks[i], size);
55                 if (bh == NULL)
56                         continue;
57                 if (!buffer_dirty(bh)) {
58                         put_bh(bh);
59                         continue;
60                 }
61                 mark_buffer_clean(bh);
62                 wait_on_buffer(bh);
63                 clear_bit(BH_Req, &bh->b_state);
64                 __brelse(bh);
65         }
66 #endif
67 }
68
69 /* when brw_kiovec() is asked to read from block -1UL it just zeros
70  * the page.  this gives us a chance to verify the write mappings
71  * as well */
72 static int filter_cleanup_mappings(int rw, struct kiobuf *iobuf,
73                                    struct inode *inode)
74 {
75         int i, blocks_per_page_bits = PAGE_SHIFT - inode->i_blkbits;
76         ENTRY;
77
78         for (i = 0 ; i < iobuf->nr_pages << blocks_per_page_bits; i++) {
79                 if (KIOBUF_GET_BLOCKS(iobuf)[i] > 0)
80                         continue;
81
82                 if (rw == OBD_BRW_WRITE)
83                         RETURN(-EINVAL);
84
85                 KIOBUF_GET_BLOCKS(iobuf)[i] = -1UL;
86         }
87         RETURN(0);
88 }
89
90 #if 0
91 static void dump_page(int rw, unsigned long block, struct page *page)
92 {
93         char *blah = kmap(page);
94         CDEBUG(D_PAGE, "rw %d block %lu: %02x %02x %02x %02x\n", rw, block,
95                        blah[0], blah[1], blah[2], blah[3]);
96         kunmap(page);
97 }
98 #endif
99
100 /* These are our hacks to keep our directio/bh IO coherent with ext3's
101  * page cache use.  Most notably ext3 reads file data into the page
102  * cache when it is zeroing the tail of partial-block truncates and
103  * leaves it there, sometimes generating io from it at later truncates.
104  * This removes the partial page and its buffers from the page cache,
105  * so it should only ever cause a wait in rare cases, as otherwise we
106  * always do full-page IO to the OST.
107  *
108  * The call to truncate_complete_page() will call journal_flushpage() to
109  * free the buffers and drop the page from cache.  The buffers should not
110  * be dirty, because we already called fdatasync/fdatawait on them.
111  */
112 static int filter_clear_page_cache(struct inode *inode, struct kiobuf *iobuf)
113 {
114         struct page *page;
115         int i, rc, rc2;
116
117         check_pending_bhs(KIOBUF_GET_BLOCKS(iobuf), iobuf->nr_pages,
118                           inode->i_dev, 1 << inode->i_blkbits);
119
120         /* This is nearly generic_osync_inode, without the waiting on the inode
121         rc = generic_osync_inode(inode, inode->i_mapping,
122                                  OSYNC_DATA|OSYNC_METADATA);
123          */
124         rc = filemap_fdatasync(inode->i_mapping);
125         rc2 = fsync_inode_data_buffers(inode);
126         if (rc == 0)
127                 rc = rc2;
128         rc2 = filemap_fdatawait(inode->i_mapping);
129         if (rc == 0)
130                 rc = rc2;
131         if (rc != 0)
132                 RETURN(rc);
133
134         /* be careful to call this after fsync_inode_data_buffers has waited
135          * for IO to complete before we evict it from the cache */
136         for (i = 0; i < iobuf->nr_pages ; i++) {
137                 page = find_lock_page(inode->i_mapping,
138                                       iobuf->maplist[i]->index);
139                 if (page == NULL)
140                         continue;
141                 if (page->mapping != NULL)
142                         ll_truncate_complete_page(page);
143
144                 unlock_page(page);
145                 page_cache_release(page);
146         }
147
148         return 0;
149 }
150
151 /* Must be called with i_sem taken for writes; this will drop it */
152 int filter_direct_io(int rw, struct dentry *dchild, struct filter_iobuf *buf,
153                      struct obd_export *exp, struct iattr *attr,
154                      struct obd_trans_info *oti, void **wait_handle)
155 {
156         struct obd_device *obd = exp->exp_obd;
157         struct inode *inode = dchild->d_inode;
158         struct kiobuf *iobuf = (void *)buf;
159         int rc, create = (rw == OBD_BRW_WRITE), committed = 0;
160         int blocks_per_page = PAGE_SIZE >> inode->i_blkbits, cleanup_phase = 0;
161         struct semaphore *sem = NULL;
162         ENTRY;
163
164         LASSERTF(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ, "%x\n", rw);
165
166         if (iobuf->nr_pages == 0)
167                 GOTO(cleanup, rc = 0);
168
169         if (iobuf->nr_pages * blocks_per_page > KIO_MAX_SECTORS)
170                 GOTO(cleanup, rc = -EINVAL);
171
172         if (iobuf->nr_pages * blocks_per_page >
173             OBDFILTER_CREATED_SCRATCHPAD_ENTRIES)
174                 GOTO(cleanup, rc = -EINVAL);
175
176         cleanup_phase = 1;
177
178         rc = lock_kiovec(1, &iobuf, 1);
179         if (rc < 0)
180                 GOTO(cleanup, rc);
181         cleanup_phase = 2;
182
183         if (rw == OBD_BRW_WRITE) {
184                 create = 1;
185                 sem = &obd->u.filter.fo_alloc_lock;
186         }
187         rc = fsfilt_map_inode_pages(obd, inode, iobuf->maplist,
188                                     iobuf->nr_pages, KIOBUF_GET_BLOCKS(iobuf),
189                                     obdfilter_created_scratchpad, create, sem);
190         if (rc)
191                 GOTO(cleanup, rc);
192
193         rc = filter_cleanup_mappings(rw, iobuf, inode);
194         if (rc)
195                 GOTO(cleanup, rc);
196
197         if (rw == OBD_BRW_WRITE) {
198                 filter_tally_write(&obd->u.filter, iobuf->maplist,
199                                    iobuf->nr_pages, KIOBUF_GET_BLOCKS(iobuf),
200                                    blocks_per_page);
201
202                 if (attr->ia_size > inode->i_size)
203                         attr->ia_valid |= ATTR_SIZE;
204                 rc = fsfilt_setattr(obd, dchild, oti->oti_handle, attr, 0);
205                 if (rc)
206                         GOTO(cleanup, rc);
207
208                 up(&inode->i_sem);
209                 cleanup_phase = 3;
210
211                 rc = filter_finish_transno(exp, oti, 0);
212                 if (rc)
213                         GOTO(cleanup, rc);
214
215                 rc = fsfilt_commit_async(obd,inode,oti->oti_handle,wait_handle);
216                 committed = 1;
217                 if (rc)
218                         GOTO(cleanup, rc);
219         }
220
221         rc = filter_clear_page_cache(inode, iobuf);
222         if (rc < 0)
223                 GOTO(cleanup, rc);
224
225         rc = fsfilt_send_bio(rw, obd, inode, iobuf);
226
227         CDEBUG(D_INFO, "tried to write %d pages, rc = %d\n",
228                iobuf->nr_pages, rc);
229
230         if (rc > 0)
231                 rc = 0;
232
233         EXIT;
234 cleanup:
235         if (!committed && (rw == OBD_BRW_WRITE)) {
236                 int err = fsfilt_commit_async(obd, inode,
237                                               oti->oti_handle, wait_handle);
238                 if (err)
239                         CERROR("can't close transaction: %d\n", err);
240                 /*
241                  * this is error path, so we prefer to return
242                  * original error, not this one
243                  */
244         }
245
246         switch(cleanup_phase) {
247         case 3:
248         case 2:
249                 unlock_kiovec(1, &iobuf);
250         case 1:
251         case 0:
252                 if (cleanup_phase != 3 && rw == OBD_BRW_WRITE)
253                         up(&inode->i_sem);
254                 break;
255         default:
256                 CERROR("corrupt cleanup_phase (%d)?\n", cleanup_phase);
257                 LBUG();
258                 break;
259         }
260         return rc;
261 }
262
263 /* See if there are unallocated parts in given file region */
264 int filter_range_is_mapped(struct inode *inode, obd_size offset, int len)
265 {
266         int (*fs_bmap)(struct address_space *, long) =
267                 inode->i_mapping->a_ops->bmap;
268         int j;
269
270         /* We can't know if the range is mapped already or not */
271         if (fs_bmap == NULL)
272                 return 0;
273
274         offset >>= inode->i_blkbits;
275         len >>= inode->i_blkbits;
276
277         for (j = 0; j < len; j++)
278                 if (fs_bmap(inode->i_mapping, offset + j) == 0)
279                         return 0;
280
281         return 1;
282 }
283
284 /* some kernels require alloc_kiovec callers to zero members through the use of
285  * map_user_kiobuf and unmap_.. we don't use those, so we have a little helper
286  * that makes sure we don't break the rules. */
287 static void clear_kiobuf(struct kiobuf *iobuf)
288 {
289         int i;
290
291         for (i = 0; i < iobuf->array_len; i++)
292                 iobuf->maplist[i] = NULL;
293
294         iobuf->nr_pages = 0;
295         iobuf->offset = 0;
296         iobuf->length = 0;
297 }
298
299 struct filter_iobuf *filter_alloc_iobuf(struct filter_obd *filter,
300                                         int rw, int num_pages)
301 {
302         struct kiobuf *iobuf;
303         int rc;
304         ENTRY;
305
306         LASSERTF(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ, "%x\n", rw);
307
308         rc = alloc_kiovec(1, &iobuf);
309         if (rc)
310                 RETURN(ERR_PTR(rc));
311
312         rc = expand_kiobuf(iobuf, num_pages);
313         if (rc) {
314                 free_kiovec(1, &iobuf);
315                 RETURN(ERR_PTR(rc));
316         }
317
318 #ifdef HAVE_KIOBUF_DOVARY
319         iobuf->dovary = 0; /* this prevents corruption, not present in 2.4.20 */
320 #endif
321         clear_kiobuf(iobuf);
322         RETURN((void *)iobuf);
323 }
324
325 void filter_free_iobuf(struct filter_iobuf *buf)
326 {
327         struct kiobuf *iobuf = (void *)buf;
328
329         clear_kiobuf(iobuf);
330         free_kiovec(1, &iobuf);
331 }
332
333 void filter_iobuf_put(struct filter_obd *filter, struct filter_iobuf *iobuf,
334                       struct obd_trans_info *oti)
335 {
336         int thread_id = oti ? oti->oti_thread_id : -1;
337
338         if (unlikely(thread_id < 0)) {
339                 filter_free_iobuf(iobuf);
340                 return;
341         }
342
343         LASSERTF(filter->fo_iobuf_pool[thread_id] == iobuf,
344                  "iobuf mismatch for thread %d: pool %p iobuf %p\n",
345                  thread_id, filter->fo_iobuf_pool[thread_id], iobuf);
346         clear_kiobuf((void *)iobuf);
347 }
348
349 int filter_iobuf_add_page(struct obd_device *obd, struct filter_iobuf *buf,
350                            struct inode *inode, struct page *page)
351 {
352         struct kiobuf *iobuf = (void *)buf;
353
354         iobuf->maplist[iobuf->nr_pages++] = page;
355         iobuf->length += PAGE_SIZE;
356
357         return 0;
358 }
359
360 int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount,
361                           struct obd_ioobj *obj, int niocount,
362                           struct niobuf_local *res, struct obd_trans_info *oti,
363                           int rc)
364 {
365         struct obd_device *obd = exp->exp_obd;
366         struct lvfs_run_ctxt saved;
367         struct niobuf_local *lnb;
368         struct fsfilt_objinfo fso;
369         struct iattr iattr = { 0 };
370         void *iobuf = NULL;
371         struct inode *inode = NULL;
372         int i, n, cleanup_phase = 0, err;
373         unsigned long now = jiffies; /* DEBUGGING OST TIMEOUTS */
374         void *wait_handle;
375         ENTRY;
376         LASSERT(oti != NULL);
377         LASSERT(objcount == 1);
378         LASSERT(current->journal_info == NULL);
379
380         if (rc != 0)
381                 GOTO(cleanup, rc);
382
383         iobuf = filter_iobuf_get(&obd->u.filter, oti);
384         if (iobuf == NULL)
385                 GOTO(cleanup, rc = -ENOMEM);
386         cleanup_phase = 1;
387
388         fso.fso_dentry = res->dentry;
389         fso.fso_bufcnt = obj->ioo_bufcnt;
390         inode = res->dentry->d_inode;
391
392         for (i = 0, lnb = res, n = 0; i < obj->ioo_bufcnt; i++, lnb++) {
393                 loff_t this_size;
394
395                 /* If overwriting an existing block, we don't need a grant */
396                 if (!(lnb->flags & OBD_BRW_GRANTED) && lnb->rc == -ENOSPC &&
397                     filter_range_is_mapped(inode, lnb->offset, lnb->len))
398                         lnb->rc = 0;
399
400                 if (lnb->rc) /* ENOSPC, network RPC error */
401                         continue;
402
403                 filter_iobuf_add_page(obd, iobuf, inode, lnb->page);
404
405                 /* We expect these pages to be in offset order, but we'll
406                  * be forgiving */
407                 this_size = lnb->offset + lnb->len;
408                 if (this_size > iattr.ia_size)
409                         iattr.ia_size = this_size;
410         }
411
412         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
413         cleanup_phase = 2;
414
415         down(&inode->i_sem);
416         oti->oti_handle = fsfilt_brw_start(obd, objcount, &fso, niocount, res,
417                                            oti);
418         if (IS_ERR(oti->oti_handle)) {
419                 up(&inode->i_sem);
420                 rc = PTR_ERR(oti->oti_handle);
421                 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
422                        "error starting transaction: rc = %d\n", rc);
423                 oti->oti_handle = NULL;
424                 GOTO(cleanup, rc);
425         }
426
427         fsfilt_check_slow(now, obd_timeout, "brw_start");
428
429         i = OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME;
430
431         /* If the inode still has SUID+SGID bits set (see filter_precreate())
432          * then we will accept the UID+GID if sent by the client for
433          * initializing the ownership of this inode.  We only allow this to
434          * happen once (so clear these bits) and later only allow setattr. */
435         if (inode->i_mode & S_ISUID)
436                 i |= OBD_MD_FLUID;
437         if (inode->i_mode & S_ISGID)
438                 i |= OBD_MD_FLGID;
439
440         iattr_from_obdo(&iattr, oa, i);
441         if (iattr.ia_valid & (ATTR_UID | ATTR_GID)) {
442                 CDEBUG(D_INODE, "update UID/GID to %lu/%lu\n",
443                        (unsigned long)oa->o_uid, (unsigned long)oa->o_gid);
444
445                 cap_raise(current->cap_effective, CAP_SYS_RESOURCE);
446
447                 iattr.ia_valid |= ATTR_MODE;
448                 iattr.ia_mode = inode->i_mode;
449                 if (iattr.ia_valid & ATTR_UID)
450                         iattr.ia_mode &= ~S_ISUID;
451                 if (iattr.ia_valid & ATTR_GID)
452                         iattr.ia_mode &= ~S_ISGID;
453
454                 rc = filter_update_fidea(exp, inode, oti->oti_handle, oa);
455         }
456
457         /* filter_direct_io drops i_sem */
458         rc = filter_direct_io(OBD_BRW_WRITE, res->dentry, iobuf, exp, &iattr,
459                               oti, &wait_handle);
460         if (rc == 0)
461                 obdo_from_inode(oa, inode, FILTER_VALID_FLAGS);
462
463         fsfilt_check_slow(now, obd_timeout, "direct_io");
464
465         err = fsfilt_commit_wait(obd, inode, wait_handle);
466         if (err) {
467                 CERROR("Failure to commit OST transaction (%d)?\n", err);
468                 rc = err;
469         }
470         if (obd_sync_filter && !err)
471                 LASSERTF(oti->oti_transno <= obd->obd_last_committed,
472                          "oti_transno "LPU64" last_committed "LPU64"\n",
473                          oti->oti_transno, obd->obd_last_committed);
474         fsfilt_check_slow(now, obd_timeout, "commitrw commit");
475
476 cleanup:
477         filter_grant_commit(exp, niocount, res);
478
479         switch (cleanup_phase) {
480         case 2:
481                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
482                 LASSERT(current->journal_info == NULL);
483         case 1:
484                 filter_iobuf_put(&obd->u.filter, iobuf, oti);
485         case 0:
486                 /*
487                  * lnb->page automatically returns back into per-thread page
488                  * pool (bug 5137)
489                  */
490                 f_dput(res->dentry);
491         }
492
493         RETURN(rc);
494 }