Whamcloud - gitweb
cc0007fce4fa76f725fbcc2639dc16cc7cb516b3
[fs/lustre-release.git] / lustre / obdfilter / filter_io_26.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/fs/obdfilter/filter_io.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *   Author: Phil Schwan <phil@clusterfs.com>
10  *
11  *   This file is part of Lustre, http://www.lustre.org.
12  *
13  *   Lustre is free software; you can redistribute it and/or
14  *   modify it under the terms of version 2 of the GNU General Public
15  *   License as published by the Free Software Foundation.
16  *
17  *   Lustre is distributed in the hope that it will be useful,
18  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
19  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20  *   GNU General Public License for more details.
21  *
22  *   You should have received a copy of the GNU General Public License
23  *   along with Lustre; if not, write to the Free Software
24  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25  */
26
27 #include <linux/config.h>
28 #include <linux/module.h>
29 #include <linux/pagemap.h> // XXX kill me soon
30 #include <linux/version.h>
31
32 #define DEBUG_SUBSYSTEM S_FILTER
33
34 #include <linux/obd_class.h>
35 #include <linux/lustre_fsfilt.h>
36 #include "filter_internal.h"
37
38 #warning "implement writeback mode -bzzz"
39
40 int ext3_map_inode_page(struct inode *inode, struct page *page,
41                         unsigned long *blocks, int *created, int create);
42
43 /* 512byte block min */
44 #define MAX_BLOCKS_PER_PAGE (PAGE_SIZE / 512)
45 struct dio_request {
46         atomic_t numreqs;       /* number of reqs being processed */
47         struct bio *bio_list;   /* list of completed bios */
48         wait_queue_head_t wait;
49         int created[MAX_BLOCKS_PER_PAGE];
50         unsigned long blocks[MAX_BLOCKS_PER_PAGE];
51         spinlock_t lock;
52 };
53
54 static int dio_complete_routine(struct bio *bio, unsigned int done, int error)
55 {
56         struct dio_request *dreq = bio->bi_private;
57         unsigned long flags;
58
59         spin_lock_irqsave(&dreq->lock, flags);
60         bio->bi_private = dreq->bio_list;
61         dreq->bio_list = bio;
62         spin_unlock_irqrestore(&dreq->lock, flags);
63         if (atomic_dec_and_test(&dreq->numreqs))
64                 wake_up(&dreq->wait);
65
66         return 0;
67 }
68
69 static int can_be_merged(struct bio *bio, sector_t sector)
70 {
71         int size;
72         
73         if (!bio)
74                 return 0;
75         
76         size = bio->bi_size >> 9;
77         return bio->bi_sector + size == sector ? 1 : 0;
78 }
79
80 int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount,
81                           struct obd_ioobj *obj, int niocount,
82                           struct niobuf_local *res, struct obd_trans_info *oti)
83 {
84         struct obd_device *obd = exp->exp_obd;
85         struct obd_run_ctxt saved;
86         struct niobuf_local *lnb;
87         struct fsfilt_objinfo fso;
88         struct iattr iattr = { .ia_valid = ATTR_SIZE, .ia_size = 0, };
89         struct inode *inode = NULL;
90         int rc = 0, i, k, cleanup_phase = 0, err;
91         unsigned long now = jiffies; /* DEBUGGING OST TIMEOUTS */
92         int blocks_per_page;
93         struct dio_request *dreq;
94         struct bio *bio = NULL;
95         ENTRY;
96         LASSERT(oti != NULL);
97         LASSERT(objcount == 1);
98         LASSERT(current->journal_info == NULL);
99
100         inode = res->dentry->d_inode;
101         blocks_per_page = PAGE_SIZE >> inode->i_blkbits;
102         LASSERT(blocks_per_page <= MAX_BLOCKS_PER_PAGE);
103
104         OBD_ALLOC(dreq, sizeof(*dreq));
105         if (dreq == NULL)
106                 RETURN(-ENOMEM);
107         dreq->bio_list = NULL;
108         init_waitqueue_head(&dreq->wait);
109         atomic_set(&dreq->numreqs, 0);
110         spin_lock_init(&dreq->lock);
111
112         cleanup_phase = 1;
113         fso.fso_dentry = res->dentry;
114         fso.fso_bufcnt = obj->ioo_bufcnt;
115
116         push_ctxt(&saved, &obd->obd_ctxt, NULL);
117         cleanup_phase = 2; 
118
119         oti->oti_handle = fsfilt_brw_start(obd, objcount, &fso, niocount, oti);
120         if (IS_ERR(oti->oti_handle)) {
121                 rc = PTR_ERR(oti->oti_handle);
122                 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
123                        "error starting transaction: rc = %d\n", rc);
124                 oti->oti_handle = NULL;
125                 GOTO(cleanup, rc);
126         }
127
128         if (time_after(jiffies, now + 15 * HZ))
129                 CERROR("slow brw_start %lus\n", (jiffies - now) / HZ);
130
131         for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++) {
132                 loff_t this_size;
133                 sector_t sector;
134                 int offs;
135
136                 /* get block number for next page */
137                 rc = ext3_map_inode_page(inode, lnb->page, dreq->blocks,
138                                                 dreq->created, 1);
139                 if (rc)
140                         GOTO(cleanup, rc);
141
142                 for (k = 0; k < blocks_per_page; k++) {
143                         sector = dreq->blocks[k] * (inode->i_sb->s_blocksize >> 9);
144                         offs = k * inode->i_sb->s_blocksize;
145
146                         if (!bio || !can_be_merged(bio, sector) ||
147                                 !bio_add_page(bio, lnb->page, lnb->len, offs)) {
148                                 if (bio) {
149                                         atomic_inc(&dreq->numreqs);
150                                         submit_bio(WRITE, bio);
151                                         bio = NULL;
152                                 }
153                                 /* allocate new bio */
154                                 bio = bio_alloc(GFP_NOIO, obj->ioo_bufcnt);
155                                 bio->bi_bdev = inode->i_sb->s_bdev;
156                                 bio->bi_sector = sector;
157                                 bio->bi_end_io = dio_complete_routine; 
158                                 bio->bi_private = dreq;
159
160                                 if (!bio_add_page(bio, lnb->page, lnb->len, 0))
161                                         LBUG();
162                         }
163                 }
164
165                 /* We expect these pages to be in offset order, but we'll
166                  * be forgiving */
167                 this_size = lnb->offset + lnb->len;
168                 if (this_size > iattr.ia_size)
169                         iattr.ia_size = this_size;
170         }
171         if (bio) {
172                 atomic_inc(&dreq->numreqs);
173                 submit_bio(WRITE, bio);
174         }
175
176         /* time to wait for I/O completion */
177         wait_event(dreq->wait, atomic_read(&dreq->numreqs) == 0);
178
179         /* free all bios */
180         while (dreq->bio_list) {
181                 bio = dreq->bio_list;
182                 dreq->bio_list = bio->bi_private;
183                 bio_put(bio);
184         }
185
186         if (rc == 0) {
187                 down(&inode->i_sem);
188                 inode_update_time(inode, 1);
189                 if (iattr.ia_size > inode->i_size) {
190                         CDEBUG(D_INFO, "setting i_size to "LPU64"\n",
191                                iattr.ia_size);
192                         fsfilt_setattr(obd, res->dentry, oti->oti_handle,
193                                        &iattr, 0);
194                 }
195                 up(&inode->i_sem);
196         }
197
198         if (time_after(jiffies, now + 15 * HZ))
199                 CERROR("slow direct_io %lus\n", (jiffies - now) / HZ);
200
201         rc = filter_finish_transno(exp, oti, rc);
202         err = fsfilt_commit(obd, inode, oti->oti_handle, obd_sync_filter);
203         if (err)
204                 rc = err;
205         if (obd_sync_filter)
206                 LASSERT(oti->oti_transno <= obd->obd_last_committed);
207         if (time_after(jiffies, now + 15 * HZ))
208                 CERROR("slow commitrw commit %lus\n", (jiffies - now) / HZ);
209
210 cleanup:
211         switch (cleanup_phase) {
212         case 2:
213                 pop_ctxt(&saved, &obd->obd_ctxt, NULL);
214                 LASSERT(current->journal_info == NULL);
215         case 1:
216                 OBD_FREE(dreq, sizeof(*dreq));
217         case 0:
218                 for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++) {
219                         /* flip_.. gets a ref, while free_page only frees
220                          * when it decrefs to 0 */
221                         if (rc == 0)
222                                 flip_into_page_cache(inode, lnb->page);
223                         __free_page(lnb->page);
224                 }
225                 f_dput(res->dentry);
226         }
227
228         RETURN(rc);
229 }