Whamcloud - gitweb
land b1_5 onto HEAD
[fs/lustre-release.git] / lustre / obdfilter / filter_io_24.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/fs/obdfilter/filter_io.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *   Author: Phil Schwan <phil@clusterfs.com>
10  *
11  *   This file is part of the Lustre file system, http://www.lustre.org
12  *   Lustre is a trademark of Cluster File Systems, Inc.
13  *
14  *   You may have signed or agreed to another license before downloading
15  *   this software.  If so, you are bound by the terms and conditions
16  *   of that agreement, and the following does not apply to you.  See the
17  *   LICENSE file included with this distribution for more information.
18  *
19  *   If you did not agree to a different license, then this copy of Lustre
20  *   is open source software; you can redistribute it and/or modify it
21  *   under the terms of version 2 of the GNU General Public License as
22  *   published by the Free Software Foundation.
23  *
24  *   In either case, Lustre is distributed in the hope that it will be
25  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
26  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
27  *   license text for more details.
28  */
29
30 #ifdef HAVE_KERNEL_CONFIG_H
31 #include <linux/config.h>
32 #endif
33 #include <linux/module.h>
34 #include <linux/pagemap.h> // XXX kill me soon
35 #include <linux/version.h>
36
37 #define DEBUG_SUBSYSTEM S_FILTER
38
39 #include <linux/iobuf.h>
40 #include <linux/locks.h>
41
42 #include <obd_class.h>
43 #include <lustre_fsfilt.h>
44 #include "filter_internal.h"
45
46 /* Bug 2254 -- this is better done in ext3_map_inode_page, but this
47  * workaround will suffice until everyone has upgraded their kernels */
48 static void check_pending_bhs(unsigned long *blocks, int nr_pages, dev_t dev,
49                               int size)
50 {
51 #if (LUSTRE_KERNEL_VERSION < 32)
52         struct buffer_head *bh;
53         int i;
54
55         for (i = 0; i < nr_pages; i++) {
56                 bh = get_hash_table(dev, blocks[i], size);
57                 if (bh == NULL)
58                         continue;
59                 if (!buffer_dirty(bh)) {
60                         put_bh(bh);
61                         continue;
62                 }
63                 mark_buffer_clean(bh);
64                 wait_on_buffer(bh);
65                 clear_bit(BH_Req, &bh->b_state);
66                 __brelse(bh);
67         }
68 #endif
69 }
70
71 /* when brw_kiovec() is asked to read from block -1UL it just zeros
72  * the page.  this gives us a chance to verify the write mappings
73  * as well */
74 static int filter_cleanup_mappings(int rw, struct kiobuf *iobuf,
75                                    struct inode *inode)
76 {
77         int i, blocks_per_page_bits = CFS_PAGE_SHIFT - inode->i_blkbits;
78         ENTRY;
79
80         for (i = 0 ; i < iobuf->nr_pages << blocks_per_page_bits; i++) {
81                 if (KIOBUF_GET_BLOCKS(iobuf)[i] > 0)
82                         continue;
83
84                 if (rw == OBD_BRW_WRITE)
85                         RETURN(-EINVAL);
86
87                 KIOBUF_GET_BLOCKS(iobuf)[i] = -1UL;
88         }
89         RETURN(0);
90 }
91
92 #if 0
93 static void dump_page(int rw, unsigned long block, struct page *page)
94 {
95         char *blah = kmap(page);
96         CDEBUG(D_PAGE, "rw %d block %lu: %02x %02x %02x %02x\n", rw, block,
97                        blah[0], blah[1], blah[2], blah[3]);
98         kunmap(page);
99 }
100 #endif
101
102 /* These are our hacks to keep our directio/bh IO coherent with ext3's
103  * page cache use.  Most notably ext3 reads file data into the page
104  * cache when it is zeroing the tail of partial-block truncates and
105  * leaves it there, sometimes generating io from it at later truncates.
106  * This removes the partial page and its buffers from the page cache,
107  * so it should only ever cause a wait in rare cases, as otherwise we
108  * always do full-page IO to the OST.
109  *
110  * The call to truncate_complete_page() will call journal_flushpage() to
111  * free the buffers and drop the page from cache.  The buffers should not
112  * be dirty, because we already called fdatasync/fdatawait on them.
113  */
114 static int filter_sync_inode_data(struct inode *inode)
115 {
116         int rc, rc2;
117
118         /* This is nearly generic_osync_inode, without the waiting on the inode
119         rc = generic_osync_inode(inode, inode->i_mapping,
120                                  OSYNC_DATA|OSYNC_METADATA);
121          */
122         rc = filemap_fdatasync(inode->i_mapping);
123         rc2 = fsync_inode_data_buffers(inode);
124         if (rc == 0)
125                 rc = rc2;
126         rc2 = filemap_fdatawait(inode->i_mapping);
127         if (rc == 0)
128                 rc = rc2;
129
130         return rc;
131 }
132
133 static int filter_clear_page_cache(struct inode *inode, struct kiobuf *iobuf)
134 {
135         struct page *page;
136         int i, rc;
137
138         check_pending_bhs(KIOBUF_GET_BLOCKS(iobuf), iobuf->nr_pages,
139                           inode->i_dev, 1 << inode->i_blkbits);
140
141         rc = filter_sync_inode_data(inode);
142         if (rc != 0)
143                 RETURN(rc);
144
145         /* be careful to call this after fsync_inode_data_buffers has waited
146          * for IO to complete before we evict it from the cache */
147         for (i = 0; i < iobuf->nr_pages ; i++) {
148                 page = find_lock_page(inode->i_mapping,
149                                       iobuf->maplist[i]->index);
150                 if (page == NULL)
151                         continue;
152                 if (page->mapping != NULL) {
153                         /* Now that the only source of such pages in truncate
154                          * path flushes these pages to disk and and then
155                          * discards, this is error condition */
156                         CERROR("Data page in page cache during write!\n");
157                         ll_truncate_complete_page(page);
158                 }
159
160                 unlock_page(page);
161                 page_cache_release(page);
162         }
163
164         return 0;
165 }
166
167 int filter_clear_truncated_page(struct inode *inode)
168 {
169         struct page *page;
170         int rc;
171
172         /* Truncate on page boundary, so nothing to flush? */
173         if (!(inode->i_size & ~CFS_PAGE_MASK))
174                 return 0;
175
176         rc = filter_sync_inode_data(inode);
177         if (rc != 0)
178                 RETURN(rc);
179
180         /* be careful to call this after fsync_inode_data_buffers has waited
181          * for IO to complete before we evict it from the cache */
182         page = find_lock_page(inode->i_mapping,
183                               inode->i_size >> CFS_PAGE_SHIFT);
184         if (page) {
185                 if (page->mapping != NULL)
186                         ll_truncate_complete_page(page);
187
188                 unlock_page(page);
189                 page_cache_release(page);
190         }
191
192         return 0;
193 }
194
195 /* Must be called with i_sem taken for writes; this will drop it */
196 int filter_direct_io(int rw, struct dentry *dchild, struct filter_iobuf *buf,
197                      struct obd_export *exp, struct iattr *attr,
198                      struct obd_trans_info *oti, void **wait_handle)
199 {
200         struct obd_device *obd = exp->exp_obd;
201         struct inode *inode = dchild->d_inode;
202         struct kiobuf *iobuf = (void *)buf;
203         int rc, create = (rw == OBD_BRW_WRITE), committed = 0;
204         int blocks_per_page = CFS_PAGE_SIZE >> inode->i_blkbits, cleanup_phase = 0;
205         struct semaphore *sem = NULL;
206         ENTRY;
207
208         LASSERTF(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ, "%x\n", rw);
209
210         if (iobuf->nr_pages == 0)
211                 GOTO(cleanup, rc = 0);
212
213         if (iobuf->nr_pages * blocks_per_page > KIO_MAX_SECTORS)
214                 GOTO(cleanup, rc = -EINVAL);
215
216         if (iobuf->nr_pages * blocks_per_page >
217             OBDFILTER_CREATED_SCRATCHPAD_ENTRIES)
218                 GOTO(cleanup, rc = -EINVAL);
219
220         cleanup_phase = 1;
221
222         rc = lock_kiovec(1, &iobuf, 1);
223         if (rc < 0)
224                 GOTO(cleanup, rc);
225         cleanup_phase = 2;
226
227         if (rw == OBD_BRW_WRITE) {
228                 create = 1;
229                 sem = &obd->u.filter.fo_alloc_lock;
230         }
231         rc = fsfilt_map_inode_pages(obd, inode, iobuf->maplist,
232                                     iobuf->nr_pages, KIOBUF_GET_BLOCKS(iobuf),
233                                     obdfilter_created_scratchpad, create, sem);
234         if (rc)
235                 GOTO(cleanup, rc);
236
237         rc = filter_cleanup_mappings(rw, iobuf, inode);
238         if (rc)
239                 GOTO(cleanup, rc);
240
241         if (rw == OBD_BRW_WRITE) {
242                 if (rc == 0) {
243                         filter_tally_write(exp, iobuf->maplist, iobuf->nr_pages,
244                                            KIOBUF_GET_BLOCKS(iobuf),
245                                            blocks_per_page);
246
247                         if (attr->ia_size > inode->i_size)
248                                 attr->ia_valid |= ATTR_SIZE;
249                         rc = fsfilt_setattr(obd, dchild,
250                                             oti->oti_handle, attr, 0);
251                         if (rc)
252                                 GOTO(cleanup, rc);
253                 }
254
255                 up(&inode->i_sem);
256                 cleanup_phase = 3;
257
258                 rc = filter_finish_transno(exp, oti, 0);
259                 if (rc)
260                         GOTO(cleanup, rc);
261
262                 rc = fsfilt_commit_async(obd,inode,oti->oti_handle,wait_handle);
263                 committed = 1;
264                 if (rc)
265                         GOTO(cleanup, rc);
266         } else {
267                 filter_tally_read(exp, iobuf->maplist, iobuf->nr_pages,
268                                   KIOBUF_GET_BLOCKS(iobuf), blocks_per_page);
269         }
270
271         rc = filter_clear_page_cache(inode, iobuf);
272         if (rc < 0)
273                 GOTO(cleanup, rc);
274
275         rc = fsfilt_send_bio(rw, obd, inode, iobuf);
276
277         CDEBUG(D_INFO, "tried to %s %d pages, rc = %d\n",
278                rw & OBD_BRW_WRITE ? "write" : "read", iobuf->nr_pages, rc);
279
280         if (rc > 0)
281                 rc = 0;
282
283         EXIT;
284 cleanup:
285         if (!committed && (rw == OBD_BRW_WRITE)) {
286                 int err = fsfilt_commit_async(obd, inode,
287                                               oti->oti_handle, wait_handle);
288                 if (err)
289                         CERROR("can't close transaction: %d\n", err);
290                 /*
291                  * this is error path, so we prefer to return
292                  * original error, not this one
293                  */
294         }
295
296         switch(cleanup_phase) {
297         case 3:
298         case 2:
299                 unlock_kiovec(1, &iobuf);
300         case 1:
301         case 0:
302                 if (cleanup_phase != 3 && rw == OBD_BRW_WRITE)
303                         up(&inode->i_sem);
304                 break;
305         default:
306                 CERROR("corrupt cleanup_phase (%d)?\n", cleanup_phase);
307                 LBUG();
308                 break;
309         }
310         return rc;
311 }
312
313 /* See if there are unallocated parts in given file region */
314 int filter_range_is_mapped(struct inode *inode, obd_size offset, int len)
315 {
316         int (*fs_bmap)(struct address_space *, long) =
317                 inode->i_mapping->a_ops->bmap;
318         int j;
319
320         /* We can't know if the range is mapped already or not */
321         if (fs_bmap == NULL)
322                 return 0;
323
324         offset >>= inode->i_blkbits;
325         len >>= inode->i_blkbits;
326
327         for (j = 0; j < len; j++)
328                 if (fs_bmap(inode->i_mapping, offset + j) == 0)
329                         return 0;
330
331         return 1;
332 }
333
334 /* some kernels require alloc_kiovec callers to zero members through the use of
335  * map_user_kiobuf and unmap_.. we don't use those, so we have a little helper
336  * that makes sure we don't break the rules. */
337 static void clear_kiobuf(struct kiobuf *iobuf)
338 {
339         int i;
340
341         for (i = 0; i < iobuf->array_len; i++)
342                 iobuf->maplist[i] = NULL;
343
344         iobuf->nr_pages = 0;
345         iobuf->offset = 0;
346         iobuf->length = 0;
347 }
348
349 struct filter_iobuf *filter_alloc_iobuf(struct filter_obd *filter,
350                                         int rw, int num_pages)
351 {
352         struct kiobuf *iobuf;
353         int rc;
354         ENTRY;
355
356         LASSERTF(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ, "%x\n", rw);
357
358         rc = alloc_kiovec(1, &iobuf);
359         if (rc)
360                 RETURN(ERR_PTR(rc));
361
362         rc = expand_kiobuf(iobuf, num_pages);
363         if (rc) {
364                 free_kiovec(1, &iobuf);
365                 RETURN(ERR_PTR(rc));
366         }
367
368 #ifdef HAVE_KIOBUF_DOVARY
369         iobuf->dovary = 0; /* this prevents corruption, not present in 2.4.20 */
370 #endif
371         clear_kiobuf(iobuf);
372         RETURN((void *)iobuf);
373 }
374
375 void filter_free_iobuf(struct filter_iobuf *buf)
376 {
377         struct kiobuf *iobuf = (void *)buf;
378
379         clear_kiobuf(iobuf);
380         free_kiovec(1, &iobuf);
381 }
382
383 void filter_iobuf_put(struct filter_obd *filter, struct filter_iobuf *iobuf,
384                       struct obd_trans_info *oti)
385 {
386         int thread_id = oti ? oti->oti_thread_id : -1;
387
388         if (unlikely(thread_id < 0)) {
389                 filter_free_iobuf(iobuf);
390                 return;
391         }
392
393         LASSERTF(filter->fo_iobuf_pool[thread_id] == iobuf,
394                  "iobuf mismatch for thread %d: pool %p iobuf %p\n",
395                  thread_id, filter->fo_iobuf_pool[thread_id], iobuf);
396         clear_kiobuf((void *)iobuf);
397 }
398
399 int filter_iobuf_add_page(struct obd_device *obd, struct filter_iobuf *buf,
400                            struct inode *inode, struct page *page)
401 {
402         struct kiobuf *iobuf = (void *)buf;
403
404         iobuf->maplist[iobuf->nr_pages++] = page;
405         iobuf->length += CFS_PAGE_SIZE;
406
407         return 0;
408 }
409
410 int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount,
411                           struct obd_ioobj *obj, int niocount,
412                           struct niobuf_local *res, struct obd_trans_info *oti,
413                           int rc)
414 {
415         struct obd_device *obd = exp->exp_obd;
416         struct lvfs_run_ctxt saved;
417         struct niobuf_local *lnb;
418         struct fsfilt_objinfo fso;
419         struct iattr iattr = { 0 };
420         void *iobuf = NULL;
421         struct inode *inode = NULL;
422         int i, n, cleanup_phase = 0, err;
423         unsigned long now = jiffies; /* DEBUGGING OST TIMEOUTS */
424         void *wait_handle;
425         ENTRY;
426         LASSERT(oti != NULL);
427         LASSERT(objcount == 1);
428         LASSERT(current->journal_info == NULL);
429
430         if (rc != 0)
431                 GOTO(cleanup, rc);
432
433         iobuf = filter_iobuf_get(&obd->u.filter, oti);
434         if (IS_ERR(iobuf))
435                 GOTO(cleanup, rc = PTR_ERR(iobuf));
436         cleanup_phase = 1;
437
438         fso.fso_dentry = res->dentry;
439         fso.fso_bufcnt = obj->ioo_bufcnt;
440         inode = res->dentry->d_inode;
441
442         for (i = 0, lnb = res, n = 0; i < obj->ioo_bufcnt; i++, lnb++) {
443                 loff_t this_size;
444
445                 /* If overwriting an existing block, we don't need a grant */
446                 if (!(lnb->flags & OBD_BRW_GRANTED) && lnb->rc == -ENOSPC &&
447                     filter_range_is_mapped(inode, lnb->offset, lnb->len))
448                         lnb->rc = 0;
449
450                 if (lnb->rc) /* ENOSPC, network RPC error */
451                         continue;
452
453                 filter_iobuf_add_page(obd, iobuf, inode, lnb->page);
454
455                 /* We expect these pages to be in offset order, but we'll
456                  * be forgiving */
457                 this_size = lnb->offset + lnb->len;
458                 if (this_size > iattr.ia_size)
459                         iattr.ia_size = this_size;
460         }
461
462         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
463         cleanup_phase = 2;
464
465         down(&inode->i_sem);
466         oti->oti_handle = fsfilt_brw_start(obd, objcount, &fso, niocount, res,
467                                            oti);
468         if (IS_ERR(oti->oti_handle)) {
469                 up(&inode->i_sem);
470                 rc = PTR_ERR(oti->oti_handle);
471                 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
472                        "error starting transaction: rc = %d\n", rc);
473                 oti->oti_handle = NULL;
474                 GOTO(cleanup, rc);
475         }
476
477         fsfilt_check_slow(obd, now, obd_timeout, "brw_start");
478
479         i = OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME;
480
481         /* If the inode still has SUID+SGID bits set (see filter_precreate())
482          * then we will accept the UID+GID if sent by the client for
483          * initializing the ownership of this inode.  We only allow this to
484          * happen once (so clear these bits) and later only allow setattr. */
485         if (inode->i_mode & S_ISUID)
486                 i |= OBD_MD_FLUID;
487         if (inode->i_mode & S_ISGID)
488                 i |= OBD_MD_FLGID;
489
490         iattr_from_obdo(&iattr, oa, i);
491         if (iattr.ia_valid & (ATTR_UID | ATTR_GID)) {
492                 CDEBUG(D_INODE, "update UID/GID to %lu/%lu\n",
493                        (unsigned long)oa->o_uid, (unsigned long)oa->o_gid);
494
495                 cap_raise(current->cap_effective, CAP_SYS_RESOURCE);
496
497                 iattr.ia_valid |= ATTR_MODE;
498                 iattr.ia_mode = inode->i_mode;
499                 if (iattr.ia_valid & ATTR_UID)
500                         iattr.ia_mode &= ~S_ISUID;
501                 if (iattr.ia_valid & ATTR_GID)
502                         iattr.ia_mode &= ~S_ISGID;
503
504                 rc = filter_update_fidea(exp, inode, oti->oti_handle, oa);
505         }
506
507         /* filter_direct_io drops i_sem */
508         rc = filter_direct_io(OBD_BRW_WRITE, res->dentry, iobuf, exp, &iattr,
509                               oti, &wait_handle);
510         if (rc == 0)
511                 obdo_from_inode(oa, inode, FILTER_VALID_FLAGS);
512
513         fsfilt_check_slow(obd, now, obd_timeout, "direct_io");
514
515         err = fsfilt_commit_wait(obd, inode, wait_handle);
516         if (err) {
517                 CERROR("Failure to commit OST transaction (%d)?\n", err);
518                 rc = err;
519         }
520         if (obd->obd_replayable && !rc)
521                 LASSERTF(oti->oti_transno <= obd->obd_last_committed,
522                          "oti_transno "LPU64" last_committed "LPU64"\n",
523                          oti->oti_transno, obd->obd_last_committed);
524         fsfilt_check_slow(obd, now, obd_timeout, "commitrw commit");
525
526 cleanup:
527         filter_grant_commit(exp, niocount, res);
528
529         switch (cleanup_phase) {
530         case 2:
531                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
532                 LASSERT(current->journal_info == NULL);
533         case 1:
534                 filter_iobuf_put(&obd->u.filter, iobuf, oti);
535         case 0:
536                 /*
537                  * lnb->page automatically returns back into per-thread page
538                  * pool (bug 5137)
539                  */
540                 f_dput(res->dentry);
541         }
542
543         RETURN(rc);
544 }