Whamcloud - gitweb
Allow OST read cache to be disabled (already on b1_0).
[fs/lustre-release.git] / lustre / obdfilter / filter_io.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/fs/obdfilter/filter_io.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *   Author: Phil Schwan <phil@clusterfs.com>
10  *
11  *   This file is part of Lustre, http://www.lustre.org.
12  *
13  *   Lustre is free software; you can redistribute it and/or
14  *   modify it under the terms of version 2 of the GNU General Public
15  *   License as published by the Free Software Foundation.
16  *
17  *   Lustre is distributed in the hope that it will be useful,
18  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
19  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20  *   GNU General Public License for more details.
21  *
22  *   You should have received a copy of the GNU General Public License
23  *   along with Lustre; if not, write to the Free Software
24  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25  */
26
27 #define DEBUG_SUBSYSTEM S_FILTER
28
29 #include <linux/config.h>
30 #include <linux/module.h>
31 #include <linux/pagemap.h> // XXX kill me soon
32 #include <linux/version.h>
33
34 #include <linux/obd_class.h>
35 #include <linux/lustre_fsfilt.h>
36 #include "filter_internal.h"
37
38 static int filter_start_page_read(struct inode *inode, struct niobuf_local *lnb)
39 {
40         struct address_space *mapping = inode->i_mapping;
41         struct page *page;
42         unsigned long index = lnb->offset >> PAGE_SHIFT;
43         int rc;
44
45         page = grab_cache_page(mapping, index); /* locked page */
46         if (page == NULL)
47                 return lnb->rc = -ENOMEM;
48
49         LASSERT(page->mapping == mapping);
50
51         lnb->page = page;
52
53         if (inode->i_size < lnb->offset + lnb->len - 1)
54                 lnb->rc = inode->i_size - lnb->offset;
55         else
56                 lnb->rc = lnb->len;
57
58         if (PageUptodate(page)) {
59                 unlock_page(page);
60                 return 0;
61         }
62
63         rc = mapping->a_ops->readpage(NULL, page);
64         if (rc < 0) {
65                 CERROR("page index %lu, rc = %d\n", index, rc);
66                 lnb->page = NULL;
67                 page_cache_release(page);
68                 return lnb->rc = rc;
69         }
70
71         return 0;
72 }
73
74 static int filter_finish_page_read(struct niobuf_local *lnb)
75 {
76         if (lnb->page == NULL)
77                 return 0;
78
79         if (PageUptodate(lnb->page))
80                 return 0;
81
82         wait_on_page(lnb->page);
83         if (!PageUptodate(lnb->page)) {
84                 CERROR("page index %lu/offset "LPX64" not uptodate\n",
85                        lnb->page->index, lnb->offset);
86                 GOTO(err_page, lnb->rc = -EIO);
87         }
88         if (PageError(lnb->page)) {
89                 CERROR("page index %lu/offset "LPX64" has error\n",
90                        lnb->page->index, lnb->offset);
91                 GOTO(err_page, lnb->rc = -EIO);
92         }
93
94         return 0;
95
96 err_page:
97         page_cache_release(lnb->page);
98         lnb->page = NULL;
99         return lnb->rc;
100 }
101
102 static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
103                               int objcount, struct obd_ioobj *obj,
104                               int niocount, struct niobuf_remote *nb,
105                               struct niobuf_local *res,
106                               struct obd_trans_info *oti)
107 {
108         struct obd_run_ctxt saved;
109         struct obd_ioobj *o;
110         struct niobuf_remote *rnb;
111         struct niobuf_local *lnb = NULL;
112         struct fsfilt_objinfo *fso;
113         struct dentry *dentry;
114         struct inode *inode;
115         int rc = 0, i, j, tot_bytes = 0, cleanup_phase = 0;
116         unsigned long now = jiffies;
117         ENTRY;
118
119         /* We are currently not supporting multi-obj BRW_READ RPCS at all.
120          * When we do this function's dentry cleanup will need to be fixed */
121         LASSERT(objcount == 1);
122
123         OBD_ALLOC(fso, objcount * sizeof(*fso));
124         if (fso == NULL)
125                 RETURN(-ENOMEM);
126
127         memset(res, 0, niocount * sizeof(*res));
128
129         push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
130         for (i = 0, o = obj; i < objcount; i++, o++) {
131                 LASSERT(o->ioo_bufcnt);
132
133                 dentry = filter_oa2dentry(exp->exp_obd, oa);
134                 if (IS_ERR(dentry))
135                         GOTO(cleanup, rc = PTR_ERR(dentry));
136
137                 if (dentry->d_inode == NULL) {
138                         CERROR("trying to BRW to non-existent file "LPU64"\n",
139                                o->ioo_id);
140                         f_dput(dentry);
141                         GOTO(cleanup, rc = -ENOENT);
142                 }
143
144                 fso[i].fso_dentry = dentry;
145                 fso[i].fso_bufcnt = o->ioo_bufcnt;
146         }
147
148         if (time_after(jiffies, now + 15 * HZ))
149                 CERROR("slow preprw_read setup %lus\n", (jiffies - now) / HZ);
150         else
151                 CDEBUG(D_INFO, "preprw_read setup: %lu jiffies\n",
152                        (jiffies - now));
153
154         for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
155                 dentry = fso[i].fso_dentry;
156                 inode = dentry->d_inode;
157
158                 for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
159                         lnb->dentry = dentry;
160                         lnb->offset = rnb->offset;
161                         lnb->len    = rnb->len;
162                         lnb->flags  = rnb->flags;
163                         lnb->start  = jiffies;
164
165                         if (inode->i_size <= rnb->offset) {
166                                 /* If there's no more data, abort early.
167                                  * lnb->page == NULL and lnb->rc == 0, so it's
168                                  * easy to detect later. */
169                                 break;
170                         } else {
171                                 rc = filter_start_page_read(inode, lnb);
172                         }
173
174                         if (rc) {
175                                 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
176                                        "page err %u@"LPU64" %u/%u %p: rc %d\n",
177                                        lnb->len, lnb->offset, j, o->ioo_bufcnt,
178                                        dentry, rc);
179                                 cleanup_phase = 1;
180                                 GOTO(cleanup, rc);
181                         }
182
183                         tot_bytes += lnb->rc;
184                         if (lnb->rc < lnb->len) {
185                                 /* short read, be sure to wait on it */
186                                 lnb++;
187                                 break;
188                         }
189                 }
190         }
191
192         if (time_after(jiffies, now + 15 * HZ))
193                 CERROR("slow start_page_read %lus\n", (jiffies - now) / HZ);
194         else
195                 CDEBUG(D_INFO, "start_page_read: %lu jiffies\n",
196                        (jiffies - now));
197
198         lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_READ_BYTES,
199                             tot_bytes);
200         while (lnb-- > res) {
201                 rc = filter_finish_page_read(lnb);
202                 if (rc) {
203                         CERROR("error page %u@"LPU64" %u %p: rc %d\n", lnb->len,
204                                lnb->offset, (int)(lnb - res), lnb->dentry, rc);
205                         cleanup_phase = 1;
206                         GOTO(cleanup, rc);
207                 }
208         }
209
210         if (time_after(jiffies, now + 15 * HZ))
211                 CERROR("slow finish_page_read %lus\n", (jiffies - now) / HZ);
212         else
213                 CDEBUG(D_INFO, "finish_page_read: %lu jiffies\n",
214                        (jiffies - now));
215
216         filter_tally_read(&exp->exp_obd->u.filter, res, niocount);
217
218         EXIT;
219
220  cleanup:
221         switch (cleanup_phase) {
222         case 1:
223                 for (lnb = res; lnb < (res + niocount); lnb++) {
224                         if (lnb->page)
225                                 page_cache_release(lnb->page);
226                 }
227                 if (res->dentry != NULL)
228                         f_dput(res->dentry);
229                 else
230                         CERROR("NULL dentry in cleanup -- tell CFS\n");
231         case 0:
232                 OBD_FREE(fso, objcount * sizeof(*fso));
233                 pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
234         }
235         return rc;
236 }
237
238 static int filter_start_page_write(struct inode *inode,
239                                    struct niobuf_local *lnb)
240 {
241         struct page *page = alloc_pages(GFP_HIGHUSER, 0);
242         if (page == NULL) {
243                 CERROR("no memory for a temp page\n");
244                 RETURN(lnb->rc = -ENOMEM);
245         }
246         POISON_PAGE(page, 0xf1);
247         page->index = lnb->offset >> PAGE_SHIFT;
248         lnb->page = page;
249
250         return 0;
251 }
252
253 /* If we ever start to support multi-object BRW RPCs, we will need to get locks
254  * on mulitple inodes.  That isn't all, because there still exists the
255  * possibility of a truncate starting a new transaction while holding the ext3
256  * rwsem = write while some writes (which have started their transactions here)
257  * blocking on the ext3 rwsem = read => lock inversion.
258  *
259  * The handling gets very ugly when dealing with locked pages.  It may be easier
260  * to just get rid of the locked page code (which has problems of its own) and
261  * either discover we do not need it anymore (i.e. it was a symptom of another
262  * bug) or ensure we get the page locks in an appropriate order. */
263 static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
264                                int objcount, struct obd_ioobj *obj,
265                                int niocount, struct niobuf_remote *nb,
266                                struct niobuf_local *res,
267                                struct obd_trans_info *oti)
268 {
269         struct obd_run_ctxt saved;
270         struct niobuf_remote *rnb;
271         struct niobuf_local *lnb = NULL;
272         struct fsfilt_objinfo fso;
273         struct dentry *dentry;
274         int rc = 0, i, tot_bytes = 0;
275         unsigned long now = jiffies;
276         ENTRY;
277         LASSERT(objcount == 1);
278         LASSERT(obj->ioo_bufcnt > 0);
279
280         memset(res, 0, niocount * sizeof(*res));
281
282         push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
283         dentry = filter_fid2dentry(exp->exp_obd, NULL, obj->ioo_gr, 
284                                    obj->ioo_id);
285         if (IS_ERR(dentry))
286                 GOTO(cleanup, rc = PTR_ERR(dentry));
287
288         if (dentry->d_inode == NULL) {
289                 CERROR("trying to BRW to non-existent file "LPU64"\n",
290                        obj->ioo_id);
291                 f_dput(dentry);
292                 GOTO(cleanup, rc = -ENOENT);
293         }
294
295         fso.fso_dentry = dentry;
296         fso.fso_bufcnt = obj->ioo_bufcnt;
297
298         if (time_after(jiffies, now + 15 * HZ))
299                 CERROR("slow preprw_write setup %lus\n", (jiffies - now) / HZ);
300         else
301                 CDEBUG(D_INFO, "preprw_write setup: %lu jiffies\n",
302                        (jiffies - now));
303
304         for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt;
305              i++, lnb++, rnb++) {
306                 lnb->dentry = dentry;
307                 lnb->offset = rnb->offset;
308                 lnb->len    = rnb->len;
309                 lnb->flags  = rnb->flags;
310                 lnb->start  = jiffies;
311
312                 rc = filter_start_page_write(dentry->d_inode, lnb);
313                 if (rc) {
314                         CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, "page err %u@"
315                                LPU64" %u/%u %p: rc %d\n", lnb->len, lnb->offset,
316                                i, obj->ioo_bufcnt, dentry, rc);
317                         while (lnb-- > res)
318                                 __free_pages(lnb->page, 0);
319                         f_dput(dentry);
320                         GOTO(cleanup, rc);
321                 }
322                 tot_bytes += lnb->len;
323         }
324
325         if (time_after(jiffies, now + 15 * HZ))
326                 CERROR("slow start_page_write %lus\n", (jiffies - now) / HZ);
327         else
328                 CDEBUG(D_INFO, "start_page_write: %lu jiffies\n",
329                        (jiffies - now));
330
331         lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
332                             tot_bytes);
333         EXIT;
334 cleanup:
335         pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
336         return rc;
337 }
338
339 int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
340                   int objcount, struct obd_ioobj *obj, int niocount,
341                   struct niobuf_remote *nb, struct niobuf_local *res,
342                   struct obd_trans_info *oti)
343 {
344         if (cmd == OBD_BRW_WRITE)
345                 return filter_preprw_write(cmd, exp, oa, objcount, obj,
346                                            niocount, nb, res, oti);
347
348         if (cmd == OBD_BRW_READ)
349                 return filter_preprw_read(cmd, exp, oa, objcount, obj,
350                                           niocount, nb, res, oti);
351
352         LBUG();
353         return -EPROTO;
354 }
355
356 static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa,
357                                 int objcount, struct obd_ioobj *obj,
358                                 int niocount, struct niobuf_local *res,
359                                 struct obd_trans_info *oti)
360 {
361         struct obd_ioobj *o;
362         struct niobuf_local *lnb;
363         int i, j, drop = 0;
364         ENTRY;
365
366         if (res->dentry != NULL)
367                 drop = (res->dentry->d_inode->i_size >
368                         exp->exp_obd->u.filter.fo_readcache_max_filesize);
369
370         for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
371                 for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
372                         if (lnb->page == NULL)
373                                 continue;
374                         /* drop from cache like truncate_list_pages() */
375                         if (drop && !TryLockPage(lnb->page)) {
376                                 if (lnb->page->mapping)
377                                         truncate_complete_page(lnb->page);
378                                 unlock_page(lnb->page);
379                         }
380                         page_cache_release(lnb->page);
381                 }
382         }
383         if (res->dentry != NULL)
384                 f_dput(res->dentry);
385         RETURN(0);
386 }
387
388 void flip_into_page_cache(struct inode *inode, struct page *new_page)
389 {
390         struct page *old_page;
391         int rc;
392
393         do {
394                 /* the dlm is protecting us from read/write concurrency, so we
395                  * expect this find_lock_page to return quickly.  even if we
396                  * race with another writer it won't be doing much work with
397                  * the page locked.  we do this 'cause t_c_p expects a 
398                  * locked page, and it wants to grab the pagecache lock
399                  * as well. */
400                 old_page = find_lock_page(inode->i_mapping, new_page->index);
401                 if (old_page) {
402 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
403                         truncate_complete_page(old_page);
404 #else
405                         truncate_complete_page(old_page->mapping, old_page);
406 #endif
407                         unlock_page(old_page);
408                         page_cache_release(old_page);
409                 }
410
411 #if 0 /* this should be a /proc tunable someday */
412                 /* racing o_directs (no locking ioctl) could race adding
413                  * their pages, so we repeat the page invalidation unless
414                  * we successfully added our new page */
415                 rc = add_to_page_cache_unique(new_page, inode->i_mapping, 
416                                               new_page->index,
417                                               page_hash(inode->i_mapping, 
418                                                         new_page->index));
419                 if (rc == 0) {
420                         /* add_to_page_cache clears uptodate|dirty and locks
421                          * the page */
422                         SetPageUptodate(new_page);
423                         unlock_page(new_page);
424                 }
425 #else   
426                 rc = 0;
427 #endif
428         } while (rc != 0);
429 }
430
431 /* XXX needs to trickle its oa down */
432 int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
433                     int objcount, struct obd_ioobj *obj, int niocount,
434                     struct niobuf_local *res, struct obd_trans_info *oti)
435 {
436         if (cmd == OBD_BRW_WRITE)
437                 return filter_commitrw_write(exp, oa, objcount, obj, niocount,
438                                              res, oti);
439         if (cmd == OBD_BRW_READ)
440                 return filter_commitrw_read(exp, oa, objcount, obj, niocount,
441                                             res, oti);
442         LBUG();
443         return -EPROTO;
444 }
445
446 int filter_brw(int cmd, struct obd_export *exp, struct obdo *oa,
447                struct lov_stripe_md *lsm, obd_count oa_bufs,
448                struct brw_page *pga, struct obd_trans_info *oti)
449 {
450         struct obd_ioobj ioo;
451         struct niobuf_local *lnb;
452         struct niobuf_remote *rnb;
453         obd_count i;
454         int ret = 0;
455         ENTRY;
456
457         OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
458         OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
459
460         if (lnb == NULL || rnb == NULL)
461                 GOTO(out, ret = -ENOMEM);
462
463         for (i = 0; i < oa_bufs; i++) {
464                 rnb[i].offset = pga[i].off;
465                 rnb[i].len = pga[i].count;
466         }
467
468         obdo_to_ioobj(oa, &ioo);
469         ioo.ioo_bufcnt = oa_bufs;
470
471         ret = filter_preprw(cmd, exp, oa, 1, &ioo, oa_bufs, rnb, lnb, oti);
472         if (ret != 0)
473                 GOTO(out, ret);
474
475         for (i = 0; i < oa_bufs; i++) {
476                 void *virt = kmap(pga[i].pg);
477                 obd_off off = pga[i].off & ~PAGE_MASK;
478                 void *addr = kmap(lnb[i].page);
479
480                 /* 2 kmaps == vanishingly small deadlock opportunity */
481
482                 if (cmd & OBD_BRW_WRITE)
483                         memcpy(addr + off, virt + off, pga[i].count);
484                 else
485                         memcpy(virt + off, addr + off, pga[i].count);
486
487                 kunmap(lnb[i].page);
488                 kunmap(pga[i].pg);
489         }
490
491         ret = filter_commitrw(cmd, exp, oa, 1, &ioo, oa_bufs, lnb, oti);
492
493 out:
494         if (lnb)
495                 OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
496         if (rnb)
497                 OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));
498         RETURN(ret);
499 }