Whamcloud - gitweb
1ce8825621f24a28775cabcd2cf08919e4b77ea3
[fs/lustre-release.git] / lustre / obdfilter / filter_io.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/fs/obdfilter/filter_io.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *   Author: Phil Schwan <phil@clusterfs.com>
10  *
11  *   This file is part of Lustre, http://www.lustre.org.
12  *
13  *   Lustre is free software; you can redistribute it and/or
14  *   modify it under the terms of version 2 of the GNU General Public
15  *   License as published by the Free Software Foundation.
16  *
17  *   Lustre is distributed in the hope that it will be useful,
18  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
19  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20  *   GNU General Public License for more details.
21  *
22  *   You should have received a copy of the GNU General Public License
23  *   along with Lustre; if not, write to the Free Software
24  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25  */
26
27 #define DEBUG_SUBSYSTEM S_FILTER
28
29 #include <linux/config.h>
30 #include <linux/module.h>
31 #include <linux/pagemap.h> // XXX kill me soon
32 #include <linux/version.h>
33
34 #include <linux/obd_class.h>
35 #include <linux/lustre_fsfilt.h>
36 #include "filter_internal.h"
37
38 static int filter_start_page_read(struct inode *inode, struct niobuf_local *lnb)
39 {
40         struct address_space *mapping = inode->i_mapping;
41         struct page *page;
42         unsigned long index = lnb->offset >> PAGE_SHIFT;
43         int rc;
44
45         page = grab_cache_page(mapping, index); /* locked page */
46         if (page == NULL)
47                 return lnb->rc = -ENOMEM;
48
49         LASSERT(page->mapping == mapping);
50
51         lnb->page = page;
52
53         if (inode->i_size < lnb->offset + lnb->len - 1)
54                 lnb->rc = inode->i_size - lnb->offset;
55         else
56                 lnb->rc = lnb->len;
57
58         if (PageUptodate(page)) {
59                 unlock_page(page);
60                 return 0;
61         }
62
63         rc = mapping->a_ops->readpage(NULL, page);
64         if (rc < 0) {
65                 CERROR("page index %lu, rc = %d\n", index, rc);
66                 lnb->page = NULL;
67                 page_cache_release(page);
68                 return lnb->rc = rc;
69         }
70
71         return 0;
72 }
73
74 static int filter_finish_page_read(struct niobuf_local *lnb)
75 {
76         if (lnb->page == NULL)
77                 return 0;
78
79         if (PageUptodate(lnb->page))
80                 return 0;
81
82         wait_on_page(lnb->page);
83         if (!PageUptodate(lnb->page)) {
84                 CERROR("page index %lu/offset "LPX64" not uptodate\n",
85                        lnb->page->index, lnb->offset);
86                 GOTO(err_page, lnb->rc = -EIO);
87         }
88         if (PageError(lnb->page)) {
89                 CERROR("page index %lu/offset "LPX64" has error\n",
90                        lnb->page->index, lnb->offset);
91                 GOTO(err_page, lnb->rc = -EIO);
92         }
93
94         return 0;
95
96 err_page:
97         page_cache_release(lnb->page);
98         lnb->page = NULL;
99         return lnb->rc;
100 }
101
102 static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
103                               int objcount, struct obd_ioobj *obj,
104                               int niocount, struct niobuf_remote *nb,
105                               struct niobuf_local *res,
106                               struct obd_trans_info *oti)
107 {
108         struct obd_run_ctxt saved;
109         struct obd_ioobj *o;
110         struct niobuf_remote *rnb;
111         struct niobuf_local *lnb = NULL;
112         struct fsfilt_objinfo *fso;
113         struct dentry *dentry;
114         struct inode *inode;
115         int rc = 0, i, j, tot_bytes = 0, cleanup_phase = 0;
116         unsigned long now = jiffies;
117         ENTRY;
118
119         /* We are currently not supporting multi-obj BRW_READ RPCS at all.
120          * When we do this function's dentry cleanup will need to be fixed */
121         LASSERT(objcount == 1);
122
123         OBD_ALLOC(fso, objcount * sizeof(*fso));
124         if (fso == NULL)
125                 RETURN(-ENOMEM);
126
127         memset(res, 0, niocount * sizeof(*res));
128
129         push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
130         for (i = 0, o = obj; i < objcount; i++, o++) {
131                 LASSERT(o->ioo_bufcnt);
132
133                 dentry = filter_oa2dentry(exp->exp_obd, oa);
134                 if (IS_ERR(dentry))
135                         GOTO(cleanup, rc = PTR_ERR(dentry));
136
137                 if (dentry->d_inode == NULL) {
138                         CERROR("trying to BRW to non-existent file "LPU64"\n",
139                                o->ioo_id);
140                         f_dput(dentry);
141                         GOTO(cleanup, rc = -ENOENT);
142                 }
143
144                 fso[i].fso_dentry = dentry;
145                 fso[i].fso_bufcnt = o->ioo_bufcnt;
146         }
147
148         if (time_after(jiffies, now + 15 * HZ))
149                 CERROR("slow preprw_read setup %lus\n", (jiffies - now) / HZ);
150         else
151                 CDEBUG(D_INFO, "preprw_read setup: %lu jiffies\n",
152                        (jiffies - now));
153
154         for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
155                 dentry = fso[i].fso_dentry;
156                 inode = dentry->d_inode;
157
158                 for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
159                         lnb->dentry = dentry;
160                         lnb->offset = rnb->offset;
161                         lnb->len    = rnb->len;
162                         lnb->flags  = rnb->flags;
163                         lnb->start  = jiffies;
164
165                         if (inode->i_size <= rnb->offset) {
166                                 /* If there's no more data, abort early.
167                                  * lnb->page == NULL and lnb->rc == 0, so it's
168                                  * easy to detect later. */
169                                 break;
170                         } else {
171                                 rc = filter_start_page_read(inode, lnb);
172                         }
173
174                         if (rc) {
175                                 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
176                                        "page err %u@"LPU64" %u/%u %p: rc %d\n",
177                                        lnb->len, lnb->offset, j, o->ioo_bufcnt,
178                                        dentry, rc);
179                                 cleanup_phase = 1;
180                                 GOTO(cleanup, rc);
181                         }
182
183                         tot_bytes += lnb->rc;
184                         if (lnb->rc < lnb->len) {
185                                 /* short read, be sure to wait on it */
186                                 lnb++;
187                                 break;
188                         }
189                 }
190         }
191
192         if (time_after(jiffies, now + 15 * HZ))
193                 CERROR("slow start_page_read %lus\n", (jiffies - now) / HZ);
194         else
195                 CDEBUG(D_INFO, "start_page_read: %lu jiffies\n",
196                        (jiffies - now));
197
198         lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_READ_BYTES,
199                             tot_bytes);
200         while (lnb-- > res) {
201                 rc = filter_finish_page_read(lnb);
202                 if (rc) {
203                         CERROR("error page %u@"LPU64" %u %p: rc %d\n", lnb->len,
204                                lnb->offset, (int)(lnb - res), lnb->dentry, rc);
205                         cleanup_phase = 1;
206                         GOTO(cleanup, rc);
207                 }
208         }
209
210         if (time_after(jiffies, now + 15 * HZ))
211                 CERROR("slow finish_page_read %lus\n", (jiffies - now) / HZ);
212         else
213                 CDEBUG(D_INFO, "finish_page_read: %lu jiffies\n",
214                        (jiffies - now));
215
216         EXIT;
217
218  cleanup:
219         switch (cleanup_phase) {
220         case 1:
221                 for (lnb = res; lnb < (res + niocount); lnb++) {
222                         if (lnb->page)
223                                 page_cache_release(lnb->page);
224                 }
225                 if (res->dentry != NULL)
226                         f_dput(res->dentry);
227                 else
228                         CERROR("NULL dentry in cleanup -- tell CFS\n");
229         case 0:
230                 OBD_FREE(fso, objcount * sizeof(*fso));
231                 pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
232         }
233         return rc;
234 }
235
236 static int filter_start_page_write(struct inode *inode,
237                                    struct niobuf_local *lnb)
238 {
239         struct page *page = alloc_pages(GFP_HIGHUSER, 0);
240         if (page == NULL) {
241                 CERROR("no memory for a temp page\n");
242                 RETURN(lnb->rc = -ENOMEM);
243         }
244         POISON_PAGE(page, 0xf1);
245         page->index = lnb->offset >> PAGE_SHIFT;
246         lnb->page = page;
247
248         return 0;
249 }
250
251 /* If we ever start to support multi-object BRW RPCs, we will need to get locks
252  * on mulitple inodes.  That isn't all, because there still exists the
253  * possibility of a truncate starting a new transaction while holding the ext3
254  * rwsem = write while some writes (which have started their transactions here)
255  * blocking on the ext3 rwsem = read => lock inversion.
256  *
257  * The handling gets very ugly when dealing with locked pages.  It may be easier
258  * to just get rid of the locked page code (which has problems of its own) and
259  * either discover we do not need it anymore (i.e. it was a symptom of another
260  * bug) or ensure we get the page locks in an appropriate order. */
261 static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
262                                int objcount, struct obd_ioobj *obj,
263                                int niocount, struct niobuf_remote *nb,
264                                struct niobuf_local *res,
265                                struct obd_trans_info *oti)
266 {
267         struct obd_run_ctxt saved;
268         struct niobuf_remote *rnb;
269         struct niobuf_local *lnb = NULL;
270         struct fsfilt_objinfo fso;
271         struct dentry *dentry;
272         int rc = 0, i, tot_bytes = 0;
273         unsigned long now = jiffies;
274         ENTRY;
275         LASSERT(objcount == 1);
276         LASSERT(obj->ioo_bufcnt > 0);
277
278         memset(res, 0, niocount * sizeof(*res));
279
280         push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
281         dentry = filter_fid2dentry(exp->exp_obd, NULL, obj->ioo_gr, 
282                                    obj->ioo_id);
283         if (IS_ERR(dentry))
284                 GOTO(cleanup, rc = PTR_ERR(dentry));
285
286         if (dentry->d_inode == NULL) {
287                 CERROR("trying to BRW to non-existent file "LPU64"\n",
288                        obj->ioo_id);
289                 f_dput(dentry);
290                 GOTO(cleanup, rc = -ENOENT);
291         }
292
293         fso.fso_dentry = dentry;
294         fso.fso_bufcnt = obj->ioo_bufcnt;
295
296         if (time_after(jiffies, now + 15 * HZ))
297                 CERROR("slow preprw_write setup %lus\n", (jiffies - now) / HZ);
298         else
299                 CDEBUG(D_INFO, "preprw_write setup: %lu jiffies\n",
300                        (jiffies - now));
301
302         for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt;
303              i++, lnb++, rnb++) {
304                 lnb->dentry = dentry;
305                 lnb->offset = rnb->offset;
306                 lnb->len    = rnb->len;
307                 lnb->flags  = rnb->flags;
308                 lnb->start  = jiffies;
309
310                 rc = filter_start_page_write(dentry->d_inode, lnb);
311                 if (rc) {
312                         CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, "page err %u@"
313                                LPU64" %u/%u %p: rc %d\n", lnb->len, lnb->offset,
314                                i, obj->ioo_bufcnt, dentry, rc);
315                         while (lnb-- > res)
316                                 __free_pages(lnb->page, 0);
317                         f_dput(dentry);
318                         GOTO(cleanup, rc);
319                 }
320                 tot_bytes += lnb->len;
321         }
322
323         if (time_after(jiffies, now + 15 * HZ))
324                 CERROR("slow start_page_write %lus\n", (jiffies - now) / HZ);
325         else
326                 CDEBUG(D_INFO, "start_page_write: %lu jiffies\n",
327                        (jiffies - now));
328
329         lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
330                             tot_bytes);
331         EXIT;
332 cleanup:
333         pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
334         return rc;
335 }
336
337 int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
338                   int objcount, struct obd_ioobj *obj, int niocount,
339                   struct niobuf_remote *nb, struct niobuf_local *res,
340                   struct obd_trans_info *oti)
341 {
342         if (cmd == OBD_BRW_WRITE)
343                 return filter_preprw_write(cmd, exp, oa, objcount, obj,
344                                            niocount, nb, res, oti);
345
346         if (cmd == OBD_BRW_READ)
347                 return filter_preprw_read(cmd, exp, oa, objcount, obj,
348                                           niocount, nb, res, oti);
349
350         LBUG();
351         return -EPROTO;
352 }
353
354 static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa,
355                                 int objcount, struct obd_ioobj *obj,
356                                 int niocount, struct niobuf_local *res,
357                                 struct obd_trans_info *oti)
358 {
359         struct obd_ioobj *o;
360         struct niobuf_local *lnb;
361         int i, j;
362         ENTRY;
363
364         for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
365                 for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
366                         if (lnb->page != NULL)
367                                 page_cache_release(lnb->page);
368                 }
369         }
370         if (res->dentry != NULL)
371                 f_dput(res->dentry);
372         RETURN(0);
373 }
374
375 void flip_into_page_cache(struct inode *inode, struct page *new_page)
376 {
377         struct page *old_page;
378         int rc;
379
380         do {
381                 /* the dlm is protecting us from read/write concurrency, so we
382                  * expect this find_lock_page to return quickly.  even if we
383                  * race with another writer it won't be doing much work with
384                  * the page locked.  we do this 'cause t_c_p expects a 
385                  * locked page, and it wants to grab the pagecache lock
386                  * as well. */
387                 old_page = find_lock_page(inode->i_mapping, new_page->index);
388                 if (old_page) {
389 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
390                         truncate_complete_page(old_page);
391 #else
392                         truncate_complete_page(old_page->mapping, old_page);
393 #endif
394                         unlock_page(old_page);
395                         page_cache_release(old_page);
396                 }
397
398 #if 0 /* this should be a /proc tunable someday */
399                 /* racing o_directs (no locking ioctl) could race adding
400                  * their pages, so we repeat the page invalidation unless
401                  * we successfully added our new page */
402                 rc = add_to_page_cache_unique(new_page, inode->i_mapping, 
403                                               new_page->index,
404                                               page_hash(inode->i_mapping, 
405                                                         new_page->index));
406                 if (rc == 0) {
407                         /* add_to_page_cache clears uptodate|dirty and locks
408                          * the page */
409                         SetPageUptodate(new_page);
410                         unlock_page(new_page);
411                 }
412 #else   
413                 rc = 0;
414 #endif
415         } while (rc != 0);
416 }
417
418 /* XXX needs to trickle its oa down */
419 int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
420                     int objcount, struct obd_ioobj *obj, int niocount,
421                     struct niobuf_local *res, struct obd_trans_info *oti)
422 {
423         if (cmd == OBD_BRW_WRITE)
424                 return filter_commitrw_write(exp, oa, objcount, obj, niocount,
425                                              res, oti);
426         if (cmd == OBD_BRW_READ)
427                 return filter_commitrw_read(exp, oa, objcount, obj, niocount,
428                                             res, oti);
429         LBUG();
430         return -EPROTO;
431 }
432
433 int filter_brw(int cmd, struct obd_export *exp, struct obdo *oa,
434                struct lov_stripe_md *lsm, obd_count oa_bufs,
435                struct brw_page *pga, struct obd_trans_info *oti)
436 {
437         struct obd_ioobj ioo;
438         struct niobuf_local *lnb;
439         struct niobuf_remote *rnb;
440         obd_count i;
441         int ret = 0;
442         ENTRY;
443
444         OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
445         OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
446
447         if (lnb == NULL || rnb == NULL)
448                 GOTO(out, ret = -ENOMEM);
449
450         for (i = 0; i < oa_bufs; i++) {
451                 rnb[i].offset = pga[i].off;
452                 rnb[i].len = pga[i].count;
453         }
454
455         obdo_to_ioobj(oa, &ioo);
456         ioo.ioo_bufcnt = oa_bufs;
457
458         ret = filter_preprw(cmd, exp, oa, 1, &ioo, oa_bufs, rnb, lnb, oti);
459         if (ret != 0)
460                 GOTO(out, ret);
461
462         for (i = 0; i < oa_bufs; i++) {
463                 void *virt = kmap(pga[i].pg);
464                 obd_off off = pga[i].off & ~PAGE_MASK;
465                 void *addr = kmap(lnb[i].page);
466
467                 /* 2 kmaps == vanishingly small deadlock opportunity */
468
469                 if (cmd & OBD_BRW_WRITE)
470                         memcpy(addr + off, virt + off, pga[i].count);
471                 else
472                         memcpy(virt + off, addr + off, pga[i].count);
473
474                 kunmap(lnb[i].page);
475                 kunmap(pga[i].pg);
476         }
477
478         ret = filter_commitrw(cmd, exp, oa, 1, &ioo, oa_bufs, lnb, oti);
479
480 out:
481         if (lnb)
482                 OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
483         if (rnb)
484                 OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));
485         RETURN(ret);
486 }