Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / llite / rw.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/llite/rw.c
37  *
38  * Lustre Lite I/O page cache routines shared by different kernel revs
39  */
40
41 #include <linux/autoconf.h>
42 #include <linux/kernel.h>
43 #include <linux/mm.h>
44 #include <linux/string.h>
45 #include <linux/stat.h>
46 #include <linux/errno.h>
47 #include <linux/smp_lock.h>
48 #include <linux/unistd.h>
49 #include <linux/version.h>
50 #include <asm/system.h>
51 #include <asm/uaccess.h>
52
53 #include <linux/fs.h>
54 #include <linux/stat.h>
55 #include <asm/uaccess.h>
56 #include <linux/mm.h>
57 #include <linux/pagemap.h>
58 #include <linux/smp_lock.h>
59
60 #define DEBUG_SUBSYSTEM S_LLITE
61
62 //#include <lustre_mdc.h>
63 #include <lustre_lite.h>
64 #include <obd_cksum.h>
65 #include "llite_internal.h"
66 #include <linux/lustre_compat25.h>
67
68 #ifndef list_for_each_prev_safe
69 #define list_for_each_prev_safe(pos, n, head) \
70         for (pos = (head)->prev, n = pos->prev; pos != (head); \
71                 pos = n, n = pos->prev )
72 #endif
73
74 cfs_mem_cache_t *ll_async_page_slab = NULL;
75 size_t ll_async_page_slab_size = 0;
76
77 /* SYNCHRONOUS I/O to object storage for an inode */
78 static int ll_brw(int cmd, struct inode *inode, struct obdo *oa,
79                   struct page *page, int flags)
80 {
81         struct ll_inode_info *lli = ll_i2info(inode);
82         struct lov_stripe_md *lsm = lli->lli_smd;
83         struct obd_info oinfo = { { { 0 } } };
84         struct brw_page pg;
85         int opc, rc;
86         ENTRY;
87
88         pg.pg = page;
89         pg.off = ((obd_off)page->index) << CFS_PAGE_SHIFT;
90
91         if ((cmd & OBD_BRW_WRITE) && (pg.off+CFS_PAGE_SIZE>i_size_read(inode)))
92                 pg.count = i_size_read(inode) % CFS_PAGE_SIZE;
93         else
94                 pg.count = CFS_PAGE_SIZE;
95
96         LL_CDEBUG_PAGE(D_PAGE, page, "%s %d bytes ino %lu at "LPU64"/"LPX64"\n",
97                        cmd & OBD_BRW_WRITE ? "write" : "read", pg.count,
98                        inode->i_ino, pg.off, pg.off);
99         if (pg.count == 0) {
100                 CERROR("ZERO COUNT: ino %lu: size %p:%Lu(%p:%Lu) idx %lu off "
101                        LPU64"\n", inode->i_ino, inode, i_size_read(inode),
102                        page->mapping->host, i_size_read(page->mapping->host),
103                        page->index, pg.off);
104         }
105
106         pg.flag = flags;
107
108         if (cmd & OBD_BRW_WRITE)
109                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_BRW_WRITE,
110                                    pg.count);
111         else
112                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_BRW_READ,
113                                    pg.count);
114         oinfo.oi_oa = oa;
115         oinfo.oi_md = lsm;
116         /* NB partial write, so we might not have CAPA_OPC_OSS_READ capa */
117         opc = cmd & OBD_BRW_WRITE ? CAPA_OPC_OSS_WRITE : CAPA_OPC_OSS_RW;
118         oinfo.oi_capa = ll_osscapa_get(inode, opc);
119         rc = obd_brw(cmd, ll_i2dtexp(inode), &oinfo, 1, &pg, NULL);
120         capa_put(oinfo.oi_capa);
121         if (rc == 0)
122                 obdo_to_inode(inode, oa, OBD_MD_FLBLOCKS);
123         else if (rc != -EIO)
124                 CERROR("error from obd_brw: rc = %d\n", rc);
125         RETURN(rc);
126 }
127
128 int ll_file_punch(struct inode * inode, loff_t new_size, int srvlock)
129 {
130         struct ll_inode_info *lli = ll_i2info(inode);
131         struct obd_info oinfo = { { { 0 } } };
132         struct obdo oa;
133         int rc;
134
135         ENTRY;
136         CDEBUG(D_INFO, "calling punch for "LPX64" (new size %Lu=%#Lx)\n",
137                lli->lli_smd->lsm_object_id, i_size_read(inode), i_size_read(inode));
138
139         oinfo.oi_md = lli->lli_smd;
140         oinfo.oi_policy.l_extent.start = new_size;
141         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
142         oinfo.oi_oa = &oa;
143         oa.o_id = lli->lli_smd->lsm_object_id;
144         oa.o_gr = lli->lli_smd->lsm_object_gr;
145         oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
146         if (srvlock) {
147                 /* set OBD_MD_FLFLAGS in o_valid, only if we
148                  * set OBD_FL_TRUNCLOCK, otherwise ost_punch
149                  * and filter_setattr get confused, see the comment
150                  * in ost_punch */
151                 oa.o_flags = OBD_FL_TRUNCLOCK;
152                 oa.o_valid |= OBD_MD_FLFLAGS;
153         }
154         obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLMODE |
155                         OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME |
156                         OBD_MD_FLFID | OBD_MD_FLGENER);
157
158         oinfo.oi_capa = ll_osscapa_get(inode, CAPA_OPC_OSS_TRUNC);
159         rc = obd_punch_rqset(ll_i2dtexp(inode), &oinfo, NULL);
160         ll_truncate_free_capa(oinfo.oi_capa);
161         if (rc)
162                 CERROR("obd_truncate fails (%d) ino %lu\n", rc, inode->i_ino);
163         else
164                 obdo_to_inode(inode, &oa, OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
165                               OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME);
166         RETURN(rc);
167 }
168
169 /* this isn't where truncate starts.   roughly:
170  * sys_truncate->ll_setattr_raw->vmtruncate->ll_truncate. setattr_raw grabs
171  * DLM lock on [size, EOF], i_mutex, ->lli_size_sem, and WRITE_I_ALLOC_SEM to
172  * avoid races.
173  *
174  * must be called under ->lli_size_sem */
175 void ll_truncate(struct inode *inode)
176 {
177         struct ll_inode_info *lli = ll_i2info(inode);
178         int srvlock = !!(lli->lli_flags & LLIF_SRVLOCK);
179         loff_t new_size;
180         ENTRY;
181         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) to %Lu=%#Lx\n",inode->i_ino,
182                inode->i_generation, inode, i_size_read(inode),
183                i_size_read(inode));
184
185         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_TRUNC, 1);
186         if (lli->lli_size_sem_owner != current) {
187                 EXIT;
188                 return;
189         }
190
191         if (!lli->lli_smd) {
192                 CDEBUG(D_INODE, "truncate on inode %lu with no objects\n",
193                        inode->i_ino);
194                 GOTO(out_unlock, 0);
195         }
196
197         LASSERT(atomic_read(&lli->lli_size_sem.count) <= 0);
198
199         if (!srvlock) {
200                 struct ost_lvb lvb;
201                 int rc;
202
203                 /* XXX I'm pretty sure this is a hack to paper
204                  * over a more fundamental race condition. */
205                 lov_stripe_lock(lli->lli_smd);
206                 inode_init_lvb(inode, &lvb);
207                 rc = obd_merge_lvb(ll_i2dtexp(inode), lli->lli_smd, &lvb, 0);
208                 if (lvb.lvb_size == i_size_read(inode) && rc == 0) {
209                         CDEBUG(D_VFSTRACE, "skipping punch for obj "LPX64
210                                ",%Lu=%#Lx\n", lli->lli_smd->lsm_object_id,
211                                i_size_read(inode), i_size_read(inode));
212                         lov_stripe_unlock(lli->lli_smd);
213                         GOTO(out_unlock, 0);
214                 }
215                 obd_adjust_kms(ll_i2dtexp(inode), lli->lli_smd,
216                                i_size_read(inode), 1);
217                 lov_stripe_unlock(lli->lli_smd);
218         }
219
220         if (unlikely((ll_i2sbi(inode)->ll_flags & LL_SBI_CHECKSUM) &&
221                      (i_size_read(inode) & ~CFS_PAGE_MASK))) {
222                 /* If the truncate leaves behind a partial page, update its
223                  * checksum. */
224                 struct page *page = find_get_page(inode->i_mapping,
225                                                   i_size_read(inode) >>
226                                                   CFS_PAGE_SHIFT);
227                 if (page != NULL) {
228                         struct ll_async_page *llap = llap_cast_private(page);
229                         if (llap != NULL) {
230                                 char *kaddr = kmap_atomic(page, KM_USER0);
231                                 llap->llap_checksum =
232                                         init_checksum(OSC_DEFAULT_CKSUM);
233                                 llap->llap_checksum =
234                                         compute_checksum(llap->llap_checksum,
235                                                          kaddr, CFS_PAGE_SIZE,
236                                                          OSC_DEFAULT_CKSUM);
237                                 kunmap_atomic(kaddr, KM_USER0);
238                         }
239                         page_cache_release(page);
240                 }
241         }
242
243         new_size = i_size_read(inode);
244         ll_inode_size_unlock(inode, 0);
245         if (!srvlock)
246                 ll_file_punch(inode, new_size, 0);
247         else
248                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LOCKLESS_TRUNC, 1);
249
250         EXIT;
251         return;
252
253  out_unlock:
254         ll_inode_size_unlock(inode, 0);
255 } /* ll_truncate */
256
257 int ll_prepare_write(struct file *file, struct page *page, unsigned from,
258                      unsigned to)
259 {
260         struct inode *inode = page->mapping->host;
261         struct ll_inode_info *lli = ll_i2info(inode);
262         struct lov_stripe_md *lsm = lli->lli_smd;
263         obd_off offset = ((obd_off)page->index) << CFS_PAGE_SHIFT;
264         struct obd_info oinfo = { { { 0 } } };
265         struct brw_page pga;
266         struct obdo oa;
267         struct ost_lvb lvb;
268         int rc = 0;
269         ENTRY;
270
271         LASSERT(PageLocked(page));
272         (void)llap_cast_private(page); /* assertion */
273
274         /* Check to see if we should return -EIO right away */
275         pga.pg = page;
276         pga.off = offset;
277         pga.count = CFS_PAGE_SIZE;
278         pga.flag = 0;
279
280         oa.o_mode = inode->i_mode;
281         oa.o_id = lsm->lsm_object_id;
282         oa.o_gr = lsm->lsm_object_gr;
283         oa.o_valid = OBD_MD_FLID | OBD_MD_FLMODE |
284                      OBD_MD_FLTYPE | OBD_MD_FLGROUP;
285         obdo_from_inode(&oa, inode, OBD_MD_FLFID | OBD_MD_FLGENER);
286
287         oinfo.oi_oa = &oa;
288         oinfo.oi_md = lsm;
289         rc = obd_brw(OBD_BRW_CHECK, ll_i2dtexp(inode), &oinfo, 1, &pga, NULL);
290         if (rc)
291                 RETURN(rc);
292
293         if (PageUptodate(page)) {
294                 LL_CDEBUG_PAGE(D_PAGE, page, "uptodate\n");
295                 RETURN(0);
296         }
297
298         /* We're completely overwriting an existing page, so _don't_ set it up
299          * to date until commit_write */
300         if (from == 0 && to == CFS_PAGE_SIZE) {
301                 LL_CDEBUG_PAGE(D_PAGE, page, "full page write\n");
302                 POISON_PAGE(page, 0x11);
303                 RETURN(0);
304         }
305
306         /* If are writing to a new page, no need to read old data.  The extent
307          * locking will have updated the KMS, and for our purposes here we can
308          * treat it like i_size. */
309         lov_stripe_lock(lsm);
310         inode_init_lvb(inode, &lvb);
311         obd_merge_lvb(ll_i2dtexp(inode), lsm, &lvb, 1);
312         lov_stripe_unlock(lsm);
313         if (lvb.lvb_size <= offset) {
314                 char *kaddr = kmap_atomic(page, KM_USER0);
315                 LL_CDEBUG_PAGE(D_PAGE, page, "kms "LPU64" <= offset "LPU64"\n",
316                                lvb.lvb_size, offset);
317                 memset(kaddr, 0, CFS_PAGE_SIZE);
318                 kunmap_atomic(kaddr, KM_USER0);
319                 GOTO(prepare_done, rc = 0);
320         }
321
322         /* XXX could be an async ocp read.. read-ahead? */
323         rc = ll_brw(OBD_BRW_READ, inode, &oa, page, 0);
324         if (rc == 0) {
325                 /* bug 1598: don't clobber blksize */
326                 oa.o_valid &= ~(OBD_MD_FLSIZE | OBD_MD_FLBLKSZ);
327                 obdo_refresh_inode(inode, &oa, oa.o_valid);
328         }
329
330         EXIT;
331  prepare_done:
332         if (rc == 0)
333                 SetPageUptodate(page);
334
335         return rc;
336 }
337
338 /**
339  * make page ready for ASYNC write
340  * \param data - pointer to llap cookie
341  * \param cmd - is OBD_BRW_* macroses
342  *
343  * \retval 0 is page successfully prepared to send
344  * \retval -EAGAIN is page not need to send
345  */
346 static int ll_ap_make_ready(void *data, int cmd)
347 {
348         struct ll_async_page *llap;
349         struct page *page;
350         ENTRY;
351
352         llap = LLAP_FROM_COOKIE(data);
353         page = llap->llap_page;
354
355         /* we're trying to write, but the page is locked.. come back later */
356         if (TryLockPage(page))
357                 RETURN(-EAGAIN);
358
359         LASSERTF(!(cmd & OBD_BRW_READ) || !PageWriteback(page),
360                 "cmd %x page %p ino %lu index %lu fl %lx\n", cmd, page,
361                  page->mapping->host->i_ino, page->index, page->flags);
362
363         /* if we left PageDirty we might get another writepage call
364          * in the future.  list walkers are bright enough
365          * to check page dirty so we can leave it on whatever list
366          * its on.  XXX also, we're called with the cli list so if
367          * we got the page cache list we'd create a lock inversion
368          * with the removepage path which gets the page lock then the
369          * cli lock */
370         LASSERTF(!PageWriteback(page),"cmd %x page %p ino %lu index %lu\n", cmd, page,
371                  page->mapping->host->i_ino, page->index);
372         if(!clear_page_dirty_for_io(page)) {
373                 unlock_page(page);
374                 RETURN(-EAGAIN);
375         }
376
377         /* This actually clears the dirty bit in the radix tree.*/
378         set_page_writeback(page);
379
380         LL_CDEBUG_PAGE(D_PAGE, page, "made ready\n");
381         page_cache_get(page);
382
383         RETURN(0);
384 }
385
386 /* We have two reasons for giving llite the opportunity to change the
387  * write length of a given queued page as it builds the RPC containing
388  * the page:
389  *
390  * 1) Further extending writes may have landed in the page cache
391  *    since a partial write first queued this page requiring us
392  *    to write more from the page cache.  (No further races are possible, since
393  *    by the time this is called, the page is locked.)
394  * 2) We might have raced with truncate and want to avoid performing
395  *    write RPCs that are just going to be thrown away by the
396  *    truncate's punch on the storage targets.
397  *
398  * The kms serves these purposes as it is set at both truncate and extending
399  * writes.
400  */
401 static int ll_ap_refresh_count(void *data, int cmd)
402 {
403         struct ll_inode_info *lli;
404         struct ll_async_page *llap;
405         struct lov_stripe_md *lsm;
406         struct page *page;
407         struct inode *inode;
408         struct ost_lvb lvb;
409         __u64 kms;
410         ENTRY;
411
412         /* readpage queues with _COUNT_STABLE, shouldn't get here. */
413         LASSERT(cmd != OBD_BRW_READ);
414
415         llap = LLAP_FROM_COOKIE(data);
416         page = llap->llap_page;
417         inode = page->mapping->host;
418         lli = ll_i2info(inode);
419         lsm = lli->lli_smd;
420
421         lov_stripe_lock(lsm);
422         inode_init_lvb(inode, &lvb);
423         obd_merge_lvb(ll_i2dtexp(inode), lsm, &lvb, 1);
424         kms = lvb.lvb_size;
425         lov_stripe_unlock(lsm);
426
427         /* catch race with truncate */
428         if (((__u64)page->index << CFS_PAGE_SHIFT) >= kms)
429                 return 0;
430
431         /* catch sub-page write at end of file */
432         if (((__u64)page->index << CFS_PAGE_SHIFT) + CFS_PAGE_SIZE > kms)
433                 return kms % CFS_PAGE_SIZE;
434
435         return CFS_PAGE_SIZE;
436 }
437
438 void ll_inode_fill_obdo(struct inode *inode, int cmd, struct obdo *oa)
439 {
440         struct lov_stripe_md *lsm;
441         obd_flag valid_flags;
442
443         lsm = ll_i2info(inode)->lli_smd;
444
445         oa->o_id = lsm->lsm_object_id;
446         oa->o_gr = lsm->lsm_object_gr;
447         oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
448         valid_flags = OBD_MD_FLTYPE | OBD_MD_FLATIME;
449         if (cmd & OBD_BRW_WRITE) {
450                 oa->o_valid |= OBD_MD_FLEPOCH;
451                 oa->o_easize = ll_i2info(inode)->lli_ioepoch;
452
453                 valid_flags |= OBD_MD_FLMTIME | OBD_MD_FLCTIME |
454                         OBD_MD_FLUID | OBD_MD_FLGID |
455                         OBD_MD_FLFID | OBD_MD_FLGENER;
456         }
457
458         obdo_from_inode(oa, inode, valid_flags);
459 }
460
461 static void ll_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
462 {
463         struct ll_async_page *llap;
464         ENTRY;
465
466         llap = LLAP_FROM_COOKIE(data);
467         ll_inode_fill_obdo(llap->llap_page->mapping->host, cmd, oa);
468
469         EXIT;
470 }
471
472 static void ll_ap_update_obdo(void *data, int cmd, struct obdo *oa,
473                               obd_valid valid)
474 {
475         struct ll_async_page *llap;
476         ENTRY;
477
478         llap = LLAP_FROM_COOKIE(data);
479         obdo_from_inode(oa, llap->llap_page->mapping->host, valid);
480
481         EXIT;
482 }
483
484 static struct obd_capa *ll_ap_lookup_capa(void *data, int cmd)
485 {
486         struct ll_async_page *llap = LLAP_FROM_COOKIE(data);
487         int opc = cmd & OBD_BRW_WRITE ? CAPA_OPC_OSS_WRITE : CAPA_OPC_OSS_RW;
488
489         return ll_osscapa_get(llap->llap_page->mapping->host, opc);
490 }
491
492 static struct obd_async_page_ops ll_async_page_ops = {
493         .ap_make_ready =        ll_ap_make_ready,
494         .ap_refresh_count =     ll_ap_refresh_count,
495         .ap_fill_obdo =         ll_ap_fill_obdo,
496         .ap_update_obdo =       ll_ap_update_obdo,
497         .ap_completion =        ll_ap_completion,
498         .ap_lookup_capa =       ll_ap_lookup_capa,
499 };
500
501 struct ll_async_page *llap_cast_private(struct page *page)
502 {
503         struct ll_async_page *llap = (struct ll_async_page *)page_private(page);
504
505         LASSERTF(llap == NULL || llap->llap_magic == LLAP_MAGIC,
506                  "page %p private %lu gave magic %d which != %d\n",
507                  page, page_private(page), llap->llap_magic, LLAP_MAGIC);
508
509         return llap;
510 }
511
512 /* Try to reap @target pages in the specific @cpu's async page list.
513  *
514  * There is an llap attached onto every page in lustre, linked off @sbi.
515  * We add an llap to the list so we don't lose our place during list walking.
516  * If llaps in the list are being moved they will only move to the end
517  * of the LRU, and we aren't terribly interested in those pages here (we
518  * start at the beginning of the list where the least-used llaps are. */
519 static inline int llap_shrink_cache_internal(struct ll_sb_info *sbi, 
520         int cpu, int target)
521 {
522         struct ll_async_page *llap, dummy_llap = { .llap_magic = 0xd11ad11a };
523         struct ll_pglist_data *pd;
524         struct list_head *head;
525         int count = 0;
526
527         pd = ll_pglist_cpu_lock(sbi, cpu);
528         head = &pd->llpd_list;
529         list_add(&dummy_llap.llap_pglist_item, head);
530         while (count < target) {
531                 struct page *page;
532                 int keep;
533
534                 if (unlikely(need_resched())) {
535                         ll_pglist_cpu_unlock(sbi, cpu);
536                         cond_resched();
537                         ll_pglist_cpu_lock(sbi, cpu);
538                 }
539
540                 llap = llite_pglist_next_llap(head, 
541                         &dummy_llap.llap_pglist_item);
542                 list_del_init(&dummy_llap.llap_pglist_item);
543                 if (llap == NULL)
544                         break;
545
546                 page = llap->llap_page;
547                 LASSERT(page != NULL);
548
549                 list_add(&dummy_llap.llap_pglist_item, &llap->llap_pglist_item);
550
551                 /* Page needs/undergoing IO */
552                 if (TryLockPage(page)) {
553                         LL_CDEBUG_PAGE(D_PAGE, page, "can't lock\n");
554                         continue;
555                 }
556
557                keep = (llap->llap_write_queued || PageDirty(page) ||
558                       PageWriteback(page) || (!PageUptodate(page) &&
559                       llap->llap_origin != LLAP_ORIGIN_READAHEAD));
560
561                 LL_CDEBUG_PAGE(D_PAGE, page,"%s LRU page: %s%s%s%s%s origin %s\n",
562                                keep ? "keep" : "drop",
563                                llap->llap_write_queued ? "wq " : "",
564                                PageDirty(page) ? "pd " : "",
565                                PageUptodate(page) ? "" : "!pu ",
566                                PageWriteback(page) ? "wb" : "",
567                                llap->llap_defer_uptodate ? "" : "!du",
568                                llap_origins[llap->llap_origin]);
569
570                 /* If page is dirty or undergoing IO don't discard it */
571                 if (keep) {
572                         unlock_page(page);
573                         continue;
574                 }
575
576                 page_cache_get(page);
577                 ll_pglist_cpu_unlock(sbi, cpu);
578
579                 if (page->mapping != NULL) {
580                         ll_teardown_mmaps(page->mapping,
581                                          (__u64)page->index << CFS_PAGE_SHIFT,
582                                          ((__u64)page->index << CFS_PAGE_SHIFT)|
583                                           ~CFS_PAGE_MASK);
584                         if (!PageDirty(page) && !page_mapped(page)) {
585                                 ll_ra_accounting(llap, page->mapping);
586                                 ll_truncate_complete_page(page);
587                                 ++count;
588                         } else {
589                                 LL_CDEBUG_PAGE(D_PAGE, page, "Not dropping page"
590                                                              " because it is "
591                                                              "%s\n",
592                                                               PageDirty(page)?
593                                                               "dirty":"mapped");
594                         }
595                 }
596                 unlock_page(page);
597                 page_cache_release(page);
598
599                 ll_pglist_cpu_lock(sbi, cpu);
600         }
601         list_del(&dummy_llap.llap_pglist_item);
602         ll_pglist_cpu_unlock(sbi, cpu);
603
604         CDEBUG(D_CACHE, "shrank %d, expected %d however. \n", count, target);
605         return count;
606 }
607
608
609 /* Try to shrink the page cache for the @sbi filesystem by 1/@shrink_fraction.
610  *
611  * At first, this code calculates total pages wanted by @shrink_fraction, then
612  * it deduces how many pages should be reaped from each cpu in proportion as 
613  * their own # of page count(llpd_count).
614  */
615 int llap_shrink_cache(struct ll_sb_info *sbi, int shrink_fraction)
616 {
617         unsigned long total, want, percpu_want, count = 0;
618         int cpu, nr_cpus;
619
620         total = lcounter_read(&sbi->ll_async_page_count);
621         if (total == 0)
622                 return 0;
623
624 #ifdef HAVE_SHRINKER_CACHE
625         want = shrink_fraction;
626         if (want == 0)
627                 return total;
628 #else
629         /* There can be a large number of llaps (600k or more in a large
630          * memory machine) so the VM 1/6 shrink ratio is likely too much.
631          * Since we are freeing pages also, we don't necessarily want to
632          * shrink so much.  Limit to 40MB of pages + llaps per call. */
633         if (shrink_fraction <= 0)
634                 want = total - sbi->ll_async_page_max + 32*num_online_cpus();
635         else
636                 want = (total + shrink_fraction - 1) / shrink_fraction;
637 #endif
638
639         if (want > 40 << (20 - CFS_PAGE_SHIFT))
640                 want = 40 << (20 - CFS_PAGE_SHIFT);
641
642         CDEBUG(D_CACHE, "shrinking %lu of %lu pages (1/%d)\n",
643                want, total, shrink_fraction);
644
645         nr_cpus = num_possible_cpus();
646         cpu = sbi->ll_async_page_clock_hand;
647         /* we at most do one round */
648         do {
649                 int c;
650
651                 cpu = (cpu + 1) % nr_cpus;
652                 c = LL_PGLIST_DATA_CPU(sbi, cpu)->llpd_count;
653                 if (!cpu_online(cpu))
654                         percpu_want = c;
655                 else
656                         percpu_want = want / ((total / (c + 1)) + 1);
657                 if (percpu_want == 0)
658                         continue;
659
660                 count += llap_shrink_cache_internal(sbi, cpu, percpu_want);
661                 if (count >= want)
662                         sbi->ll_async_page_clock_hand = cpu;
663         } while (cpu != sbi->ll_async_page_clock_hand);
664
665         CDEBUG(D_CACHE, "shrank %lu/%lu and left %lu unscanned\n",
666                count, want, total);
667
668 #ifdef HAVE_SHRINKER_CACHE
669         return lcounter_read(&sbi->ll_async_page_count);
670 #else
671         return count;
672 #endif
673 }
674
675 /* Rebalance the async page queue len for each cpu. We hope that the cpu
676  * which do much IO job has a relative longer queue len.
677  * This function should be called with preempt disabled.
678  */
679 static inline int llap_async_cache_rebalance(struct ll_sb_info *sbi)
680 {
681         unsigned long sample = 0, *cpu_sample, bias, slice;
682         struct ll_pglist_data *pd;
683         cpumask_t mask;
684         int cpu, surplus;
685         int w1 = 7, w2 = 3, base = (w1 + w2); /* weight value */
686         atomic_t *pcnt;
687
688         if (!spin_trylock(&sbi->ll_async_page_reblnc_lock)) {
689                 /* someone else is doing the job */
690                 return 1;
691         }
692
693         pcnt = &LL_PGLIST_DATA(sbi)->llpd_sample_count;
694         if (!atomic_read(pcnt)) {
695                 /* rare case, somebody else has gotten this job done */
696                 spin_unlock(&sbi->ll_async_page_reblnc_lock);
697                 return 1;
698         }
699
700         sbi->ll_async_page_reblnc_count++;
701         cpu_sample = sbi->ll_async_page_sample;
702         memset(cpu_sample, 0, num_possible_cpus() * sizeof(unsigned long));
703         for_each_online_cpu(cpu) {
704                 pcnt = &LL_PGLIST_DATA_CPU(sbi, cpu)->llpd_sample_count;
705                 cpu_sample[cpu] = atomic_read(pcnt);
706                 atomic_set(pcnt, 0);
707                 sample += cpu_sample[cpu];
708         }
709
710         cpus_clear(mask);
711         surplus = sbi->ll_async_page_max;
712         slice = surplus / sample + 1;
713         sample /= num_online_cpus();
714         bias = sample >> 4;
715         for_each_online_cpu(cpu) {
716                 pd = LL_PGLIST_DATA_CPU(sbi, cpu);
717                 if (labs((long int)sample - cpu_sample[cpu]) > bias) {
718                         unsigned long budget = pd->llpd_budget;
719                         /* weighted original queue length and expected queue
720                          * length to avoid thrashing. */
721                         pd->llpd_budget = (budget * w1) / base +
722                                         (slice * cpu_sample[cpu]) * w2 / base;
723                         cpu_set(cpu, mask);
724                 }
725                 surplus -= pd->llpd_budget;
726         }
727         surplus /= cpus_weight(mask) ?: 1;
728         for_each_cpu_mask(cpu, mask)
729                 LL_PGLIST_DATA_CPU(sbi, cpu)->llpd_budget += surplus;
730         spin_unlock(&sbi->ll_async_page_reblnc_lock);
731
732         /* TODO: do we really need to call llap_shrink_cache_internal 
733          * for every cpus with its page_count greater than budget?
734          * for_each_cpu_mask(cpu, mask) 
735          *      ll_shrink_cache_internal(...) 
736          */
737
738         return 0;
739 }
740
741 static struct ll_async_page *llap_from_page_with_lockh(struct page *page,
742                                                        unsigned origin,
743                                                        struct lustre_handle *lockh)
744 {
745         struct ll_async_page *llap;
746         struct obd_export *exp;
747         struct inode *inode = page->mapping->host;
748         struct ll_sb_info *sbi;
749         struct ll_pglist_data *pd;
750         int rc, cpu, target;
751         ENTRY;
752
753         if (!inode) {
754                 static int triggered;
755
756                 if (!triggered) {
757                         LL_CDEBUG_PAGE(D_ERROR, page, "Bug 10047. Wrong anon "
758                                        "page received\n");
759                         libcfs_debug_dumpstack(NULL);
760                         triggered = 1;
761                 }
762                 RETURN(ERR_PTR(-EINVAL));
763         }
764         sbi = ll_i2sbi(inode);
765         LASSERT(ll_async_page_slab);
766         LASSERTF(origin < LLAP__ORIGIN_MAX, "%u\n", origin);
767
768         llap = llap_cast_private(page);
769         if (llap != NULL) {
770                 /* move to end of LRU list, except when page is just about to
771                  * die */
772                 if (origin != LLAP_ORIGIN_REMOVEPAGE) {
773                         int old_cpu = llap->llap_pglist_cpu;
774                         struct ll_pglist_data *old_pd;
775
776                         pd = ll_pglist_double_lock(sbi, old_cpu, &old_pd);
777                         pd->llpd_hit++;
778                         while (old_cpu != llap->llap_pglist_cpu) {
779                                 /* rarely case, someone else is touching this
780                                  * page too. */
781                                 ll_pglist_double_unlock(sbi, old_cpu);
782                                 old_cpu = llap->llap_pglist_cpu;
783                                 pd=ll_pglist_double_lock(sbi, old_cpu, &old_pd);
784                         }
785
786                         list_move(&llap->llap_pglist_item,
787                                   &pd->llpd_list);
788                         old_pd->llpd_gen++;
789                         if (pd->llpd_cpu != old_cpu) {
790                                 pd->llpd_count++;
791                                 old_pd->llpd_count--;
792                                 old_pd->llpd_gen++;
793                                 llap->llap_pglist_cpu = pd->llpd_cpu;
794                                 pd->llpd_cross++;
795                         }
796                         ll_pglist_double_unlock(sbi, old_cpu);
797                 }
798                 GOTO(out, llap);
799         }
800
801         exp = ll_i2dtexp(page->mapping->host);
802         if (exp == NULL)
803                 RETURN(ERR_PTR(-EINVAL));
804
805         /* limit the number of lustre-cached pages */
806         cpu = get_cpu();
807         pd = LL_PGLIST_DATA(sbi);
808         target = pd->llpd_count - pd->llpd_budget;
809         if (target > 0) {
810                 rc = 0;
811                 atomic_inc(&pd->llpd_sample_count);
812                 if (atomic_read(&pd->llpd_sample_count) > 
813                     sbi->ll_async_page_sample_max) {
814                         pd->llpd_reblnc_count++;
815                         rc = llap_async_cache_rebalance(sbi);
816                         if (rc == 0)
817                                 target = pd->llpd_count - pd->llpd_budget;
818                 }
819                 /* if rc equals 1, it means other cpu is doing the rebalance
820                  * job, and our budget # would be modified when we read it. 
821                  * Furthermore, it is much likely being increased because
822                  * we have already reached the rebalance threshold. In this
823                  * case, we skip to shrink cache here. */
824                 if ((rc == 0) && target > 0)
825                         llap_shrink_cache_internal(sbi, cpu, target + 32);
826         }
827         put_cpu();
828
829         OBD_SLAB_ALLOC(llap, ll_async_page_slab, CFS_ALLOC_STD,
830                        ll_async_page_slab_size);
831         if (llap == NULL)
832                 RETURN(ERR_PTR(-ENOMEM));
833         llap->llap_magic = LLAP_MAGIC;
834         llap->llap_cookie = (void *)llap + size_round(sizeof(*llap));
835
836         /* XXX: for bug 11270 - check for lockless origin here! */
837         if (origin == LLAP_ORIGIN_LOCKLESS_IO)
838                 llap->llap_nocache = 1;
839
840         rc = obd_prep_async_page(exp, ll_i2info(inode)->lli_smd, NULL, page,
841                                  (obd_off)page->index << CFS_PAGE_SHIFT,
842                                  &ll_async_page_ops, llap, &llap->llap_cookie,
843                                  llap->llap_nocache, lockh);
844         if (rc) {
845                 OBD_SLAB_FREE(llap, ll_async_page_slab,
846                               ll_async_page_slab_size);
847                 RETURN(ERR_PTR(rc));
848         }
849
850         CDEBUG(D_CACHE, "llap %p page %p cookie %p obj off "LPU64"\n", llap,
851                page, llap->llap_cookie, (obd_off)page->index << CFS_PAGE_SHIFT);
852         /* also zeroing the PRIVBITS low order bitflags */
853         __set_page_ll_data(page, llap);
854         llap->llap_page = page;
855
856         lcounter_inc(&sbi->ll_async_page_count);
857         pd = ll_pglist_lock(sbi);
858         list_add_tail(&llap->llap_pglist_item, &pd->llpd_list);
859         INIT_LIST_HEAD(&llap->llap_pending_write);
860         pd->llpd_count++;
861         pd->llpd_gen++;
862         pd->llpd_miss++;
863         llap->llap_pglist_cpu = pd->llpd_cpu;
864         ll_pglist_unlock(sbi);
865
866  out:
867         if (unlikely(sbi->ll_flags & LL_SBI_CHECKSUM)) {
868                 __u32 csum;
869                 char *kaddr = kmap_atomic(page, KM_USER0);
870                 csum = init_checksum(OSC_DEFAULT_CKSUM);
871                 csum = compute_checksum(csum, kaddr, CFS_PAGE_SIZE,
872                                         OSC_DEFAULT_CKSUM);
873                 kunmap_atomic(kaddr, KM_USER0);
874                 if (origin == LLAP_ORIGIN_READAHEAD ||
875                     origin == LLAP_ORIGIN_READPAGE ||
876                     origin == LLAP_ORIGIN_LOCKLESS_IO) {
877                         llap->llap_checksum = 0;
878                 } else if (origin == LLAP_ORIGIN_COMMIT_WRITE ||
879                            llap->llap_checksum == 0) {
880                         llap->llap_checksum = csum;
881                         CDEBUG(D_PAGE, "page %p cksum %x\n", page, csum);
882                 } else if (llap->llap_checksum == csum) {
883                         /* origin == LLAP_ORIGIN_WRITEPAGE */
884                         CDEBUG(D_PAGE, "page %p cksum %x confirmed\n",
885                                page, csum);
886                 } else {
887                         /* origin == LLAP_ORIGIN_WRITEPAGE */
888                         LL_CDEBUG_PAGE(D_ERROR, page, "old cksum %x != new "
889                                        "%x!\n", llap->llap_checksum, csum);
890                 }
891         }
892
893         llap->llap_origin = origin;
894         RETURN(llap);
895 }
896
897 struct ll_async_page *llap_from_page(struct page *page,
898                                      unsigned origin)
899 {
900         return llap_from_page_with_lockh(page, origin, NULL);
901 }
902
903 static int queue_or_sync_write(struct obd_export *exp, struct inode *inode,
904                                struct ll_async_page *llap,
905                                unsigned to, obd_flag async_flags)
906 {
907         unsigned long size_index = i_size_read(inode) >> CFS_PAGE_SHIFT;
908         struct obd_io_group *oig;
909         struct ll_sb_info *sbi = ll_i2sbi(inode);
910         int rc, noquot = llap->llap_ignore_quota ? OBD_BRW_NOQUOTA : 0;
911         ENTRY;
912
913         /* _make_ready only sees llap once we've unlocked the page */
914         llap->llap_write_queued = 1;
915         rc = obd_queue_async_io(exp, ll_i2info(inode)->lli_smd, NULL,
916                                 llap->llap_cookie, OBD_BRW_WRITE | noquot,
917                                 0, 0, 0, async_flags);
918         if (rc == 0) {
919                 LL_CDEBUG_PAGE(D_PAGE, llap->llap_page, "write queued\n");
920                 GOTO(out, 0);
921         }
922
923         llap->llap_write_queued = 0;
924         /* Do not pass llap here as it is sync write. */
925         llap_write_pending(inode, NULL);
926
927         rc = oig_init(&oig);
928         if (rc)
929                 GOTO(out, rc);
930
931         /* make full-page requests if we are not at EOF (bug 4410) */
932         if (to != CFS_PAGE_SIZE && llap->llap_page->index < size_index) {
933                 LL_CDEBUG_PAGE(D_PAGE, llap->llap_page,
934                                "sync write before EOF: size_index %lu, to %d\n",
935                                size_index, to);
936                 to = CFS_PAGE_SIZE;
937         } else if (to != CFS_PAGE_SIZE && llap->llap_page->index == size_index) {
938                 int size_to = i_size_read(inode) & ~CFS_PAGE_MASK;
939                 LL_CDEBUG_PAGE(D_PAGE, llap->llap_page,
940                                "sync write at EOF: size_index %lu, to %d/%d\n",
941                                size_index, to, size_to);
942                 if (to < size_to)
943                         to = size_to;
944         }
945
946         /* compare the checksum once before the page leaves llite */
947         if (unlikely((sbi->ll_flags & LL_SBI_CHECKSUM) &&
948                      llap->llap_checksum != 0)) {
949                 __u32 csum;
950                 struct page *page = llap->llap_page;
951                 char *kaddr = kmap_atomic(page, KM_USER0);
952                 csum = init_checksum(OSC_DEFAULT_CKSUM);
953                 csum = compute_checksum(csum, kaddr, CFS_PAGE_SIZE,
954                                         OSC_DEFAULT_CKSUM);
955                 kunmap_atomic(kaddr, KM_USER0);
956                 if (llap->llap_checksum == csum) {
957                         CDEBUG(D_PAGE, "page %p cksum %x confirmed\n",
958                                page, csum);
959                 } else {
960                         CERROR("page %p old cksum %x != new cksum %x!\n",
961                                page, llap->llap_checksum, csum);
962                 }
963         }
964
965         rc = obd_queue_group_io(exp, ll_i2info(inode)->lli_smd, NULL, oig,
966                                 llap->llap_cookie, OBD_BRW_WRITE | noquot,
967                                 0, to, 0, ASYNC_READY | ASYNC_URGENT |
968                                 ASYNC_COUNT_STABLE | ASYNC_GROUP_SYNC);
969         if (rc)
970                 GOTO(free_oig, rc);
971
972         rc = obd_trigger_group_io(exp, ll_i2info(inode)->lli_smd, NULL, oig);
973         if (rc)
974                 GOTO(free_oig, rc);
975
976         rc = oig_wait(oig);
977
978         if (!rc && async_flags & ASYNC_READY) {
979                 unlock_page(llap->llap_page);
980                 if (PageWriteback(llap->llap_page))
981                         end_page_writeback(llap->llap_page);
982         }
983
984         if (rc == 0 && llap_write_complete(inode, llap))
985                 ll_queue_done_writing(inode, 0);
986
987         LL_CDEBUG_PAGE(D_PAGE, llap->llap_page, "sync write returned %d\n", rc);
988
989 free_oig:
990         oig_release(oig);
991 out:
992         RETURN(rc);
993 }
994
995 /* update our write count to account for i_size increases that may have
996  * happened since we've queued the page for io. */
997
998 /* be careful not to return success without setting the page Uptodate or
999  * the next pass through prepare_write will read in stale data from disk. */
1000 int ll_commit_write(struct file *file, struct page *page, unsigned from,
1001                     unsigned to)
1002 {
1003         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1004         struct inode *inode = page->mapping->host;
1005         struct ll_inode_info *lli = ll_i2info(inode);
1006         struct lov_stripe_md *lsm = lli->lli_smd;
1007         struct obd_export *exp;
1008         struct ll_async_page *llap;
1009         loff_t size;
1010         struct lustre_handle *lockh = NULL;
1011         int rc = 0;
1012         ENTRY;
1013
1014         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1015         LASSERT(inode == file->f_dentry->d_inode);
1016         LASSERT(PageLocked(page));
1017
1018         CDEBUG(D_INODE, "inode %p is writing page %p from %d to %d at %lu\n",
1019                inode, page, from, to, page->index);
1020
1021         if (fd->fd_flags & LL_FILE_GROUP_LOCKED)
1022                 lockh = &fd->fd_cwlockh;
1023
1024         llap = llap_from_page_with_lockh(page, LLAP_ORIGIN_COMMIT_WRITE, lockh);
1025         if (IS_ERR(llap))
1026                 RETURN(PTR_ERR(llap));
1027
1028         exp = ll_i2dtexp(inode);
1029         if (exp == NULL)
1030                 RETURN(-EINVAL);
1031
1032         llap->llap_ignore_quota = cfs_capable(CFS_CAP_SYS_RESOURCE);
1033
1034         /*
1035          * queue a write for some time in the future the first time we
1036          * dirty the page.
1037          *
1038          * This is different from what other file systems do: they usually
1039          * just mark page (and some of its buffers) dirty and rely on
1040          * balance_dirty_pages() to start a write-back. Lustre wants write-back
1041          * to be started earlier for the following reasons:
1042          *
1043          *     (1) with a large number of clients we need to limit the amount
1044          *     of cached data on the clients a lot;
1045          *
1046          *     (2) large compute jobs generally want compute-only then io-only
1047          *     and the IO should complete as quickly as possible;
1048          *
1049          *     (3) IO is batched up to the RPC size and is async until the
1050          *     client max cache is hit
1051          *     (/proc/fs/lustre/osc/OSC.../max_dirty_mb)
1052          *
1053          */
1054         if (!PageDirty(page)) {
1055                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_DIRTY_MISSES, 1);
1056
1057                 rc = queue_or_sync_write(exp, inode, llap, to, 0);
1058                 if (rc)
1059                         GOTO(out, rc);
1060         } else {
1061                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_DIRTY_HITS, 1);
1062         }
1063
1064         /* put the page in the page cache, from now on ll_removepage is
1065          * responsible for cleaning up the llap.
1066          * only set page dirty when it's queued to be write out */
1067         if (llap->llap_write_queued)
1068                 set_page_dirty(page);
1069
1070 out:
1071         size = (((obd_off)page->index) << CFS_PAGE_SHIFT) + to;
1072         ll_inode_size_lock(inode, 0);
1073         if (rc == 0) {
1074                 lov_stripe_lock(lsm);
1075                 obd_adjust_kms(exp, lsm, size, 0);
1076                 lov_stripe_unlock(lsm);
1077                 if (size > i_size_read(inode))
1078                         i_size_write(inode, size);
1079                 SetPageUptodate(page);
1080         } else if (size > i_size_read(inode)) {
1081                 /* this page beyond the pales of i_size, so it can't be
1082                  * truncated in ll_p_r_e during lock revoking. we must
1083                  * teardown our book-keeping here. */
1084                 ll_removepage(page);
1085         }
1086         ll_inode_size_unlock(inode, 0);
1087         RETURN(rc);
1088 }
1089
1090 static void ll_ra_stats_inc_sbi(struct ll_sb_info *sbi, enum ra_stat which);
1091
1092 /* WARNING: This algorithm is used to reduce the contention on 
1093  * sbi->ll_lock. It should work well if the ra_max_pages is much 
1094  * greater than the single file's read-ahead window.
1095  *
1096  * TODO: There may exist a `global sync problem' in this implementation. 
1097  * Considering the global ra window is 100M, and each file's ra window is 10M,
1098  * there are over 10 files trying to get its ra budget and reach 
1099  * ll_ra_count_get at the exactly same time. All of them will get a zero ra
1100  * window, although the global window is 100M. -jay
1101  */
1102 static unsigned long ll_ra_count_get(struct ll_sb_info *sbi, unsigned long len)
1103 {
1104         struct ll_ra_info *ra = &sbi->ll_ra_info;
1105         unsigned long ret;
1106         ENTRY;
1107
1108         ret = min(ra->ra_max_pages - atomic_read(&ra->ra_cur_pages), len);
1109         if ((int)ret < 0)
1110                 GOTO(out, ret = 0);
1111
1112         if (atomic_add_return(ret, &ra->ra_cur_pages) > ra->ra_max_pages) {
1113                 atomic_sub(ret, &ra->ra_cur_pages);
1114                 ret = 0;
1115         }
1116 out:
1117         RETURN(ret);
1118 }
1119
1120 static void ll_ra_count_put(struct ll_sb_info *sbi, unsigned long len)
1121 {
1122         struct ll_ra_info *ra = &sbi->ll_ra_info;
1123         atomic_sub(len, &ra->ra_cur_pages);
1124 }
1125
1126 /* called for each page in a completed rpc.*/
1127 int ll_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
1128 {
1129         struct ll_async_page *llap;
1130         struct page *page;
1131         int ret = 0;
1132         ENTRY;
1133
1134         llap = LLAP_FROM_COOKIE(data);
1135         page = llap->llap_page;
1136         LASSERT(PageLocked(page));
1137         LASSERT(CheckWriteback(page,cmd));
1138
1139         LL_CDEBUG_PAGE(D_PAGE, page, "completing cmd %d with %d\n", cmd, rc);
1140
1141         if (cmd & OBD_BRW_READ && llap->llap_defer_uptodate)
1142                 ll_ra_count_put(ll_i2sbi(page->mapping->host), 1);
1143
1144         if (rc == 0)  {
1145                 if (cmd & OBD_BRW_READ) {
1146                         if (!llap->llap_defer_uptodate)
1147                                 SetPageUptodate(page);
1148                 } else {
1149                         llap->llap_write_queued = 0;
1150                 }
1151                 ClearPageError(page);
1152         } else {
1153                 if (cmd & OBD_BRW_READ) {
1154                         llap->llap_defer_uptodate = 0;
1155                 }
1156                 SetPageError(page);
1157                 if (rc == -ENOSPC)
1158                         set_bit(AS_ENOSPC, &page->mapping->flags);
1159                 else
1160                         set_bit(AS_EIO, &page->mapping->flags);
1161         }
1162
1163         /* be carefull about clear WB.
1164          * if WB will cleared after page lock is released - paralel IO can be
1165          * started before ap_make_ready is finished - so we will be have page
1166          * with PG_Writeback set from ->writepage() and completed READ which
1167          * clear this flag */
1168         if ((cmd & OBD_BRW_WRITE) && PageWriteback(page))
1169                 end_page_writeback(page);
1170
1171         unlock_page(page);
1172
1173         if (cmd & OBD_BRW_WRITE) {
1174                 /* Only rc == 0, write succeed, then this page could be deleted
1175                  * from the pending_writing list
1176                  */
1177                 if (rc == 0 && llap_write_complete(page->mapping->host, llap))
1178                         ll_queue_done_writing(page->mapping->host, 0);
1179         }
1180
1181         page_cache_release(page);
1182
1183         RETURN(ret);
1184 }
1185
1186 static void __ll_put_llap(struct page *page)
1187 {
1188         struct inode *inode = page->mapping->host;
1189         struct obd_export *exp;
1190         struct ll_async_page *llap;
1191         struct ll_sb_info *sbi = ll_i2sbi(inode);
1192         struct ll_pglist_data *pd;
1193         int rc, cpu;
1194         ENTRY;
1195
1196         exp = ll_i2dtexp(inode);
1197         if (exp == NULL) {
1198                 CERROR("page %p ind %lu gave null export\n", page, page->index);
1199                 EXIT;
1200                 return;
1201         }
1202
1203         llap = llap_from_page(page, LLAP_ORIGIN_REMOVEPAGE);
1204         if (IS_ERR(llap)) {
1205                 CERROR("page %p ind %lu couldn't find llap: %ld\n", page,
1206                        page->index, PTR_ERR(llap));
1207                 EXIT;
1208                 return;
1209         }
1210
1211         if (llap_write_complete(inode, llap))
1212                 ll_queue_done_writing(inode, 0);
1213
1214         rc = obd_teardown_async_page(exp, ll_i2info(inode)->lli_smd, NULL,
1215                                      llap->llap_cookie);
1216         if (rc != 0)
1217                 CERROR("page %p ind %lu failed: %d\n", page, page->index, rc);
1218
1219         /* this unconditional free is only safe because the page lock
1220          * is providing exclusivity to memory pressure/truncate/writeback..*/
1221         __clear_page_ll_data(page);
1222
1223         lcounter_dec(&sbi->ll_async_page_count);
1224         cpu = llap->llap_pglist_cpu;
1225         pd = ll_pglist_cpu_lock(sbi, cpu);
1226         pd->llpd_gen++;
1227         pd->llpd_count--;
1228         if (!list_empty(&llap->llap_pglist_item))
1229                 list_del_init(&llap->llap_pglist_item);
1230         ll_pglist_cpu_unlock(sbi, cpu);
1231         OBD_SLAB_FREE(llap, ll_async_page_slab, ll_async_page_slab_size);
1232         EXIT;
1233 }
1234
1235 /* the kernel calls us here when a page is unhashed from the page cache.
1236  * the page will be locked and the kernel is holding a spinlock, so
1237  * we need to be careful.  we're just tearing down our book-keeping
1238  * here. */
1239 void ll_removepage(struct page *page)
1240 {
1241         struct ll_async_page *llap = llap_cast_private(page);
1242         ENTRY;
1243
1244         LASSERT(!in_interrupt());
1245
1246         /* sync pages or failed read pages can leave pages in the page
1247          * cache that don't have our data associated with them anymore */
1248         if (page_private(page) == 0) {
1249                 EXIT;
1250                 return;
1251         }
1252
1253         LASSERT(!llap->llap_lockless_io_page);
1254         LASSERT(!llap->llap_nocache);
1255         LL_CDEBUG_PAGE(D_PAGE, page, "being evicted\n");
1256         __ll_put_llap(page);
1257         EXIT;
1258 }
1259
1260 static int ll_issue_page_read(struct obd_export *exp,
1261                               struct ll_async_page *llap,
1262                               struct obd_io_group *oig, int defer)
1263 {
1264         struct page *page = llap->llap_page;
1265         int rc;
1266
1267         page_cache_get(page);
1268         llap->llap_defer_uptodate = defer;
1269         llap->llap_ra_used = 0;
1270         rc = obd_queue_group_io(exp, ll_i2info(page->mapping->host)->lli_smd,
1271                                 NULL, oig, llap->llap_cookie, OBD_BRW_READ, 0,
1272                                 CFS_PAGE_SIZE, 0, ASYNC_COUNT_STABLE |
1273                                                   ASYNC_READY | ASYNC_URGENT);
1274         if (rc) {
1275                 LL_CDEBUG_PAGE(D_ERROR, page, "read queue failed: rc %d\n", rc);
1276                 page_cache_release(page);
1277         }
1278         RETURN(rc);
1279 }
1280
1281 static void ll_ra_stats_inc_sbi(struct ll_sb_info *sbi, enum ra_stat which)
1282 {
1283         LASSERTF(which >= 0 && which < _NR_RA_STAT, "which: %u\n", which);
1284         lprocfs_counter_incr(sbi->ll_ra_stats, which);
1285 }
1286
1287 static void ll_ra_stats_inc(struct address_space *mapping, enum ra_stat which)
1288 {
1289         struct ll_sb_info *sbi = ll_i2sbi(mapping->host);
1290         ll_ra_stats_inc_sbi(sbi, which);
1291 }
1292
1293 void ll_ra_accounting(struct ll_async_page *llap, struct address_space *mapping)
1294 {
1295         if (!llap->llap_defer_uptodate || llap->llap_ra_used)
1296                 return;
1297
1298         ll_ra_stats_inc(mapping, RA_STAT_DISCARDED);
1299 }
1300
1301 #define RAS_CDEBUG(ras) \
1302         CDEBUG(D_READA,                                                      \
1303                "lrp %lu cr %lu cp %lu ws %lu wl %lu nra %lu r %lu ri %lu"    \
1304                "csr %lu sf %lu sp %lu sl %lu \n",                            \
1305                ras->ras_last_readpage, ras->ras_consecutive_requests,        \
1306                ras->ras_consecutive_pages, ras->ras_window_start,            \
1307                ras->ras_window_len, ras->ras_next_readahead,                 \
1308                ras->ras_requests, ras->ras_request_index,                    \
1309                ras->ras_consecutive_stride_requests, ras->ras_stride_offset, \
1310                ras->ras_stride_pages, ras->ras_stride_length)
1311
1312 static int index_in_window(unsigned long index, unsigned long point,
1313                            unsigned long before, unsigned long after)
1314 {
1315         unsigned long start = point - before, end = point + after;
1316
1317         if (start > point)
1318                start = 0;
1319         if (end < point)
1320                end = ~0;
1321
1322         return start <= index && index <= end;
1323 }
1324
1325 static struct ll_readahead_state *ll_ras_get(struct file *f)
1326 {
1327         struct ll_file_data       *fd;
1328
1329         fd = LUSTRE_FPRIVATE(f);
1330         return &fd->fd_ras;
1331 }
1332
1333 void ll_ra_read_in(struct file *f, struct ll_ra_read *rar)
1334 {
1335         struct ll_readahead_state *ras;
1336
1337         ras = ll_ras_get(f);
1338
1339         spin_lock(&ras->ras_lock);
1340         ras->ras_requests++;
1341         ras->ras_request_index = 0;
1342         ras->ras_consecutive_requests++;
1343         rar->lrr_reader = current;
1344
1345         list_add(&rar->lrr_linkage, &ras->ras_read_beads);
1346         spin_unlock(&ras->ras_lock);
1347 }
1348
1349 void ll_ra_read_ex(struct file *f, struct ll_ra_read *rar)
1350 {
1351         struct ll_readahead_state *ras;
1352
1353         ras = ll_ras_get(f);
1354
1355         spin_lock(&ras->ras_lock);
1356         list_del_init(&rar->lrr_linkage);
1357         spin_unlock(&ras->ras_lock);
1358 }
1359
1360 static struct ll_ra_read *ll_ra_read_get_locked(struct ll_readahead_state *ras)
1361 {
1362         struct ll_ra_read *scan;
1363
1364         list_for_each_entry(scan, &ras->ras_read_beads, lrr_linkage) {
1365                 if (scan->lrr_reader == current)
1366                         return scan;
1367         }
1368         return NULL;
1369 }
1370
1371 struct ll_ra_read *ll_ra_read_get(struct file *f)
1372 {
1373         struct ll_readahead_state *ras;
1374         struct ll_ra_read         *bead;
1375
1376         ras = ll_ras_get(f);
1377
1378         spin_lock(&ras->ras_lock);
1379         bead = ll_ra_read_get_locked(ras);
1380         spin_unlock(&ras->ras_lock);
1381         return bead;
1382 }
1383
1384 static int ll_read_ahead_page(struct obd_export *exp, struct obd_io_group *oig,
1385                               int index, struct address_space *mapping)
1386 {
1387         struct ll_async_page *llap;
1388         struct page *page;
1389         unsigned int gfp_mask = 0;
1390         int rc = 0;
1391
1392         gfp_mask = GFP_HIGHUSER & ~__GFP_WAIT;
1393 #ifdef __GFP_NOWARN
1394         gfp_mask |= __GFP_NOWARN;
1395 #endif
1396         page = grab_cache_page_nowait_gfp(mapping, index, gfp_mask);
1397         if (page == NULL) {
1398                 ll_ra_stats_inc(mapping, RA_STAT_FAILED_GRAB_PAGE);
1399                 CDEBUG(D_READA, "g_c_p_n failed\n");
1400                 return 0;
1401         }
1402
1403         /* Check if page was truncated or reclaimed */
1404         if (page->mapping != mapping) {
1405                 ll_ra_stats_inc(mapping, RA_STAT_WRONG_GRAB_PAGE);
1406                 CDEBUG(D_READA, "g_c_p_n returned invalid page\n");
1407                 GOTO(unlock_page, rc = 0);      
1408         }
1409
1410         /* we do this first so that we can see the page in the /proc
1411          * accounting */
1412         llap = llap_from_page(page, LLAP_ORIGIN_READAHEAD);
1413         if (IS_ERR(llap) || llap->llap_defer_uptodate) {
1414                 if (PTR_ERR(llap) == -ENOLCK) {
1415                         ll_ra_stats_inc(mapping, RA_STAT_FAILED_MATCH);
1416                         CDEBUG(D_READA | D_PAGE,
1417                                "Adding page to cache failed index "
1418                                 "%d\n", index);
1419                                 CDEBUG(D_READA, "nolock page\n");
1420                                 GOTO(unlock_page, rc = -ENOLCK);
1421                 }
1422                 CDEBUG(D_READA, "read-ahead page\n");
1423                 GOTO(unlock_page, rc = 0);      
1424         }
1425
1426         /* skip completed pages */
1427         if (Page_Uptodate(page))
1428                 GOTO(unlock_page, rc = 0);      
1429
1430         /* bail out when we hit the end of the lock. */
1431         rc = ll_issue_page_read(exp, llap, oig, 1);
1432         if (rc == 0) {
1433                 LL_CDEBUG_PAGE(D_READA | D_PAGE, page, "started read-ahead\n");
1434                 rc = 1;
1435         } else {
1436 unlock_page:    
1437                 unlock_page(page);
1438                 LL_CDEBUG_PAGE(D_READA | D_PAGE, page, "skipping read-ahead\n");
1439         }
1440         page_cache_release(page);
1441         return rc;
1442 }
1443
1444 /* ra_io_arg will be filled in the beginning of ll_readahead with
1445  * ras_lock, then the following ll_read_ahead_pages will read RA
1446  * pages according to this arg, all the items in this structure are
1447  * counted by page index.
1448  */
1449 struct ra_io_arg {
1450         unsigned long ria_start;  /* start offset of read-ahead*/
1451         unsigned long ria_end;    /* end offset of read-ahead*/
1452         /* If stride read pattern is detected, ria_stoff means where
1453          * stride read is started. Note: for normal read-ahead, the
1454          * value here is meaningless, and also it will not be accessed*/
1455         pgoff_t ria_stoff;
1456         /* ria_length and ria_pages are the length and pages length in the
1457          * stride I/O mode. And they will also be used to check whether
1458          * it is stride I/O read-ahead in the read-ahead pages*/
1459         unsigned long ria_length;
1460         unsigned long ria_pages;
1461 };
1462
1463 #define RIA_DEBUG(ria)                                                \
1464         CDEBUG(D_READA, "rs %lu re %lu ro %lu rl %lu rp %lu\n",       \
1465         ria->ria_start, ria->ria_end, ria->ria_stoff, ria->ria_length,\
1466         ria->ria_pages)
1467
1468 #define RAS_INCREASE_STEP (1024 * 1024 >> CFS_PAGE_SHIFT)
1469
1470 static inline int stride_io_mode(struct ll_readahead_state *ras)
1471 {
1472         return ras->ras_consecutive_stride_requests > 1;
1473 }
1474
1475 /* The function calculates how much pages will be read in
1476  * [off, off + length], which will be read by stride I/O mode,
1477  * stride_offset = st_off, stride_lengh = st_len,
1478  * stride_pages = st_pgs
1479  */
1480 static unsigned long
1481 stride_pg_count(pgoff_t st_off, unsigned long st_len, unsigned long st_pgs,
1482                 unsigned long off, unsigned length)
1483 {
1484         unsigned long cont_len = st_off > off ?  st_off - off : 0;
1485         __u64 stride_len = length + off > st_off ?
1486                            length + off + 1 - st_off : 0;
1487         unsigned long left, pg_count;
1488
1489         if (st_len == 0 || length == 0)
1490                 return length;
1491
1492         left = do_div(stride_len, st_len);
1493         left = min(left, st_pgs);
1494
1495         pg_count = left + stride_len * st_pgs + cont_len;
1496
1497         LASSERT(pg_count >= left);
1498
1499         CDEBUG(D_READA, "st_off %lu, st_len %lu st_pgs %lu off %lu length %u"
1500                "pgcount %lu\n", st_off, st_len, st_pgs, off, length, pg_count);
1501
1502         return pg_count;
1503 }
1504
1505 static int ria_page_count(struct ra_io_arg *ria)
1506 {
1507         __u64 length = ria->ria_end >= ria->ria_start ?
1508                        ria->ria_end - ria->ria_start + 1 : 0;
1509
1510         return stride_pg_count(ria->ria_stoff, ria->ria_length,
1511                                ria->ria_pages, ria->ria_start,
1512                                length);
1513 }
1514
1515 /*Check whether the index is in the defined ra-window */
1516 static int ras_inside_ra_window(unsigned long idx, struct ra_io_arg *ria)
1517 {
1518         /* If ria_length == ria_pages, it means non-stride I/O mode,
1519          * idx should always inside read-ahead window in this case
1520          * For stride I/O mode, just check whether the idx is inside
1521          * the ria_pages. */
1522         return ria->ria_length == 0 || ria->ria_length == ria->ria_pages ||
1523                (idx - ria->ria_stoff) % ria->ria_length < ria->ria_pages;
1524 }
1525
1526 static int ll_read_ahead_pages(struct obd_export *exp,
1527                                struct obd_io_group *oig,
1528                                struct ra_io_arg *ria,   
1529                                unsigned long *reserved_pages,
1530                                struct address_space *mapping,
1531                                unsigned long *ra_end)
1532 {
1533         int rc, count = 0, stride_ria;
1534         unsigned long page_idx;
1535
1536         LASSERT(ria != NULL);
1537         RIA_DEBUG(ria);
1538
1539         stride_ria = ria->ria_length > ria->ria_pages && ria->ria_pages > 0;
1540         for (page_idx = ria->ria_start; page_idx <= ria->ria_end &&
1541                         *reserved_pages > 0; page_idx++) {
1542                 if (ras_inside_ra_window(page_idx, ria)) {
1543                         /* If the page is inside the read-ahead window*/
1544                         rc = ll_read_ahead_page(exp, oig, page_idx, mapping);
1545                         if (rc == 1) {
1546                                 (*reserved_pages)--;
1547                                 count ++;
1548                         } else if (rc == -ENOLCK)
1549                                 break;
1550                 } else if (stride_ria) {
1551                         /* If it is not in the read-ahead window, and it is
1552                          * read-ahead mode, then check whether it should skip
1553                          * the stride gap */
1554                         pgoff_t offset;
1555                         /* FIXME: This assertion only is valid when it is for
1556                          * forward read-ahead, it will be fixed when backward
1557                          * read-ahead is implemented */
1558                         LASSERTF(page_idx > ria->ria_stoff, "since %lu in the"
1559                                 " gap of ra window,it should bigger than stride"
1560                                 " offset %lu \n", page_idx, ria->ria_stoff);
1561
1562                         offset = page_idx - ria->ria_stoff;
1563                         offset = offset % (ria->ria_length);
1564                         if (offset > ria->ria_pages) {
1565                                 page_idx += ria->ria_length - offset;
1566                                 CDEBUG(D_READA, "i %lu skip %lu \n", page_idx,
1567                                        ria->ria_length - offset);
1568                                 continue;
1569                         }
1570                 }
1571         }
1572         *ra_end = page_idx;
1573         return count;
1574 }
1575
1576 static int ll_readahead(struct ll_readahead_state *ras,
1577                          struct obd_export *exp, struct address_space *mapping,
1578                          struct obd_io_group *oig, int flags)
1579 {
1580         unsigned long start = 0, end = 0, reserved;
1581         unsigned long ra_end, len;
1582         struct inode *inode;
1583         struct lov_stripe_md *lsm;
1584         struct ll_ra_read *bead;
1585         struct ost_lvb lvb;
1586         struct ra_io_arg ria = { 0 };
1587         int ret = 0;
1588         __u64 kms;
1589         ENTRY;
1590
1591         inode = mapping->host;
1592         lsm = ll_i2info(inode)->lli_smd;
1593
1594         lov_stripe_lock(lsm);
1595         inode_init_lvb(inode, &lvb);
1596         obd_merge_lvb(ll_i2dtexp(inode), lsm, &lvb, 1);
1597         kms = lvb.lvb_size;
1598         lov_stripe_unlock(lsm);
1599         if (kms == 0) {
1600                 ll_ra_stats_inc(mapping, RA_STAT_ZERO_LEN);
1601                 RETURN(0);
1602         }
1603
1604         spin_lock(&ras->ras_lock);
1605         bead = ll_ra_read_get_locked(ras);
1606         /* Enlarge the RA window to encompass the full read */
1607         if (bead != NULL && ras->ras_window_start + ras->ras_window_len <
1608             bead->lrr_start + bead->lrr_count) {
1609                 ras->ras_window_len = bead->lrr_start + bead->lrr_count -
1610                                       ras->ras_window_start;
1611         }
1612         /* Reserve a part of the read-ahead window that we'll be issuing */
1613         if (ras->ras_window_len) {
1614                 start = ras->ras_next_readahead;
1615                 end = ras->ras_window_start + ras->ras_window_len - 1;
1616         }
1617         if (end != 0) {
1618                 /* Truncate RA window to end of file */
1619                 end = min(end, (unsigned long)((kms - 1) >> CFS_PAGE_SHIFT));
1620                 ras->ras_next_readahead = max(end, end + 1);
1621                 RAS_CDEBUG(ras);
1622         }
1623         ria.ria_start = start;
1624         ria.ria_end = end;
1625         /* If stride I/O mode is detected, get stride window*/
1626         if (stride_io_mode(ras)) {
1627                 ria.ria_stoff = ras->ras_stride_offset;
1628                 ria.ria_length = ras->ras_stride_length;
1629                 ria.ria_pages = ras->ras_stride_pages;
1630         }
1631         spin_unlock(&ras->ras_lock);
1632
1633         if (end == 0) {
1634                 ll_ra_stats_inc(mapping, RA_STAT_ZERO_WINDOW);
1635                 RETURN(0);
1636         }
1637         len = ria_page_count(&ria);
1638         if (len == 0)
1639                 RETURN(0);
1640
1641         reserved = ll_ra_count_get(ll_i2sbi(inode), len);
1642
1643         if (reserved < len)
1644                 ll_ra_stats_inc(mapping, RA_STAT_MAX_IN_FLIGHT);
1645
1646         CDEBUG(D_READA, "reserved page %lu \n", reserved);
1647         
1648         ret = ll_read_ahead_pages(exp, oig, &ria, &reserved, mapping, &ra_end);
1649
1650         LASSERTF(reserved >= 0, "reserved %lu\n", reserved);
1651         if (reserved != 0)
1652                 ll_ra_count_put(ll_i2sbi(inode), reserved);
1653
1654         if (ra_end == end + 1 && ra_end == (kms >> CFS_PAGE_SHIFT))
1655                 ll_ra_stats_inc(mapping, RA_STAT_EOF);
1656
1657         /* if we didn't get to the end of the region we reserved from
1658          * the ras we need to go back and update the ras so that the
1659          * next read-ahead tries from where we left off.  we only do so
1660          * if the region we failed to issue read-ahead on is still ahead
1661          * of the app and behind the next index to start read-ahead from */
1662         CDEBUG(D_READA, "ra_end %lu end %lu stride end %lu \n",
1663                ra_end, end, ria.ria_end);
1664
1665         if (ra_end != (end + 1)) {
1666                 spin_lock(&ras->ras_lock);
1667                 if (ra_end < ras->ras_next_readahead &&
1668                     index_in_window(ra_end, ras->ras_window_start, 0,
1669                                     ras->ras_window_len)) {
1670                         ras->ras_next_readahead = ra_end;
1671                         RAS_CDEBUG(ras);
1672                 }
1673                 spin_unlock(&ras->ras_lock);
1674         }
1675
1676         RETURN(ret);
1677 }
1678
1679 static void ras_set_start(struct ll_readahead_state *ras, unsigned long index)
1680 {
1681         ras->ras_window_start = index & (~(RAS_INCREASE_STEP - 1));
1682 }
1683
1684 /* called with the ras_lock held or from places where it doesn't matter */
1685 static void ras_reset(struct ll_readahead_state *ras, unsigned long index)
1686 {
1687         ras->ras_last_readpage = index;
1688         ras->ras_consecutive_requests = 0;
1689         ras->ras_consecutive_pages = 0;
1690         ras->ras_window_len = 0;
1691         ras_set_start(ras, index);
1692         ras->ras_next_readahead = max(ras->ras_window_start, index);
1693
1694         RAS_CDEBUG(ras);
1695 }
1696
1697 /* called with the ras_lock held or from places where it doesn't matter */
1698 static void ras_stride_reset(struct ll_readahead_state *ras)
1699 {
1700         ras->ras_consecutive_stride_requests = 0;
1701         RAS_CDEBUG(ras);
1702 }
1703
1704 void ll_readahead_init(struct inode *inode, struct ll_readahead_state *ras)
1705 {
1706         spin_lock_init(&ras->ras_lock);
1707         ras_reset(ras, 0);
1708         ras->ras_requests = 0;
1709         INIT_LIST_HEAD(&ras->ras_read_beads);
1710 }
1711
1712 /* Check whether the read request is in the stride window.
1713  * If it is in the stride window, return 1, otherwise return 0.
1714  * and also update stride_gap and stride_pages.
1715  */
1716 static int index_in_stride_window(unsigned long index,
1717                                   struct ll_readahead_state *ras,
1718                                   struct inode *inode)
1719 {
1720         int stride_gap = index - ras->ras_last_readpage - 1;
1721
1722         LASSERT(stride_gap != 0);
1723
1724         if (ras->ras_consecutive_pages == 0)
1725                 return 0;
1726
1727         /*Otherwise check the stride by itself */
1728         if ((ras->ras_stride_length - ras->ras_stride_pages) == stride_gap &&
1729             ras->ras_consecutive_pages == ras->ras_stride_pages)
1730                 return 1;
1731
1732         if (stride_gap >= 0) {
1733                 /*
1734                  * only set stride_pages, stride_length if
1735                  * it is forward reading ( stride_gap > 0)
1736                  */
1737                 ras->ras_stride_pages = ras->ras_consecutive_pages;
1738                 ras->ras_stride_length = stride_gap + ras->ras_consecutive_pages;
1739         } else {
1740                 /*
1741                  * If stride_gap < 0,(back_forward reading),
1742                  * reset the stride_pages/length.
1743                  * FIXME:back_ward stride I/O read.
1744                  *
1745                  */
1746                 ras->ras_stride_pages = 0;
1747                 ras->ras_stride_length = 0;
1748         }
1749         RAS_CDEBUG(ras);
1750
1751         return 0;
1752 }
1753
1754 static unsigned long
1755 stride_page_count(struct ll_readahead_state *ras, unsigned long len)
1756 {
1757         return stride_pg_count(ras->ras_stride_offset, ras->ras_stride_length,
1758                                ras->ras_stride_pages, ras->ras_stride_offset,
1759                                len);
1760 }
1761
1762 /* Stride Read-ahead window will be increased inc_len according to
1763  * stride I/O pattern */
1764 static void ras_stride_increase_window(struct ll_readahead_state *ras,
1765                                        struct ll_ra_info *ra,
1766                                        unsigned long inc_len)
1767 {
1768         unsigned long left, step, window_len;
1769         unsigned long stride_len;
1770
1771         LASSERT(ras->ras_stride_length > 0);
1772
1773         stride_len = ras->ras_window_start + ras->ras_window_len -
1774                      ras->ras_stride_offset;
1775
1776         LASSERTF(stride_len >= 0, "window_start %lu, window_len %lu"
1777                  " stride_offset %lu\n", ras->ras_window_start,
1778                  ras->ras_window_len, ras->ras_stride_offset);
1779
1780         left = stride_len % ras->ras_stride_length;
1781
1782         window_len = ras->ras_window_len - left;
1783
1784         if (left < ras->ras_stride_pages)
1785                 left += inc_len;
1786         else
1787                 left = ras->ras_stride_pages + inc_len;
1788
1789         LASSERT(ras->ras_stride_pages != 0);
1790
1791         step = left / ras->ras_stride_pages;
1792         left %= ras->ras_stride_pages;
1793
1794         window_len += step * ras->ras_stride_length + left;
1795
1796         if (stride_page_count(ras, window_len) <= ra->ra_max_pages)
1797                 ras->ras_window_len = window_len;
1798
1799         RAS_CDEBUG(ras);
1800 }
1801
1802 /* Set stride I/O read-ahead window start offset */
1803 static void ras_set_stride_offset(struct ll_readahead_state *ras)
1804 {
1805         unsigned long window_len = ras->ras_next_readahead -
1806                                    ras->ras_window_start;
1807         unsigned long left;
1808
1809         LASSERT(ras->ras_stride_length != 0);
1810
1811         left = window_len % ras->ras_stride_length;
1812
1813         ras->ras_stride_offset = ras->ras_next_readahead - left;
1814
1815         RAS_CDEBUG(ras);
1816 }
1817
1818 static void ras_update(struct ll_sb_info *sbi, struct inode *inode,
1819                        struct ll_readahead_state *ras, unsigned long index,
1820                        unsigned hit)
1821 {
1822         struct ll_ra_info *ra = &sbi->ll_ra_info;
1823         int zero = 0, stride_zero = 0, stride_detect = 0, ra_miss = 0;
1824         ENTRY;
1825
1826         spin_lock(&ras->ras_lock);
1827
1828         ll_ra_stats_inc_sbi(sbi, hit ? RA_STAT_HIT : RA_STAT_MISS);
1829
1830         /* reset the read-ahead window in two cases.  First when the app seeks
1831          * or reads to some other part of the file.  Secondly if we get a
1832          * read-ahead miss that we think we've previously issued.  This can
1833          * be a symptom of there being so many read-ahead pages that the VM is
1834          * reclaiming it before we get to it. */
1835         if (!index_in_window(index, ras->ras_last_readpage, 8, 8)) {
1836                 zero = 1;
1837                 ll_ra_stats_inc_sbi(sbi, RA_STAT_DISTANT_READPAGE);
1838                 /* check whether it is in stride I/O mode*/
1839                 if (!index_in_stride_window(index, ras, inode))
1840                         stride_zero = 1;
1841         } else if (!hit && ras->ras_window_len &&
1842                    index < ras->ras_next_readahead &&
1843                    index_in_window(index, ras->ras_window_start, 0,
1844                                    ras->ras_window_len)) {
1845                 zero = 1;
1846                 ra_miss = 1;
1847                 /* If it hits read-ahead miss and the stride I/O is still
1848                  * not detected, reset stride stuff to re-detect the whole
1849                  * stride I/O mode to avoid complication */
1850                 if (!stride_io_mode(ras))
1851                         stride_zero = 1;
1852                 ll_ra_stats_inc_sbi(sbi, RA_STAT_MISS_IN_WINDOW);
1853         }
1854
1855         /* On the second access to a file smaller than the tunable
1856          * ra_max_read_ahead_whole_pages trigger RA on all pages in the
1857          * file up to ra_max_pages.  This is simply a best effort and
1858          * only occurs once per open file.  Normal RA behavior is reverted
1859          * to for subsequent IO.  The mmap case does not increment
1860          * ras_requests and thus can never trigger this behavior. */
1861         if (ras->ras_requests == 2 && !ras->ras_request_index) {
1862                 __u64 kms_pages;
1863
1864                 kms_pages = (i_size_read(inode) + CFS_PAGE_SIZE - 1) >>
1865                             CFS_PAGE_SHIFT;
1866
1867                 CDEBUG(D_READA, "kmsp "LPU64" mwp %lu mp %lu\n", kms_pages,
1868                        ra->ra_max_read_ahead_whole_pages, ra->ra_max_pages);
1869
1870                 if (kms_pages &&
1871                     kms_pages <= ra->ra_max_read_ahead_whole_pages) {
1872                         ras->ras_window_start = 0;
1873                         ras->ras_last_readpage = 0;
1874                         ras->ras_next_readahead = 0;
1875                         ras->ras_window_len = min(ra->ra_max_pages,
1876                                 ra->ra_max_read_ahead_whole_pages);
1877                         GOTO(out_unlock, 0);
1878                 }
1879         }
1880
1881         if (zero) {
1882                 /* If it is discontinuous read, check
1883                  * whether it is stride I/O mode*/
1884                 if (stride_zero) {
1885                         ras_reset(ras, index);
1886                         ras->ras_consecutive_pages++;
1887                         ras_stride_reset(ras);
1888                         RAS_CDEBUG(ras);
1889                         GOTO(out_unlock, 0);
1890                 } else {
1891                         /* The read is still in stride window or
1892                          * it hits read-ahead miss */
1893
1894                         /* If ra-window miss is hitted, which probably means VM
1895                          * pressure, and some read-ahead pages were reclaimed.So
1896                          * the length of ra-window will not increased, but also
1897                          * not reset to avoid redetecting the stride I/O mode.*/
1898                         ras->ras_consecutive_requests = 0;
1899                         if (!ra_miss) {
1900                                 ras->ras_consecutive_pages = 0;
1901                                 if (++ras->ras_consecutive_stride_requests > 1)
1902                                         stride_detect = 1;
1903                         }
1904                         RAS_CDEBUG(ras);
1905                 }
1906         } else if (ras->ras_consecutive_stride_requests > 1) {
1907                 /* If this is contiguous read but in stride I/O mode
1908                  * currently, check whether stride step still is valid,
1909                  * if invalid, it will reset the stride ra window*/     
1910                 if (ras->ras_consecutive_pages + 1 > ras->ras_stride_pages)
1911                         ras_stride_reset(ras);
1912         }
1913
1914         ras->ras_last_readpage = index;
1915         ras->ras_consecutive_pages++;
1916         ras_set_start(ras, index);
1917         ras->ras_next_readahead = max(ras->ras_window_start,
1918                                       ras->ras_next_readahead);
1919         RAS_CDEBUG(ras);
1920
1921         /* Trigger RA in the mmap case where ras_consecutive_requests
1922          * is not incremented and thus can't be used to trigger RA */
1923         if (!ras->ras_window_len && ras->ras_consecutive_pages == 4) {
1924                 ras->ras_window_len = RAS_INCREASE_STEP;
1925                 GOTO(out_unlock, 0);
1926         }
1927
1928         /* Initially reset the stride window offset to next_readahead*/
1929         if (ras->ras_consecutive_stride_requests == 2 && stride_detect)
1930                 ras_set_stride_offset(ras);
1931
1932         /* The initial ras_window_len is set to the request size.  To avoid
1933          * uselessly reading and discarding pages for random IO the window is
1934          * only increased once per consecutive request received. */
1935         if ((ras->ras_consecutive_requests > 1 &&
1936             !ras->ras_request_index) || stride_detect) {
1937                 if (stride_io_mode(ras))
1938                         ras_stride_increase_window(ras, ra, RAS_INCREASE_STEP);
1939                 else
1940                         ras->ras_window_len = min(ras->ras_window_len +
1941                                                   RAS_INCREASE_STEP,
1942                                                   ra->ra_max_pages);
1943         }
1944         EXIT;
1945 out_unlock:
1946         RAS_CDEBUG(ras);
1947         ras->ras_request_index++;
1948         spin_unlock(&ras->ras_lock);
1949         return;
1950 }
1951
1952 int ll_writepage(struct page *page)
1953 {
1954         struct inode *inode = page->mapping->host;
1955         struct ll_inode_info *lli = ll_i2info(inode);
1956         struct obd_export *exp;
1957         struct ll_async_page *llap;
1958         int rc = 0;
1959         ENTRY;
1960
1961         LASSERT(PageLocked(page));
1962
1963         exp = ll_i2dtexp(inode);
1964         if (exp == NULL)
1965                 GOTO(out, rc = -EINVAL);
1966
1967         llap = llap_from_page(page, LLAP_ORIGIN_WRITEPAGE);
1968         if (IS_ERR(llap))
1969                 GOTO(out, rc = PTR_ERR(llap));
1970
1971         LASSERT(!llap->llap_nocache);
1972         LASSERT(!PageWriteback(page));
1973         set_page_writeback(page);
1974
1975         page_cache_get(page);
1976         if (llap->llap_write_queued) {
1977                 LL_CDEBUG_PAGE(D_PAGE, page, "marking urgent\n");
1978                 rc = obd_set_async_flags(exp, lli->lli_smd, NULL,
1979                                          llap->llap_cookie,
1980                                          ASYNC_READY | ASYNC_URGENT);
1981         } else {
1982                 rc = queue_or_sync_write(exp, inode, llap, CFS_PAGE_SIZE,
1983                                          ASYNC_READY | ASYNC_URGENT);
1984         }
1985         if (rc) {
1986                 /* re-dirty page on error so it retries write */
1987                 if (PageWriteback(page))
1988                         end_page_writeback(page);
1989
1990                 /* resend page only for not started IO*/
1991                 if (!PageError(page))
1992                         ll_redirty_page(page);
1993
1994                 page_cache_release(page);
1995         }
1996 out:
1997         if (rc) {
1998                 if (!lli->lli_async_rc)
1999                         lli->lli_async_rc = rc;
2000                 /* resend page only for not started IO*/
2001                 unlock_page(page);
2002         }
2003         RETURN(rc);
2004 }
2005
2006 /*
2007  * for now we do our readpage the same on both 2.4 and 2.5.  The kernel's
2008  * read-ahead assumes it is valid to issue readpage all the way up to
2009  * i_size, but our dlm locks make that not the case.  We disable the
2010  * kernel's read-ahead and do our own by walking ahead in the page cache
2011  * checking for dlm lock coverage.  the main difference between 2.4 and
2012  * 2.6 is how read-ahead gets batched and issued, but we're using our own,
2013  * so they look the same.
2014  */
2015 int ll_readpage(struct file *filp, struct page *page)
2016 {
2017         struct ll_file_data *fd = LUSTRE_FPRIVATE(filp);
2018         struct inode *inode = page->mapping->host;
2019         struct obd_export *exp;
2020         struct ll_async_page *llap;
2021         struct obd_io_group *oig = NULL;
2022         struct lustre_handle *lockh = NULL;
2023         int rc;
2024         ENTRY;
2025
2026         LASSERT(PageLocked(page));
2027         LASSERT(!PageUptodate(page));
2028         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),offset=%Lu=%#Lx\n",
2029                inode->i_ino, inode->i_generation, inode,
2030                (((loff_t)page->index) << CFS_PAGE_SHIFT),
2031                (((loff_t)page->index) << CFS_PAGE_SHIFT));
2032         LASSERT(atomic_read(&filp->f_dentry->d_inode->i_count) > 0);
2033
2034         if (!ll_i2info(inode)->lli_smd) {
2035                 /* File with no objects - one big hole */
2036                 /* We use this just for remove_from_page_cache that is not
2037                  * exported, we'd make page back up to date. */
2038                 ll_truncate_complete_page(page);
2039                 clear_page(kmap(page));
2040                 kunmap(page);
2041                 SetPageUptodate(page);
2042                 unlock_page(page);
2043                 RETURN(0);
2044         }
2045
2046         rc = oig_init(&oig);
2047         if (rc < 0)
2048                 GOTO(out, rc);
2049
2050         exp = ll_i2dtexp(inode);
2051         if (exp == NULL)
2052                 GOTO(out, rc = -EINVAL);
2053
2054         if (fd->fd_flags & LL_FILE_GROUP_LOCKED)
2055                 lockh = &fd->fd_cwlockh;
2056
2057         llap = llap_from_page_with_lockh(page, LLAP_ORIGIN_READPAGE, lockh);
2058         if (IS_ERR(llap)) {
2059                 if (PTR_ERR(llap) == -ENOLCK) {
2060                         CWARN("ino %lu page %lu (%llu) not covered by "
2061                               "a lock (mmap?).  check debug logs.\n",
2062                               inode->i_ino, page->index,
2063                               (long long)page->index << PAGE_CACHE_SHIFT);
2064                 }
2065                 GOTO(out, rc = PTR_ERR(llap));
2066         }
2067
2068         if (ll_i2sbi(inode)->ll_ra_info.ra_max_pages)
2069                 ras_update(ll_i2sbi(inode), inode, &fd->fd_ras, page->index,
2070                            llap->llap_defer_uptodate);
2071
2072
2073         if (llap->llap_defer_uptodate) {
2074                 /* This is the callpath if we got the page from a readahead */
2075                 llap->llap_ra_used = 1;
2076                 rc = ll_readahead(&fd->fd_ras, exp, page->mapping, oig,
2077                                   fd->fd_flags);
2078                 if (rc > 0)
2079                         obd_trigger_group_io(exp, ll_i2info(inode)->lli_smd,
2080                                              NULL, oig);
2081                 LL_CDEBUG_PAGE(D_PAGE, page, "marking uptodate from defer\n");
2082                 SetPageUptodate(page);
2083                 unlock_page(page);
2084                 GOTO(out_oig, rc = 0);
2085         }
2086
2087         rc = ll_issue_page_read(exp, llap, oig, 0);
2088         if (rc)
2089                 GOTO(out, rc);
2090
2091         LL_CDEBUG_PAGE(D_PAGE, page, "queued readpage\n");
2092         /* We have just requested the actual page we want, see if we can tack
2093          * on some readahead to that page's RPC before it is sent. */
2094         if (ll_i2sbi(inode)->ll_ra_info.ra_max_pages)
2095                 ll_readahead(&fd->fd_ras, exp, page->mapping, oig,
2096                              fd->fd_flags);
2097
2098         rc = obd_trigger_group_io(exp, ll_i2info(inode)->lli_smd, NULL, oig);
2099
2100 out:
2101         if (rc)
2102                 unlock_page(page);
2103 out_oig:
2104         if (oig != NULL)
2105                 oig_release(oig);
2106         RETURN(rc);
2107 }
2108
2109 static void ll_file_put_pages(struct page **pages, int numpages)
2110 {
2111         int i;
2112         struct page **pp;
2113         ENTRY;
2114
2115         for (i = 0, pp = pages; i < numpages; i++, pp++) {
2116                 if (*pp) {
2117                         LL_CDEBUG_PAGE(D_PAGE, (*pp), "free\n");
2118                         __ll_put_llap(*pp);
2119                         if (page_private(*pp))
2120                                 CERROR("the llap wasn't freed\n");
2121                         (*pp)->mapping = NULL;
2122                         if (page_count(*pp) != 1)
2123                                 CERROR("page %p, flags %#lx, count %i, private %p\n",
2124                                 (*pp), (unsigned long)(*pp)->flags, page_count(*pp),
2125                                 (void*)page_private(*pp));
2126                         __free_pages(*pp, 0);
2127                 }
2128         }
2129         OBD_FREE(pages, numpages * sizeof(struct page*));
2130         EXIT;
2131 }
2132
2133 static struct page **ll_file_prepare_pages(int numpages, struct inode *inode,
2134                                            unsigned long first)
2135 {
2136         struct page **pages;
2137         int i;
2138         int rc = 0;
2139         ENTRY;
2140
2141         OBD_ALLOC(pages, sizeof(struct page *) * numpages);
2142         if (pages == NULL)
2143                 RETURN(ERR_PTR(-ENOMEM));
2144         for (i = 0; i < numpages; i++) {
2145                 struct page *page;
2146                 struct ll_async_page *llap;
2147
2148                 page = alloc_pages(GFP_HIGHUSER, 0);
2149                 if (page == NULL)
2150                         GOTO(err, rc = -ENOMEM);
2151                 pages[i] = page;
2152                 /* llap_from_page needs page index and mapping to be set */
2153                 page->index = first++;
2154                 page->mapping = inode->i_mapping;
2155                 llap = llap_from_page(page, LLAP_ORIGIN_LOCKLESS_IO);
2156                 if (IS_ERR(llap))
2157                         GOTO(err, rc = PTR_ERR(llap));
2158                 llap->llap_lockless_io_page = 1;
2159         }
2160         RETURN(pages);
2161 err:
2162         ll_file_put_pages(pages, numpages);
2163         RETURN(ERR_PTR(rc));
2164  }
2165
2166 static ssize_t ll_file_copy_pages(struct page **pages, int numpages,
2167                                   char *buf, loff_t pos, size_t count, int rw)
2168 {
2169         ssize_t amount = 0;
2170         int i;
2171         int updatechecksum = ll_i2sbi(pages[0]->mapping->host)->ll_flags &
2172                              LL_SBI_CHECKSUM;
2173         ENTRY;
2174
2175         for (i = 0; i < numpages; i++) {
2176                 unsigned offset, bytes, left;
2177                 char *vaddr;
2178
2179                 vaddr = kmap(pages[i]);
2180                 offset = pos & (CFS_PAGE_SIZE - 1);
2181                 bytes = min_t(unsigned, CFS_PAGE_SIZE - offset, count);
2182                 LL_CDEBUG_PAGE(D_PAGE, pages[i], "op = %s, addr = %p, "
2183                                "buf = %p, bytes = %u\n",
2184                                (rw == WRITE) ? "CFU" : "CTU",
2185                                vaddr + offset, buf, bytes);
2186                 if (rw == WRITE) {
2187                         left = copy_from_user(vaddr + offset, buf, bytes);
2188                         if (updatechecksum) {
2189                                 struct ll_async_page *llap;
2190
2191                                 llap = llap_cast_private(pages[i]);
2192                                 llap->llap_checksum = crc32_le(0, vaddr,
2193                                                                CFS_PAGE_SIZE);
2194                         }
2195                 } else {
2196                         left = copy_to_user(buf, vaddr + offset, bytes);
2197                 }
2198                 kunmap(pages[i]);
2199                 amount += bytes;
2200                 if (left) {
2201                         amount -= left;
2202                         break;
2203                 }
2204                 buf += bytes;
2205                 count -= bytes;
2206                 pos += bytes;
2207         }
2208         if (amount == 0)
2209                 RETURN(-EFAULT);
2210         RETURN(amount);
2211 }
2212
2213 static int ll_file_oig_pages(struct inode * inode, struct page **pages,
2214                              int numpages, loff_t pos, size_t count, int rw)
2215 {
2216         struct obd_io_group *oig;
2217         struct ll_inode_info *lli = ll_i2info(inode);
2218         struct obd_export *exp;
2219         loff_t org_pos = pos;
2220         obd_flag brw_flags;
2221         int rc;
2222         int i;
2223         ENTRY;
2224
2225         exp = ll_i2dtexp(inode);
2226         if (exp == NULL)
2227                 RETURN(-EINVAL);
2228         rc = oig_init(&oig);
2229         if (rc)
2230                 RETURN(rc);
2231         brw_flags = OBD_BRW_SRVLOCK;
2232         if (cfs_capable(CFS_CAP_SYS_RESOURCE))
2233                 brw_flags |= OBD_BRW_NOQUOTA;
2234
2235         for (i = 0; i < numpages; i++) {
2236                 struct ll_async_page *llap;
2237                 unsigned from, bytes;
2238
2239                 from = pos & (CFS_PAGE_SIZE - 1);
2240                 bytes = min_t(unsigned, CFS_PAGE_SIZE - from,
2241                               count - pos + org_pos);
2242                 llap = llap_cast_private(pages[i]);
2243                 LASSERT(llap);
2244
2245                 lock_page(pages[i]);
2246
2247                 LL_CDEBUG_PAGE(D_PAGE, pages[i], "offset "LPU64","
2248                                " from %u, bytes = %u\n",
2249                                (__u64)pos, from, bytes);
2250                 LASSERTF(pos >> CFS_PAGE_SHIFT == pages[i]->index,
2251                          "wrong page index %lu (%lu)\n",
2252                          pages[i]->index,
2253                          (unsigned long)(pos >> CFS_PAGE_SHIFT));
2254                 rc = obd_queue_group_io(exp, lli->lli_smd, NULL, oig,
2255                                         llap->llap_cookie,
2256                                         (rw == WRITE) ?
2257                                         OBD_BRW_WRITE:OBD_BRW_READ,
2258                                         from, bytes, brw_flags,
2259                                         ASYNC_READY | ASYNC_URGENT |
2260                                         ASYNC_COUNT_STABLE | ASYNC_GROUP_SYNC);
2261                 if (rc) {
2262                         i++;
2263                         GOTO(out, rc);
2264                 }
2265                 pos += bytes;
2266         }
2267         rc = obd_trigger_group_io(exp, lli->lli_smd, NULL, oig);
2268         if (rc)
2269                 GOTO(out, rc);
2270         rc = oig_wait(oig);
2271 out:
2272         while(--i >= 0)
2273                 unlock_page(pages[i]);
2274         oig_release(oig);
2275         RETURN(rc);
2276 }
2277
2278 ssize_t ll_file_lockless_io(struct file *file, char *buf, size_t count,
2279                                    loff_t *ppos, int rw)
2280 {
2281         loff_t pos;
2282         struct inode *inode = file->f_dentry->d_inode;
2283         ssize_t rc = 0;
2284         int max_pages;
2285         size_t amount = 0;
2286         unsigned long first, last;
2287         ENTRY;
2288
2289         if (rw == READ) {
2290                 loff_t isize;
2291
2292                 ll_inode_size_lock(inode, 0);
2293                 isize = i_size_read(inode);
2294                 ll_inode_size_unlock(inode, 0);
2295                 if (*ppos >= isize)
2296                         GOTO(out, rc = 0);
2297                 if (*ppos + count >= isize)
2298                         count -= *ppos + count - isize;
2299                 if (count == 0)
2300                         GOTO(out, rc);
2301         } else {
2302                 rc = generic_write_checks(file, ppos, &count, 0);
2303                 if (rc)
2304                         GOTO(out, rc);
2305                 rc = ll_remove_suid(file->f_dentry, file->f_vfsmnt);
2306                 if (rc)
2307                         GOTO(out, rc);
2308         }
2309         pos = *ppos;
2310         first = pos >> CFS_PAGE_SHIFT;
2311         last = (pos + count - 1) >> CFS_PAGE_SHIFT;
2312         max_pages = PTLRPC_MAX_BRW_PAGES *
2313                 ll_i2info(inode)->lli_smd->lsm_stripe_count;
2314         CDEBUG(D_INFO, "%u, stripe_count = %u\n",
2315                PTLRPC_MAX_BRW_PAGES /* max_pages_per_rpc */,
2316                ll_i2info(inode)->lli_smd->lsm_stripe_count);
2317
2318         while (first <= last && rc >= 0) {
2319                 int pages_for_io;
2320                 struct page **pages;
2321                 size_t bytes = count - amount;
2322
2323                 pages_for_io = min_t(int, last - first + 1, max_pages);
2324                 pages = ll_file_prepare_pages(pages_for_io, inode, first);
2325                 if (IS_ERR(pages)) {
2326                         rc = PTR_ERR(pages);
2327                         break;
2328                 }
2329                 if (rw == WRITE) {
2330                         rc = ll_file_copy_pages(pages, pages_for_io, buf,
2331                                                 pos + amount, bytes, rw);
2332                         if (rc < 0)
2333                                 GOTO(put_pages, rc);
2334                         bytes = rc;
2335                 }
2336                 rc = ll_file_oig_pages(inode, pages, pages_for_io,
2337                                        pos + amount, bytes, rw);
2338                 if (rc)
2339                         GOTO(put_pages, rc);
2340                 if (rw == READ) {
2341                         rc = ll_file_copy_pages(pages, pages_for_io, buf,
2342                                                 pos + amount, bytes, rw);
2343                         if (rc < 0)
2344                                 GOTO(put_pages, rc);
2345                         bytes = rc;
2346                 }
2347                 amount += bytes;
2348                 buf += bytes;
2349 put_pages:
2350                 ll_file_put_pages(pages, pages_for_io);
2351                 first += pages_for_io;
2352                 /* a short read/write check */
2353                 if (pos + amount < ((loff_t)first << CFS_PAGE_SHIFT))
2354                         break;
2355         }
2356         /* NOTE: don't update i_size and KMS in absence of LDLM locks even
2357          * write makes the file large */
2358         file_accessed(file);
2359         if (rw == READ && amount < count && rc == 0) {
2360                 unsigned long not_cleared;
2361
2362                 not_cleared = clear_user(buf, count - amount);
2363                 amount = count - not_cleared;
2364                 if (not_cleared)
2365                         rc = -EFAULT;
2366         }
2367         if (amount > 0) {
2368                 lprocfs_counter_add(ll_i2sbi(inode)->ll_stats,
2369                                     (rw == WRITE) ?
2370                                     LPROC_LL_LOCKLESS_WRITE :
2371                                     LPROC_LL_LOCKLESS_READ,
2372                                     (long)amount);
2373                 *ppos += amount;
2374                 RETURN(amount);
2375         }
2376 out:
2377         RETURN(rc);
2378 }