Whamcloud - gitweb
f4581bb001b598e9df18b770bc768482fefbc7d9
[fs/lustre-release.git] / lustre / obdfilter / filter_io.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/fs/obdfilter/filter_io.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *   Author: Phil Schwan <phil@clusterfs.com>
10  *
11  *   This file is part of Lustre, http://www.lustre.org.
12  *
13  *   Lustre is free software; you can redistribute it and/or
14  *   modify it under the terms of version 2 of the GNU General Public
15  *   License as published by the Free Software Foundation.
16  *
17  *   Lustre is distributed in the hope that it will be useful,
18  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
19  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20  *   GNU General Public License for more details.
21  *
22  *   You should have received a copy of the GNU General Public License
23  *   along with Lustre; if not, write to the Free Software
24  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25  */
26
27 #define DEBUG_SUBSYSTEM S_FILTER
28
29 #include <linux/config.h>
30 #include <linux/module.h>
31 #include <linux/pagemap.h> // XXX kill me soon
32 #include <linux/version.h>
33
34 #include <linux/obd_class.h>
35 #include <linux/lustre_fsfilt.h>
36 #include "filter_internal.h"
37
38 static int filter_start_page_read(struct inode *inode, struct niobuf_local *lnb)
39 {
40         struct address_space *mapping = inode->i_mapping;
41         struct page *page;
42         unsigned long index = lnb->offset >> PAGE_SHIFT;
43         int rc;
44
45         page = grab_cache_page(mapping, index); /* locked page */
46         if (page == NULL)
47                 return lnb->rc = -ENOMEM;
48
49         LASSERT(page->mapping == mapping);
50
51         lnb->page = page;
52
53         if (inode->i_size < lnb->offset + lnb->len - 1)
54                 lnb->rc = inode->i_size - lnb->offset;
55         else
56                 lnb->rc = lnb->len;
57
58         if (PageUptodate(page)) {
59                 unlock_page(page);
60                 return 0;
61         }
62
63         rc = mapping->a_ops->readpage(NULL, page);
64         if (rc < 0) {
65                 CERROR("page index %lu, rc = %d\n", index, rc);
66                 lnb->page = NULL;
67                 page_cache_release(page);
68                 return lnb->rc = rc;
69         }
70
71         return 0;
72 }
73
74 static int filter_finish_page_read(struct niobuf_local *lnb)
75 {
76         if (lnb->page == NULL)
77                 return 0;
78
79         if (PageUptodate(lnb->page))
80                 return 0;
81
82         wait_on_page(lnb->page);
83         if (!PageUptodate(lnb->page)) {
84                 CERROR("page index %lu/offset "LPX64" not uptodate\n",
85                        lnb->page->index, lnb->offset);
86                 GOTO(err_page, lnb->rc = -EIO);
87         }
88         if (PageError(lnb->page)) {
89                 CERROR("page index %lu/offset "LPX64" has error\n",
90                        lnb->page->index, lnb->offset);
91                 GOTO(err_page, lnb->rc = -EIO);
92         }
93
94         return 0;
95
96 err_page:
97         page_cache_release(lnb->page);
98         lnb->page = NULL;
99         return lnb->rc;
100 }
101
102 static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
103                               int objcount, struct obd_ioobj *obj,
104                               int niocount, struct niobuf_remote *nb,
105                               struct niobuf_local *res,
106                               struct obd_trans_info *oti)
107 {
108         struct obd_run_ctxt saved;
109         struct obd_ioobj *o;
110         struct niobuf_remote *rnb;
111         struct niobuf_local *lnb = NULL;
112         struct fsfilt_objinfo *fso;
113         struct dentry *dentry;
114         struct inode *inode;
115         int rc = 0, i, j, tot_bytes = 0, cleanup_phase = 0;
116         unsigned long now = jiffies;
117         ENTRY;
118
119         /* We are currently not supporting multi-obj BRW_READ RPCS at all.
120          * When we do this function's dentry cleanup will need to be fixed */
121         LASSERT(objcount == 1);
122
123         OBD_ALLOC(fso, objcount * sizeof(*fso));
124         if (fso == NULL)
125                 RETURN(-ENOMEM);
126
127         memset(res, 0, niocount * sizeof(*res));
128
129         push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
130         for (i = 0, o = obj; i < objcount; i++, o++) {
131                 LASSERT(o->ioo_bufcnt);
132
133                 dentry = filter_oa2dentry(exp->exp_obd, oa);
134                 if (IS_ERR(dentry))
135                         GOTO(cleanup, rc = PTR_ERR(dentry));
136
137                 if (dentry->d_inode == NULL) {
138                         CERROR("trying to BRW to non-existent file "LPU64"\n",
139                                o->ioo_id);
140                         f_dput(dentry);
141                         GOTO(cleanup, rc = -ENOENT);
142                 }
143
144                 fso[i].fso_dentry = dentry;
145                 fso[i].fso_bufcnt = o->ioo_bufcnt;
146         }
147
148         if (time_after(jiffies, now + 15 * HZ))
149                 CERROR("slow preprw_read setup %lus\n", (jiffies - now) / HZ);
150         else
151                 CDEBUG(D_INFO, "preprw_read setup: %lu jiffies\n",
152                        (jiffies - now));
153
154         for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
155                 dentry = fso[i].fso_dentry;
156                 inode = dentry->d_inode;
157
158                 for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
159                         lnb->dentry = dentry;
160                         lnb->offset = rnb->offset;
161                         lnb->len    = rnb->len;
162                         lnb->flags  = rnb->flags;
163                         lnb->start  = jiffies;
164
165                         if (inode->i_size <= rnb->offset) {
166                                 /* If there's no more data, abort early.
167                                  * lnb->page == NULL and lnb->rc == 0, so it's
168                                  * easy to detect later. */
169                                 break;
170                         } else {
171                                 rc = filter_start_page_read(inode, lnb);
172                         }
173
174                         if (rc) {
175                                 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
176                                        "page err %u@"LPU64" %u/%u %p: rc %d\n",
177                                        lnb->len, lnb->offset, j, o->ioo_bufcnt,
178                                        dentry, rc);
179                                 cleanup_phase = 1;
180                                 GOTO(cleanup, rc);
181                         }
182
183                         tot_bytes += lnb->rc;
184                         if (lnb->rc < lnb->len) {
185                                 /* short read, be sure to wait on it */
186                                 lnb++;
187                                 break;
188                         }
189                 }
190         }
191
192         if (time_after(jiffies, now + 15 * HZ))
193                 CERROR("slow start_page_read %lus\n", (jiffies - now) / HZ);
194         else
195                 CDEBUG(D_INFO, "start_page_read: %lu jiffies\n",
196                        (jiffies - now));
197
198         lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_READ_BYTES,
199                             tot_bytes);
200         while (lnb-- > res) {
201                 rc = filter_finish_page_read(lnb);
202                 if (rc) {
203                         CERROR("error page %u@"LPU64" %u %p: rc %d\n", lnb->len,
204                                lnb->offset, (int)(lnb - res), lnb->dentry, rc);
205                         cleanup_phase = 1;
206                         GOTO(cleanup, rc);
207                 }
208         }
209
210         if (time_after(jiffies, now + 15 * HZ))
211                 CERROR("slow finish_page_read %lus\n", (jiffies - now) / HZ);
212         else
213                 CDEBUG(D_INFO, "finish_page_read: %lu jiffies\n",
214                        (jiffies - now));
215
216         filter_tally_read(&exp->exp_obd->u.filter, res, niocount);
217
218         EXIT;
219
220  cleanup:
221         switch (cleanup_phase) {
222         case 1:
223                 for (lnb = res; lnb < (res + niocount); lnb++) {
224                         if (lnb->page)
225                                 page_cache_release(lnb->page);
226                 }
227                 if (res->dentry != NULL)
228                         f_dput(res->dentry);
229                 else
230                         CERROR("NULL dentry in cleanup -- tell CFS\n");
231         case 0:
232                 OBD_FREE(fso, objcount * sizeof(*fso));
233                 pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
234         }
235         return rc;
236 }
237
238 static int filter_start_page_write(struct inode *inode,
239                                    struct niobuf_local *lnb)
240 {
241         struct page *page = alloc_pages(GFP_HIGHUSER, 0);
242         if (page == NULL) {
243                 CERROR("no memory for a temp page\n");
244                 RETURN(lnb->rc = -ENOMEM);
245         }
246         POISON_PAGE(page, 0xf1);
247         if (lnb->len != PAGE_SIZE) {
248                 memset(kmap(page) + lnb->len, 0, PAGE_SIZE - lnb->len);
249                 kunmap(page);
250         }
251         page->index = lnb->offset >> PAGE_SHIFT;
252         lnb->page = page;
253
254         return 0;
255 }
256
257 /* If we ever start to support multi-object BRW RPCs, we will need to get locks
258  * on mulitple inodes.  That isn't all, because there still exists the
259  * possibility of a truncate starting a new transaction while holding the ext3
260  * rwsem = write while some writes (which have started their transactions here)
261  * blocking on the ext3 rwsem = read => lock inversion.
262  *
263  * The handling gets very ugly when dealing with locked pages.  It may be easier
264  * to just get rid of the locked page code (which has problems of its own) and
265  * either discover we do not need it anymore (i.e. it was a symptom of another
266  * bug) or ensure we get the page locks in an appropriate order. */
267 static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
268                                int objcount, struct obd_ioobj *obj,
269                                int niocount, struct niobuf_remote *nb,
270                                struct niobuf_local *res,
271                                struct obd_trans_info *oti)
272 {
273         struct obd_run_ctxt saved;
274         struct niobuf_remote *rnb;
275         struct niobuf_local *lnb = NULL;
276         struct fsfilt_objinfo fso;
277         struct dentry *dentry;
278         int rc = 0, i, tot_bytes = 0;
279         unsigned long now = jiffies;
280         ENTRY;
281         LASSERT(objcount == 1);
282         LASSERT(obj->ioo_bufcnt > 0);
283
284         memset(res, 0, niocount * sizeof(*res));
285
286         push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
287         dentry = filter_fid2dentry(exp->exp_obd, NULL, obj->ioo_gr,
288                                    obj->ioo_id);
289         if (IS_ERR(dentry))
290                 GOTO(cleanup, rc = PTR_ERR(dentry));
291
292         if (dentry->d_inode == NULL) {
293                 CERROR("trying to BRW to non-existent file "LPU64"\n",
294                        obj->ioo_id);
295                 f_dput(dentry);
296                 GOTO(cleanup, rc = -ENOENT);
297         }
298
299         fso.fso_dentry = dentry;
300         fso.fso_bufcnt = obj->ioo_bufcnt;
301
302         if (time_after(jiffies, now + 15 * HZ))
303                 CERROR("slow preprw_write setup %lus\n", (jiffies - now) / HZ);
304         else
305                 CDEBUG(D_INFO, "preprw_write setup: %lu jiffies\n",
306                        (jiffies - now));
307
308         for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt;
309              i++, lnb++, rnb++) {
310                 lnb->dentry = dentry;
311                 lnb->offset = rnb->offset;
312                 lnb->len    = rnb->len;
313                 lnb->flags  = rnb->flags;
314                 lnb->start  = jiffies;
315
316                 rc = filter_start_page_write(dentry->d_inode, lnb);
317                 if (rc) {
318                         CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, "page err %u@"
319                                LPU64" %u/%u %p: rc %d\n", lnb->len, lnb->offset,
320                                i, obj->ioo_bufcnt, dentry, rc);
321                         while (lnb-- > res)
322                                 __free_pages(lnb->page, 0);
323                         f_dput(dentry);
324                         GOTO(cleanup, rc);
325                 }
326                 tot_bytes += lnb->len;
327         }
328
329         if (time_after(jiffies, now + 15 * HZ))
330                 CERROR("slow start_page_write %lus\n", (jiffies - now) / HZ);
331         else
332                 CDEBUG(D_INFO, "start_page_write: %lu jiffies\n",
333                        (jiffies - now));
334
335         lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
336                             tot_bytes);
337         EXIT;
338 cleanup:
339         pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
340         return rc;
341 }
342
343 int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
344                   int objcount, struct obd_ioobj *obj, int niocount,
345                   struct niobuf_remote *nb, struct niobuf_local *res,
346                   struct obd_trans_info *oti)
347 {
348         if (cmd == OBD_BRW_WRITE)
349                 return filter_preprw_write(cmd, exp, oa, objcount, obj,
350                                            niocount, nb, res, oti);
351
352         if (cmd == OBD_BRW_READ)
353                 return filter_preprw_read(cmd, exp, oa, objcount, obj,
354                                           niocount, nb, res, oti);
355
356         LBUG();
357         return -EPROTO;
358 }
359
360 static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa,
361                                 int objcount, struct obd_ioobj *obj,
362                                 int niocount, struct niobuf_local *res,
363                                 struct obd_trans_info *oti)
364 {
365         struct obd_ioobj *o;
366         struct niobuf_local *lnb;
367         int i, j, drop = 0;
368         ENTRY;
369
370         if (res->dentry != NULL)
371                 drop = (res->dentry->d_inode->i_size >
372                         exp->exp_obd->u.filter.fo_readcache_max_filesize);
373
374         for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
375                 for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
376                         if (lnb->page == NULL)
377                                 continue;
378                         /* drop from cache like truncate_list_pages() */
379                         if (drop && !TryLockPage(lnb->page)) {
380                                 if (lnb->page->mapping)
381                                         truncate_complete_page(lnb->page);
382                                 unlock_page(lnb->page);
383                         }
384                         page_cache_release(lnb->page);
385                 }
386         }
387         if (res->dentry != NULL)
388                 f_dput(res->dentry);
389         RETURN(0);
390 }
391
392 void flip_into_page_cache(struct inode *inode, struct page *new_page)
393 {
394         struct page *old_page;
395         int rc;
396
397         do {
398                 /* the dlm is protecting us from read/write concurrency, so we
399                  * expect this find_lock_page to return quickly.  even if we
400                  * race with another writer it won't be doing much work with
401                  * the page locked.  we do this 'cause t_c_p expects a
402                  * locked page, and it wants to grab the pagecache lock
403                  * as well. */
404                 old_page = find_lock_page(inode->i_mapping, new_page->index);
405                 if (old_page) {
406 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
407                         truncate_complete_page(old_page);
408 #else
409                         truncate_complete_page(old_page->mapping, old_page);
410 #endif
411                         unlock_page(old_page);
412                         page_cache_release(old_page);
413                 }
414
415 #if 0 /* this should be a /proc tunable someday */
416                 /* racing o_directs (no locking ioctl) could race adding
417                  * their pages, so we repeat the page invalidation unless
418                  * we successfully added our new page */
419                 rc = add_to_page_cache_unique(new_page, inode->i_mapping,
420                                               new_page->index,
421                                               page_hash(inode->i_mapping,
422                                                         new_page->index));
423                 if (rc == 0) {
424                         /* add_to_page_cache clears uptodate|dirty and locks
425                          * the page */
426                         SetPageUptodate(new_page);
427                         unlock_page(new_page);
428                 }
429 #else
430                 rc = 0;
431 #endif
432         } while (rc != 0);
433 }
434
435 /* XXX needs to trickle its oa down */
436 int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
437                     int objcount, struct obd_ioobj *obj, int niocount,
438                     struct niobuf_local *res, struct obd_trans_info *oti)
439 {
440         if (cmd == OBD_BRW_WRITE)
441                 return filter_commitrw_write(exp, oa, objcount, obj, niocount,
442                                              res, oti);
443         if (cmd == OBD_BRW_READ)
444                 return filter_commitrw_read(exp, oa, objcount, obj, niocount,
445                                             res, oti);
446         LBUG();
447         return -EPROTO;
448 }
449
450 int filter_brw(int cmd, struct obd_export *exp, struct obdo *oa,
451                struct lov_stripe_md *lsm, obd_count oa_bufs,
452                struct brw_page *pga, struct obd_trans_info *oti)
453 {
454         struct obd_ioobj ioo;
455         struct niobuf_local *lnb;
456         struct niobuf_remote *rnb;
457         obd_count i;
458         int ret = 0;
459         ENTRY;
460
461         OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
462         OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
463
464         if (lnb == NULL || rnb == NULL)
465                 GOTO(out, ret = -ENOMEM);
466
467         for (i = 0; i < oa_bufs; i++) {
468                 rnb[i].offset = pga[i].off;
469                 rnb[i].len = pga[i].count;
470         }
471
472         obdo_to_ioobj(oa, &ioo);
473         ioo.ioo_bufcnt = oa_bufs;
474
475         ret = filter_preprw(cmd, exp, oa, 1, &ioo, oa_bufs, rnb, lnb, oti);
476         if (ret != 0)
477                 GOTO(out, ret);
478
479         for (i = 0; i < oa_bufs; i++) {
480                 void *virt = kmap(pga[i].pg);
481                 obd_off off = pga[i].off & ~PAGE_MASK;
482                 void *addr = kmap(lnb[i].page);
483
484                 /* 2 kmaps == vanishingly small deadlock opportunity */
485
486                 if (cmd & OBD_BRW_WRITE)
487                         memcpy(addr + off, virt + off, pga[i].count);
488                 else
489                         memcpy(virt + off, addr + off, pga[i].count);
490
491                 kunmap(lnb[i].page);
492                 kunmap(pga[i].pg);
493         }
494
495         ret = filter_commitrw(cmd, exp, oa, 1, &ioo, oa_bufs, lnb, oti);
496
497 out:
498         if (lnb)
499                 OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
500         if (rnb)
501                 OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));
502         RETURN(ret);
503 }