Whamcloud - gitweb
Branch b1_8
[fs/lustre-release.git] / lustre / llite / rw.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Lustre Lite I/O page cache routines shared by different kernel revs
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23 #ifndef AUTOCONF_INCLUDED
24 #include <linux/config.h>
25 #endif
26 #include <linux/kernel.h>
27 #include <linux/mm.h>
28 #include <linux/string.h>
29 #include <linux/stat.h>
30 #include <linux/errno.h>
31 #include <linux/smp_lock.h>
32 #include <linux/unistd.h>
33 #include <linux/version.h>
34 #include <asm/system.h>
35 #include <asm/uaccess.h>
36
37 #include <linux/fs.h>
38 #include <linux/stat.h>
39 #include <asm/uaccess.h>
40 #include <linux/mm.h>
41 #include <linux/pagemap.h>
42 #include <linux/smp_lock.h>
43
44 #define DEBUG_SUBSYSTEM S_LLITE
45
46 #include <lustre_lite.h>
47 #include "llite_internal.h"
48 #include <linux/lustre_compat25.h>
49
50 #ifndef list_for_each_prev_safe
51 #define list_for_each_prev_safe(pos, n, head) \
52         for (pos = (head)->prev, n = pos->prev; pos != (head); \
53                 pos = n, n = pos->prev )
54 #endif
55
56 cfs_mem_cache_t *ll_async_page_slab = NULL;
57 size_t ll_async_page_slab_size = 0;
58
59 /* SYNCHRONOUS I/O to object storage for an inode */
60 static int ll_brw(int cmd, struct inode *inode, struct obdo *oa,
61                   struct page *page, int flags)
62 {
63         struct ll_inode_info *lli = ll_i2info(inode);
64         struct lov_stripe_md *lsm = lli->lli_smd;
65         struct obd_info oinfo = { { { 0 } } };
66         struct brw_page pg;
67         int rc;
68         ENTRY;
69
70         pg.pg = page;
71         pg.off = ((obd_off)page->index) << CFS_PAGE_SHIFT;
72
73         if ((cmd & OBD_BRW_WRITE) && (pg.off+CFS_PAGE_SIZE>i_size_read(inode)))
74                 pg.count = i_size_read(inode) % CFS_PAGE_SIZE;
75         else
76                 pg.count = CFS_PAGE_SIZE;
77
78         LL_CDEBUG_PAGE(D_PAGE, page, "%s %d bytes ino %lu at "LPU64"/"LPX64"\n",
79                        cmd & OBD_BRW_WRITE ? "write" : "read", pg.count,
80                        inode->i_ino, pg.off, pg.off);
81         if (pg.count == 0) {
82                 CERROR("ZERO COUNT: ino %lu: size %p:%Lu(%p:%Lu) idx %lu off "
83                        LPU64"\n", inode->i_ino, inode, i_size_read(inode),
84                        page->mapping->host, i_size_read(page->mapping->host),
85                        page->index, pg.off);
86         }
87
88         pg.flag = flags;
89
90         if (cmd & OBD_BRW_WRITE)
91                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_BRW_WRITE,
92                                    pg.count);
93         else
94                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_BRW_READ,
95                            pg.count);
96         oinfo.oi_oa = oa;
97         oinfo.oi_md = lsm;
98         rc = obd_brw(cmd, ll_i2obdexp(inode), &oinfo, 1, &pg, NULL);
99         if (rc == 0)
100                 obdo_to_inode(inode, oa, OBD_MD_FLBLOCKS);
101         else if (rc != -EIO)
102                 CERROR("error from obd_brw: rc = %d\n", rc);
103         RETURN(rc);
104 }
105
106 int ll_file_punch(struct inode * inode, loff_t new_size, int srvlock)
107 {
108         struct ll_inode_info *lli = ll_i2info(inode);
109         struct obd_info oinfo = { { { 0 } } };
110         struct obdo oa;
111         int rc;
112
113         ENTRY;
114         CDEBUG(D_INFO, "calling punch for "LPX64" (new size %Lu=%#Lx)\n",
115                lli->lli_smd->lsm_object_id, new_size, new_size);
116
117         oinfo.oi_md = lli->lli_smd;
118         oinfo.oi_policy.l_extent.start = new_size;
119         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
120         oinfo.oi_oa = &oa;
121         oa.o_id = lli->lli_smd->lsm_object_id;
122         oa.o_valid = OBD_MD_FLID;
123         if (srvlock) {
124                 /* set OBD_MD_FLFLAGS in o_valid, only if we 
125                  * set OBD_FL_TRUNCLOCK, otherwise ost_punch
126                  * and filter_setattr get confused, see the comment
127                  * in ost_punch */
128                 oa.o_flags = OBD_FL_TRUNCLOCK;
129                 oa.o_valid |= OBD_MD_FLFLAGS;
130         }
131         obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLMODE |OBD_MD_FLFID|
132                         OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME |
133                         OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLGENER |
134                         OBD_MD_FLBLOCKS);
135         rc = obd_punch_rqset(ll_i2obdexp(inode), &oinfo, NULL);
136         if (rc) {
137                 CERROR("obd_truncate fails (%d) ino %lu\n", rc, inode->i_ino);
138                 RETURN(rc);
139         }
140         obdo_to_inode(inode, &oa, OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
141                       OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME);
142         RETURN(0);
143 }
144 /* this isn't where truncate starts.   roughly:
145  * sys_truncate->ll_setattr_raw->vmtruncate->ll_truncate. setattr_raw grabs
146  * DLM lock on [size, EOF], i_mutex, ->lli_size_sem, and WRITE_I_ALLOC_SEM to
147  * avoid races.
148  *
149  * must be called under ->lli_size_sem */
150 void ll_truncate(struct inode *inode)
151 {
152         struct ll_inode_info *lli = ll_i2info(inode);
153         int srvlock = test_bit(LLI_F_SRVLOCK, &lli->lli_flags);
154         loff_t new_size;
155         ENTRY;
156         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) to %Lu=%#Lx\n",inode->i_ino,
157                inode->i_generation, inode, i_size_read(inode), i_size_read(inode));
158
159         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_TRUNC, 1);
160         if (lli->lli_size_sem_owner != current) {
161                 EXIT;
162                 return;
163         }
164
165         if (!lli->lli_smd) {
166                 CDEBUG(D_INODE, "truncate on inode %lu with no objects\n",
167                        inode->i_ino);
168                 GOTO(out_unlock, 0);
169         }
170
171         LASSERT(atomic_read(&lli->lli_size_sem.count) <= 0);
172
173         if (!srvlock) {
174                 struct ost_lvb lvb;
175                 int rc;
176
177                 /* XXX I'm pretty sure this is a hack to paper over a more fundamental
178                  * race condition. */
179                 lov_stripe_lock(lli->lli_smd);
180                 inode_init_lvb(inode, &lvb);
181                 rc = obd_merge_lvb(ll_i2obdexp(inode), lli->lli_smd, &lvb, 0);
182                 inode->i_blocks = lvb.lvb_blocks;
183                 if (lvb.lvb_size == i_size_read(inode) && rc == 0) {
184                         CDEBUG(D_VFSTRACE, "skipping punch for obj "LPX64", %Lu=%#Lx\n",
185                                lli->lli_smd->lsm_object_id, i_size_read(inode),
186                                i_size_read(inode));
187                         lov_stripe_unlock(lli->lli_smd);
188                         GOTO(out_unlock, 0);
189                 }
190
191                 obd_adjust_kms(ll_i2obdexp(inode), lli->lli_smd,
192                                i_size_read(inode), 1);
193                 lov_stripe_unlock(lli->lli_smd);
194         }
195
196         if (unlikely((ll_i2sbi(inode)->ll_flags & LL_SBI_LLITE_CHECKSUM) &&
197                      (i_size_read(inode) & ~CFS_PAGE_MASK))) {
198                 /* If the truncate leaves a partial page, update its checksum */
199                 struct page *page = find_get_page(inode->i_mapping,
200                                                   i_size_read(inode) >>
201                                                   CFS_PAGE_SHIFT);
202                 if (page != NULL) {
203                         struct ll_async_page *llap = llap_cast_private(page);
204                         if (llap != NULL) {
205                                 char *kaddr = kmap_atomic(page, KM_USER0);
206                                 llap->llap_checksum =
207                                         init_checksum(OSC_DEFAULT_CKSUM);
208                                 llap->llap_checksum =
209                                         compute_checksum(llap->llap_checksum,
210                                                          kaddr, CFS_PAGE_SIZE,
211                                                          OSC_DEFAULT_CKSUM);
212                                 kunmap_atomic(kaddr, KM_USER0);
213                         }
214                         page_cache_release(page);
215                 }
216         }
217
218         new_size = i_size_read(inode);
219         ll_inode_size_unlock(inode, 0);
220         if (!srvlock)
221                 ll_file_punch(inode, new_size, 0);
222         else
223                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LOCKLESS_TRUNC, 1);
224
225         EXIT;
226         return;
227
228  out_unlock:
229         ll_inode_size_unlock(inode, 0);
230 } /* ll_truncate */
231
232 int ll_prepare_write(struct file *file, struct page *page, unsigned from,
233                      unsigned to)
234 {
235         struct inode *inode = page->mapping->host;
236         struct ll_inode_info *lli = ll_i2info(inode);
237         struct lov_stripe_md *lsm = lli->lli_smd;
238         obd_off offset = ((obd_off)page->index) << CFS_PAGE_SHIFT;
239         struct obd_info oinfo = { { { 0 } } };
240         struct brw_page pga;
241         struct obdo oa;
242         struct ost_lvb lvb;
243         int rc = 0;
244         ENTRY;
245
246         LASSERT(PageLocked(page));
247         (void)llap_cast_private(page); /* assertion */
248
249         /* Check to see if we should return -EIO right away */
250         pga.pg = page;
251         pga.off = offset;
252         pga.count = CFS_PAGE_SIZE;
253         pga.flag = 0;
254
255         oa.o_mode = inode->i_mode;
256         oa.o_id = lsm->lsm_object_id;
257         oa.o_valid = OBD_MD_FLID | OBD_MD_FLMODE | OBD_MD_FLTYPE;
258         obdo_from_inode(&oa, inode, OBD_MD_FLFID | OBD_MD_FLGENER);
259
260         oinfo.oi_oa = &oa;
261         oinfo.oi_md = lsm;
262         rc = obd_brw(OBD_BRW_CHECK, ll_i2obdexp(inode), &oinfo, 1, &pga, NULL);
263         if (rc)
264                 RETURN(rc);
265
266         if (PageUptodate(page)) {
267                 LL_CDEBUG_PAGE(D_PAGE, page, "uptodate\n");
268                 RETURN(0);
269         }
270
271         /* We're completely overwriting an existing page, so _don't_ set it up
272          * to date until commit_write */
273         if (from == 0 && to == CFS_PAGE_SIZE) {
274                 LL_CDEBUG_PAGE(D_PAGE, page, "full page write\n");
275                 POISON_PAGE(page, 0x11);
276                 RETURN(0);
277         }
278
279         /* If are writing to a new page, no need to read old data.  The extent
280          * locking will have updated the KMS, and for our purposes here we can
281          * treat it like i_size. */
282         lov_stripe_lock(lsm);
283         inode_init_lvb(inode, &lvb);
284         obd_merge_lvb(ll_i2obdexp(inode), lsm, &lvb, 1);
285         lov_stripe_unlock(lsm);
286         if (lvb.lvb_size <= offset) {
287                 char *kaddr = kmap_atomic(page, KM_USER0);
288                 LL_CDEBUG_PAGE(D_PAGE, page, "kms "LPU64" <= offset "LPU64"\n",
289                                lvb.lvb_size, offset);
290                 memset(kaddr, 0, CFS_PAGE_SIZE);
291                 kunmap_atomic(kaddr, KM_USER0);
292                 GOTO(prepare_done, rc = 0);
293         }
294
295         /* XXX could be an async ocp read.. read-ahead? */
296         rc = ll_brw(OBD_BRW_READ, inode, &oa, page, 0);
297         if (rc == 0) {
298                 /* bug 1598: don't clobber blksize */
299                 oa.o_valid &= ~(OBD_MD_FLSIZE | OBD_MD_FLBLKSZ);
300                 obdo_refresh_inode(inode, &oa, oa.o_valid);
301         }
302
303         EXIT;
304  prepare_done:
305         if (rc == 0)
306                 SetPageUptodate(page);
307
308         return rc;
309 }
310
311 /**
312  * make page ready for ASYNC write
313  * \param data - pointer to llap cookie
314  * \param cmd - is OBD_BRW_* macroses
315  *
316  * \retval 0 is page successfully prepared to send
317  * \retval -EAGAIN is page not need to send
318  */
319 static int ll_ap_make_ready(void *data, int cmd)
320 {
321         struct ll_async_page *llap;
322         struct page *page;
323         ENTRY;
324
325         llap = LLAP_FROM_COOKIE(data);
326         page = llap->llap_page;
327
328         /* we're trying to write, but the page is locked.. come back later */
329         if (TryLockPage(page))
330                 RETURN(-EAGAIN);
331
332         LASSERTF(!(cmd & OBD_BRW_READ) || !PageWriteback(page),
333                 "cmd %x page %p ino %lu index %lu fl %lx\n", cmd, page,
334                  page->mapping->host->i_ino, page->index, page->flags);
335
336         /* if we left PageDirty we might get another writepage call
337          * in the future.  list walkers are bright enough
338          * to check page dirty so we can leave it on whatever list
339          * its on.  XXX also, we're called with the cli list so if
340          * we got the page cache list we'd create a lock inversion
341          * with the removepage path which gets the page lock then the
342          * cli lock */
343         if(!clear_page_dirty_for_io(page)) {
344                 unlock_page(page);
345                 RETURN(-EAGAIN);
346         }
347
348         /* This actually clears the dirty bit in the radix tree.*/
349         set_page_writeback(page);
350
351         LL_CDEBUG_PAGE(D_PAGE, page, "made ready\n");
352         page_cache_get(page);
353
354         RETURN(0);
355 }
356
357 /* We have two reasons for giving llite the opportunity to change the
358  * write length of a given queued page as it builds the RPC containing
359  * the page:
360  *
361  * 1) Further extending writes may have landed in the page cache
362  *    since a partial write first queued this page requiring us
363  *    to write more from the page cache.  (No further races are possible, since
364  *    by the time this is called, the page is locked.)
365  * 2) We might have raced with truncate and want to avoid performing
366  *    write RPCs that are just going to be thrown away by the
367  *    truncate's punch on the storage targets.
368  *
369  * The kms serves these purposes as it is set at both truncate and extending
370  * writes.
371  */
372 static int ll_ap_refresh_count(void *data, int cmd)
373 {
374         struct ll_inode_info *lli;
375         struct ll_async_page *llap;
376         struct lov_stripe_md *lsm;
377         struct page *page;
378         struct inode *inode;
379         struct ost_lvb lvb;
380         __u64 kms;
381         ENTRY;
382
383         /* readpage queues with _COUNT_STABLE, shouldn't get here. */
384         LASSERT(cmd != OBD_BRW_READ);
385
386         llap = LLAP_FROM_COOKIE(data);
387         page = llap->llap_page;
388         inode = page->mapping->host;
389         lli = ll_i2info(inode);
390         lsm = lli->lli_smd;
391
392         lov_stripe_lock(lsm);
393         inode_init_lvb(inode, &lvb);
394         obd_merge_lvb(ll_i2obdexp(inode), lsm, &lvb, 1);
395         kms = lvb.lvb_size;
396         lov_stripe_unlock(lsm);
397
398         /* catch race with truncate */
399         if (((__u64)page->index << CFS_PAGE_SHIFT) >= kms)
400                 return 0;
401
402         /* catch sub-page write at end of file */
403         if (((__u64)page->index << CFS_PAGE_SHIFT) + CFS_PAGE_SIZE > kms)
404                 return kms % CFS_PAGE_SIZE;
405
406         return CFS_PAGE_SIZE;
407 }
408
409 void ll_inode_fill_obdo(struct inode *inode, int cmd, struct obdo *oa)
410 {
411         struct lov_stripe_md *lsm;
412         obd_flag valid_flags;
413
414         lsm = ll_i2info(inode)->lli_smd;
415
416         oa->o_id = lsm->lsm_object_id;
417         oa->o_valid = OBD_MD_FLID;
418         valid_flags = OBD_MD_FLTYPE | OBD_MD_FLATIME;
419         if (cmd & OBD_BRW_WRITE) {
420                 oa->o_valid |= OBD_MD_FLEPOCH;
421                 oa->o_easize = ll_i2info(inode)->lli_io_epoch;
422
423                 valid_flags |= OBD_MD_FLMTIME | OBD_MD_FLCTIME |
424                         OBD_MD_FLUID | OBD_MD_FLGID |
425                         OBD_MD_FLFID | OBD_MD_FLGENER;
426         }
427
428         obdo_from_inode(oa, inode, valid_flags);
429 }
430
431 static void ll_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
432 {
433         struct ll_async_page *llap;
434         ENTRY;
435
436         llap = LLAP_FROM_COOKIE(data);
437         ll_inode_fill_obdo(llap->llap_page->mapping->host, cmd, oa);
438
439         EXIT;
440 }
441
442 static void ll_ap_update_obdo(void *data, int cmd, struct obdo *oa,
443                               obd_valid valid)
444 {
445         struct ll_async_page *llap;
446         ENTRY;
447
448         llap = LLAP_FROM_COOKIE(data);
449         obdo_from_inode(oa, llap->llap_page->mapping->host, valid);
450
451         EXIT;
452 }
453
454 static struct obd_async_page_ops ll_async_page_ops = {
455         .ap_make_ready =        ll_ap_make_ready,
456         .ap_refresh_count =     ll_ap_refresh_count,
457         .ap_fill_obdo =         ll_ap_fill_obdo,
458         .ap_update_obdo =       ll_ap_update_obdo,
459         .ap_completion =        ll_ap_completion,
460 };
461
462 struct ll_async_page *llap_cast_private(struct page *page)
463 {
464         struct ll_async_page *llap = (struct ll_async_page *)page_private(page);
465
466         LASSERTF(llap == NULL || llap->llap_magic == LLAP_MAGIC,
467                  "page %p private %lu gave magic %d which != %d\n",
468                  page, page_private(page), llap->llap_magic, LLAP_MAGIC);
469
470         return llap;
471 }
472
473 /* Try to shrink the page cache for the @sbi filesystem by 1/@shrink_fraction.
474  *
475  * There is an llap attached onto every page in lustre, linked off @sbi.
476  * We add an llap to the list so we don't lose our place during list walking.
477  * If llaps in the list are being moved they will only move to the end
478  * of the LRU, and we aren't terribly interested in those pages here (we
479  * start at the beginning of the list where the least-used llaps are.
480  */
481 int llap_shrink_cache(struct ll_sb_info *sbi, int shrink_fraction)
482 {
483         struct ll_async_page *llap, dummy_llap = { .llap_magic = 0xd11ad11a };
484         unsigned long total, want, count = 0;
485
486         total = sbi->ll_async_page_count;
487
488         /* There can be a large number of llaps (600k or more in a large
489          * memory machine) so the VM 1/6 shrink ratio is likely too much.
490          * Since we are freeing pages also, we don't necessarily want to
491          * shrink so much.  Limit to 40MB of pages + llaps per call. */
492         if (shrink_fraction == 0)
493                 want = sbi->ll_async_page_count - sbi->ll_async_page_max + 32;
494         else
495                 want = (total + shrink_fraction - 1) / shrink_fraction;
496
497         if (want > 40 << (20 - CFS_PAGE_SHIFT))
498                 want = 40 << (20 - CFS_PAGE_SHIFT);
499
500         CDEBUG(D_CACHE, "shrinking %lu of %lu pages (1/%d)\n",
501                want, total, shrink_fraction);
502
503         spin_lock(&sbi->ll_lock);
504         list_add(&dummy_llap.llap_pglist_item, &sbi->ll_pglist);
505
506         while (--total >= 0 && count < want) {
507                 struct page *page;
508                 int keep;
509
510                 if (unlikely(need_resched())) {
511                         spin_unlock(&sbi->ll_lock);
512                         cond_resched();
513                         spin_lock(&sbi->ll_lock);
514                 }
515
516                 llap = llite_pglist_next_llap(sbi,&dummy_llap.llap_pglist_item);
517                 list_del_init(&dummy_llap.llap_pglist_item);
518                 if (llap == NULL)
519                         break;
520
521                 page = llap->llap_page;
522                 LASSERT(page != NULL);
523
524                 list_add(&dummy_llap.llap_pglist_item, &llap->llap_pglist_item);
525
526                 /* Page needs/undergoing IO */
527                 if (TryLockPage(page)) {
528                         LL_CDEBUG_PAGE(D_PAGE, page, "can't lock\n");
529                         continue;
530                 }
531
532                keep = (llap->llap_write_queued || PageDirty(page) ||
533                       PageWriteback(page) || (!PageUptodate(page) &&
534                       llap->llap_origin != LLAP_ORIGIN_READAHEAD));
535
536                 LL_CDEBUG_PAGE(D_PAGE, page,"%s LRU page: %s%s%s%s%s origin %s\n",
537                                keep ? "keep" : "drop",
538                                llap->llap_write_queued ? "wq " : "",
539                                PageDirty(page) ? "pd " : "",
540                                PageUptodate(page) ? "" : "!pu ",
541                                PageWriteback(page) ? "wb" : "",
542                                llap->llap_defer_uptodate ? "" : "!du",
543                                llap_origins[llap->llap_origin]);
544
545                 /* If page is dirty or undergoing IO don't discard it */
546                 if (keep) {
547                         unlock_page(page);
548                         continue;
549                 }
550
551                 page_cache_get(page);
552                 spin_unlock(&sbi->ll_lock);
553
554                 if (page->mapping != NULL) {
555                         ll_teardown_mmaps(page->mapping,
556                                          (__u64)page->index << CFS_PAGE_SHIFT,
557                                          ((__u64)page->index << CFS_PAGE_SHIFT)|
558                                           ~CFS_PAGE_MASK);
559                         if (!PageDirty(page) && !page_mapped(page)) {
560                                 ll_ra_accounting(llap, page->mapping);
561                                 ll_truncate_complete_page(page);
562                                 ++count;
563                         } else {
564                                 LL_CDEBUG_PAGE(D_PAGE, page, "Not dropping page"
565                                                              " because it is "
566                                                              "%s\n",
567                                                               PageDirty(page)?
568                                                               "dirty":"mapped");
569                         }
570                 }
571                 unlock_page(page);
572                 page_cache_release(page);
573
574                 spin_lock(&sbi->ll_lock);
575         }
576         list_del(&dummy_llap.llap_pglist_item);
577         spin_unlock(&sbi->ll_lock);
578
579         CDEBUG(D_CACHE, "shrank %lu/%lu and left %lu unscanned\n",
580                count, want, total);
581
582         return count;
583 }
584
585 static struct ll_async_page *llap_from_page_with_lockh(struct page *page,
586                                                        unsigned origin,
587                                                        struct lustre_handle *lockh)
588 {
589         struct ll_async_page *llap;
590         struct obd_export *exp;
591         struct inode *inode = page->mapping->host;
592         struct ll_sb_info *sbi;
593         int rc;
594         ENTRY;
595
596         if (!inode) {
597                 static int triggered;
598
599                 if (!triggered) {
600                         LL_CDEBUG_PAGE(D_ERROR, page, "Bug 10047. Wrong anon "
601                                        "page received\n");
602                         libcfs_debug_dumpstack(NULL);
603                         triggered = 1;
604                 }
605                 RETURN(ERR_PTR(-EINVAL));
606         }
607         sbi = ll_i2sbi(inode);
608         LASSERT(ll_async_page_slab);
609         LASSERTF(origin < LLAP__ORIGIN_MAX, "%u\n", origin);
610
611         llap = llap_cast_private(page);
612         if (llap != NULL) {
613                 /* move to end of LRU list, except when page is just about to
614                  * die */
615                 if (origin != LLAP_ORIGIN_REMOVEPAGE) {
616                         spin_lock(&sbi->ll_lock);
617                         sbi->ll_pglist_gen++;
618                         list_del_init(&llap->llap_pglist_item);
619                         list_add_tail(&llap->llap_pglist_item, &sbi->ll_pglist);
620                         spin_unlock(&sbi->ll_lock);
621                 }
622                 GOTO(out, llap);
623         }
624
625         exp = ll_i2obdexp(page->mapping->host);
626         if (exp == NULL)
627                 RETURN(ERR_PTR(-EINVAL));
628
629         /* limit the number of lustre-cached pages */
630         if (sbi->ll_async_page_count >= sbi->ll_async_page_max)
631                 llap_shrink_cache(sbi, 0);
632
633         OBD_SLAB_ALLOC(llap, ll_async_page_slab, CFS_ALLOC_STD,
634                        ll_async_page_slab_size);
635         if (llap == NULL)
636                 RETURN(ERR_PTR(-ENOMEM));
637         llap->llap_magic = LLAP_MAGIC;
638         llap->llap_cookie = (void *)llap + size_round(sizeof(*llap));
639
640         /* XXX: for bug 11270 - check for lockless origin here! */
641         if (origin == LLAP_ORIGIN_LOCKLESS_IO)
642                 llap->llap_nocache = 1;
643
644         rc = obd_prep_async_page(exp, ll_i2info(inode)->lli_smd, NULL, page,
645                                  (obd_off)page->index << CFS_PAGE_SHIFT,
646                                  &ll_async_page_ops, llap, &llap->llap_cookie,
647                                  llap->llap_nocache, lockh);
648         if (rc) {
649                 OBD_SLAB_FREE(llap, ll_async_page_slab,
650                               ll_async_page_slab_size);
651                 RETURN(ERR_PTR(rc));
652         }
653
654         CDEBUG(D_CACHE, "llap %p page %p cookie %p obj off "LPU64"\n", llap,
655                page, llap->llap_cookie, (obd_off)page->index << CFS_PAGE_SHIFT);
656         /* also zeroing the PRIVBITS low order bitflags */
657         __set_page_ll_data(page, llap);
658         llap->llap_page = page;
659
660         spin_lock(&sbi->ll_lock);
661         sbi->ll_pglist_gen++;
662         sbi->ll_async_page_count++;
663         list_add_tail(&llap->llap_pglist_item, &sbi->ll_pglist);
664         spin_unlock(&sbi->ll_lock);
665
666  out:
667         if (unlikely(sbi->ll_flags & LL_SBI_LLITE_CHECKSUM)) {
668                 __u32 csum;
669                 char *kaddr = kmap_atomic(page, KM_USER0);
670                 csum = init_checksum(OSC_DEFAULT_CKSUM);
671                 csum = compute_checksum(csum, kaddr, CFS_PAGE_SIZE,
672                                         OSC_DEFAULT_CKSUM);
673                 kunmap_atomic(kaddr, KM_USER0);
674                 if (origin == LLAP_ORIGIN_READAHEAD ||
675                     origin == LLAP_ORIGIN_READPAGE ||
676                     origin == LLAP_ORIGIN_LOCKLESS_IO) {
677                         llap->llap_checksum = 0;
678                 } else if (origin == LLAP_ORIGIN_COMMIT_WRITE ||
679                            llap->llap_checksum == 0) {
680                         llap->llap_checksum = csum;
681                         CDEBUG(D_PAGE, "page %p cksum %x\n", page, csum);
682                 } else if (llap->llap_checksum == csum) {
683                         /* origin == LLAP_ORIGIN_WRITEPAGE */
684                         CDEBUG(D_PAGE, "page %p cksum %x confirmed\n",
685                                page, csum);
686                 } else {
687                         /* origin == LLAP_ORIGIN_WRITEPAGE */
688                         LL_CDEBUG_PAGE(D_ERROR, page, "old cksum %x != new "
689                                        "%x!\n", llap->llap_checksum, csum);
690                 }
691         }
692
693         llap->llap_origin = origin;
694         RETURN(llap);
695 }
696
697 static inline struct ll_async_page *llap_from_page(struct page *page,
698                                                    unsigned origin)
699 {
700         return llap_from_page_with_lockh(page, origin, NULL);
701 }
702
703 static int queue_or_sync_write(struct obd_export *exp, struct inode *inode,
704                                struct ll_async_page *llap,
705                                unsigned to, obd_flag async_flags)
706 {
707         unsigned long size_index = i_size_read(inode) >> CFS_PAGE_SHIFT;
708         struct obd_io_group *oig;
709         struct ll_sb_info *sbi = ll_i2sbi(inode);
710         int rc, noquot = llap->llap_ignore_quota ? OBD_BRW_NOQUOTA : 0;
711         ENTRY;
712
713         /* _make_ready only sees llap once we've unlocked the page */
714         llap->llap_write_queued = 1;
715         rc = obd_queue_async_io(exp, ll_i2info(inode)->lli_smd, NULL,
716                                 llap->llap_cookie, OBD_BRW_WRITE | noquot,
717                                 0, 0, 0, async_flags);
718         if (rc == 0) {
719                 LL_CDEBUG_PAGE(D_PAGE, llap->llap_page, "write queued\n");
720                 llap_write_pending(inode, llap);
721                 GOTO(out, 0);
722         }
723
724         llap->llap_write_queued = 0;
725
726         rc = oig_init(&oig);
727         if (rc)
728                 GOTO(out, rc);
729
730         /* make full-page requests if we are not at EOF (bug 4410) */
731         if (to != CFS_PAGE_SIZE && llap->llap_page->index < size_index) {
732                 LL_CDEBUG_PAGE(D_PAGE, llap->llap_page,
733                                "sync write before EOF: size_index %lu, to %d\n",
734                                size_index, to);
735                 to = CFS_PAGE_SIZE;
736         } else if (to != CFS_PAGE_SIZE && llap->llap_page->index == size_index){
737                 int size_to = i_size_read(inode) & ~CFS_PAGE_MASK;
738                 LL_CDEBUG_PAGE(D_PAGE, llap->llap_page,
739                                "sync write at EOF: size_index %lu, to %d/%d\n",
740                                size_index, to, size_to);
741                 if (to < size_to)
742                         to = size_to;
743         }
744
745         /* compare the checksum once before the page leaves llite */
746         if (unlikely((sbi->ll_flags & LL_SBI_LLITE_CHECKSUM) &&
747                      llap->llap_checksum != 0)) {
748                 __u32 csum;
749                 struct page *page = llap->llap_page;
750                 char *kaddr = kmap_atomic(page, KM_USER0);
751                 csum = init_checksum(OSC_DEFAULT_CKSUM);
752                 csum = compute_checksum(csum, kaddr, CFS_PAGE_SIZE,
753                                         OSC_DEFAULT_CKSUM);
754                 kunmap_atomic(kaddr, KM_USER0);
755                 if (llap->llap_checksum == csum) {
756                         CDEBUG(D_PAGE, "page %p cksum %x confirmed\n",
757                                page, csum);
758                 } else {
759                         CERROR("page %p old cksum %x != new cksum %x!\n",
760                                page, llap->llap_checksum, csum);
761                 }
762         }
763
764         rc = obd_queue_group_io(exp, ll_i2info(inode)->lli_smd, NULL, oig,
765                                 llap->llap_cookie, OBD_BRW_WRITE | noquot,
766                                 0, to, 0, ASYNC_READY | ASYNC_URGENT |
767                                 ASYNC_COUNT_STABLE | ASYNC_GROUP_SYNC);
768         if (rc)
769                 GOTO(free_oig, rc);
770
771         rc = obd_trigger_group_io(exp, ll_i2info(inode)->lli_smd, NULL, oig);
772         if (rc)
773                 GOTO(free_oig, rc);
774
775         rc = oig_wait(oig);
776
777         if (!rc && async_flags & ASYNC_READY) {
778                 unlock_page(llap->llap_page);
779                 if (PageWriteback(llap->llap_page))
780                         end_page_writeback(llap->llap_page);
781         }
782
783         LL_CDEBUG_PAGE(D_PAGE, llap->llap_page, "sync write returned %d\n", rc);
784
785 free_oig:
786         oig_release(oig);
787 out:
788         RETURN(rc);
789 }
790
791 /* update our write count to account for i_size increases that may have
792  * happened since we've queued the page for io. */
793
794 /* be careful not to return success without setting the page Uptodate or
795  * the next pass through prepare_write will read in stale data from disk. */
796 int ll_commit_write(struct file *file, struct page *page, unsigned from,
797                     unsigned to)
798 {
799         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
800         struct inode *inode = page->mapping->host;
801         struct ll_inode_info *lli = ll_i2info(inode);
802         struct lov_stripe_md *lsm = lli->lli_smd;
803         struct obd_export *exp;
804         struct ll_async_page *llap;
805         loff_t size;
806         struct lustre_handle *lockh = NULL;
807         int rc = 0;
808         ENTRY;
809
810         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
811         LASSERT(inode == file->f_dentry->d_inode);
812         LASSERT(PageLocked(page));
813
814         CDEBUG(D_INODE, "inode %p is writing page %p from %d to %d at %lu\n",
815                inode, page, from, to, page->index);
816
817         if (fd->fd_flags & LL_FILE_GROUP_LOCKED)
818                 lockh = &fd->fd_cwlockh;
819
820         llap = llap_from_page_with_lockh(page, LLAP_ORIGIN_COMMIT_WRITE, lockh);
821         if (IS_ERR(llap))
822                 RETURN(PTR_ERR(llap));
823
824         exp = ll_i2obdexp(inode);
825         if (exp == NULL)
826                 RETURN(-EINVAL);
827
828         llap->llap_ignore_quota = capable(CAP_SYS_RESOURCE);
829
830         /* queue a write for some time in the future the first time we
831          * dirty the page */
832         if (!PageDirty(page)) {
833                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_DIRTY_MISSES, 1);
834
835                 rc = queue_or_sync_write(exp, inode, llap, to, 0);
836                 if (rc)
837                         GOTO(out, rc);
838         } else {
839                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_DIRTY_HITS, 1);
840         }
841
842         /* put the page in the page cache, from now on ll_removepage is
843          * responsible for cleaning up the llap.
844          * only set page dirty when it's queued to be write out */
845         if (llap->llap_write_queued)
846                 set_page_dirty(page);
847
848 out:
849         size = (((obd_off)page->index) << CFS_PAGE_SHIFT) + to;
850         ll_inode_size_lock(inode, 0);
851         if (rc == 0) {
852                 lov_stripe_lock(lsm);
853                 obd_adjust_kms(exp, lsm, size, 0);
854                 lov_stripe_unlock(lsm);
855                 if (size > i_size_read(inode))
856                         i_size_write(inode, size);
857                 SetPageUptodate(page);
858         } else if (size > i_size_read(inode)) {
859                 /* this page beyond the pales of i_size, so it can't be
860                  * truncated in ll_p_r_e during lock revoking. we must
861                  * teardown our book-keeping here. */
862                 ll_removepage(page);
863         }
864         ll_inode_size_unlock(inode, 0);
865         RETURN(rc);
866 }
867
868 static unsigned long ll_ra_count_get(struct ll_sb_info *sbi, unsigned long len)
869 {
870         struct ll_ra_info *ra = &sbi->ll_ra_info;
871         unsigned long ret;
872         ENTRY;
873
874         spin_lock(&sbi->ll_lock);
875         ret = min(ra->ra_max_pages - ra->ra_cur_pages, len);
876         ra->ra_cur_pages += ret;
877         spin_unlock(&sbi->ll_lock);
878
879         RETURN(ret);
880 }
881
882 static void ll_ra_count_put(struct ll_sb_info *sbi, unsigned long len)
883 {
884         struct ll_ra_info *ra = &sbi->ll_ra_info;
885         spin_lock(&sbi->ll_lock);
886         LASSERTF(ra->ra_cur_pages >= len, "r_c_p %lu len %lu\n",
887                  ra->ra_cur_pages, len);
888         ra->ra_cur_pages -= len;
889         spin_unlock(&sbi->ll_lock);
890 }
891
892 /* called for each page in a completed rpc.*/
893 int ll_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
894 {
895         struct ll_async_page *llap;
896         struct page *page;
897         int ret = 0;
898         ENTRY;
899
900         llap = LLAP_FROM_COOKIE(data);
901         page = llap->llap_page;
902         LASSERT(PageLocked(page));
903         LASSERT(CheckWriteback(page,cmd));
904
905         LL_CDEBUG_PAGE(D_PAGE, page, "completing cmd %d with %d\n", cmd, rc);
906
907         if (cmd & OBD_BRW_READ && llap->llap_defer_uptodate)
908                 ll_ra_count_put(ll_i2sbi(page->mapping->host), 1);
909
910         if (rc == 0)  {
911                 if (cmd & OBD_BRW_READ) {
912                         if (!llap->llap_defer_uptodate)
913                                 SetPageUptodate(page);
914                 } else {
915                         llap->llap_write_queued = 0;
916                 }
917                 ClearPageError(page);
918         } else {
919                 if (cmd & OBD_BRW_READ) {
920                         llap->llap_defer_uptodate = 0;
921                 }
922                 SetPageError(page);
923 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
924                 if (rc == -ENOSPC)
925                         set_bit(AS_ENOSPC, &page->mapping->flags);
926                 else
927                         set_bit(AS_EIO, &page->mapping->flags);
928 #else
929                 page->mapping->gfp_mask |= AS_EIO_MASK;
930 #endif
931         }
932
933         /* be carefull about clear WB.
934          * if WB will cleared after page lock is released - paralel IO can be
935          * started before ap_make_ready is finished - so we will be have page
936          * with PG_Writeback set from ->writepage() and completed READ which
937          * clear this flag */
938         if ((cmd & OBD_BRW_WRITE) && PageWriteback(page))
939                 end_page_writeback(page);
940
941         unlock_page(page);
942
943         if (cmd & OBD_BRW_WRITE) {
944                 llap_write_complete(page->mapping->host, llap);
945                 ll_try_done_writing(page->mapping->host);
946         }
947
948         page_cache_release(page);
949
950         RETURN(ret);
951 }
952
953 static void __ll_put_llap(struct page *page)
954 {
955         struct inode *inode = page->mapping->host;
956         struct obd_export *exp;
957         struct ll_async_page *llap;
958         struct ll_sb_info *sbi = ll_i2sbi(inode);
959         int rc;
960         ENTRY;
961
962         exp = ll_i2obdexp(inode);
963         if (exp == NULL) {
964                 CERROR("page %p ind %lu gave null export\n", page, page->index);
965                 EXIT;
966                 return;
967         }
968
969         llap = llap_from_page(page, LLAP_ORIGIN_REMOVEPAGE);
970         if (IS_ERR(llap)) {
971                 CERROR("page %p ind %lu couldn't find llap: %ld\n", page,
972                        page->index, PTR_ERR(llap));
973                 EXIT;
974                 return;
975         }
976
977         //llap_write_complete(inode, llap);
978         rc = obd_teardown_async_page(exp, ll_i2info(inode)->lli_smd, NULL,
979                                      llap->llap_cookie);
980         if (rc != 0)
981                 CERROR("page %p ind %lu failed: %d\n", page, page->index, rc);
982
983         /* this unconditional free is only safe because the page lock
984          * is providing exclusivity to memory pressure/truncate/writeback..*/
985         __clear_page_ll_data(page);
986
987         spin_lock(&sbi->ll_lock);
988         if (!list_empty(&llap->llap_pglist_item))
989                 list_del_init(&llap->llap_pglist_item);
990         sbi->ll_pglist_gen++;
991         sbi->ll_async_page_count--;
992         spin_unlock(&sbi->ll_lock);
993         OBD_SLAB_FREE(llap, ll_async_page_slab, ll_async_page_slab_size);
994
995         EXIT;
996 }
997
998 /* the kernel calls us here when a page is unhashed from the page cache.
999  * the page will be locked and the kernel is holding a spinlock, so
1000  * we need to be careful.  we're just tearing down our book-keeping
1001  * here. */
1002 void ll_removepage(struct page *page)
1003 {
1004         struct ll_async_page *llap = llap_cast_private(page);
1005         ENTRY;
1006
1007         LASSERT(!in_interrupt());
1008
1009         /* sync pages or failed read pages can leave pages in the page
1010          * cache that don't have our data associated with them anymore */
1011         if (page_private(page) == 0) {
1012                 EXIT;
1013                 return;
1014         }
1015
1016         LASSERT(!llap->llap_lockless_io_page);
1017         LASSERT(!llap->llap_nocache);
1018
1019         LL_CDEBUG_PAGE(D_PAGE, page, "being evicted\n");
1020         __ll_put_llap(page);
1021
1022         EXIT;
1023 }
1024
1025 static int ll_issue_page_read(struct obd_export *exp,
1026                               struct ll_async_page *llap,
1027                               struct obd_io_group *oig, int defer)
1028 {
1029         struct page *page = llap->llap_page;
1030         int rc;
1031
1032         page_cache_get(page);
1033         llap->llap_defer_uptodate = defer;
1034         llap->llap_ra_used = 0;
1035         rc = obd_queue_group_io(exp, ll_i2info(page->mapping->host)->lli_smd,
1036                                 NULL, oig, llap->llap_cookie, OBD_BRW_READ, 0,
1037                                 CFS_PAGE_SIZE, 0, ASYNC_COUNT_STABLE | ASYNC_READY |
1038                                               ASYNC_URGENT);
1039         if (rc) {
1040                 LL_CDEBUG_PAGE(D_ERROR, page, "read queue failed: rc %d\n", rc);
1041                 page_cache_release(page);
1042         }
1043         RETURN(rc);
1044 }
1045
1046 static void ll_ra_stats_inc_unlocked(struct ll_ra_info *ra, enum ra_stat which)
1047 {
1048         LASSERTF(which >= 0 && which < _NR_RA_STAT, "which: %u\n", which);
1049         ra->ra_stats[which]++;
1050 }
1051
1052 static void ll_ra_stats_inc(struct address_space *mapping, enum ra_stat which)
1053 {
1054         struct ll_sb_info *sbi = ll_i2sbi(mapping->host);
1055         struct ll_ra_info *ra = &ll_i2sbi(mapping->host)->ll_ra_info;
1056
1057         spin_lock(&sbi->ll_lock);
1058         ll_ra_stats_inc_unlocked(ra, which);
1059         spin_unlock(&sbi->ll_lock);
1060 }
1061
1062 void ll_ra_accounting(struct ll_async_page *llap, struct address_space *mapping)
1063 {
1064         if (!llap->llap_defer_uptodate || llap->llap_ra_used)
1065                 return;
1066
1067         ll_ra_stats_inc(mapping, RA_STAT_DISCARDED);
1068 }
1069
1070 #define RAS_CDEBUG(ras) \
1071         CDEBUG(D_READA,                                                      \
1072                "lrp %lu cr %lu cp %lu ws %lu wl %lu nra %lu r %lu ri %lu"    \
1073                "csr %lu sf %lu sp %lu sl %lu \n",                            \
1074                ras->ras_last_readpage, ras->ras_consecutive_requests,        \
1075                ras->ras_consecutive_pages, ras->ras_window_start,            \
1076                ras->ras_window_len, ras->ras_next_readahead,                 \
1077                ras->ras_requests, ras->ras_request_index,                    \
1078                ras->ras_consecutive_stride_requests, ras->ras_stride_offset, \
1079                ras->ras_stride_pages, ras->ras_stride_length)
1080
1081 static int index_in_window(unsigned long index, unsigned long point,
1082                            unsigned long before, unsigned long after)
1083 {
1084         unsigned long start = point - before, end = point + after;
1085
1086         if (start > point)
1087                start = 0;
1088         if (end < point)
1089                end = ~0;
1090
1091         return start <= index && index <= end;
1092 }
1093
1094 static struct ll_readahead_state *ll_ras_get(struct file *f)
1095 {
1096         struct ll_file_data       *fd;
1097
1098         fd = LUSTRE_FPRIVATE(f);
1099         return &fd->fd_ras;
1100 }
1101
1102 void ll_ra_read_in(struct file *f, struct ll_ra_read *rar)
1103 {
1104         struct ll_readahead_state *ras;
1105
1106         ras = ll_ras_get(f);
1107
1108         spin_lock(&ras->ras_lock);
1109         ras->ras_requests++;
1110         ras->ras_request_index = 0;
1111         ras->ras_consecutive_requests++;
1112         rar->lrr_reader = current;
1113
1114         list_add(&rar->lrr_linkage, &ras->ras_read_beads);
1115         spin_unlock(&ras->ras_lock);
1116 }
1117
1118 void ll_ra_read_ex(struct file *f, struct ll_ra_read *rar)
1119 {
1120         struct ll_readahead_state *ras;
1121
1122         ras = ll_ras_get(f);
1123
1124         spin_lock(&ras->ras_lock);
1125         list_del_init(&rar->lrr_linkage);
1126         spin_unlock(&ras->ras_lock);
1127 }
1128
1129 static struct ll_ra_read *ll_ra_read_get_locked(struct ll_readahead_state *ras)
1130 {
1131         struct ll_ra_read *scan;
1132
1133         list_for_each_entry(scan, &ras->ras_read_beads, lrr_linkage) {
1134                 if (scan->lrr_reader == current)
1135                         return scan;
1136         }
1137         return NULL;
1138 }
1139
1140 struct ll_ra_read *ll_ra_read_get(struct file *f)
1141 {
1142         struct ll_readahead_state *ras;
1143         struct ll_ra_read         *bead;
1144
1145         ras = ll_ras_get(f);
1146
1147         spin_lock(&ras->ras_lock);
1148         bead = ll_ra_read_get_locked(ras);
1149         spin_unlock(&ras->ras_lock);
1150         return bead;
1151 }
1152
1153 static int ll_read_ahead_page(struct obd_export *exp, struct obd_io_group *oig,
1154                               int index, struct address_space *mapping)
1155 {
1156         struct ll_async_page *llap;
1157         struct page *page;
1158         unsigned int gfp_mask = 0;
1159         int rc = 0;
1160
1161         gfp_mask = GFP_HIGHUSER & ~__GFP_WAIT;
1162 #ifdef __GFP_NOWARN
1163         gfp_mask |= __GFP_NOWARN;
1164 #endif
1165         page = grab_cache_page_nowait_gfp(mapping, index, gfp_mask);
1166         if (page == NULL) {
1167                 ll_ra_stats_inc(mapping, RA_STAT_FAILED_GRAB_PAGE);
1168                 CDEBUG(D_READA, "g_c_p_n failed\n");
1169                 return 0;
1170         }
1171
1172         /* Check if page was truncated or reclaimed */
1173         if (page->mapping != mapping) {
1174                 ll_ra_stats_inc(mapping, RA_STAT_WRONG_GRAB_PAGE);
1175                 CDEBUG(D_READA, "g_c_p_n returned invalid page\n");
1176                 GOTO(unlock_page, rc = 0);      
1177         }
1178
1179         /* we do this first so that we can see the page in the /proc
1180          * accounting */
1181         llap = llap_from_page(page, LLAP_ORIGIN_READAHEAD);
1182         if (IS_ERR(llap) || llap->llap_defer_uptodate) {
1183                 if (PTR_ERR(llap) == -ENOLCK) {
1184                         ll_ra_stats_inc(mapping, RA_STAT_FAILED_MATCH);
1185                         CDEBUG(D_READA | D_PAGE,
1186                                "Adding page to cache failed index "
1187                                 "%d\n", index);
1188                                 CDEBUG(D_READA, "nolock page\n");
1189                                 GOTO(unlock_page, rc = -ENOLCK);
1190                 }
1191                 CDEBUG(D_READA, "read-ahead page\n");
1192                 GOTO(unlock_page, rc = 0);      
1193         }
1194
1195         /* skip completed pages */
1196         if (Page_Uptodate(page))
1197                 GOTO(unlock_page, rc = 0);      
1198
1199         /* bail out when we hit the end of the lock. */
1200         rc = ll_issue_page_read(exp, llap, oig, 1);
1201         if (rc == 0) {
1202                 LL_CDEBUG_PAGE(D_READA | D_PAGE, page, "started read-ahead\n");
1203                 rc = 1;
1204         } else {
1205 unlock_page:    
1206                 unlock_page(page);
1207                 LL_CDEBUG_PAGE(D_READA | D_PAGE, page, "skipping read-ahead\n");
1208         }
1209         page_cache_release(page);
1210         return rc;
1211 }
1212
1213 /* ra_io_arg will be filled in the beginning of ll_readahead with
1214  * ras_lock, then the following ll_read_ahead_pages will read RA
1215  * pages according to this arg, all the items in this structure are
1216  * counted by page index.
1217  */
1218 struct ra_io_arg {
1219         unsigned long ria_start;  /* start offset of read-ahead*/
1220         unsigned long ria_end;    /* end offset of read-ahead*/
1221         /* If stride read pattern is detected, ria_stoff means where
1222          * stride read is started. Note: for normal read-ahead, the
1223          * value here is meaningless, and also it will not be accessed*/
1224         pgoff_t ria_stoff;
1225         /* ria_length and ria_pages are the length and pages length in the
1226          * stride I/O mode. And they will also be used to check whether
1227          * it is stride I/O read-ahead in the read-ahead pages*/
1228         unsigned long ria_length;
1229         unsigned long ria_pages;
1230 };
1231
1232 #define RIA_DEBUG(ria)                                                \
1233         CDEBUG(D_READA, "rs %lu re %lu ro %lu rl %lu rp %lu\n",       \
1234         ria->ria_start, ria->ria_end, ria->ria_stoff, ria->ria_length,\
1235         ria->ria_pages)
1236
1237 #define RAS_INCREASE_STEP (1024 * 1024 >> CFS_PAGE_SHIFT)
1238
1239 static inline int stride_io_mode(struct ll_readahead_state *ras)
1240 {
1241         return ras->ras_consecutive_stride_requests > 1;
1242 }
1243
1244 /* The function calculates how much pages will be read in
1245  * [off, off + length], which will be read by stride I/O mode,
1246  * stride_offset = st_off, stride_lengh = st_len,
1247  * stride_pages = st_pgs
1248  */
1249 static unsigned long
1250 stride_pg_count(pgoff_t st_off, unsigned long st_len, unsigned long st_pgs,
1251                 unsigned long off, unsigned length)
1252 {
1253         unsigned long cont_len = st_off > off ?  st_off - off : 0;
1254         __u64 stride_len = length + off > st_off ?
1255                            length + off + 1 - st_off : 0;
1256         unsigned long left, pg_count;
1257
1258         if (st_len == 0 || length == 0)
1259                 return length;
1260
1261         left = do_div(stride_len, st_len);
1262         left = min(left, st_pgs);
1263
1264         pg_count = left + stride_len * st_pgs + cont_len;
1265
1266         LASSERT(pg_count >= left);
1267
1268         CDEBUG(D_READA, "st_off %lu, st_len %lu st_pgs %lu off %lu length %u"
1269                "pgcount %lu\n", st_off, st_len, st_pgs, off, length, pg_count);
1270
1271         return pg_count;
1272 }
1273
1274 static int ria_page_count(struct ra_io_arg *ria)
1275 {
1276         __u64 length = ria->ria_end >= ria->ria_start ?
1277                        ria->ria_end - ria->ria_start + 1 : 0;
1278
1279         return stride_pg_count(ria->ria_stoff, ria->ria_length,
1280                                ria->ria_pages, ria->ria_start,
1281                                length);
1282 }
1283
1284 /*Check whether the index is in the defined ra-window */
1285 static int ras_inside_ra_window(unsigned long idx, struct ra_io_arg *ria)
1286 {
1287         /* If ria_length == ria_pages, it means non-stride I/O mode,
1288          * idx should always inside read-ahead window in this case
1289          * For stride I/O mode, just check whether the idx is inside
1290          * the ria_pages. */
1291         return ria->ria_length == 0 || ria->ria_length == ria->ria_pages ||
1292                (idx - ria->ria_stoff) % ria->ria_length < ria->ria_pages;
1293 }
1294
1295 static int ll_read_ahead_pages(struct obd_export *exp,
1296                                struct obd_io_group *oig,
1297                                struct ra_io_arg *ria,   
1298                                unsigned long *reserved_pages,
1299                                struct address_space *mapping,
1300                                unsigned long *ra_end)
1301 {
1302         int rc, count = 0, stride_ria;
1303         unsigned long page_idx;
1304
1305         LASSERT(ria != NULL);
1306         RIA_DEBUG(ria);
1307
1308         stride_ria = ria->ria_length > ria->ria_pages && ria->ria_pages > 0;
1309         for (page_idx = ria->ria_start; page_idx <= ria->ria_end &&
1310                         *reserved_pages > 0; page_idx++) {
1311                 if (ras_inside_ra_window(page_idx, ria)) {
1312                         /* If the page is inside the read-ahead window*/
1313                         rc = ll_read_ahead_page(exp, oig, page_idx, mapping);
1314                         if (rc == 1) {
1315                                 (*reserved_pages)--;
1316                                 count ++;
1317                         } else if (rc == -ENOLCK)
1318                                 break;
1319                 } else if (stride_ria) {
1320                         /* If it is not in the read-ahead window, and it is
1321                          * read-ahead mode, then check whether it should skip
1322                          * the stride gap */
1323                         pgoff_t offset;
1324                         /* FIXME: This assertion only is valid when it is for
1325                          * forward read-ahead, it will be fixed when backward
1326                          * read-ahead is implemented */
1327                         LASSERTF(page_idx > ria->ria_stoff, "since %lu in the"
1328                                 " gap of ra window,it should bigger than stride"
1329                                 " offset %lu \n", page_idx, ria->ria_stoff);
1330
1331                         offset = page_idx - ria->ria_stoff;
1332                         offset = offset % (ria->ria_length);
1333                         if (offset > ria->ria_pages) {
1334                                 page_idx += ria->ria_length - offset;
1335                                 CDEBUG(D_READA, "i %lu skip %lu \n", page_idx,
1336                                        ria->ria_length - offset);
1337                                 continue;
1338                         }
1339                 }
1340         }
1341         *ra_end = page_idx;
1342         return count;
1343 }
1344
1345 static int ll_readahead(struct ll_readahead_state *ras,
1346                          struct obd_export *exp, struct address_space *mapping,
1347                          struct obd_io_group *oig, int flags)
1348 {
1349         unsigned long start = 0, end = 0, reserved;
1350         unsigned long ra_end, len;
1351         struct inode *inode;
1352         struct lov_stripe_md *lsm;
1353         struct ll_ra_read *bead;
1354         struct ost_lvb lvb;
1355         struct ra_io_arg ria = { 0 };
1356         int ret = 0;
1357         __u64 kms;
1358         ENTRY;
1359
1360         inode = mapping->host;
1361         lsm = ll_i2info(inode)->lli_smd;
1362
1363         lov_stripe_lock(lsm);
1364         inode_init_lvb(inode, &lvb);
1365         obd_merge_lvb(ll_i2obdexp(inode), lsm, &lvb, 1);
1366         kms = lvb.lvb_size;
1367         lov_stripe_unlock(lsm);
1368         if (kms == 0) {
1369                 ll_ra_stats_inc(mapping, RA_STAT_ZERO_LEN);
1370                 RETURN(0);
1371         }
1372
1373         spin_lock(&ras->ras_lock);
1374         bead = ll_ra_read_get_locked(ras);
1375         /* Enlarge the RA window to encompass the full read */
1376         if (bead != NULL && ras->ras_window_start + ras->ras_window_len <
1377             bead->lrr_start + bead->lrr_count) {
1378                 ras->ras_window_len = bead->lrr_start + bead->lrr_count -
1379                                       ras->ras_window_start;
1380         }
1381         /* Reserve a part of the read-ahead window that we'll be issuing */
1382         if (ras->ras_window_len) {
1383                 start = ras->ras_next_readahead;
1384                 end = ras->ras_window_start + ras->ras_window_len - 1;
1385         }
1386         if (end != 0) {
1387                 /* Truncate RA window to end of file */
1388                 end = min(end, (unsigned long)((kms - 1) >> CFS_PAGE_SHIFT));
1389                 ras->ras_next_readahead = max(end, end + 1);
1390                 RAS_CDEBUG(ras);
1391         }
1392         ria.ria_start = start;
1393         ria.ria_end = end;
1394         /* If stride I/O mode is detected, get stride window*/
1395         if (stride_io_mode(ras)) {
1396                 ria.ria_stoff = ras->ras_stride_offset;
1397                 ria.ria_length = ras->ras_stride_length;
1398                 ria.ria_pages = ras->ras_stride_pages;
1399         }
1400         spin_unlock(&ras->ras_lock);
1401
1402         if (end == 0) {
1403                 ll_ra_stats_inc(mapping, RA_STAT_ZERO_WINDOW);
1404                 RETURN(0);
1405         }
1406
1407         len = ria_page_count(&ria);
1408         if (len == 0)
1409                 RETURN(0);
1410
1411         reserved = ll_ra_count_get(ll_i2sbi(inode), len);
1412         if (reserved < len)
1413                 ll_ra_stats_inc(mapping, RA_STAT_MAX_IN_FLIGHT);
1414
1415         CDEBUG(D_READA, "reserved page %lu \n", reserved);
1416         
1417         ret = ll_read_ahead_pages(exp, oig, &ria, &reserved, mapping, &ra_end);
1418
1419         LASSERTF(reserved >= 0, "reserved %lu\n", reserved);
1420         if (reserved != 0)
1421                 ll_ra_count_put(ll_i2sbi(inode), reserved);
1422
1423         if (ra_end == end + 1 && ra_end == (kms >> CFS_PAGE_SHIFT))
1424                 ll_ra_stats_inc(mapping, RA_STAT_EOF);
1425
1426         /* if we didn't get to the end of the region we reserved from
1427          * the ras we need to go back and update the ras so that the
1428          * next read-ahead tries from where we left off.  we only do so
1429          * if the region we failed to issue read-ahead on is still ahead
1430          * of the app and behind the next index to start read-ahead from */
1431         CDEBUG(D_READA, "ra_end %lu end %lu stride end %lu \n",
1432                ra_end, end, ria.ria_end);
1433
1434         if (ra_end != (end + 1)) {
1435                 spin_lock(&ras->ras_lock);
1436                 if (ra_end < ras->ras_next_readahead &&
1437                     index_in_window(ra_end, ras->ras_window_start, 0,
1438                                     ras->ras_window_len)) {
1439                         ras->ras_next_readahead = ra_end;
1440                         RAS_CDEBUG(ras);
1441                 }
1442                 spin_unlock(&ras->ras_lock);
1443         }
1444
1445         RETURN(ret);
1446 }
1447
1448 static void ras_set_start(struct ll_readahead_state *ras, unsigned long index)
1449 {
1450         ras->ras_window_start = index & (~(RAS_INCREASE_STEP - 1));
1451 }
1452
1453 /* called with the ras_lock held or from places where it doesn't matter */
1454 static void ras_reset(struct ll_readahead_state *ras, unsigned long index)
1455 {
1456         ras->ras_last_readpage = index;
1457         ras->ras_consecutive_requests = 0;
1458         ras->ras_consecutive_pages = 0;
1459         ras->ras_window_len = 0;
1460         ras_set_start(ras, index);
1461         ras->ras_next_readahead = max(ras->ras_window_start, index);
1462
1463         RAS_CDEBUG(ras);
1464 }
1465
1466 /* called with the ras_lock held or from places where it doesn't matter */
1467 static void ras_stride_reset(struct ll_readahead_state *ras)
1468 {
1469         ras->ras_consecutive_stride_requests = 0;
1470         RAS_CDEBUG(ras);
1471 }
1472
1473 void ll_readahead_init(struct inode *inode, struct ll_readahead_state *ras)
1474 {
1475         spin_lock_init(&ras->ras_lock);
1476         ras_reset(ras, 0);
1477         ras->ras_requests = 0;
1478         INIT_LIST_HEAD(&ras->ras_read_beads);
1479 }
1480
1481 /* Check whether the read request is in the stride window.
1482  * If it is in the stride window, return 1, otherwise return 0.
1483  * and also update stride_gap and stride_pages.
1484  */
1485 static int index_in_stride_window(unsigned long index,
1486                                   struct ll_readahead_state *ras,
1487                                   struct inode *inode)
1488 {
1489         int stride_gap = index - ras->ras_last_readpage - 1;
1490
1491         LASSERT(stride_gap != 0);
1492
1493         if (ras->ras_consecutive_pages == 0)
1494                 return 0;
1495
1496         /*Otherwise check the stride by itself */
1497         if ((ras->ras_stride_length - ras->ras_stride_pages) == stride_gap &&
1498             ras->ras_consecutive_pages == ras->ras_stride_pages)
1499                 return 1;
1500
1501         if (stride_gap >= 0) {
1502                 /*
1503                  * only set stride_pages, stride_length if
1504                  * it is forward reading ( stride_gap > 0)
1505                  */
1506                 ras->ras_stride_pages = ras->ras_consecutive_pages;
1507                 ras->ras_stride_length = stride_gap + ras->ras_consecutive_pages;
1508         } else {
1509                 /*
1510                  * If stride_gap < 0,(back_forward reading),
1511                  * reset the stride_pages/length.
1512                  * FIXME:back_ward stride I/O read.
1513                  *
1514                  */
1515                 ras->ras_stride_pages = 0;
1516                 ras->ras_stride_length = 0;
1517         }
1518         RAS_CDEBUG(ras);
1519
1520         return 0;
1521 }
1522
1523 static unsigned long
1524 stride_page_count(struct ll_readahead_state *ras, unsigned long len)
1525 {
1526         return stride_pg_count(ras->ras_stride_offset, ras->ras_stride_length,
1527                                ras->ras_stride_pages, ras->ras_stride_offset,
1528                                len);
1529 }
1530
1531 /* Stride Read-ahead window will be increased inc_len according to
1532  * stride I/O pattern */
1533 static void ras_stride_increase_window(struct ll_readahead_state *ras,
1534                                        struct ll_ra_info *ra,
1535                                        unsigned long inc_len)
1536 {
1537         unsigned long left, step, window_len;
1538         unsigned long stride_len;
1539
1540         LASSERT(ras->ras_stride_length > 0);
1541
1542         stride_len = ras->ras_window_start + ras->ras_window_len -
1543                      ras->ras_stride_offset;
1544
1545         LASSERTF(stride_len >= 0, "window_start %lu, window_len %lu"
1546                  " stride_offset %lu\n", ras->ras_window_start,
1547                  ras->ras_window_len, ras->ras_stride_offset);
1548
1549         left = stride_len % ras->ras_stride_length;
1550
1551         window_len = ras->ras_window_len - left;
1552
1553         if (left < ras->ras_stride_pages)
1554                 left += inc_len;
1555         else
1556                 left = ras->ras_stride_pages + inc_len;
1557
1558         LASSERT(ras->ras_stride_pages != 0);
1559
1560         step = left / ras->ras_stride_pages;
1561         left %= ras->ras_stride_pages;
1562
1563         window_len += step * ras->ras_stride_length + left;
1564
1565         if (stride_page_count(ras, window_len) <= ra->ra_max_pages)
1566                 ras->ras_window_len = window_len;
1567
1568         RAS_CDEBUG(ras);
1569 }
1570
1571 /* Set stride I/O read-ahead window start offset */
1572 static void ras_set_stride_offset(struct ll_readahead_state *ras)
1573 {
1574         unsigned long window_len = ras->ras_next_readahead -
1575                                    ras->ras_window_start;
1576         unsigned long left;
1577
1578         LASSERT(ras->ras_stride_length != 0);
1579
1580         left = window_len % ras->ras_stride_length;
1581
1582         ras->ras_stride_offset = ras->ras_next_readahead - left;
1583
1584         RAS_CDEBUG(ras);
1585 }
1586
1587 static void ras_update(struct ll_sb_info *sbi, struct inode *inode,
1588                        struct ll_readahead_state *ras, unsigned long index,
1589                        unsigned hit)
1590 {
1591         struct ll_ra_info *ra = &sbi->ll_ra_info;
1592         int zero = 0, stride_zero = 0, stride_detect = 0, ra_miss = 0;
1593         ENTRY;
1594
1595         spin_lock(&sbi->ll_lock);
1596         spin_lock(&ras->ras_lock);
1597
1598         ll_ra_stats_inc_unlocked(ra, hit ? RA_STAT_HIT : RA_STAT_MISS);
1599
1600         /* reset the read-ahead window in two cases.  First when the app seeks
1601          * or reads to some other part of the file.  Secondly if we get a
1602          * read-ahead miss that we think we've previously issued.  This can
1603          * be a symptom of there being so many read-ahead pages that the VM is
1604          * reclaiming it before we get to it. */
1605         if (!index_in_window(index, ras->ras_last_readpage, 8, 8)) {
1606                 zero = 1;
1607                 ll_ra_stats_inc_unlocked(ra, RA_STAT_DISTANT_READPAGE);
1608                 /* check whether it is in stride I/O mode*/
1609                 if (!index_in_stride_window(index, ras, inode))
1610                         stride_zero = 1;
1611         } else if (!hit && ras->ras_window_len &&
1612                    index < ras->ras_next_readahead &&
1613                    index_in_window(index, ras->ras_window_start, 0,
1614                                    ras->ras_window_len)) {
1615                 zero = 1;
1616                 ra_miss = 1;
1617                 /* If it hits read-ahead miss and the stride I/O is still
1618                  * not detected, reset stride stuff to re-detect the whole
1619                  * stride I/O mode to avoid complication */
1620                 if (!stride_io_mode(ras))
1621                         stride_zero = 1;
1622                 ll_ra_stats_inc_unlocked(ra, RA_STAT_MISS_IN_WINDOW);
1623         }
1624
1625         /* On the second access to a file smaller than the tunable
1626          * ra_max_read_ahead_whole_pages trigger RA on all pages in the
1627          * file up to ra_max_pages.  This is simply a best effort and
1628          * only occurs once per open file.  Normal RA behavior is reverted
1629          * to for subsequent IO.  The mmap case does not increment
1630          * ras_requests and thus can never trigger this behavior. */
1631         if (ras->ras_requests == 2 && !ras->ras_request_index) {
1632                 __u64 kms_pages;
1633
1634                 kms_pages = (i_size_read(inode) + CFS_PAGE_SIZE - 1) >>
1635                             CFS_PAGE_SHIFT;
1636
1637                 CDEBUG(D_READA, "kmsp "LPU64" mwp %lu mp %lu\n", kms_pages,
1638                        ra->ra_max_read_ahead_whole_pages, ra->ra_max_pages);
1639
1640                 if (kms_pages &&
1641                     kms_pages <= ra->ra_max_read_ahead_whole_pages) {
1642                         ras->ras_window_start = 0;
1643                         ras->ras_last_readpage = 0;
1644                         ras->ras_next_readahead = 0;
1645                         ras->ras_window_len = min(ra->ra_max_pages,
1646                                 ra->ra_max_read_ahead_whole_pages);
1647                         GOTO(out_unlock, 0);
1648                 }
1649         }
1650
1651         if (zero) {
1652                 /* If it is discontinuous read, check
1653                  * whether it is stride I/O mode*/
1654                 if (stride_zero) {
1655                         ras_reset(ras, index);
1656                         ras->ras_consecutive_pages++;
1657                         ras_stride_reset(ras);
1658                         RAS_CDEBUG(ras);
1659                         GOTO(out_unlock, 0);
1660                 } else {
1661                         /* The read is still in stride window or
1662                          * it hits read-ahead miss */
1663
1664                         /* If ra-window miss is hitted, which probably means VM
1665                          * pressure, and some read-ahead pages were reclaimed.So
1666                          * the length of ra-window will not increased, but also
1667                          * not reset to avoid redetecting the stride I/O mode.*/
1668                         ras->ras_consecutive_requests = 0;
1669                         if (!ra_miss) {
1670                                 ras->ras_consecutive_pages = 0;
1671                                 if (++ras->ras_consecutive_stride_requests > 1)
1672                                         stride_detect = 1;
1673                         }
1674                         RAS_CDEBUG(ras);
1675                 }
1676         } else if (ras->ras_consecutive_stride_requests > 1) {
1677                 /* If this is contiguous read but in stride I/O mode
1678                  * currently, check whether stride step still is valid,
1679                  * if invalid, it will reset the stride ra window*/     
1680                 if (ras->ras_consecutive_pages + 1 > ras->ras_stride_pages)
1681                         ras_stride_reset(ras);
1682         }
1683
1684         ras->ras_last_readpage = index;
1685         ras->ras_consecutive_pages++;
1686         ras_set_start(ras, index);
1687         ras->ras_next_readahead = max(ras->ras_window_start,
1688                                       ras->ras_next_readahead);
1689         RAS_CDEBUG(ras);
1690
1691         /* Trigger RA in the mmap case where ras_consecutive_requests
1692          * is not incremented and thus can't be used to trigger RA */
1693         if (!ras->ras_window_len && ras->ras_consecutive_pages == 4) {
1694                 ras->ras_window_len = RAS_INCREASE_STEP;
1695                 GOTO(out_unlock, 0);
1696         }
1697
1698         /* Initially reset the stride window offset to next_readahead*/
1699         if (ras->ras_consecutive_stride_requests == 2 && stride_detect)
1700                 ras_set_stride_offset(ras);
1701
1702         /* The initial ras_window_len is set to the request size.  To avoid
1703          * uselessly reading and discarding pages for random IO the window is
1704          * only increased once per consecutive request received. */
1705         if ((ras->ras_consecutive_requests > 1 &&
1706             !ras->ras_request_index) || stride_detect) {
1707                 if (stride_io_mode(ras))
1708                         ras_stride_increase_window(ras, ra, RAS_INCREASE_STEP);
1709                 else
1710                         ras->ras_window_len = min(ras->ras_window_len +
1711                                                   RAS_INCREASE_STEP,
1712                                                   ra->ra_max_pages);
1713         }
1714         EXIT;
1715 out_unlock:
1716         RAS_CDEBUG(ras);
1717         ras->ras_request_index++;
1718         spin_unlock(&ras->ras_lock);
1719         spin_unlock(&sbi->ll_lock);
1720         return;
1721 }
1722
1723 int ll_writepage(struct page *page)
1724 {
1725         struct inode *inode = page->mapping->host;
1726         struct ll_inode_info *lli = ll_i2info(inode);
1727         struct obd_export *exp;
1728         struct ll_async_page *llap;
1729         int rc = 0;
1730         ENTRY;
1731
1732         LASSERT(PageLocked(page));
1733
1734         exp = ll_i2obdexp(inode);
1735         if (exp == NULL)
1736                 GOTO(out, rc = -EINVAL);
1737
1738         llap = llap_from_page(page, LLAP_ORIGIN_WRITEPAGE);
1739         if (IS_ERR(llap))
1740                 GOTO(out, rc = PTR_ERR(llap));
1741
1742         LASSERT(!llap->llap_nocache);
1743         LASSERT(!PageWriteback(page));
1744         set_page_writeback(page);
1745
1746         page_cache_get(page);
1747         if (llap->llap_write_queued) {
1748                 LL_CDEBUG_PAGE(D_PAGE, page, "marking urgent\n");
1749                 rc = obd_set_async_flags(exp, lli->lli_smd, NULL,
1750                                          llap->llap_cookie,
1751                                          ASYNC_READY | ASYNC_URGENT);
1752         } else {
1753                 rc = queue_or_sync_write(exp, inode, llap, CFS_PAGE_SIZE,
1754                                          ASYNC_READY | ASYNC_URGENT);
1755         }
1756         if (rc) {
1757                 /* re-dirty page on error so it retries write */
1758                 if (PageWriteback(page))
1759                         end_page_writeback(page);
1760
1761                 /* resend page only for not started IO*/
1762                 if (!PageError(page))
1763                         ll_redirty_page(page);
1764
1765                 page_cache_release(page);
1766         }
1767 out:
1768         if (rc) {
1769                 if (!lli->lli_async_rc)
1770                         lli->lli_async_rc = rc;
1771                 /* resend page only for not started IO*/
1772                 unlock_page(page);
1773         }
1774         RETURN(rc);
1775 }
1776
1777 /*
1778  * for now we do our readpage the same on both 2.4 and 2.5.  The kernel's
1779  * read-ahead assumes it is valid to issue readpage all the way up to
1780  * i_size, but our dlm locks make that not the case.  We disable the
1781  * kernel's read-ahead and do our own by walking ahead in the page cache
1782  * checking for dlm lock coverage.  the main difference between 2.4 and
1783  * 2.6 is how read-ahead gets batched and issued, but we're using our own,
1784  * so they look the same.
1785  */
1786 int ll_readpage(struct file *filp, struct page *page)
1787 {
1788         struct ll_file_data *fd = LUSTRE_FPRIVATE(filp);
1789         struct inode *inode = page->mapping->host;
1790         struct obd_export *exp;
1791         struct ll_async_page *llap;
1792         struct obd_io_group *oig = NULL;
1793         struct lustre_handle *lockh = NULL;
1794         int rc;
1795         ENTRY;
1796
1797         LASSERT(PageLocked(page));
1798         LASSERT(!PageUptodate(page));
1799         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),offset=%Lu=%#Lx\n",
1800                inode->i_ino, inode->i_generation, inode,
1801                (((loff_t)page->index) << CFS_PAGE_SHIFT),
1802                (((loff_t)page->index) << CFS_PAGE_SHIFT));
1803         LASSERT(atomic_read(&filp->f_dentry->d_inode->i_count) > 0);
1804
1805         if (!ll_i2info(inode)->lli_smd) {
1806                 /* File with no objects - one big hole */
1807                 /* We use this just for remove_from_page_cache that is not
1808                  * exported, we'd make page back up to date. */
1809                 ll_truncate_complete_page(page);
1810                 clear_page(kmap(page));
1811                 kunmap(page);
1812                 SetPageUptodate(page);
1813                 unlock_page(page);
1814                 RETURN(0);
1815         }
1816
1817         rc = oig_init(&oig);
1818         if (rc < 0)
1819                 GOTO(out, rc);
1820
1821         exp = ll_i2obdexp(inode);
1822         if (exp == NULL)
1823                 GOTO(out, rc = -EINVAL);
1824
1825         if (fd->fd_flags & LL_FILE_GROUP_LOCKED)
1826                 lockh = &fd->fd_cwlockh;
1827
1828         llap = llap_from_page_with_lockh(page, LLAP_ORIGIN_READPAGE, lockh);
1829         if (IS_ERR(llap)) {
1830                 if (PTR_ERR(llap) == -ENOLCK) {
1831                         CWARN("ino %lu page %lu (%llu) not covered by "
1832                               "a lock (mmap?).  check debug logs.\n",
1833                               inode->i_ino, page->index,
1834                               (long long)page->index << PAGE_CACHE_SHIFT);
1835                 }
1836                 GOTO(out, rc = PTR_ERR(llap));
1837         }
1838
1839         if (ll_i2sbi(inode)->ll_ra_info.ra_max_pages)
1840                 ras_update(ll_i2sbi(inode), inode, &fd->fd_ras, page->index,
1841                            llap->llap_defer_uptodate);
1842
1843
1844         if (llap->llap_defer_uptodate) {
1845                 /* This is the callpath if we got the page from a readahead */
1846                 llap->llap_ra_used = 1;
1847                 rc = ll_readahead(&fd->fd_ras, exp, page->mapping, oig,
1848                                   fd->fd_flags);
1849                 if (rc > 0)
1850                         obd_trigger_group_io(exp, ll_i2info(inode)->lli_smd,
1851                                              NULL, oig);
1852                 LL_CDEBUG_PAGE(D_PAGE, page, "marking uptodate from defer\n");
1853                 SetPageUptodate(page);
1854                 unlock_page(page);
1855                 GOTO(out_oig, rc = 0);
1856         }
1857
1858         rc = ll_issue_page_read(exp, llap, oig, 0);
1859         if (rc)
1860                 GOTO(out, rc);
1861
1862         LL_CDEBUG_PAGE(D_PAGE, page, "queued readpage\n");
1863         /* We have just requested the actual page we want, see if we can tack
1864          * on some readahead to that page's RPC before it is sent. */
1865         if (ll_i2sbi(inode)->ll_ra_info.ra_max_pages)
1866                 ll_readahead(&fd->fd_ras, exp, page->mapping, oig,
1867                              fd->fd_flags);
1868
1869         rc = obd_trigger_group_io(exp, ll_i2info(inode)->lli_smd, NULL, oig);
1870
1871 out:
1872         if (rc)
1873                 unlock_page(page);
1874 out_oig:
1875         if (oig != NULL)
1876                 oig_release(oig);
1877         RETURN(rc);
1878 }
1879
1880 static void ll_file_put_pages(struct page **pages, int numpages)
1881 {
1882         int i;
1883         struct page **pp;
1884         ENTRY;
1885
1886         for (i = 0, pp = pages; i < numpages; i++, pp++) {
1887                 if (*pp) {
1888                         LL_CDEBUG_PAGE(D_PAGE, (*pp), "free\n");
1889                         __ll_put_llap(*pp);
1890                         if (page_private(*pp))
1891                                 CERROR("the llap wasn't freed\n");
1892                         (*pp)->mapping = NULL;
1893                         if (page_count(*pp) != 1)
1894                                 CERROR("page %p, flags %#lx, count %i, private %p\n",
1895                                 (*pp), (unsigned long)(*pp)->flags, page_count(*pp),
1896                                 (void*)page_private(*pp));
1897                         __free_pages(*pp, 0);
1898                 }
1899         }
1900         OBD_FREE(pages, numpages * sizeof(struct page*));
1901         EXIT;
1902 }
1903
1904 static struct page **ll_file_prepare_pages(int numpages, struct inode *inode,
1905                                            unsigned long first)
1906 {
1907         struct page **pages;
1908         int i;
1909         int rc = 0;
1910         ENTRY;
1911
1912         OBD_ALLOC(pages, sizeof(struct page *) * numpages);
1913         if (pages == NULL)
1914                 RETURN(ERR_PTR(-ENOMEM));
1915         for (i = 0; i < numpages; i++) {
1916                 struct page *page;
1917                 struct ll_async_page *llap;
1918
1919                 page = alloc_pages(GFP_HIGHUSER, 0);
1920                 if (page == NULL)
1921                         GOTO(err, rc = -ENOMEM);
1922                 pages[i] = page;
1923                 /* llap_from_page needs page index and mapping to be set */
1924                 page->index = first++;
1925                 page->mapping = inode->i_mapping;
1926                 llap = llap_from_page(page, LLAP_ORIGIN_LOCKLESS_IO);
1927                 if (IS_ERR(llap))
1928                         GOTO(err, rc = PTR_ERR(llap));
1929                 llap->llap_lockless_io_page = 1;
1930         }
1931         RETURN(pages);
1932 err:
1933         ll_file_put_pages(pages, numpages);
1934         RETURN(ERR_PTR(rc));
1935  }
1936
1937 static ssize_t ll_file_copy_pages(struct page **pages, int numpages,
1938                                   const struct iovec *iov, unsigned long nsegs,
1939                                   ssize_t iov_offset, loff_t pos, size_t count,
1940                                   int rw)
1941 {
1942         ssize_t amount = 0;
1943         int i;
1944         int updatechecksum = ll_i2sbi(pages[0]->mapping->host)->ll_flags &
1945                              LL_SBI_LLITE_CHECKSUM;
1946         ENTRY;
1947
1948         for (i = 0; i < numpages; i++) {
1949                 unsigned offset, bytes, left = 0;
1950                 char *vaddr;
1951
1952                 vaddr = kmap(pages[i]);
1953                 offset = pos & (CFS_PAGE_SIZE - 1);
1954                 bytes = min_t(unsigned, CFS_PAGE_SIZE - offset, count);
1955                 LL_CDEBUG_PAGE(D_PAGE, pages[i], "op = %s, addr = %p, "
1956                                "bytes = %u\n",
1957                                (rw == WRITE) ? "CFU" : "CTU",
1958                                vaddr + offset, bytes);
1959                 while (bytes > 0 && !left && nsegs) {
1960                         unsigned copy = min_t(ssize_t, bytes,
1961                                                iov->iov_len - iov_offset);
1962                         if (rw == WRITE) {
1963                                 left = copy_from_user(vaddr + offset,
1964                                                       iov->iov_base +iov_offset,
1965                                                       copy);
1966                                 if (updatechecksum) {
1967                                         struct ll_async_page *llap;
1968
1969                                         llap = llap_cast_private(pages[i]);
1970                                         llap->llap_checksum =
1971                                                 init_checksum(OSC_DEFAULT_CKSUM);
1972                                         llap->llap_checksum =
1973                                            compute_checksum(llap->llap_checksum,
1974                                                             vaddr,CFS_PAGE_SIZE,
1975                                                             OSC_DEFAULT_CKSUM);
1976                                 }
1977                         } else {
1978                                 left = copy_to_user(iov->iov_base + iov_offset,
1979                                                     vaddr + offset, copy);
1980                         }
1981
1982                         amount += copy;
1983                         count -= copy;
1984                         pos += copy;
1985                         iov_offset += copy;
1986                         bytes -= copy;
1987                         if (iov_offset == iov->iov_len) {
1988                                 iov_offset = 0;
1989                                 iov++;
1990                                 nsegs--;
1991                         }
1992                 }
1993                 kunmap(pages[i]);
1994                 if (left) {
1995                         amount -= left;
1996                         break;
1997                 }
1998         }
1999         if (amount == 0)
2000                 RETURN(-EFAULT);
2001         RETURN(amount);
2002 }
2003
2004 static int ll_file_oig_pages(struct inode * inode, struct page **pages,
2005                              int numpages, loff_t pos, size_t count, int rw)
2006 {
2007         struct obd_io_group *oig;
2008         struct ll_inode_info *lli = ll_i2info(inode);
2009         struct obd_export *exp;
2010         loff_t org_pos = pos;
2011         obd_flag brw_flags;
2012         int rc;
2013         int i;
2014         ENTRY;
2015
2016         exp = ll_i2obdexp(inode);
2017         if (exp == NULL)
2018                 RETURN(-EINVAL);
2019         rc = oig_init(&oig);
2020         if (rc)
2021                 RETURN(rc);
2022         brw_flags = OBD_BRW_SRVLOCK;
2023         if (capable(CAP_SYS_RESOURCE))
2024                 brw_flags |= OBD_BRW_NOQUOTA;
2025
2026         for (i = 0; i < numpages; i++) {
2027                 struct ll_async_page *llap;
2028                 unsigned from, bytes;
2029
2030                 from = pos & (CFS_PAGE_SIZE - 1);
2031                 bytes = min_t(unsigned, CFS_PAGE_SIZE - from,
2032                               count - pos + org_pos);
2033                 llap = llap_cast_private(pages[i]);
2034                 LASSERT(llap);
2035
2036                 lock_page(pages[i]);
2037
2038                 LL_CDEBUG_PAGE(D_PAGE, pages[i], "offset "LPU64","
2039                                " from %u, bytes = %u\n",
2040                                pos, from, bytes);
2041                 LASSERTF(pos >> CFS_PAGE_SHIFT == pages[i]->index,
2042                          "wrong page index %lu (%lu)\n",
2043                          pages[i]->index,
2044                          (unsigned long)(pos >> CFS_PAGE_SHIFT));
2045                 rc = obd_queue_group_io(exp, lli->lli_smd, NULL, oig,
2046                                         llap->llap_cookie,
2047                                         (rw == WRITE) ?
2048                                         OBD_BRW_WRITE:OBD_BRW_READ,
2049                                         from, bytes, brw_flags,
2050                                         ASYNC_READY | ASYNC_URGENT |
2051                                         ASYNC_COUNT_STABLE | ASYNC_GROUP_SYNC);
2052                 if (rc) {
2053                         i++;
2054                         GOTO(out, rc);
2055                 }
2056                 pos += bytes;
2057         }
2058         rc = obd_trigger_group_io(exp, lli->lli_smd, NULL, oig);
2059         if (rc)
2060                 GOTO(out, rc);
2061         rc = oig_wait(oig);
2062 out:
2063         while(--i >= 0)
2064                 unlock_page(pages[i]);
2065         oig_release(oig);
2066         RETURN(rc);
2067 }
2068
2069 /* Advance through passed iov, adjust iov pointer as necessary and return
2070  * starting offset in individual entry we are pointing at. Also reduce
2071  * nr_segs as needed */
2072 static ssize_t ll_iov_advance(const struct iovec **iov, unsigned long *nr_segs,
2073                               ssize_t offset)
2074 {
2075         while (*nr_segs > 0) {
2076                 if ((*iov)->iov_len > offset)
2077                         return ((*iov)->iov_len - offset);
2078                 offset -= (*iov)->iov_len;
2079                 (*iov)++;
2080                 (*nr_segs)--;
2081         }
2082         return 0;
2083 }
2084
2085 ssize_t ll_file_lockless_io(struct file *file, const struct iovec *iov,
2086                             unsigned long nr_segs,
2087                             loff_t *ppos, int rw, ssize_t count)
2088 {
2089         loff_t pos;
2090         struct inode *inode = file->f_dentry->d_inode;
2091         ssize_t rc = 0;
2092         int max_pages;
2093         size_t amount = 0;
2094         unsigned long first, last;
2095         const struct iovec *iv = &iov[0];
2096         unsigned long nsegs = nr_segs;
2097         unsigned long offset = 0;
2098         ENTRY;
2099
2100         if (rw == READ) {
2101                 loff_t isize;
2102
2103                 ll_inode_size_lock(inode, 0);
2104                 isize = i_size_read(inode);
2105                 ll_inode_size_unlock(inode, 0);
2106                 if (*ppos >= isize)
2107                         GOTO(out, rc = 0);
2108                 if (*ppos + count >= isize)
2109                         count -= *ppos + count - isize;
2110                 if (count == 0)
2111                         GOTO(out, rc);
2112         } else {
2113                 rc = generic_write_checks(file, ppos, &count, 0);
2114                 if (rc)
2115                         GOTO(out, rc);
2116                 rc = remove_suid(file->f_dentry);
2117                 if (rc)
2118                         GOTO(out, rc);
2119         }
2120
2121         pos = *ppos;
2122         first = pos >> CFS_PAGE_SHIFT;
2123         last = (pos + count - 1) >> CFS_PAGE_SHIFT;
2124         max_pages = PTLRPC_MAX_BRW_PAGES *
2125                 ll_i2info(inode)->lli_smd->lsm_stripe_count;
2126         CDEBUG(D_INFO, "%u, stripe_count = %u\n",
2127                PTLRPC_MAX_BRW_PAGES /* max_pages_per_rpc */,
2128                ll_i2info(inode)->lli_smd->lsm_stripe_count);
2129
2130         while (first <= last && rc >= 0) {
2131                 int pages_for_io;
2132                 struct page **pages;
2133                 size_t bytes = count - amount;
2134
2135                 pages_for_io = min_t(int, last - first + 1, max_pages);
2136                 pages = ll_file_prepare_pages(pages_for_io, inode, first);
2137                 if (IS_ERR(pages)) {
2138                         rc = PTR_ERR(pages);
2139                         break;
2140                 }
2141                 if (rw == WRITE) {
2142                         rc = ll_file_copy_pages(pages, pages_for_io, iv, nsegs,
2143                                                 offset, pos + amount, bytes,
2144                                                 rw);
2145                         if (rc < 0)
2146                                 GOTO(put_pages, rc);
2147                         offset = ll_iov_advance(&iv, &nsegs, offset + rc);
2148                         bytes = rc;
2149                 }
2150                 rc = ll_file_oig_pages(inode, pages, pages_for_io,
2151                                        pos + amount, bytes, rw);
2152                 if (rc)
2153                         GOTO(put_pages, rc);
2154                 if (rw == READ) {
2155                         rc = ll_file_copy_pages(pages, pages_for_io, iv, nsegs,
2156                                                 offset, pos + amount, bytes, rw);
2157                         if (rc < 0)
2158                                 GOTO(put_pages, rc);
2159                         offset = ll_iov_advance(&iv, &nsegs, offset + rc);
2160                         bytes = rc;
2161                 }
2162                 amount += bytes;
2163 put_pages:
2164                 ll_file_put_pages(pages, pages_for_io);
2165                 first += pages_for_io;
2166                 /* a short read/write check */
2167                 if (pos + amount < ((loff_t)first << CFS_PAGE_SHIFT))
2168                         break;
2169                 /* Check if we are out of userspace buffers. (how that could
2170                    happen?) */
2171                 if (nsegs == 0)
2172                         break;
2173         }
2174         /* NOTE: don't update i_size and KMS in absence of LDLM locks even
2175          * write makes the file large */
2176         file_accessed(file);
2177         if (rw == READ && amount < count && rc == 0) {
2178                 unsigned long not_cleared;
2179
2180                 while (nsegs > 0) {
2181                         ssize_t to_clear = min_t(ssize_t, count - amount,
2182                                                  iv->iov_len - offset);
2183                         not_cleared = clear_user(iv->iov_base + offset,
2184                                                  to_clear);
2185                         amount += to_clear - not_cleared;
2186                         if (not_cleared) {
2187                                 rc = -EFAULT;
2188                                 break;
2189                         }
2190                         offset = 0;
2191                         iv++;
2192                         nsegs--;
2193                 }
2194         }
2195         if (amount > 0) {
2196                 lprocfs_counter_add(ll_i2sbi(inode)->ll_stats,
2197                                     (rw == WRITE) ?
2198                                     LPROC_LL_LOCKLESS_WRITE :
2199                                     LPROC_LL_LOCKLESS_READ,
2200                                     (long)amount);
2201                 *ppos += amount;
2202                 RETURN(amount);
2203         }
2204 out:
2205         RETURN(rc);
2206 }