Whamcloud - gitweb
Silence some compiler warnings.
[fs/lustre-release.git] / lustre / llite / rw.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Lustre Lite I/O Page Cache
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #include <linux/config.h>
25 #include <linux/kernel.h>
26 #include <linux/mm.h>
27 #include <linux/string.h>
28 #include <linux/stat.h>
29 #include <linux/errno.h>
30 #include <linux/smp_lock.h>
31 #include <linux/unistd.h>
32 #include <linux/version.h>
33 #include <asm/system.h>
34 #include <asm/uaccess.h>
35
36 #include <linux/fs.h>
37 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
38 #include <linux/buffer_head.h>
39 #include <linux/mpage.h>
40 #include <linux/writeback.h>
41 #else
42 #include <linux/iobuf.h>
43 #endif
44 #include <linux/stat.h>
45 #include <asm/uaccess.h>
46 #include <asm/segment.h>
47 #include <linux/mm.h>
48 #include <linux/pagemap.h>
49 #include <linux/smp_lock.h>
50
51 #define DEBUG_SUBSYSTEM S_LLITE
52
53 #include <linux/lustre_mds.h>
54 #include <linux/lustre_lite.h>
55 #include "llite_internal.h"
56 #include <linux/lustre_compat25.h>
57
58 /*
59  * Remove page from dirty list
60  */
61 static void __set_page_clean(struct page *page)
62 {
63         struct address_space *mapping = page->mapping;
64         struct inode *inode;
65
66         if (!mapping)
67                 return;
68
69         PGCACHE_WRLOCK(mapping);
70
71         list_del(&page->list);
72         list_add(&page->list, &mapping->clean_pages);
73
74         /* XXX doesn't inode_lock protect i_state ? */
75         inode = mapping->host;
76         if (list_empty(&mapping->dirty_pages)) {
77                 CDEBUG(D_INODE, "inode clean\n");
78                 inode->i_state &= ~I_DIRTY_PAGES;
79         }
80
81         PGCACHE_WRUNLOCK(mapping);
82         EXIT;
83 }
84
85 void set_page_clean(struct page *page)
86 {
87         if (PageDirty(page)) {
88                 ClearPageDirty(page);
89                 __set_page_clean(page);
90         }
91 }
92
93 /* SYNCHRONOUS I/O to object storage for an inode */
94 static int ll_brw(int cmd, struct inode *inode, struct obdo *oa,
95                   struct page *page, int flags)
96 {
97         struct ll_inode_info *lli = ll_i2info(inode);
98         struct lov_stripe_md *lsm = lli->lli_smd;
99         struct brw_page pg;
100         int rc;
101         ENTRY;
102
103         pg.pg = page;
104         pg.off = ((obd_off)page->index) << PAGE_SHIFT;
105
106         if (cmd == OBD_BRW_WRITE && (pg.off + PAGE_SIZE > inode->i_size))
107                 pg.count = inode->i_size % PAGE_SIZE;
108         else
109                 pg.count = PAGE_SIZE;
110
111         CDEBUG(D_PAGE, "%s %d bytes ino %lu at "LPU64"/"LPX64"\n",
112                cmd & OBD_BRW_WRITE ? "write" : "read", pg.count, inode->i_ino,
113                pg.off, pg.off);
114         if (pg.count == 0) {
115                 CERROR("ZERO COUNT: ino %lu: size %p:%Lu(%p:%Lu) idx %lu off "
116                        LPU64"\n",
117                        inode->i_ino, inode, inode->i_size, page->mapping->host,
118                        page->mapping->host->i_size, page->index, pg.off);
119         }
120
121         pg.flag = flags;
122
123         if (cmd == OBD_BRW_WRITE)
124                 lprocfs_counter_add(ll_i2sbi(inode)->ll_stats,
125                                     LPROC_LL_BRW_WRITE, pg.count);
126         else
127                 lprocfs_counter_add(ll_i2sbi(inode)->ll_stats,
128                                     LPROC_LL_BRW_READ, pg.count);
129         rc = obd_brw(cmd, ll_i2obdconn(inode), oa, lsm, 1, &pg, NULL);
130         if (rc != 0 && rc != -EIO)
131                 CERROR("error from obd_brw: rc = %d\n", rc);
132
133         RETURN(rc);
134 }
135
136 /*
137  * we were asked to read a single page but we're going to try and read a batch
138  * of pages all at once.  this vaguely simulates 2.5's readpages.
139  */
140 static int ll_readpage(struct file *file, struct page *first_page)
141 {
142         struct inode *inode = first_page->mapping->host;
143         struct ll_inode_info *lli = ll_i2info(inode);
144         struct page *page = first_page;
145         struct list_head *pos;
146         struct brw_page *pgs;
147         struct obdo *oa;
148         unsigned long end_index, extent_end = 0;
149         struct ptlrpc_request_set *set;
150         int npgs = 0, rc = 0, max_pages;
151         ENTRY;
152
153         LASSERT(PageLocked(page));
154         LASSERT(!PageUptodate(page));
155         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),offset="LPX64"\n",
156                inode->i_ino, inode->i_generation, inode,
157                (((obd_off)page->index) << PAGE_SHIFT));
158         LASSERT(atomic_read(&file->f_dentry->d_inode->i_count) > 0);
159
160         if (inode->i_size <= ((obd_off)page->index) << PAGE_SHIFT) {
161                 CERROR("reading beyond EOF\n");
162                 memset(kmap(page), 0, PAGE_SIZE);
163                 kunmap(page);
164                 SetPageUptodate(page);
165                 unlock_page(page);
166                 RETURN(rc);
167         }
168
169         /* try to read the file's preferred block size in a one-er */
170         end_index = first_page->index +
171                 (inode->i_blksize >> PAGE_CACHE_SHIFT);
172         if (end_index > (inode->i_size >> PAGE_CACHE_SHIFT))
173                 end_index = inode->i_size >> PAGE_CACHE_SHIFT;
174
175         max_pages = ((end_index - first_page->index) << PAGE_CACHE_SHIFT) >>
176                 PAGE_SHIFT;
177         OBD_ALLOC_GFP(pgs, max_pages * sizeof(*pgs), GFP_USER);
178         if (pgs == NULL)
179                 RETURN(-ENOMEM);
180
181         /*
182          * find how far we're allowed to read under the extent ll_file_read
183          * is passing us..
184          */
185         spin_lock(&lli->lli_read_extent_lock);
186         list_for_each(pos, &lli->lli_read_extents) {
187                 struct ll_read_extent *rextent;
188                 rextent = list_entry(pos, struct ll_read_extent, re_lli_item);
189                 if (rextent->re_task != current)
190                         continue;
191
192                 if (rextent->re_extent.end + PAGE_SIZE < rextent->re_extent.end)
193                         /* extent wrapping */
194                         extent_end = ~0;
195                 else {
196                         extent_end = (rextent->re_extent.end + PAGE_SIZE)
197                                                         << PAGE_CACHE_SHIFT;
198                         /* 32bit indexes, 64bit extents.. */
199                         if (((u64)extent_end >> PAGE_CACHE_SHIFT) <
200                                         rextent->re_extent.end)
201                                 extent_end = ~0;
202                 }
203                 break;
204         }
205         spin_unlock(&lli->lli_read_extent_lock);
206
207         if (extent_end == 0) {
208                 static long next_print;
209                 if (time_after(jiffies, next_print)) {
210                         next_print = jiffies + 30 * HZ;
211                         CDEBUG(D_INODE, "mmap readpage - check locks\n");
212                 }
213                 end_index = page->index + 1;
214         } else if (extent_end < end_index)
215                 end_index = extent_end;
216
217         CDEBUG(D_INFO, "max_pages: %d, extent_end: %lu, end_index: %lu, "
218                "i_size: %llu\n",
219                max_pages, extent_end, end_index, inode->i_size);
220
221         /* to balance the find_get_page ref the other pages get that is
222          * decrefed on teardown.. */
223         page_cache_get(page);
224         do {
225                 unsigned long index ;
226
227                 pgs[npgs].pg = page;
228                 pgs[npgs].off = ((obd_off)page->index) << PAGE_CACHE_SHIFT;
229                 pgs[npgs].flag = 0;
230                 pgs[npgs].count = PAGE_SIZE;
231                 /* XXX Workaround for BA OSTs returning short reads at EOF.
232                  * The linux OST will return the full page, zero-filled at the
233                  * end, which will just overwrite the data we set here.  Bug
234                  * 593 relates to fixing this properly.
235                  */
236                 if (inode->i_size < pgs[npgs].off + PAGE_SIZE) {
237                         int count = inode->i_size - pgs[npgs].off;
238                         void *addr = kmap(page);
239                         pgs[npgs].count = count;
240                         //POISON(addr, 0x7c, count);
241                         memset(addr + count, 0, PAGE_SIZE - count);
242                         kunmap(page);
243                 }
244
245                 npgs++;
246                 if (npgs == max_pages)
247                         break;
248
249                 /*
250                  * find pages ahead of us that we can read in.
251                  * grab_cache_page waits on pages that are locked so
252                  * we first try find_get_page, which doesn't.  this stops
253                  * the worst case behaviour of racing threads waiting on
254                  * each other, but doesn't remove it entirely.
255                  */
256                 for (index = page->index + 1, page = NULL;
257                      page == NULL && index < end_index; index++) {
258
259                         /* see if the page already exists and needs updating */
260                         page = find_get_page(inode->i_mapping, index);
261                         if (page) {
262                                 if (Page_Uptodate(page) || TryLockPage(page))
263                                         goto out_release;
264                                 if (!page->mapping || Page_Uptodate(page))
265                                         goto out_unlock;
266                         } else {
267                                 /* ok, we have to create it.. */
268                                 page = grab_cache_page(inode->i_mapping, index);
269                                 if (page == NULL)
270                                         continue;
271                                 if (Page_Uptodate(page))
272                                         goto out_unlock;
273                         }
274
275                         break;
276
277                 out_unlock:
278                         unlock_page(page);
279                 out_release:
280                         page_cache_release(page);
281                         page = NULL;
282                 }
283
284         } while (page);
285
286         if ((oa = obdo_alloc()) == NULL) {
287                 CERROR("ENOMEM allocing obdo\n");
288                 rc = -ENOMEM;
289         } else if ((set = ptlrpc_prep_set()) == NULL) {
290                 CERROR("ENOMEM allocing request set\n");
291                 obdo_free(oa);
292                 rc = -ENOMEM;
293         } else {
294                 struct ll_file_data *fd = file->private_data;
295
296                 oa->o_id = lli->lli_smd->lsm_object_id;
297                 memcpy(obdo_handle(oa), &fd->fd_ost_och.och_fh,
298                        sizeof(fd->fd_ost_och.och_fh));
299                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLHANDLE;
300                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME);
301
302                 rc = obd_brw_async(OBD_BRW_READ, ll_i2obdconn(inode), oa,
303                                    ll_i2info(inode)->lli_smd, npgs, pgs,
304                                    set, NULL);
305                 if (rc == 0)
306                         rc = ptlrpc_set_wait(set);
307                 ptlrpc_set_destroy(set);
308                 if (rc == 0) {
309                         /* bug 1598: don't clobber blksize */
310                         oa->o_valid &= ~(OBD_MD_FLSIZE | OBD_MD_FLBLKSZ);
311                         obdo_refresh_inode(inode, oa, oa->o_valid);
312                 }
313                 if (rc && rc != -EIO)
314                         CERROR("error from obd_brw_async: rc = %d\n", rc);
315                 obdo_free(oa);
316         }
317
318         while (npgs-- > 0) {
319                 page = pgs[npgs].pg;
320
321                 if (rc == 0)
322                         SetPageUptodate(page);
323                 unlock_page(page);
324                 page_cache_release(page);
325         }
326
327         OBD_FREE(pgs, max_pages * sizeof(*pgs));
328         RETURN(rc);
329 } /* ll_readpage */
330
331 /* this isn't where truncate starts.   roughly:
332  * sys_truncate->ll_setattr_raw->vmtruncate->ll_truncate
333  * we grab the lock back in setattr_raw to avoid races. */
334 void ll_truncate(struct inode *inode)
335 {
336         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
337         struct obdo oa;
338         int err;
339         ENTRY;
340         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
341                inode->i_generation, inode);
342
343         /* object not yet allocated */
344         if (!lsm) {
345                 CERROR("truncate on inode %lu with no objects\n", inode->i_ino);
346                 EXIT;
347                 return;
348         }
349
350         /* vmtruncate will just throw away our dirty pages, make sure
351          * we don't think they're still dirty, being careful to round
352          * i_size to the first whole page that was tossed */
353         err = ll_clear_dirty_pages(ll_i2obdconn(inode), lsm,
354                         (inode->i_size + PAGE_CACHE_SIZE-1) >> PAGE_CACHE_SHIFT,
355                         ~0);
356
357         oa.o_id = lsm->lsm_object_id;
358         oa.o_valid = OBD_MD_FLID;
359         obdo_from_inode(&oa, inode, OBD_MD_FLTYPE|OBD_MD_FLMODE|OBD_MD_FLATIME|
360                                     OBD_MD_FLMTIME | OBD_MD_FLCTIME);
361
362         CDEBUG(D_INFO, "calling punch for "LPX64" (all bytes after %Lu)\n",
363                oa.o_id, inode->i_size);
364
365         /* truncate == punch from new size to absolute end of file */
366         err = obd_punch(ll_i2obdconn(inode), &oa, lsm, inode->i_size,
367                         OBD_OBJECT_EOF, NULL);
368         if (err)
369                 CERROR("obd_truncate fails (%d) ino %lu\n", err, inode->i_ino);
370         else
371                 obdo_to_inode(inode, &oa, OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
372                                           OBD_MD_FLATIME | OBD_MD_FLMTIME |
373                                           OBD_MD_FLCTIME);
374
375         EXIT;
376         return;
377 } /* ll_truncate */
378
379 //#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
380
381 static int ll_prepare_write(struct file *file, struct page *page, unsigned from,
382                             unsigned to)
383 {
384         struct inode *inode = page->mapping->host;
385         struct ll_inode_info *lli = ll_i2info(inode);
386         struct ll_file_data *fd = file->private_data;
387         struct lov_stripe_md *lsm = lli->lli_smd;
388         obd_off offset = ((obd_off)page->index) << PAGE_SHIFT;
389         struct brw_page pg;
390         struct obdo oa;
391         int rc = 0;
392         ENTRY;
393
394         if (!PageLocked(page))
395                 LBUG();
396
397         if (PageUptodate(page))
398                 RETURN(0);
399
400         //POISON(addr + from, 0xca, to - from);
401
402         /* Check to see if we should return -EIO right away */
403         pg.pg = page;
404         pg.off = offset;
405         pg.count = PAGE_SIZE;
406         pg.flag = 0;
407         rc = obd_brw(OBD_BRW_CHECK, ll_i2obdconn(inode), NULL, lsm, 1,&pg,NULL);
408         if (rc)
409                 RETURN(rc);
410
411         /* We're completely overwriting an existing page, so _don't_ set it up
412          * to date until commit_write */
413         if (from == 0 && to == PAGE_SIZE)
414                 RETURN(0);
415
416         /* If are writing to a new page, no need to read old data.
417          * the extent locking and getattr procedures in ll_file_write have
418          * guaranteed that i_size is stable enough for our zeroing needs */
419         if (inode->i_size <= offset) {
420                 memset(kmap(page), 0, PAGE_SIZE);
421                 kunmap(page);
422                 GOTO(prepare_done, rc = 0);
423         }
424
425         oa.o_id = lsm->lsm_object_id;
426         oa.o_mode = inode->i_mode;
427         memcpy(obdo_handle(&oa), &fd->fd_ost_och.och_fh,
428                sizeof(fd->fd_ost_och.och_fh));
429         oa.o_valid = OBD_MD_FLID |OBD_MD_FLMODE |OBD_MD_FLTYPE |OBD_MD_FLHANDLE;
430
431         rc = ll_brw(OBD_BRW_READ, inode, &oa, page, 0);
432         if (rc == 0) {
433                 /* bug 1598: don't clobber blksize */
434                 oa.o_valid &= ~(OBD_MD_FLSIZE | OBD_MD_FLBLKSZ);
435                 obdo_refresh_inode(inode, &oa, oa.o_valid);
436         }
437
438         EXIT;
439  prepare_done:
440         if (rc == 0)
441                 SetPageUptodate(page);
442
443         return rc;
444 }
445
446 /*
447  * background file writeback.  This is called regularly from kupdated to write
448  * dirty data, from kswapd when memory is low, and from filemap_fdatasync when
449  * super blocks or inodes are synced..
450  *
451  * obd_brw errors down in _batch_writepage are ignored, so pages are always
452  * unlocked.  Also, there is nobody to return an error code to from here - the
453  * application may not even be running anymore.
454  *
455  * this should be async so that things like kswapd can have a chance to
456  * free some more pages that our allocating writeback may need, but it isn't
457  * yet.
458  */
459 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
460 static unsigned long ll_local_cache_dirty_pages;
461 static unsigned long ll_max_dirty_pages = 20 * 1024 * 1024 / PAGE_SIZE;
462
463 static spinlock_t ll_local_cache_page_count_lock = SPIN_LOCK_UNLOCKED;
464
465 int ll_rd_dirty_pages(char *page, char **start, off_t off, int count, int *eof,
466                       void *data)
467 {
468         unsigned long dirty_count;
469         spin_lock(&ll_local_cache_page_count_lock);
470         dirty_count = ll_local_cache_dirty_pages;
471         spin_unlock(&ll_local_cache_page_count_lock);
472         return snprintf(page, count, "%lu\n", dirty_count);
473 }
474
475 int ll_rd_max_dirty_pages(char *page, char **start, off_t off, int count,
476                           int *eof, void *data)
477 {
478         unsigned long max_dirty;
479         spin_lock(&ll_local_cache_page_count_lock);
480         max_dirty = ll_max_dirty_pages;
481         spin_unlock(&ll_local_cache_page_count_lock);
482         return snprintf(page, count, "%lu\n", max_dirty);
483 }
484
485 int ll_wr_max_dirty_pages(struct file *file, const char *buffer,
486                           unsigned long count, void *data)
487 {
488         unsigned long max_dirty;
489         signed long max_dirty_signed;
490         char kernbuf[20], *end;
491         
492         if (count > (sizeof(kernbuf) - 1))
493                 return -EINVAL;
494
495         if (copy_from_user(kernbuf, buffer, count))
496                 return -EFAULT;
497
498         kernbuf[count] = '\0';
499
500         max_dirty_signed = simple_strtol(kernbuf, &end, 0);
501         if (kernbuf == end)
502                 return -EINVAL;
503         max_dirty = (unsigned long)max_dirty_signed;
504
505 #if 0
506         if (max_dirty < ll_local_cache_dirty_pages)
507                 flush_to_new_max_dirty();
508 #endif
509
510         spin_lock(&ll_local_cache_page_count_lock);
511         CDEBUG(D_CACHE, "changing max_dirty from %lu to %lu\n",
512                ll_max_dirty_pages, max_dirty);
513         ll_max_dirty_pages = max_dirty;
514         spin_unlock(&ll_local_cache_page_count_lock);
515         return count;
516 }
517
518 static int ll_local_cache_full(void)
519 {
520         int full = 0;
521         spin_lock(&ll_local_cache_page_count_lock);
522         if (ll_max_dirty_pages &&
523             ll_local_cache_dirty_pages >= ll_max_dirty_pages) {
524                 full = 1;
525         }
526         spin_unlock(&ll_local_cache_page_count_lock);
527         /* XXX instrument? */
528         /* XXX trigger async writeback when full, or 75% of full? */
529         return full;
530 }
531
532 static void ll_local_cache_flushed_pages(unsigned long pgcount)
533 {
534         unsigned long dirty_count;
535         spin_lock(&ll_local_cache_page_count_lock);
536         dirty_count = ll_local_cache_dirty_pages;
537         ll_local_cache_dirty_pages -= pgcount;
538         CDEBUG(D_CACHE, "dirty pages: %lu->%lu)\n",
539                dirty_count, ll_local_cache_dirty_pages);
540         spin_unlock(&ll_local_cache_page_count_lock);
541         LASSERT(dirty_count >= pgcount);
542 }
543
544 static void ll_local_cache_dirtied_pages(unsigned long pgcount)
545 {
546         unsigned long dirty_count;
547         spin_lock(&ll_local_cache_page_count_lock);
548         dirty_count = ll_local_cache_dirty_pages;
549         ll_local_cache_dirty_pages += pgcount;
550         CDEBUG(D_CACHE, "dirty pages: %lu->%lu\n",
551                dirty_count, ll_local_cache_dirty_pages);
552         spin_unlock(&ll_local_cache_page_count_lock);
553         /* XXX track maximum cached, report to lprocfs */
554 }
555
556 int ll_clear_dirty_pages(struct lustre_handle *conn, struct lov_stripe_md *lsm,
557                          unsigned long start, unsigned long end)
558 {
559         unsigned long cleared;
560         int rc;
561
562         ENTRY;
563         rc = obd_clear_dirty_pages(conn, lsm, start, end, &cleared);
564         if (!rc)
565                 ll_local_cache_flushed_pages(cleared);
566         RETURN(rc);
567 }
568
569 int ll_mark_dirty_page(struct lustre_handle *conn, struct lov_stripe_md *lsm,
570                        unsigned long index)
571 {
572         int rc;
573
574         ENTRY;
575         if (ll_local_cache_full())
576                 RETURN(-EDQUOT);
577
578         rc = obd_mark_page_dirty(conn, lsm, index);
579         if (!rc)
580                 ll_local_cache_dirtied_pages(1);
581         RETURN(rc);
582 }
583
584 static int ll_writepage(struct page *page)
585 {
586         struct inode *inode = page->mapping->host;
587         struct obdo oa;
588         ENTRY;
589
590         CDEBUG(D_CACHE, "page %p [lau %d] inode %p\n", page,
591                PageLaunder(page), inode);
592         LASSERT(PageLocked(page));
593
594         oa.o_id = ll_i2info(inode)->lli_smd->lsm_object_id;
595         oa.o_valid = OBD_MD_FLID;
596         obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
597                                     OBD_MD_FLMTIME | OBD_MD_FLCTIME);
598
599         RETURN(ll_batch_writepage(inode, &oa, page));
600 }
601
602 /*
603  * we really don't want to start writeback here, we want to give callers some
604  * time to further dirty the pages before we write them out.
605  */
606 static int ll_commit_write(struct file *file, struct page *page,
607                            unsigned from, unsigned to)
608 {
609         struct inode *inode = page->mapping->host;
610         loff_t size;
611         int rc = 0;
612         ENTRY;
613
614         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
615         LASSERT(inode == file->f_dentry->d_inode);
616         LASSERT(PageLocked(page));
617
618         CDEBUG(D_INODE, "inode %p is writing page %p from %d to %d at %lu\n",
619                inode, page, from, to, page->index);
620         if (!PageDirty(page)) {
621                 lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats,
622                                      LPROC_LL_DIRTY_MISSES);
623                 rc = ll_mark_dirty_page(ll_i2obdconn(inode),
624                                         ll_i2info(inode)->lli_smd,
625                                         page->index);
626                 if (rc < 0 && rc != -EDQUOT)
627                         RETURN(rc); /* XXX lproc counter here? */
628         } else {
629                 lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats,
630                                      LPROC_LL_DIRTY_HITS);
631         }
632
633         size = (((obd_off)page->index) << PAGE_SHIFT) + to;
634         if (size > inode->i_size)
635                 inode->i_size = size;
636
637         SetPageUptodate(page);
638         set_page_dirty(page);
639
640         /* This means that we've hit either the local cache limit or the limit
641          * of the OST's grant. */
642         if (rc == -EDQUOT) {
643                 struct ll_file_data *fd = file->private_data;
644                 struct obdo oa;
645                 int rc;
646
647                 oa.o_id = ll_i2info(inode)->lli_smd->lsm_object_id;
648                 memcpy(obdo_handle(&oa), &fd->fd_ost_och.och_fh,
649                        sizeof(fd->fd_ost_och.och_fh));
650                 oa.o_valid = OBD_MD_FLID | OBD_MD_FLHANDLE;
651                 obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
652                                             OBD_MD_FLMTIME | OBD_MD_FLCTIME);
653
654                 rc = ll_batch_writepage(inode, &oa, page);
655                 lock_page(page); /* caller expects to unlock */
656                 RETURN(rc);
657         }
658
659         RETURN(0);
660 } /* ll_commit_write */
661 #else
662 static int ll_writepage(struct page *page,
663                         struct writeback_control *wbc)
664 {
665
666         return 0;
667 }
668 static int ll_commit_write(struct file *file, struct page *page,
669                            unsigned from, unsigned to)
670 {
671         return 0;
672 }
673 #endif
674
675 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
676 static int ll_direct_IO(int rw, struct inode *inode, struct kiobuf *iobuf,
677                         unsigned long blocknr, int blocksize)
678 {
679         struct ll_inode_info *lli = ll_i2info(inode);
680         struct lov_stripe_md *lsm = lli->lli_smd;
681         struct brw_page *pga;
682         struct ptlrpc_request_set *set;
683         struct obdo oa;
684         int length, i, flags, rc = 0;
685         loff_t offset;
686         ENTRY;
687
688         if (!lsm || !lsm->lsm_object_id)
689                 RETURN(-EBADF);
690
691         /* FIXME: io smaller than PAGE_SIZE is broken on ia64 */
692         if ((iobuf->offset & (PAGE_SIZE - 1)) ||
693             (iobuf->length & (PAGE_SIZE - 1)))
694                 RETURN(-EINVAL);
695
696         set = ptlrpc_prep_set();
697         if (set == NULL)
698                 RETURN(-ENOMEM);
699
700         OBD_ALLOC(pga, sizeof(*pga) * iobuf->nr_pages);
701         if (!pga) {
702                 ptlrpc_set_destroy(set);
703                 RETURN(-ENOMEM);
704         }
705
706         flags = (rw == WRITE ? OBD_BRW_CREATE : 0) /* | OBD_BRW_DIRECTIO */;
707         offset = ((obd_off)blocknr << inode->i_blkbits);
708         length = iobuf->length;
709
710         for (i = 0, length = iobuf->length; length > 0;
711              length -= pga[i].count, offset += pga[i].count, i++) { /*i last!*/
712                 pga[i].pg = iobuf->maplist[i];
713                 pga[i].off = offset;
714                 /* To the end of the page, or the length, whatever is less */
715                 pga[i].count = min_t(int, PAGE_SIZE - (offset & ~PAGE_MASK),
716                                      length);
717                 pga[i].flag = flags;
718                 if (rw == READ) {
719                         //POISON(kmap(iobuf->maplist[i]), 0xc5, PAGE_SIZE);
720                         //kunmap(iobuf->maplist[i]);
721                 }
722         }
723
724         oa.o_id = lsm->lsm_object_id;
725         oa.o_valid = OBD_MD_FLID;
726         obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
727                                     OBD_MD_FLMTIME | OBD_MD_FLCTIME);
728
729         if (rw == WRITE)
730                 lprocfs_counter_add(ll_i2sbi(inode)->ll_stats,
731                                     LPROC_LL_DIRECT_WRITE, iobuf->length);
732         else
733                 lprocfs_counter_add(ll_i2sbi(inode)->ll_stats,
734                                     LPROC_LL_DIRECT_READ, iobuf->length);
735         rc = obd_brw_async(rw == WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
736                            ll_i2obdconn(inode), &oa, lsm, iobuf->nr_pages, pga,
737                            set, NULL);
738         if (rc) {
739                 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
740                        "error from obd_brw_async: rc = %d\n", rc);
741         } else {
742                 rc = ptlrpc_set_wait(set);
743                 if (rc)
744                         CERROR("error from callback: rc = %d\n", rc);
745         }
746         ptlrpc_set_destroy(set);
747         if (rc == 0)
748                 rc = iobuf->length;
749
750         OBD_FREE(pga, sizeof(*pga) * iobuf->nr_pages);
751         RETURN(rc);
752 }
753 #endif
754
755 //#endif
756
757 struct address_space_operations ll_aops = {
758         readpage: ll_readpage,
759 #if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
760         direct_IO: ll_direct_IO,
761 #endif
762         writepage: ll_writepage,
763         sync_page: block_sync_page,
764         prepare_write: ll_prepare_write,
765         commit_write: ll_commit_write,
766         bmap: NULL
767 //#endif
768 };