Whamcloud - gitweb
merge b_devel into HEAD (20030703)
[fs/lustre-release.git] / lustre / llite / rw.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Lustre Lite I/O Page Cache
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #include <linux/config.h>
25 #include <linux/kernel.h>
26 #include <linux/mm.h>
27 #include <linux/string.h>
28 #include <linux/stat.h>
29 #include <linux/errno.h>
30 #include <linux/smp_lock.h>
31 #include <linux/unistd.h>
32 #include <linux/version.h>
33 #include <asm/system.h>
34 #include <asm/uaccess.h>
35 #include "llite_internal.h"
36
37 #include <linux/fs.h>
38 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
39 #include <linux/buffer_head.h>
40 #else
41 #include <linux/iobuf.h>
42 #endif
43 #include <linux/stat.h>
44 #include <asm/uaccess.h>
45 #include <asm/segment.h>
46 #include <linux/mm.h>
47 #include <linux/pagemap.h>
48 #include <linux/smp_lock.h>
49
50 #define DEBUG_SUBSYSTEM S_LLITE
51
52 #include <linux/lustre_mds.h>
53 #include <linux/lustre_lite.h>
54 #include <linux/lustre_lib.h>
55 #include <linux/lustre_compat25.h>
56
57 /*
58  * Remove page from dirty list
59  */
60 static void __set_page_clean(struct page *page)
61 {
62         struct address_space *mapping = page->mapping;
63         struct inode *inode;
64
65         if (!mapping)
66                 return;
67
68         PGCACHE_WRLOCK(mapping);
69
70         list_del(&page->list);
71         list_add(&page->list, &mapping->clean_pages);
72
73         /* XXX doesn't inode_lock protect i_state ? */
74         inode = mapping->host;
75         if (list_empty(&mapping->dirty_pages)) {
76                 CDEBUG(D_INODE, "inode clean\n");
77                 inode->i_state &= ~I_DIRTY_PAGES;
78         }
79
80         PGCACHE_WRUNLOCK(mapping);
81         EXIT;
82 }
83
84 void set_page_clean(struct page *page)
85 {
86         if (PageDirty(page)) {
87                 ClearPageDirty(page);
88                 __set_page_clean(page);
89         }
90 }
91
92 /* SYNCHRONOUS I/O to object storage for an inode */
93 static int ll_brw(int cmd, struct inode *inode, struct page *page, int flags)
94 {
95         struct ll_inode_info *lli = ll_i2info(inode);
96         struct lov_stripe_md *lsm = lli->lli_smd;
97         struct brw_page pg;
98         int rc;
99         ENTRY;
100
101         pg.pg = page;
102         pg.off = ((obd_off)page->index) << PAGE_SHIFT;
103
104         if (cmd == OBD_BRW_WRITE && (pg.off + PAGE_SIZE > inode->i_size))
105                 pg.count = inode->i_size % PAGE_SIZE;
106         else
107                 pg.count = PAGE_SIZE;
108
109         CDEBUG(D_PAGE, "%s %d bytes ino %lu at "LPU64"/"LPX64"\n",
110                cmd & OBD_BRW_WRITE ? "write" : "read", pg.count, inode->i_ino,
111                pg.off, pg.off);
112         if (pg.count == 0) {
113                 CERROR("ZERO COUNT: ino %lu: size %p:%Lu(%p:%Lu) idx %lu off "
114                        LPU64"\n",
115                        inode->i_ino, inode, inode->i_size, page->mapping->host,
116                        page->mapping->host->i_size, page->index, pg.off);
117         }
118
119         pg.flag = flags;
120
121         if (cmd == OBD_BRW_WRITE)
122                 lprocfs_counter_add(ll_i2sbi(inode)->ll_stats,
123                                     LPROC_LL_BRW_WRITE, pg.count);
124         else
125                 lprocfs_counter_add(ll_i2sbi(inode)->ll_stats,
126                                     LPROC_LL_BRW_READ, pg.count);
127         rc = obd_brw(cmd, ll_i2obdconn(inode), lsm, 1, &pg, NULL);
128         if (rc)
129                 CERROR("error from obd_brw: rc = %d\n", rc);
130
131         RETURN(rc);
132 }
133
134 /*
135  * we were asked to read a single page but we're going to try and read a batch
136  * of pages all at once.  this vaguely simulates 2.5's readpages.
137  */
138 static int ll_readpage(struct file *file, struct page *first_page)
139 {
140         struct inode *inode = first_page->mapping->host;
141         struct ll_inode_info *lli = ll_i2info(inode);
142         struct page *page = first_page;
143         struct list_head *pos;
144         struct brw_page *pgs;
145         unsigned long end_index, extent_end = 0;
146         struct ptlrpc_request_set *set;
147         int npgs = 0, rc = 0, max_pages;
148         ENTRY;
149
150         LASSERT(PageLocked(page));
151         LASSERT(!PageUptodate(page));
152         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),offset="LPX64"\n",
153                inode->i_ino, inode->i_generation, inode,
154                (((obd_off)page->index) << PAGE_SHIFT));
155         LASSERT(atomic_read(&file->f_dentry->d_inode->i_count) > 0);
156
157         if (inode->i_size <= ((obd_off)page->index) << PAGE_SHIFT) {
158                 CERROR("reading beyond EOF\n");
159                 memset(kmap(page), 0, PAGE_SIZE);
160                 kunmap(page);
161                 SetPageUptodate(page);
162                 unlock_page(page);
163                 RETURN(rc);
164         }
165
166         /* try to read the file's preferred block size in a one-er */
167         end_index = first_page->index +
168                 (inode->i_blksize >> PAGE_CACHE_SHIFT);
169         if (end_index > (inode->i_size >> PAGE_CACHE_SHIFT))
170                 end_index = inode->i_size >> PAGE_CACHE_SHIFT;
171
172         max_pages = ((end_index - first_page->index) << PAGE_CACHE_SHIFT) >>
173                 PAGE_SHIFT;
174         pgs = kmalloc(max_pages * sizeof(*pgs), GFP_USER);
175         if (pgs == NULL)
176                 RETURN(-ENOMEM);
177
178         /*
179          * find how far we're allowed to read under the extent ll_file_read
180          * is passing us..
181          */
182         spin_lock(&lli->lli_read_extent_lock);
183         list_for_each(pos, &lli->lli_read_extents) {
184                 struct ll_read_extent *rextent;
185                 rextent = list_entry(pos, struct ll_read_extent, re_lli_item);
186                 if (rextent->re_task != current)
187                         continue;
188
189                 if (rextent->re_extent.end + PAGE_SIZE < rextent->re_extent.end)
190                         /* extent wrapping */
191                         extent_end = ~0;
192                 else {
193                         extent_end = (rextent->re_extent.end + PAGE_SIZE)
194                                                         << PAGE_CACHE_SHIFT;
195                         /* 32bit indexes, 64bit extents.. */
196                         if (((u64)extent_end >> PAGE_CACHE_SHIFT) <
197                                         rextent->re_extent.end)
198                                 extent_end = ~0;
199                 }
200                 break;
201         }
202         spin_unlock(&lli->lli_read_extent_lock);
203
204         if (extent_end == 0) {
205                 static long next_print;
206                 if (time_after(jiffies, next_print)) {
207                         next_print = jiffies + 30 * HZ;
208                         CDEBUG(D_INODE, "mmap readpage - check locks\n");
209                 }
210                 end_index = page->index + 1;
211         } else if (extent_end < end_index)
212                 end_index = extent_end;
213
214         /* to balance the find_get_page ref the other pages get that is
215          * decrefed on teardown.. */
216         page_cache_get(page);
217         do {
218                 unsigned long index ;
219
220                 pgs[npgs].pg = page;
221                 pgs[npgs].off = ((obd_off)page->index) << PAGE_CACHE_SHIFT;
222                 pgs[npgs].flag = 0;
223                 pgs[npgs].count = PAGE_SIZE;
224                 /* XXX Workaround for BA OSTs returning short reads at EOF.
225                  * The linux OST will return the full page, zero-filled at the
226                  * end, which will just overwrite the data we set here.  Bug
227                  * 593 relates to fixing this properly.
228                  */
229                 if (inode->i_size < pgs[npgs].off + PAGE_SIZE) {
230                         int count = inode->i_size - pgs[npgs].off;
231                         void *addr = kmap(page);
232                         pgs[npgs].count = count;
233                         //POISON(addr, 0x7c, count);
234                         memset(addr + count, 0, PAGE_SIZE - count);
235                         kunmap(page);
236                 }
237
238                 npgs++;
239                 if (npgs == max_pages)
240                         break;
241
242                 /*
243                  * find pages ahead of us that we can read in.
244                  * grab_cache_page waits on pages that are locked so
245                  * we first try find_get_page, which doesn't.  this stops
246                  * the worst case behaviour of racing threads waiting on
247                  * each other, but doesn't remove it entirely.
248                  */
249                 for (index = page->index + 1, page = NULL;
250                      page == NULL && index < end_index; index++) {
251
252                         /* see if the page already exists and needs updating */
253                         page = find_get_page(inode->i_mapping, index);
254                         if (page) {
255                                 if (Page_Uptodate(page) || TryLockPage(page))
256                                         goto out_release;
257                                 if (!page->mapping || Page_Uptodate(page))
258                                         goto out_unlock;
259                         } else {
260                                 /* ok, we have to create it.. */
261                                 page = grab_cache_page(inode->i_mapping, index);
262                                 if (page == NULL)
263                                         continue;
264                                 if (Page_Uptodate(page))
265                                         goto out_unlock;
266                         }
267
268                         break;
269
270                 out_unlock:
271                         unlock_page(page);
272                 out_release:
273                         page_cache_release(page);
274                         page = NULL;
275                 }
276
277         } while (page);
278
279         set = ptlrpc_prep_set();
280         if (set == NULL) {
281                 CERROR("ENOMEM allocing request set\n");
282                 rc = -ENOMEM;
283         } else {
284                 rc = obd_brw_async(OBD_BRW_READ, ll_i2obdconn(inode),
285                                    ll_i2info(inode)->lli_smd, npgs, pgs,
286                                    set, NULL);
287                 if (rc == 0)
288                         rc = ptlrpc_set_wait(set);
289                 ptlrpc_set_destroy(set);
290                 if (rc && rc != -EIO)
291                         CERROR("error from obd_brw_async: rc = %d\n", rc);
292         }
293
294         while (npgs-- > 0) {
295                 page = pgs[npgs].pg;
296
297                 if (rc == 0)
298                         SetPageUptodate(page);
299                 unlock_page(page);
300                 page_cache_release(page);
301         }
302
303         kfree(pgs);
304         RETURN(rc);
305 } /* ll_readpage */
306
307 /* this isn't where truncate starts.   roughly:
308  * sys_truncate->ll_setattr_raw->vmtruncate->ll_truncate
309  * we grab the lock back in setattr_raw to avoid races. */
310 void ll_truncate(struct inode *inode)
311 {
312         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
313         struct obdo oa = {0};
314         int err;
315         ENTRY;
316         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
317                inode->i_generation, inode);
318
319         if (!lsm) {
320                 /* object not yet allocated */
321                 inode->i_mtime = inode->i_ctime = CURRENT_TIME;
322                 EXIT;
323                 return;
324         }
325
326         /* vmtruncate will just throw away our dirty pages, make sure
327          * we don't think they're still dirty, being careful to round
328          * i_size to the first whole page that was tossed */
329         err = ll_clear_dirty_pages(ll_i2obdconn(inode), lsm,
330                         (inode->i_size + PAGE_CACHE_SIZE-1) >> PAGE_CACHE_SHIFT,
331                         ~0);
332
333         oa.o_id = lsm->lsm_object_id;
334         oa.o_mode = inode->i_mode;
335         oa.o_valid = OBD_MD_FLID | OBD_MD_FLMODE | OBD_MD_FLTYPE;
336
337         CDEBUG(D_INFO, "calling punch for "LPX64" (all bytes after %Lu)\n",
338                oa.o_id, inode->i_size);
339
340         /* truncate == punch from new size to absolute end of file */
341         err = obd_punch(ll_i2obdconn(inode), &oa, lsm, inode->i_size,
342                         OBD_OBJECT_EOF, NULL);
343         if (err)
344                 CERROR("obd_truncate fails (%d) ino %lu\n", err, inode->i_ino);
345         else
346                 obdo_to_inode(inode, &oa, oa.o_valid);
347
348         EXIT;
349         return;
350 } /* ll_truncate */
351
352 //#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
353
354 static int ll_prepare_write(struct file *file, struct page *page, unsigned from,
355                             unsigned to)
356 {
357         struct inode *inode = page->mapping->host;
358         struct ll_inode_info *lli = ll_i2info(inode);
359         struct lov_stripe_md *lsm = lli->lli_smd;
360         obd_off offset = ((obd_off)page->index) << PAGE_SHIFT;
361         struct brw_page pg;
362         int rc = 0;
363         ENTRY;
364
365         if (!PageLocked(page))
366                 LBUG();
367
368         if (PageUptodate(page))
369                 RETURN(0);
370
371         //POISON(addr + from, 0xca, to - from);
372
373         /* Check to see if we should return -EIO right away */
374         pg.pg = page;
375         pg.off = offset;
376         pg.count = PAGE_SIZE;
377         pg.flag = 0;
378         rc = obd_brw(OBD_BRW_CHECK, ll_i2obdconn(inode), lsm, 1, &pg, NULL);
379         if (rc)
380                 RETURN(rc);
381
382         /* We're completely overwriting an existing page, so _don't_ set it up
383          * to date until commit_write */
384         if (from == 0 && to == PAGE_SIZE)
385                 RETURN(0);
386
387         /* If are writing to a new page, no need to read old data.
388          * the extent locking and getattr procedures in ll_file_write have
389          * guaranteed that i_size is stable enough for our zeroing needs */
390         if (inode->i_size <= offset) {
391                 memset(kmap(page), 0, PAGE_SIZE);
392                 kunmap(page);
393                 GOTO(prepare_done, rc = 0);
394         }
395
396         rc = ll_brw(OBD_BRW_READ, inode, page, 0);
397
398         EXIT;
399  prepare_done:
400         if (rc == 0)
401                 SetPageUptodate(page);
402
403         return rc;
404 }
405
406 /*
407  * background file writeback.  This is called regularly from kupdated to write
408  * dirty data, from kswapd when memory is low, and from filemap_fdatasync when
409  * super blocks or inodes are synced..
410  *
411  * obd_brw errors down in _batch_writepage are ignored, so pages are always
412  * unlocked.  Also, there is nobody to return an error code to from here - the
413  * application may not even be running anymore.
414  *
415  * this should be async so that things like kswapd can have a chance to
416  * free some more pages that our allocating writeback may need, but it isn't
417  * yet.
418  */
419 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
420 static unsigned long ll_local_cache_dirty_pages;
421 static unsigned long ll_max_dirty_pages = 20 * 1024 * 1024 / PAGE_SIZE;
422
423 static spinlock_t ll_local_cache_page_count_lock = SPIN_LOCK_UNLOCKED;
424
425 int ll_rd_dirty_pages(char *page, char **start, off_t off, int count, int *eof,
426                       void *data)
427 {
428         unsigned long dirty_count;
429         spin_lock(&ll_local_cache_page_count_lock);
430         dirty_count = ll_local_cache_dirty_pages;
431         spin_unlock(&ll_local_cache_page_count_lock);
432         return snprintf(page, count, "%lu\n", dirty_count);
433 }
434
435 int ll_rd_max_dirty_pages(char *page, char **start, off_t off, int count,
436                           int *eof, void *data)
437 {
438         unsigned long max_dirty;
439         spin_lock(&ll_local_cache_page_count_lock);
440         max_dirty = ll_max_dirty_pages;
441         spin_unlock(&ll_local_cache_page_count_lock);
442         return snprintf(page, count, "%lu\n", max_dirty);
443 }
444
445 int ll_wr_max_dirty_pages(struct file *file, const char *buffer,
446                           unsigned long count, void *data)
447 {
448         unsigned long max_dirty;
449         signed long max_dirty_signed;
450         char kernbuf[20], *end;
451         
452         if (count > (sizeof(kernbuf) - 1))
453                 return -EINVAL;
454
455         if (copy_from_user(kernbuf, buffer, count))
456                 return -EFAULT;
457
458         kernbuf[count] = '\0';
459
460         max_dirty_signed = simple_strtol(kernbuf, &end, 0);
461         if (kernbuf == end)
462                 return -EINVAL;
463         max_dirty = (unsigned long)max_dirty_signed;
464
465 #if 0
466         if (max_dirty < ll_local_cache_dirty_pages)
467                 flush_to_new_max_dirty();
468 #endif
469
470         spin_lock(&ll_local_cache_page_count_lock);
471         CDEBUG(D_CACHE, "changing max_dirty from %lu to %lu\n",
472                ll_max_dirty_pages, max_dirty);
473         ll_max_dirty_pages = max_dirty;
474         spin_unlock(&ll_local_cache_page_count_lock);
475         return count;
476 }
477
478 static int ll_local_cache_full(void)
479 {
480         int full = 0;
481         spin_lock(&ll_local_cache_page_count_lock);
482         if (ll_max_dirty_pages &&
483             ll_local_cache_dirty_pages >= ll_max_dirty_pages) {
484                 full = 1;
485         }
486         spin_unlock(&ll_local_cache_page_count_lock);
487         /* XXX instrument? */
488         /* XXX trigger async writeback when full, or 75% of full? */
489         return full;
490 }
491
492 static void ll_local_cache_flushed_pages(unsigned long pgcount)
493 {
494         unsigned long dirty_count;
495         spin_lock(&ll_local_cache_page_count_lock);
496         dirty_count = ll_local_cache_dirty_pages;
497         ll_local_cache_dirty_pages -= pgcount;
498         CDEBUG(D_CACHE, "dirty pages: %lu->%lu)\n",
499                dirty_count, ll_local_cache_dirty_pages);
500         spin_unlock(&ll_local_cache_page_count_lock);
501         LASSERT(dirty_count >= pgcount);
502 }
503
504 static void ll_local_cache_dirtied_pages(unsigned long pgcount)
505 {
506         unsigned long dirty_count;
507         spin_lock(&ll_local_cache_page_count_lock);
508         dirty_count = ll_local_cache_dirty_pages;
509         ll_local_cache_dirty_pages += pgcount;
510         CDEBUG(D_CACHE, "dirty pages: %lu->%lu\n",
511                dirty_count, ll_local_cache_dirty_pages);
512         spin_unlock(&ll_local_cache_page_count_lock);
513         /* XXX track maximum cached, report to lprocfs */
514 }
515
516 int ll_clear_dirty_pages(struct lustre_handle *conn, struct lov_stripe_md *lsm,
517                          unsigned long start, unsigned long end)
518 {
519         unsigned long cleared;
520         int rc;
521
522         ENTRY;
523         rc = obd_clear_dirty_pages(conn, lsm, start, end, &cleared);
524         if (!rc)
525                 ll_local_cache_flushed_pages(cleared);
526         RETURN(rc);
527 }
528
529 int ll_mark_dirty_page(struct lustre_handle *conn, struct lov_stripe_md *lsm,
530                        unsigned long index)
531 {
532         int rc;
533
534         ENTRY;
535         if (ll_local_cache_full())
536                 RETURN(-EDQUOT);
537
538         rc = obd_mark_page_dirty(conn, lsm, index);
539         if (!rc)
540                 ll_local_cache_dirtied_pages(1);
541         RETURN(rc);
542 }
543
544 static int ll_writepage(struct page *page)
545 {
546         struct inode *inode = page->mapping->host;
547         ENTRY;
548
549         CDEBUG(D_CACHE, "page %p [lau %d] inode %p\n", page,
550                         PageLaunder(page), inode);
551         LASSERT(PageLocked(page));
552
553         /* XXX should obd_brw errors trickle up? */
554         ll_batch_writepage(inode, page);
555         RETURN(0);
556 }
557
558 /*
559  * we really don't want to start writeback here, we want to give callers some
560  * time to further dirty the pages before we write them out.
561  */
562 static int ll_commit_write(struct file *file, struct page *page,
563                            unsigned from, unsigned to)
564 {
565         struct inode *inode = page->mapping->host;
566         loff_t size;
567         int rc = 0;
568         ENTRY;
569
570         LASSERT(inode == file->f_dentry->d_inode);
571         LASSERT(PageLocked(page));
572
573         CDEBUG(D_INODE, "inode %p is writing page %p from %d to %d at %lu\n",
574                inode, page, from, to, page->index);
575         if (!PageDirty(page)) {
576                 lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats,
577                                      LPROC_LL_DIRTY_MISSES);
578                 rc = ll_mark_dirty_page(ll_i2obdconn(inode),
579                                         ll_i2info(inode)->lli_smd,
580                                         page->index);
581                 if (rc < 0 && rc != -EDQUOT)
582                         RETURN(rc); /* XXX lproc counter here? */
583         } else {
584                 lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats,
585                                      LPROC_LL_DIRTY_HITS);
586         }
587
588         size = (((obd_off)page->index) << PAGE_SHIFT) + to;
589         if (size > inode->i_size)
590                 inode->i_size = size;
591
592         SetPageUptodate(page);
593         set_page_dirty(page);
594
595         /* This means that we've hit either the local cache limit or the limit
596          * of the OST's grant. */
597         if (rc == -EDQUOT) {
598                 int rc = ll_batch_writepage(inode, page);
599                 lock_page(page); /* caller expects to unlock */
600                 RETURN(rc);
601         }
602
603         RETURN(0);
604 } /* ll_commit_write */
605 #else
606 static int ll_writepage(struct page *page,
607                         struct writeback_control *wbc)
608 {
609
610         return 0;
611 }
612 static int ll_commit_write(struct file *file, struct page *page,
613                            unsigned from, unsigned to)
614 {
615         return 0;
616 }
617 #endif
618
619 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
620 static int ll_direct_IO(int rw, struct inode *inode, struct kiobuf *iobuf,
621                         unsigned long blocknr, int blocksize)
622 {
623         struct ll_inode_info *lli = ll_i2info(inode);
624         struct lov_stripe_md *lsm = lli->lli_smd;
625         struct brw_page *pga;
626         struct ptlrpc_request_set *set;
627         int length, i, flags, rc = 0;
628         loff_t offset;
629         ENTRY;
630
631         if (!lsm || !lsm->lsm_object_id)
632                 RETURN(-ENOMEM);
633
634         if ((iobuf->offset & (blocksize - 1)) ||
635             (iobuf->length & (blocksize - 1)))
636                 RETURN(-EINVAL);
637
638         set = ptlrpc_prep_set();
639         if (set == NULL)
640                 RETURN(-ENOMEM);
641
642         OBD_ALLOC(pga, sizeof(*pga) * iobuf->nr_pages);
643         if (!pga) {
644                 ptlrpc_set_destroy(set);
645                 RETURN(-ENOMEM);
646         }
647
648         flags = (rw == WRITE ? OBD_BRW_CREATE : 0) /* | OBD_BRW_DIRECTIO */;
649         offset = ((obd_off)blocknr << inode->i_blkbits);
650         length = iobuf->length;
651
652         for (i = 0, length = iobuf->length; length > 0;
653              length -= pga[i].count, offset += pga[i].count, i++) { /*i last!*/
654                 pga[i].pg = iobuf->maplist[i];
655                 pga[i].off = offset;
656                 /* To the end of the page, or the length, whatever is less */
657                 pga[i].count = min_t(int, PAGE_SIZE - (offset & ~PAGE_MASK),
658                                      length);
659                 pga[i].flag = flags;
660                 if (rw == READ) {
661                         //POISON(kmap(iobuf->maplist[i]), 0xc5, PAGE_SIZE);
662                         //kunmap(iobuf->maplist[i]);
663                 }
664         }
665
666         if (rw == WRITE)
667                 lprocfs_counter_add(ll_i2sbi(inode)->ll_stats,
668                                     LPROC_LL_DIRECT_WRITE, iobuf->length);
669         else
670                 lprocfs_counter_add(ll_i2sbi(inode)->ll_stats,
671                                     LPROC_LL_DIRECT_READ, iobuf->length);
672         rc = obd_brw_async(rw == WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
673                            ll_i2obdconn(inode), lsm, iobuf->nr_pages, pga, set,
674                            NULL);
675         if (rc) {
676                 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
677                        "error from obd_brw_async: rc = %d\n", rc);
678         } else {
679                 rc = ptlrpc_set_wait(set);
680                 if (rc)
681                         CERROR("error from callback: rc = %d\n", rc);
682         }
683         ptlrpc_set_destroy(set);
684         if (rc == 0)
685                 rc = iobuf->length;
686
687         OBD_FREE(pga, sizeof(*pga) * iobuf->nr_pages);
688         RETURN(rc);
689 }
690 #endif
691
692 //#endif
693
694 struct address_space_operations ll_aops = {
695         readpage: ll_readpage,
696 #if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
697         direct_IO: ll_direct_IO,
698 #endif
699         writepage: ll_writepage,
700         sync_page: block_sync_page,
701         prepare_write: ll_prepare_write,
702         commit_write: ll_commit_write,
703         bmap: NULL
704 //#endif
705 };