Whamcloud - gitweb
Land from b_hd_pid to HEAD
[fs/lustre-release.git] / lustre / obdfilter / filter_io_26.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/fs/obdfilter/filter_io.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *   Author: Phil Schwan <phil@clusterfs.com>
10  *
11  *   This file is part of Lustre, http://www.lustre.org.
12  *
13  *   Lustre is free software; you can redistribute it and/or
14  *   modify it under the terms of version 2 of the GNU General Public
15  *   License as published by the Free Software Foundation.
16  *
17  *   Lustre is distributed in the hope that it will be useful,
18  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
19  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20  *   GNU General Public License for more details.
21  *
22  *   You should have received a copy of the GNU General Public License
23  *   along with Lustre; if not, write to the Free Software
24  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25  */
26
27 #include <linux/config.h>
28 #include <linux/module.h>
29 #include <linux/pagemap.h> // XXX kill me soon
30 #include <linux/version.h>
31
32 #define DEBUG_SUBSYSTEM S_FILTER
33
34 #include <linux/obd_class.h>
35 #include <linux/lustre_fsfilt.h>
36 #include "filter_internal.h"
37
38 #warning "implement writeback mode -bzzz"
39
40 /* 512byte block min */
41 #define MAX_BLOCKS_PER_PAGE (PAGE_SIZE / 512)
42 struct dio_request {
43         atomic_t numreqs;       /* number of reqs being processed */
44         struct bio *bio_list;   /* list of completed bios */
45         wait_queue_head_t wait;
46         int created[MAX_BLOCKS_PER_PAGE];
47         unsigned long blocks[MAX_BLOCKS_PER_PAGE];
48         spinlock_t lock;
49 };
50
51 static int dio_complete_routine(struct bio *bio, unsigned int done, int error)
52 {
53         struct dio_request *dreq = bio->bi_private;
54         unsigned long flags;
55
56         spin_lock_irqsave(&dreq->lock, flags);
57         bio->bi_private = dreq->bio_list;
58         dreq->bio_list = bio;
59         spin_unlock_irqrestore(&dreq->lock, flags);
60         if (atomic_dec_and_test(&dreq->numreqs))
61                 wake_up(&dreq->wait);
62
63         return 0;
64 }
65
66 static int can_be_merged(struct bio *bio, sector_t sector)
67 {
68         int size;
69
70         if (!bio)
71                 return 0;
72
73         size = bio->bi_size >> 9;
74         return bio->bi_sector + size == sector ? 1 : 0;
75 }
76
77 /* See if there are unallocated parts in given file region */
78 static int filter_range_is_mapped(struct inode *inode, obd_size offset, int len)
79 {
80         sector_t (*fs_bmap)(struct address_space *, sector_t) =
81                 inode->i_mapping->a_ops->bmap;
82         int j;
83
84         /* We can't know if we are overwriting or not */
85         if (fs_bmap == NULL)
86                 return 0;
87
88         offset >>= inode->i_blkbits;
89         len >>= inode->i_blkbits;
90
91         for (j = 0; j <= len; j++)
92                 if (fs_bmap(inode->i_mapping, offset + j) == 0)
93                         return 0;
94
95         return 1;
96 }
97
98 int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
99                           int objcount, struct obd_ioobj *obj, int niocount,
100                           struct niobuf_local *res, struct obd_trans_info *oti,
101                           int rc)
102 {
103         struct bio *bio = NULL;
104         int blocks_per_page, err;
105         struct niobuf_local *lnb;
106         struct lvfs_run_ctxt saved;
107         struct fsfilt_objinfo fso;
108         struct iattr iattr = { 0 };
109         struct inode *inode = NULL;
110         unsigned long now = jiffies;
111         int i, k, cleanup_phase = 0;
112
113         struct dio_request *dreq = NULL;
114         struct obd_device *obd = exp->exp_obd;
115
116         ENTRY;
117
118         LASSERT(oti != NULL);
119         LASSERT(objcount == 1);
120         LASSERT(current->journal_info == NULL);
121
122         if (rc != 0)
123                 GOTO(cleanup, rc);
124
125         inode = res->dentry->d_inode;
126         blocks_per_page = PAGE_SIZE >> inode->i_blkbits;
127         LASSERT(blocks_per_page <= MAX_BLOCKS_PER_PAGE);
128
129         OBD_ALLOC(dreq, sizeof(*dreq));
130
131         if (dreq == NULL)
132                 RETURN(-ENOMEM);
133
134         dreq->bio_list = NULL;
135         init_waitqueue_head(&dreq->wait);
136         atomic_set(&dreq->numreqs, 0);
137         spin_lock_init(&dreq->lock);
138
139         cleanup_phase = 1;
140         fso.fso_dentry = res->dentry;
141         fso.fso_bufcnt = obj->ioo_bufcnt;
142
143         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
144         cleanup_phase = 2;
145
146         generic_osync_inode(inode, inode->i_mapping, OSYNC_DATA|OSYNC_METADATA);
147
148         oti->oti_handle = fsfilt_brw_start(obd, objcount, &fso,
149                                            niocount, res, oti);
150         
151         if (IS_ERR(oti->oti_handle)) {
152                 rc = PTR_ERR(oti->oti_handle);
153                 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
154                        "error starting transaction: rc = %d\n", rc);
155                 oti->oti_handle = NULL;
156                 GOTO(cleanup, rc);
157         }
158
159         if (time_after(jiffies, now + 15 * HZ))
160                 CERROR("slow brw_start %lus\n", (jiffies - now) / HZ);
161
162         iattr_from_obdo(&iattr,oa,OBD_MD_FLATIME|OBD_MD_FLMTIME|OBD_MD_FLCTIME);
163         for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++) {
164                 loff_t this_size;
165                 sector_t sector;
166                 struct page *pages[1];
167                 int offs;
168
169                 /* If overwriting an existing block, we don't need a grant */
170                 if (!(lnb->flags & OBD_BRW_GRANTED) && lnb->rc == -ENOSPC &&
171                     filter_range_is_mapped(inode, lnb->offset, lnb->len))
172                         lnb->rc = 0;
173
174                 if (lnb->rc) /* ENOSPC, network RPC error, etc. */ 
175                         continue;
176
177                 /* get block number for next page */
178                 pages[0] = lnb->page;
179                 rc = fsfilt_map_inode_pages(obd, inode, pages, 1, 
180                                             dreq->blocks, dreq->created, 1,
181                                             NULL);
182                 if (rc != 0)
183                         GOTO(cleanup, rc);
184
185                 for (k = 0; k < blocks_per_page; k++) {
186                         sector = dreq->blocks[k] *(inode->i_sb->s_blocksize>>9);
187                         offs = k * inode->i_sb->s_blocksize;
188
189                         if (!bio || !can_be_merged(bio, sector) ||
190                             !bio_add_page(bio, lnb->page, PAGE_SIZE, offs)) {
191                                 if (bio) {
192                                         atomic_inc(&dreq->numreqs);
193                                         submit_bio(WRITE, bio);
194                                         bio = NULL;
195                                 }
196                                 /* allocate new bio */
197                                 bio = bio_alloc(GFP_NOIO, obj->ioo_bufcnt);
198                                 bio->bi_bdev = inode->i_sb->s_bdev;
199                                 bio->bi_sector = sector;
200                                 bio->bi_end_io = dio_complete_routine;
201                                 bio->bi_private = dreq;
202
203                                 if (!bio_add_page(bio, lnb->page, PAGE_SIZE, 
204                                                   offs))
205                                         LBUG();
206                         }
207                 }
208
209                 /* we expect these pages to be in offset order, but we'll
210                  * be forgiving */
211                 this_size = lnb->offset + lnb->len;
212                 if (this_size > iattr.ia_size)
213                         iattr.ia_size = this_size;
214         }
215
216         if (bio) {
217                 atomic_inc(&dreq->numreqs);
218                 fsfilt_send_bio(obd, inode, bio);
219         }
220
221         /* time to wait for I/O completion */
222         wait_event(dreq->wait, atomic_read(&dreq->numreqs) == 0);
223
224         /* free all bios */
225         while (dreq->bio_list) {
226                 bio = dreq->bio_list;
227                 dreq->bio_list = bio->bi_private;
228                 bio_put(bio);
229         }
230
231         down(&inode->i_sem);
232         if (iattr.ia_size > inode->i_size) {
233                 CDEBUG(D_INFO, "setting i_size to "LPU64"\n",
234                        iattr.ia_size);
235                         
236                 iattr.ia_valid |= ATTR_SIZE;
237                         
238                 fsfilt_setattr(obd, res->dentry, oti->oti_handle,
239                                &iattr, 0);
240         }
241         up(&inode->i_sem);
242
243         if (time_after(jiffies, now + 15 * HZ))
244                 CERROR("slow direct_io %lus\n", (jiffies - now) / HZ);
245
246         rc = filter_finish_transno(exp, oti, rc);
247
248         err = fsfilt_commit(obd, inode, oti->oti_handle, obd_sync_filter);
249         if (err)
250                 rc = err;
251
252         if (obd_sync_filter)
253                 LASSERT(oti->oti_transno <= obd->obd_last_committed);
254
255         if (time_after(jiffies, now + 15 * HZ))
256                 CERROR("slow commitrw commit %lus\n", (jiffies - now) / HZ);
257
258 cleanup:
259         filter_grant_commit(exp, niocount, res);
260
261         switch (cleanup_phase) {
262         case 2:
263                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
264                 LASSERT(current->journal_info == NULL);
265         case 1:
266                 OBD_FREE(dreq, sizeof(*dreq));
267         case 0:
268                 for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++) {
269                         filter_release_write_page(&obd->u.filter,
270                                                   res->dentry->d_inode, lnb,
271                                                   rc);
272                 }
273
274                 f_dput(res->dentry);
275         }
276
277         RETURN(rc);
278 }