Whamcloud - gitweb
b=16150
[fs/lustre-release.git] / lustre / llite / rw.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/llite/rw.c
37  *
38  * Lustre Lite I/O page cache routines shared by different kernel revs
39  */
40
41 #include <linux/autoconf.h>
42 #include <linux/kernel.h>
43 #include <linux/mm.h>
44 #include <linux/string.h>
45 #include <linux/stat.h>
46 #include <linux/errno.h>
47 #include <linux/smp_lock.h>
48 #include <linux/unistd.h>
49 #include <linux/version.h>
50 #include <asm/system.h>
51 #include <asm/uaccess.h>
52
53 #include <linux/fs.h>
54 #include <linux/stat.h>
55 #include <asm/uaccess.h>
56 #include <linux/mm.h>
57 #include <linux/pagemap.h>
58 #include <linux/smp_lock.h>
59
60 #define DEBUG_SUBSYSTEM S_LLITE
61
62 //#include <lustre_mdc.h>
63 #include <lustre_lite.h>
64 #include <obd_cksum.h>
65 #include "llite_internal.h"
66 #include <linux/lustre_compat25.h>
67
68 #ifndef list_for_each_prev_safe
69 #define list_for_each_prev_safe(pos, n, head) \
70         for (pos = (head)->prev, n = pos->prev; pos != (head); \
71                 pos = n, n = pos->prev )
72 #endif
73
74 cfs_mem_cache_t *ll_async_page_slab = NULL;
75 size_t ll_async_page_slab_size = 0;
76
77 /* SYNCHRONOUS I/O to object storage for an inode */
78 static int ll_brw(int cmd, struct inode *inode, struct obdo *oa,
79                   struct page *page, int flags)
80 {
81         struct ll_inode_info *lli = ll_i2info(inode);
82         struct lov_stripe_md *lsm = lli->lli_smd;
83         struct obd_info oinfo = { { { 0 } } };
84         struct brw_page pg;
85         int opc, rc;
86         ENTRY;
87
88         pg.pg = page;
89         pg.off = ((obd_off)page->index) << CFS_PAGE_SHIFT;
90
91         if ((cmd & OBD_BRW_WRITE) && (pg.off+CFS_PAGE_SIZE>i_size_read(inode)))
92                 pg.count = i_size_read(inode) % CFS_PAGE_SIZE;
93         else
94                 pg.count = CFS_PAGE_SIZE;
95
96         LL_CDEBUG_PAGE(D_PAGE, page, "%s %d bytes ino %lu at "LPU64"/"LPX64"\n",
97                        cmd & OBD_BRW_WRITE ? "write" : "read", pg.count,
98                        inode->i_ino, pg.off, pg.off);
99         if (pg.count == 0) {
100                 CERROR("ZERO COUNT: ino %lu: size %p:%Lu(%p:%Lu) idx %lu off "
101                        LPU64"\n", inode->i_ino, inode, i_size_read(inode),
102                        page->mapping->host, i_size_read(page->mapping->host),
103                        page->index, pg.off);
104         }
105
106         pg.flag = flags;
107
108         if (cmd & OBD_BRW_WRITE)
109                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_BRW_WRITE,
110                                    pg.count);
111         else
112                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_BRW_READ,
113                                    pg.count);
114         oinfo.oi_oa = oa;
115         oinfo.oi_md = lsm;
116         /* NB partial write, so we might not have CAPA_OPC_OSS_READ capa */
117         opc = cmd & OBD_BRW_WRITE ? CAPA_OPC_OSS_WRITE : CAPA_OPC_OSS_RW;
118         oinfo.oi_capa = ll_osscapa_get(inode, opc);
119         rc = obd_brw(cmd, ll_i2dtexp(inode), &oinfo, 1, &pg, NULL);
120         capa_put(oinfo.oi_capa);
121         if (rc == 0)
122                 obdo_to_inode(inode, oa, OBD_MD_FLBLOCKS);
123         else if (rc != -EIO)
124                 CERROR("error from obd_brw: rc = %d\n", rc);
125         RETURN(rc);
126 }
127
128 int ll_file_punch(struct inode * inode, loff_t new_size, int srvlock)
129 {
130         struct ll_inode_info *lli = ll_i2info(inode);
131         struct obd_info oinfo = { { { 0 } } };
132         struct obdo oa;
133         int rc;
134
135         ENTRY;
136         CDEBUG(D_INFO, "calling punch for "LPX64" (new size %Lu=%#Lx)\n",
137                lli->lli_smd->lsm_object_id, i_size_read(inode), i_size_read(inode));
138
139         oinfo.oi_md = lli->lli_smd;
140         oinfo.oi_policy.l_extent.start = new_size;
141         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
142         oinfo.oi_oa = &oa;
143         oa.o_id = lli->lli_smd->lsm_object_id;
144         oa.o_gr = lli->lli_smd->lsm_object_gr;
145         oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
146         if (srvlock) {
147                 /* set OBD_MD_FLFLAGS in o_valid, only if we
148                  * set OBD_FL_TRUNCLOCK, otherwise ost_punch
149                  * and filter_setattr get confused, see the comment
150                  * in ost_punch */
151                 oa.o_flags = OBD_FL_TRUNCLOCK;
152                 oa.o_valid |= OBD_MD_FLFLAGS;
153         }
154         obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLMODE |
155                         OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME |
156                         OBD_MD_FLFID | OBD_MD_FLGENER);
157
158         oinfo.oi_capa = ll_osscapa_get(inode, CAPA_OPC_OSS_TRUNC);
159         rc = obd_punch_rqset(ll_i2dtexp(inode), &oinfo, NULL);
160         ll_truncate_free_capa(oinfo.oi_capa);
161         if (rc)
162                 CERROR("obd_truncate fails (%d) ino %lu\n", rc, inode->i_ino);
163         else
164                 obdo_to_inode(inode, &oa, OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
165                               OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME);
166         RETURN(rc);
167 }
168
169 /* this isn't where truncate starts.   roughly:
170  * sys_truncate->ll_setattr_raw->vmtruncate->ll_truncate. setattr_raw grabs
171  * DLM lock on [size, EOF], i_mutex, ->lli_size_sem, and WRITE_I_ALLOC_SEM to
172  * avoid races.
173  *
174  * must be called under ->lli_size_sem */
175 void ll_truncate(struct inode *inode)
176 {
177         struct ll_inode_info *lli = ll_i2info(inode);
178         int srvlock = !!(lli->lli_flags & LLIF_SRVLOCK);
179         loff_t new_size;
180         ENTRY;
181         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) to %Lu=%#Lx\n",inode->i_ino,
182                inode->i_generation, inode, i_size_read(inode),
183                i_size_read(inode));
184
185         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_TRUNC, 1);
186         if (lli->lli_size_sem_owner != current) {
187                 EXIT;
188                 return;
189         }
190
191         if (!lli->lli_smd) {
192                 CDEBUG(D_INODE, "truncate on inode %lu with no objects\n",
193                        inode->i_ino);
194                 GOTO(out_unlock, 0);
195         }
196
197         LASSERT(atomic_read(&lli->lli_size_sem.count) <= 0);
198
199         if (!srvlock) {
200                 struct ost_lvb lvb;
201                 int rc;
202
203                 /* XXX I'm pretty sure this is a hack to paper
204                  * over a more fundamental race condition. */
205                 lov_stripe_lock(lli->lli_smd);
206                 inode_init_lvb(inode, &lvb);
207                 rc = obd_merge_lvb(ll_i2dtexp(inode), lli->lli_smd, &lvb, 0);
208                 if (lvb.lvb_size == i_size_read(inode) && rc == 0) {
209                         CDEBUG(D_VFSTRACE, "skipping punch for obj "LPX64
210                                ",%Lu=%#Lx\n", lli->lli_smd->lsm_object_id,
211                                i_size_read(inode), i_size_read(inode));
212                         lov_stripe_unlock(lli->lli_smd);
213                         GOTO(out_unlock, 0);
214                 }
215                 obd_adjust_kms(ll_i2dtexp(inode), lli->lli_smd,
216                                i_size_read(inode), 1);
217                 lov_stripe_unlock(lli->lli_smd);
218         }
219
220         if (unlikely((ll_i2sbi(inode)->ll_flags & LL_SBI_CHECKSUM) &&
221                      (i_size_read(inode) & ~CFS_PAGE_MASK))) {
222                 /* If the truncate leaves a partial page, update its checksum */
223                 struct page *page = find_get_page(inode->i_mapping,
224                                                   i_size_read(inode) >>
225                                                   CFS_PAGE_SHIFT);
226                 if (page != NULL) {
227                         struct ll_async_page *llap = llap_cast_private(page);
228                         if (llap != NULL) {
229                                 char *kaddr = kmap_atomic(page, KM_USER0);
230                                 llap->llap_checksum =
231                                         init_checksum(OSC_DEFAULT_CKSUM);
232                                 llap->llap_checksum =
233                                         compute_checksum(llap->llap_checksum,
234                                                          kaddr, CFS_PAGE_SIZE,
235                                                          OSC_DEFAULT_CKSUM);
236                                 kunmap_atomic(kaddr, KM_USER0);
237                         }
238                         page_cache_release(page);
239                 }
240         }
241
242         new_size = i_size_read(inode);
243         ll_inode_size_unlock(inode, 0);
244         if (!srvlock)
245                 ll_file_punch(inode, new_size, 0);
246         else
247                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LOCKLESS_TRUNC, 1);
248
249         EXIT;
250         return;
251
252  out_unlock:
253         ll_inode_size_unlock(inode, 0);
254 } /* ll_truncate */
255
256 int ll_prepare_write(struct file *file, struct page *page, unsigned from,
257                      unsigned to)
258 {
259         struct inode *inode = page->mapping->host;
260         struct ll_inode_info *lli = ll_i2info(inode);
261         struct lov_stripe_md *lsm = lli->lli_smd;
262         obd_off offset = ((obd_off)page->index) << CFS_PAGE_SHIFT;
263         struct obd_info oinfo = { { { 0 } } };
264         struct brw_page pga;
265         struct obdo oa;
266         struct ost_lvb lvb;
267         int rc = 0;
268         ENTRY;
269
270         LASSERT(PageLocked(page));
271         (void)llap_cast_private(page); /* assertion */
272
273         /* Check to see if we should return -EIO right away */
274         pga.pg = page;
275         pga.off = offset;
276         pga.count = CFS_PAGE_SIZE;
277         pga.flag = 0;
278
279         oa.o_mode = inode->i_mode;
280         oa.o_id = lsm->lsm_object_id;
281         oa.o_gr = lsm->lsm_object_gr;
282         oa.o_valid = OBD_MD_FLID | OBD_MD_FLMODE |
283                      OBD_MD_FLTYPE | OBD_MD_FLGROUP;
284         obdo_from_inode(&oa, inode, OBD_MD_FLFID | OBD_MD_FLGENER);
285
286         oinfo.oi_oa = &oa;
287         oinfo.oi_md = lsm;
288         rc = obd_brw(OBD_BRW_CHECK, ll_i2dtexp(inode), &oinfo, 1, &pga, NULL);
289         if (rc)
290                 RETURN(rc);
291
292         if (PageUptodate(page)) {
293                 LL_CDEBUG_PAGE(D_PAGE, page, "uptodate\n");
294                 RETURN(0);
295         }
296
297         /* We're completely overwriting an existing page, so _don't_ set it up
298          * to date until commit_write */
299         if (from == 0 && to == CFS_PAGE_SIZE) {
300                 LL_CDEBUG_PAGE(D_PAGE, page, "full page write\n");
301                 POISON_PAGE(page, 0x11);
302                 RETURN(0);
303         }
304
305         /* If are writing to a new page, no need to read old data.  The extent
306          * locking will have updated the KMS, and for our purposes here we can
307          * treat it like i_size. */
308         lov_stripe_lock(lsm);
309         inode_init_lvb(inode, &lvb);
310         obd_merge_lvb(ll_i2dtexp(inode), lsm, &lvb, 1);
311         lov_stripe_unlock(lsm);
312         if (lvb.lvb_size <= offset) {
313                 char *kaddr = kmap_atomic(page, KM_USER0);
314                 LL_CDEBUG_PAGE(D_PAGE, page, "kms "LPU64" <= offset "LPU64"\n",
315                                lvb.lvb_size, offset);
316                 memset(kaddr, 0, CFS_PAGE_SIZE);
317                 kunmap_atomic(kaddr, KM_USER0);
318                 GOTO(prepare_done, rc = 0);
319         }
320
321         /* XXX could be an async ocp read.. read-ahead? */
322         rc = ll_brw(OBD_BRW_READ, inode, &oa, page, 0);
323         if (rc == 0) {
324                 /* bug 1598: don't clobber blksize */
325                 oa.o_valid &= ~(OBD_MD_FLSIZE | OBD_MD_FLBLKSZ);
326                 obdo_refresh_inode(inode, &oa, oa.o_valid);
327         }
328
329         EXIT;
330  prepare_done:
331         if (rc == 0)
332                 SetPageUptodate(page);
333
334         return rc;
335 }
336
337 /**
338  * make page ready for ASYNC write
339  * \param data - pointer to llap cookie
340  * \param cmd - is OBD_BRW_* macroses
341  *
342  * \retval 0 is page successfully prepared to send
343  * \retval -EAGAIN is page not need to send
344  */
345 static int ll_ap_make_ready(void *data, int cmd)
346 {
347         struct ll_async_page *llap;
348         struct page *page;
349         ENTRY;
350
351         llap = llap_from_cookie(data);
352         page = llap->llap_page;
353
354         /* we're trying to write, but the page is locked.. come back later */
355         if (TryLockPage(page))
356                 RETURN(-EAGAIN);
357
358         LASSERTF(!(cmd & OBD_BRW_READ) || !PageWriteback(page),
359                 "cmd %x page %p ino %lu index %lu fl %lx\n", cmd, page,
360                  page->mapping->host->i_ino, page->index, page->flags);
361
362         /* if we left PageDirty we might get another writepage call
363          * in the future.  list walkers are bright enough
364          * to check page dirty so we can leave it on whatever list
365          * its on.  XXX also, we're called with the cli list so if
366          * we got the page cache list we'd create a lock inversion
367          * with the removepage path which gets the page lock then the
368          * cli lock */
369         LASSERTF(!PageWriteback(page),"cmd %x page %p ino %lu index %lu\n", cmd, page,
370                  page->mapping->host->i_ino, page->index);
371         if(!clear_page_dirty_for_io(page)) {
372                 unlock_page(page);
373                 RETURN(-EAGAIN);
374         }
375
376         /* This actually clears the dirty bit in the radix tree.*/
377         set_page_writeback(page);
378
379         LL_CDEBUG_PAGE(D_PAGE, page, "made ready\n");
380         page_cache_get(page);
381
382         RETURN(0);
383 }
384
385 /* We have two reasons for giving llite the opportunity to change the
386  * write length of a given queued page as it builds the RPC containing
387  * the page:
388  *
389  * 1) Further extending writes may have landed in the page cache
390  *    since a partial write first queued this page requiring us
391  *    to write more from the page cache.  (No further races are possible, since
392  *    by the time this is called, the page is locked.)
393  * 2) We might have raced with truncate and want to avoid performing
394  *    write RPCs that are just going to be thrown away by the
395  *    truncate's punch on the storage targets.
396  *
397  * The kms serves these purposes as it is set at both truncate and extending
398  * writes.
399  */
400 static int ll_ap_refresh_count(void *data, int cmd)
401 {
402         struct ll_inode_info *lli;
403         struct ll_async_page *llap;
404         struct lov_stripe_md *lsm;
405         struct page *page;
406         struct inode *inode;
407         struct ost_lvb lvb;
408         __u64 kms;
409         ENTRY;
410
411         /* readpage queues with _COUNT_STABLE, shouldn't get here. */
412         LASSERT(cmd != OBD_BRW_READ);
413
414         llap = llap_from_cookie(data);
415         page = llap->llap_page;
416         inode = page->mapping->host;
417         lli = ll_i2info(inode);
418         lsm = lli->lli_smd;
419
420         lov_stripe_lock(lsm);
421         inode_init_lvb(inode, &lvb);
422         obd_merge_lvb(ll_i2dtexp(inode), lsm, &lvb, 1);
423         kms = lvb.lvb_size;
424         lov_stripe_unlock(lsm);
425
426         /* catch race with truncate */
427         if (((__u64)page->index << CFS_PAGE_SHIFT) >= kms)
428                 return 0;
429
430         /* catch sub-page write at end of file */
431         if (((__u64)page->index << CFS_PAGE_SHIFT) + CFS_PAGE_SIZE > kms)
432                 return kms % CFS_PAGE_SIZE;
433
434         return CFS_PAGE_SIZE;
435 }
436
437 void ll_inode_fill_obdo(struct inode *inode, int cmd, struct obdo *oa)
438 {
439         struct lov_stripe_md *lsm;
440         obd_flag valid_flags;
441
442         lsm = ll_i2info(inode)->lli_smd;
443
444         oa->o_id = lsm->lsm_object_id;
445         oa->o_gr = lsm->lsm_object_gr;
446         oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
447         valid_flags = OBD_MD_FLTYPE | OBD_MD_FLATIME;
448         if (cmd & OBD_BRW_WRITE) {
449                 oa->o_valid |= OBD_MD_FLEPOCH;
450                 oa->o_easize = ll_i2info(inode)->lli_ioepoch;
451
452                 valid_flags |= OBD_MD_FLMTIME | OBD_MD_FLCTIME |
453                         OBD_MD_FLUID | OBD_MD_FLGID |
454                         OBD_MD_FLFID | OBD_MD_FLGENER;
455         }
456
457         obdo_from_inode(oa, inode, valid_flags);
458 }
459
460 static void ll_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
461 {
462         struct ll_async_page *llap;
463         ENTRY;
464
465         llap = llap_from_cookie(data);
466         ll_inode_fill_obdo(llap->llap_page->mapping->host, cmd, oa);
467
468         EXIT;
469 }
470
471 static void ll_ap_update_obdo(void *data, int cmd, struct obdo *oa,
472                               obd_valid valid)
473 {
474         struct ll_async_page *llap;
475         ENTRY;
476
477         llap = llap_from_cookie(data);
478         obdo_from_inode(oa, llap->llap_page->mapping->host, valid);
479
480         EXIT;
481 }
482
483 static struct obd_capa *ll_ap_lookup_capa(void *data, int cmd)
484 {
485         int opc = cmd & OBD_BRW_WRITE ? CAPA_OPC_OSS_WRITE : CAPA_OPC_OSS_RW;
486         struct ll_async_page *llap = llap_from_cookie(data);
487
488         return ll_osscapa_get(llap->llap_page->mapping->host, opc);
489 }
490
491 static struct obd_async_page_ops ll_async_page_ops = {
492         .ap_make_ready =        ll_ap_make_ready,
493         .ap_refresh_count =     ll_ap_refresh_count,
494         .ap_fill_obdo =         ll_ap_fill_obdo,
495         .ap_update_obdo =       ll_ap_update_obdo,
496         .ap_completion =        ll_ap_completion,
497         .ap_lookup_capa =       ll_ap_lookup_capa,
498 };
499
500 struct ll_async_page *llap_cast_private(struct page *page)
501 {
502         struct ll_async_page *llap = (struct ll_async_page *)page_private(page);
503
504         LASSERTF(llap == NULL || llap->llap_magic == LLAP_MAGIC,
505                  "page %p private %lu gave magic %d which != %d\n",
506                  page, page_private(page), llap->llap_magic, LLAP_MAGIC);
507
508         return llap;
509 }
510
511 /* Try to reap @target pages in the specific @cpu's async page list.
512  *
513  * There is an llap attached onto every page in lustre, linked off @sbi.
514  * We add an llap to the list so we don't lose our place during list walking.
515  * If llaps in the list are being moved they will only move to the end
516  * of the LRU, and we aren't terribly interested in those pages here (we
517  * start at the beginning of the list where the least-used llaps are. */
518 static inline int llap_shrink_cache_internal(struct ll_sb_info *sbi, 
519         int cpu, int target)
520 {
521         struct ll_async_page *llap, dummy_llap = { .llap_magic = 0xd11ad11a };
522         struct ll_pglist_data *pd;
523         struct list_head *head;
524         int count = 0;
525
526         pd = ll_pglist_cpu_lock(sbi, cpu);
527         head = &pd->llpd_list;
528         list_add(&dummy_llap.llap_pglist_item, head);
529         while (count < target) {
530                 struct page *page;
531                 int keep;
532
533                 if (unlikely(need_resched())) {
534                         ll_pglist_cpu_unlock(sbi, cpu);
535                         cond_resched();
536                         ll_pglist_cpu_lock(sbi, cpu);
537                 }
538
539                 llap = llite_pglist_next_llap(head, 
540                         &dummy_llap.llap_pglist_item);
541                 list_del_init(&dummy_llap.llap_pglist_item);
542                 if (llap == NULL)
543                         break;
544
545                 page = llap->llap_page;
546                 LASSERT(page != NULL);
547
548                 list_add(&dummy_llap.llap_pglist_item, &llap->llap_pglist_item);
549
550                 /* Page needs/undergoing IO */
551                 if (TryLockPage(page)) {
552                         LL_CDEBUG_PAGE(D_PAGE, page, "can't lock\n");
553                         continue;
554                 }
555
556                keep = (llap->llap_write_queued || PageDirty(page) ||
557                       PageWriteback(page) || (!PageUptodate(page) &&
558                       llap->llap_origin != LLAP_ORIGIN_READAHEAD));
559
560                 LL_CDEBUG_PAGE(D_PAGE, page,"%s LRU page: %s%s%s%s%s origin %s\n",
561                                keep ? "keep" : "drop",
562                                llap->llap_write_queued ? "wq " : "",
563                                PageDirty(page) ? "pd " : "",
564                                PageUptodate(page) ? "" : "!pu ",
565                                PageWriteback(page) ? "wb" : "",
566                                llap->llap_defer_uptodate ? "" : "!du",
567                                llap_origins[llap->llap_origin]);
568
569                 /* If page is dirty or undergoing IO don't discard it */
570                 if (keep) {
571                         unlock_page(page);
572                         continue;
573                 }
574
575                 page_cache_get(page);
576                 ll_pglist_cpu_unlock(sbi, cpu);
577
578                 if (page->mapping != NULL) {
579                         ll_teardown_mmaps(page->mapping,
580                                          (__u64)page->index << CFS_PAGE_SHIFT,
581                                          ((__u64)page->index << CFS_PAGE_SHIFT)|
582                                           ~CFS_PAGE_MASK);
583                         if (!PageDirty(page) && !page_mapped(page)) {
584                                 ll_ra_accounting(llap, page->mapping);
585                                 ll_truncate_complete_page(page);
586                                 ++count;
587                         } else {
588                                 LL_CDEBUG_PAGE(D_PAGE, page, "Not dropping page"
589                                                              " because it is "
590                                                              "%s\n",
591                                                               PageDirty(page)?
592                                                               "dirty":"mapped");
593                         }
594                 }
595                 unlock_page(page);
596                 page_cache_release(page);
597
598                 ll_pglist_cpu_lock(sbi, cpu);
599         }
600         list_del(&dummy_llap.llap_pglist_item);
601         ll_pglist_cpu_unlock(sbi, cpu);
602
603         CDEBUG(D_CACHE, "shrank %d, expected %d however. \n", count, target);
604         return count;
605 }
606
607
608 /* Try to shrink the page cache for the @sbi filesystem by 1/@shrink_fraction.
609  *
610  * At first, this code calculates total pages wanted by @shrink_fraction, then
611  * it deduces how many pages should be reaped from each cpu in proportion as 
612  * their own # of page count(llpd_count).
613  */
614 int llap_shrink_cache(struct ll_sb_info *sbi, int shrink_fraction)
615 {
616         unsigned long total, want, percpu_want, count = 0;
617         int cpu, nr_cpus;
618
619         total = lcounter_read(&sbi->ll_async_page_count);
620         if (total == 0)
621                 return 0;
622
623 #ifdef HAVE_SHRINKER_CACHE
624         want = shrink_fraction;
625         if (want == 0)
626                 return total;
627 #else
628         /* There can be a large number of llaps (600k or more in a large
629          * memory machine) so the VM 1/6 shrink ratio is likely too much.
630          * Since we are freeing pages also, we don't necessarily want to
631          * shrink so much.  Limit to 40MB of pages + llaps per call. */
632         if (shrink_fraction <= 0)
633                 want = total - sbi->ll_async_page_max + 32*num_online_cpus();
634         else
635                 want = (total + shrink_fraction - 1) / shrink_fraction;
636 #endif
637
638         if (want > 40 << (20 - CFS_PAGE_SHIFT))
639                 want = 40 << (20 - CFS_PAGE_SHIFT);
640
641         CDEBUG(D_CACHE, "shrinking %lu of %lu pages (1/%d)\n",
642                want, total, shrink_fraction);
643
644         nr_cpus = num_possible_cpus();
645         cpu = sbi->ll_async_page_clock_hand;
646         /* we at most do one round */
647         do {
648                 int c;
649
650                 cpu = (cpu + 1) % nr_cpus;
651                 c = LL_PGLIST_DATA_CPU(sbi, cpu)->llpd_count;
652                 if (!cpu_online(cpu))
653                         percpu_want = c;
654                 else
655                         percpu_want = want / ((total / (c + 1)) + 1);
656                 if (percpu_want == 0)
657                         continue;
658
659                 count += llap_shrink_cache_internal(sbi, cpu, percpu_want);
660                 if (count >= want)
661                         sbi->ll_async_page_clock_hand = cpu;
662         } while (cpu != sbi->ll_async_page_clock_hand);
663
664         CDEBUG(D_CACHE, "shrank %lu/%lu and left %lu unscanned\n",
665                count, want, total);
666
667 #ifdef HAVE_SHRINKER_CACHE
668         return lcounter_read(&sbi->ll_async_page_count);
669 #else
670         return count;
671 #endif
672 }
673
674 /* Rebalance the async page queue len for each cpu. We hope that the cpu
675  * which do much IO job has a relative longer queue len.
676  * This function should be called with preempt disabled.
677  */
678 static inline int llap_async_cache_rebalance(struct ll_sb_info *sbi)
679 {
680         unsigned long sample = 0, *cpu_sample, bias, slice;
681         struct ll_pglist_data *pd;
682         cpumask_t mask;
683         int cpu, surplus;
684         int w1 = 7, w2 = 3, base = (w1 + w2); /* weight value */
685         atomic_t *pcnt;
686
687         if (!spin_trylock(&sbi->ll_async_page_reblnc_lock)) {
688                 /* someone else is doing the job */
689                 return 1;
690         }
691
692         pcnt = &LL_PGLIST_DATA(sbi)->llpd_sample_count;
693         if (!atomic_read(pcnt)) {
694                 /* rare case, somebody else has gotten this job done */
695                 spin_unlock(&sbi->ll_async_page_reblnc_lock);
696                 return 1;
697         }
698
699         sbi->ll_async_page_reblnc_count++;
700         cpu_sample = sbi->ll_async_page_sample;
701         memset(cpu_sample, 0, num_possible_cpus() * sizeof(unsigned long));
702         for_each_online_cpu(cpu) {
703                 pcnt = &LL_PGLIST_DATA_CPU(sbi, cpu)->llpd_sample_count;
704                 cpu_sample[cpu] = atomic_read(pcnt);
705                 atomic_set(pcnt, 0);
706                 sample += cpu_sample[cpu];
707         }
708
709         cpus_clear(mask);
710         surplus = sbi->ll_async_page_max;
711         slice = surplus / sample + 1;
712         sample /= num_online_cpus();
713         bias = sample >> 4;
714         for_each_online_cpu(cpu) {
715                 pd = LL_PGLIST_DATA_CPU(sbi, cpu);
716                 if (labs((long int)sample - cpu_sample[cpu]) > bias) {
717                         unsigned long budget = pd->llpd_budget;
718                         /* weighted original queue length and expected queue
719                          * length to avoid thrashing. */
720                         pd->llpd_budget = (budget * w1) / base +
721                                         (slice * cpu_sample[cpu]) * w2 / base;
722                         cpu_set(cpu, mask);
723                 }
724                 surplus -= pd->llpd_budget;
725         }
726         surplus /= cpus_weight(mask) ?: 1;
727         for_each_cpu_mask(cpu, mask)
728                 LL_PGLIST_DATA_CPU(sbi, cpu)->llpd_budget += surplus;
729         spin_unlock(&sbi->ll_async_page_reblnc_lock);
730
731         /* TODO: do we really need to call llap_shrink_cache_internal 
732          * for every cpus with its page_count greater than budget?
733          * for_each_cpu_mask(cpu, mask) 
734          *      ll_shrink_cache_internal(...) 
735          */
736
737         return 0;
738 }
739
740 static struct ll_async_page *llap_from_page_with_lockh(struct page *page,
741                                                        unsigned origin,
742                                                        struct lustre_handle *lockh)
743 {
744         struct ll_async_page *llap;
745         struct obd_export *exp;
746         struct inode *inode = page->mapping->host;
747         struct ll_sb_info *sbi;
748         struct ll_pglist_data *pd;
749         int rc, cpu, target;
750         ENTRY;
751
752         if (!inode) {
753                 static int triggered;
754
755                 if (!triggered) {
756                         LL_CDEBUG_PAGE(D_ERROR, page, "Bug 10047. Wrong anon "
757                                        "page received\n");
758                         libcfs_debug_dumpstack(NULL);
759                         triggered = 1;
760                 }
761                 RETURN(ERR_PTR(-EINVAL));
762         }
763         sbi = ll_i2sbi(inode);
764         LASSERT(ll_async_page_slab);
765         LASSERTF(origin < LLAP__ORIGIN_MAX, "%u\n", origin);
766
767         llap = llap_cast_private(page);
768         if (llap != NULL) {
769                 /* move to end of LRU list, except when page is just about to
770                  * die */
771                 if (origin != LLAP_ORIGIN_REMOVEPAGE) {
772                         int old_cpu = llap->llap_pglist_cpu;
773                         struct ll_pglist_data *old_pd;
774
775                         pd = ll_pglist_double_lock(sbi, old_cpu, &old_pd);
776                         pd->llpd_hit++;
777                         while (old_cpu != llap->llap_pglist_cpu) {
778                                 /* rarely case, someone else is touching this
779                                  * page too. */
780                                 ll_pglist_double_unlock(sbi, old_cpu);
781                                 old_cpu = llap->llap_pglist_cpu;
782                                 pd=ll_pglist_double_lock(sbi, old_cpu, &old_pd);
783                         }
784
785                         list_move(&llap->llap_pglist_item,
786                                   &pd->llpd_list);
787                         old_pd->llpd_gen++;
788                         if (pd->llpd_cpu != old_cpu) {
789                                 pd->llpd_count++;
790                                 old_pd->llpd_count--;
791                                 old_pd->llpd_gen++;
792                                 llap->llap_pglist_cpu = pd->llpd_cpu;
793                                 pd->llpd_cross++;
794                         }
795                         ll_pglist_double_unlock(sbi, old_cpu);
796                 }
797                 GOTO(out, llap);
798         }
799
800         exp = ll_i2dtexp(page->mapping->host);
801         if (exp == NULL)
802                 RETURN(ERR_PTR(-EINVAL));
803
804         /* limit the number of lustre-cached pages */
805         cpu = get_cpu();
806         pd = LL_PGLIST_DATA(sbi);
807         target = pd->llpd_count - pd->llpd_budget;
808         if (target > 0) {
809                 rc = 0;
810                 atomic_inc(&pd->llpd_sample_count);
811                 if (atomic_read(&pd->llpd_sample_count) > 
812                     sbi->ll_async_page_sample_max) {
813                         pd->llpd_reblnc_count++;
814                         rc = llap_async_cache_rebalance(sbi);
815                         if (rc == 0)
816                                 target = pd->llpd_count - pd->llpd_budget;
817                 }
818                 /* if rc equals 1, it means other cpu is doing the rebalance
819                  * job, and our budget # would be modified when we read it. 
820                  * Furthermore, it is much likely being increased because
821                  * we have already reached the rebalance threshold. In this
822                  * case, we skip to shrink cache here. */
823                 if ((rc == 0) && target > 0)
824                         llap_shrink_cache_internal(sbi, cpu, target + 32);
825         }
826         put_cpu();
827
828         OBD_SLAB_ALLOC(llap, ll_async_page_slab, CFS_ALLOC_STD,
829                        ll_async_page_slab_size);
830         if (llap == NULL)
831                 RETURN(ERR_PTR(-ENOMEM));
832         llap->llap_magic = LLAP_MAGIC;
833         llap->llap_cookie = (void *)llap + size_round(sizeof(*llap));
834
835         /* XXX: for bug 11270 - check for lockless origin here! */
836         if (origin == LLAP_ORIGIN_LOCKLESS_IO)
837                 llap->llap_nocache = 1;
838
839         rc = obd_prep_async_page(exp, ll_i2info(inode)->lli_smd, NULL, page,
840                                  (obd_off)page->index << CFS_PAGE_SHIFT,
841                                  &ll_async_page_ops, llap, &llap->llap_cookie,
842                                  llap->llap_nocache, lockh);
843         if (rc) {
844                 OBD_SLAB_FREE(llap, ll_async_page_slab,
845                               ll_async_page_slab_size);
846                 RETURN(ERR_PTR(rc));
847         }
848
849         CDEBUG(D_CACHE, "llap %p page %p cookie %p obj off "LPU64"\n", llap,
850                page, llap->llap_cookie, (obd_off)page->index << CFS_PAGE_SHIFT);
851         /* also zeroing the PRIVBITS low order bitflags */
852         __set_page_ll_data(page, llap);
853         llap->llap_page = page;
854
855         lcounter_inc(&sbi->ll_async_page_count);
856         pd = ll_pglist_lock(sbi);
857         list_add_tail(&llap->llap_pglist_item, &pd->llpd_list);
858         INIT_LIST_HEAD(&llap->llap_pending_write);
859         pd->llpd_count++;
860         pd->llpd_gen++;
861         pd->llpd_miss++;
862         llap->llap_pglist_cpu = pd->llpd_cpu;
863         ll_pglist_unlock(sbi);
864
865  out:
866         if (unlikely(sbi->ll_flags & LL_SBI_CHECKSUM)) {
867                 __u32 csum;
868                 char *kaddr = kmap_atomic(page, KM_USER0);
869                 csum = init_checksum(OSC_DEFAULT_CKSUM);
870                 csum = compute_checksum(csum, kaddr, CFS_PAGE_SIZE,
871                                         OSC_DEFAULT_CKSUM);
872                 kunmap_atomic(kaddr, KM_USER0);
873                 if (origin == LLAP_ORIGIN_READAHEAD ||
874                     origin == LLAP_ORIGIN_READPAGE ||
875                     origin == LLAP_ORIGIN_LOCKLESS_IO) {
876                         llap->llap_checksum = 0;
877                 } else if (origin == LLAP_ORIGIN_COMMIT_WRITE ||
878                            llap->llap_checksum == 0) {
879                         llap->llap_checksum = csum;
880                         CDEBUG(D_PAGE, "page %p cksum %x\n", page, csum);
881                 } else if (llap->llap_checksum == csum) {
882                         /* origin == LLAP_ORIGIN_WRITEPAGE */
883                         CDEBUG(D_PAGE, "page %p cksum %x confirmed\n",
884                                page, csum);
885                 } else {
886                         /* origin == LLAP_ORIGIN_WRITEPAGE */
887                         LL_CDEBUG_PAGE(D_ERROR, page, "old cksum %x != new "
888                                        "%x!\n", llap->llap_checksum, csum);
889                 }
890         }
891
892         llap->llap_origin = origin;
893         RETURN(llap);
894 }
895
896 struct ll_async_page *llap_from_page(struct page *page,
897                                      unsigned origin)
898 {
899         return llap_from_page_with_lockh(page, origin, NULL);
900 }
901
902 static int queue_or_sync_write(struct obd_export *exp, struct inode *inode,
903                                struct ll_async_page *llap,
904                                unsigned to, obd_flag async_flags)
905 {
906         unsigned long size_index = i_size_read(inode) >> CFS_PAGE_SHIFT;
907         struct obd_io_group *oig;
908         struct ll_sb_info *sbi = ll_i2sbi(inode);
909         int rc, noquot = llap->llap_ignore_quota ? OBD_BRW_NOQUOTA : 0;
910         ENTRY;
911
912         /* _make_ready only sees llap once we've unlocked the page */
913         llap->llap_write_queued = 1;
914         rc = obd_queue_async_io(exp, ll_i2info(inode)->lli_smd, NULL,
915                                 llap->llap_cookie, OBD_BRW_WRITE | noquot,
916                                 0, 0, 0, async_flags);
917         if (rc == 0) {
918                 LL_CDEBUG_PAGE(D_PAGE, llap->llap_page, "write queued\n");
919                 GOTO(out, 0);
920         }
921
922         llap->llap_write_queued = 0;
923         /* Do not pass llap here as it is sync write. */
924         llap_write_pending(inode, NULL);
925
926         rc = oig_init(&oig);
927         if (rc)
928                 GOTO(out, rc);
929
930         /* make full-page requests if we are not at EOF (bug 4410) */
931         if (to != CFS_PAGE_SIZE && llap->llap_page->index < size_index) {
932                 LL_CDEBUG_PAGE(D_PAGE, llap->llap_page,
933                                "sync write before EOF: size_index %lu, to %d\n",
934                                size_index, to);
935                 to = CFS_PAGE_SIZE;
936         } else if (to != CFS_PAGE_SIZE && llap->llap_page->index == size_index){
937                 int size_to = i_size_read(inode) & ~CFS_PAGE_MASK;
938                 LL_CDEBUG_PAGE(D_PAGE, llap->llap_page,
939                                "sync write at EOF: size_index %lu, to %d/%d\n",
940                                size_index, to, size_to);
941                 if (to < size_to)
942                         to = size_to;
943         }
944
945         /* compare the checksum once before the page leaves llite */
946         if (unlikely((sbi->ll_flags & LL_SBI_CHECKSUM) &&
947                      llap->llap_checksum != 0)) {
948                 __u32 csum;
949                 struct page *page = llap->llap_page;
950                 char *kaddr = kmap_atomic(page, KM_USER0);
951                 csum = init_checksum(OSC_DEFAULT_CKSUM);
952                 csum = compute_checksum(csum, kaddr, CFS_PAGE_SIZE,
953                                         OSC_DEFAULT_CKSUM);
954                 kunmap_atomic(kaddr, KM_USER0);
955                 if (llap->llap_checksum == csum) {
956                         CDEBUG(D_PAGE, "page %p cksum %x confirmed\n",
957                                page, csum);
958                 } else {
959                         CERROR("page %p old cksum %x != new cksum %x!\n",
960                                page, llap->llap_checksum, csum);
961                 }
962         }
963
964         rc = obd_queue_group_io(exp, ll_i2info(inode)->lli_smd, NULL, oig,
965                                 llap->llap_cookie, OBD_BRW_WRITE | noquot,
966                                 0, to, 0, ASYNC_READY | ASYNC_URGENT |
967                                 ASYNC_COUNT_STABLE | ASYNC_GROUP_SYNC);
968         if (rc)
969                 GOTO(free_oig, rc);
970
971         rc = obd_trigger_group_io(exp, ll_i2info(inode)->lli_smd, NULL, oig);
972         if (rc)
973                 GOTO(free_oig, rc);
974
975         rc = oig_wait(oig);
976
977         if (!rc && async_flags & ASYNC_READY) {
978                 unlock_page(llap->llap_page);
979                 if (PageWriteback(llap->llap_page))
980                         end_page_writeback(llap->llap_page);
981         }
982
983         if (rc == 0 && llap_write_complete(inode, llap))
984                 ll_queue_done_writing(inode, 0);
985
986         LL_CDEBUG_PAGE(D_PAGE, llap->llap_page, "sync write returned %d\n", rc);
987
988 free_oig:
989         oig_release(oig);
990 out:
991         RETURN(rc);
992 }
993
994 /* update our write count to account for i_size increases that may have
995  * happened since we've queued the page for io. */
996
997 /* be careful not to return success without setting the page Uptodate or
998  * the next pass through prepare_write will read in stale data from disk. */
999 int ll_commit_write(struct file *file, struct page *page, unsigned from,
1000                     unsigned to)
1001 {
1002         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1003         struct inode *inode = page->mapping->host;
1004         struct ll_inode_info *lli = ll_i2info(inode);
1005         struct lov_stripe_md *lsm = lli->lli_smd;
1006         struct obd_export *exp;
1007         struct ll_async_page *llap;
1008         loff_t size;
1009         struct lustre_handle *lockh = NULL;
1010         int rc = 0;
1011         ENTRY;
1012
1013         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1014         LASSERT(inode == file->f_dentry->d_inode);
1015         LASSERT(PageLocked(page));
1016
1017         CDEBUG(D_INODE, "inode %p is writing page %p from %d to %d at %lu\n",
1018                inode, page, from, to, page->index);
1019
1020         if (fd->fd_flags & LL_FILE_GROUP_LOCKED)
1021                 lockh = &fd->fd_cwlockh;
1022
1023         llap = llap_from_page_with_lockh(page, LLAP_ORIGIN_COMMIT_WRITE, lockh);
1024         if (IS_ERR(llap))
1025                 RETURN(PTR_ERR(llap));
1026
1027         exp = ll_i2dtexp(inode);
1028         if (exp == NULL)
1029                 RETURN(-EINVAL);
1030
1031         llap->llap_ignore_quota = cfs_capable(CFS_CAP_SYS_RESOURCE);
1032
1033         /*
1034          * queue a write for some time in the future the first time we
1035          * dirty the page.
1036          *
1037          * This is different from what other file systems do: they usually
1038          * just mark page (and some of its buffers) dirty and rely on
1039          * balance_dirty_pages() to start a write-back. Lustre wants write-back
1040          * to be started earlier for the following reasons:
1041          *
1042          *     (1) with a large number of clients we need to limit the amount
1043          *     of cached data on the clients a lot;
1044          *
1045          *     (2) large compute jobs generally want compute-only then io-only
1046          *     and the IO should complete as quickly as possible;
1047          *
1048          *     (3) IO is batched up to the RPC size and is async until the
1049          *     client max cache is hit
1050          *     (/proc/fs/lustre/osc/OSC.../max_dirty_mb)
1051          *
1052          */
1053         if (!PageDirty(page)) {
1054                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_DIRTY_MISSES, 1);
1055
1056                 rc = queue_or_sync_write(exp, inode, llap, to, 0);
1057                 if (rc)
1058                         GOTO(out, rc);
1059         } else {
1060                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_DIRTY_HITS, 1);
1061         }
1062
1063         /* put the page in the page cache, from now on ll_removepage is
1064          * responsible for cleaning up the llap.
1065          * only set page dirty when it's queued to be write out */
1066         if (llap->llap_write_queued)
1067                 set_page_dirty(page);
1068
1069 out:
1070         size = (((obd_off)page->index) << CFS_PAGE_SHIFT) + to;
1071         ll_inode_size_lock(inode, 0);
1072         if (rc == 0) {
1073                 lov_stripe_lock(lsm);
1074                 obd_adjust_kms(exp, lsm, size, 0);
1075                 lov_stripe_unlock(lsm);
1076                 if (size > i_size_read(inode))
1077                         i_size_write(inode, size);
1078                 SetPageUptodate(page);
1079         } else if (size > i_size_read(inode)) {
1080                 /* this page beyond the pales of i_size, so it can't be
1081                  * truncated in ll_p_r_e during lock revoking. we must
1082                  * teardown our book-keeping here. */
1083                 ll_removepage(page);
1084         }
1085         ll_inode_size_unlock(inode, 0);
1086         RETURN(rc);
1087 }
1088
1089 static void ll_ra_stats_inc_sbi(struct ll_sb_info *sbi, enum ra_stat which);
1090
1091 /* WARNING: This algorithm is used to reduce the contention on 
1092  * sbi->ll_lock. It should work well if the ra_max_pages is much 
1093  * greater than the single file's read-ahead window.
1094  *
1095  * TODO: There may exist a `global sync problem' in this implementation. 
1096  * Considering the global ra window is 100M, and each file's ra window is 10M,
1097  * there are over 10 files trying to get its ra budget and reach 
1098  * ll_ra_count_get at the exactly same time. All of them will get a zero ra
1099  * window, although the global window is 100M. -jay
1100  */
1101 static unsigned long ll_ra_count_get(struct ll_sb_info *sbi, unsigned long len)
1102 {
1103         struct ll_ra_info *ra = &sbi->ll_ra_info;
1104         unsigned long ret;
1105         ENTRY;
1106
1107         ret = min(ra->ra_max_pages - atomic_read(&ra->ra_cur_pages), len);
1108         if ((int)ret < 0)
1109                 GOTO(out, ret = 0);
1110
1111         if (atomic_add_return(ret, &ra->ra_cur_pages) > ra->ra_max_pages) {
1112                 atomic_sub(ret, &ra->ra_cur_pages);
1113                 ret = 0;
1114         }
1115 out:
1116         RETURN(ret);
1117 }
1118
1119 static void ll_ra_count_put(struct ll_sb_info *sbi, unsigned long len)
1120 {
1121         struct ll_ra_info *ra = &sbi->ll_ra_info;
1122         atomic_sub(len, &ra->ra_cur_pages);
1123 }
1124
1125 /* called for each page in a completed rpc.*/
1126 int ll_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
1127 {
1128         struct ll_async_page *llap;
1129         struct page *page;
1130         int ret = 0;
1131         ENTRY;
1132
1133         llap = llap_from_cookie(data);
1134         page = llap->llap_page;
1135         LASSERT(PageLocked(page));
1136         LASSERT(CheckWriteback(page,cmd));
1137
1138         LL_CDEBUG_PAGE(D_PAGE, page, "completing cmd %d with %d\n", cmd, rc);
1139
1140         if (cmd & OBD_BRW_READ && llap->llap_defer_uptodate)
1141                 ll_ra_count_put(ll_i2sbi(page->mapping->host), 1);
1142
1143         if (rc == 0)  {
1144                 if (cmd & OBD_BRW_READ) {
1145                         if (!llap->llap_defer_uptodate)
1146                                 SetPageUptodate(page);
1147                 } else {
1148                         llap->llap_write_queued = 0;
1149                 }
1150                 ClearPageError(page);
1151         } else {
1152                 if (cmd & OBD_BRW_READ) {
1153                         llap->llap_defer_uptodate = 0;
1154                 }
1155                 SetPageError(page);
1156                 if (rc == -ENOSPC)
1157                         set_bit(AS_ENOSPC, &page->mapping->flags);
1158                 else
1159                         set_bit(AS_EIO, &page->mapping->flags);
1160         }
1161
1162         /* be carefull about clear WB.
1163          * if WB will cleared after page lock is released - paralel IO can be
1164          * started before ap_make_ready is finished - so we will be have page
1165          * with PG_Writeback set from ->writepage() and completed READ which
1166          * clear this flag */
1167         if ((cmd & OBD_BRW_WRITE) && PageWriteback(page))
1168                 end_page_writeback(page);
1169
1170         unlock_page(page);
1171
1172         if (cmd & OBD_BRW_WRITE) {
1173                 /* Only rc == 0, write succeed, then this page could be deleted
1174                  * from the pending_writing list
1175                  */
1176                 if (rc == 0 && llap_write_complete(page->mapping->host, llap))
1177                         ll_queue_done_writing(page->mapping->host, 0);
1178         }
1179
1180         page_cache_release(page);
1181
1182         RETURN(ret);
1183 }
1184
1185 static void __ll_put_llap(struct page *page)
1186 {
1187         struct inode *inode = page->mapping->host;
1188         struct obd_export *exp;
1189         struct ll_async_page *llap;
1190         struct ll_sb_info *sbi = ll_i2sbi(inode);
1191         struct ll_pglist_data *pd;
1192         int rc, cpu;
1193         ENTRY;
1194
1195         exp = ll_i2dtexp(inode);
1196         if (exp == NULL) {
1197                 CERROR("page %p ind %lu gave null export\n", page, page->index);
1198                 EXIT;
1199                 return;
1200         }
1201
1202         llap = llap_from_page(page, LLAP_ORIGIN_REMOVEPAGE);
1203         if (IS_ERR(llap)) {
1204                 CERROR("page %p ind %lu couldn't find llap: %ld\n", page,
1205                        page->index, PTR_ERR(llap));
1206                 EXIT;
1207                 return;
1208         }
1209
1210         if (llap_write_complete(inode, llap))
1211                 ll_queue_done_writing(inode, 0);
1212
1213         rc = obd_teardown_async_page(exp, ll_i2info(inode)->lli_smd, NULL,
1214                                      llap->llap_cookie);
1215         if (rc != 0)
1216                 CERROR("page %p ind %lu failed: %d\n", page, page->index, rc);
1217
1218         /* this unconditional free is only safe because the page lock
1219          * is providing exclusivity to memory pressure/truncate/writeback..*/
1220         __clear_page_ll_data(page);
1221
1222         lcounter_dec(&sbi->ll_async_page_count);
1223         cpu = llap->llap_pglist_cpu;
1224         pd = ll_pglist_cpu_lock(sbi, cpu);
1225         pd->llpd_gen++;
1226         pd->llpd_count--;
1227         if (!list_empty(&llap->llap_pglist_item))
1228                 list_del_init(&llap->llap_pglist_item);
1229         ll_pglist_cpu_unlock(sbi, cpu);
1230         OBD_SLAB_FREE(llap, ll_async_page_slab, ll_async_page_slab_size);
1231         EXIT;
1232 }
1233
1234 /* the kernel calls us here when a page is unhashed from the page cache.
1235  * the page will be locked and the kernel is holding a spinlock, so
1236  * we need to be careful.  we're just tearing down our book-keeping
1237  * here. */
1238 void ll_removepage(struct page *page)
1239 {
1240         struct ll_async_page *llap = llap_cast_private(page);
1241         ENTRY;
1242
1243         LASSERT(!in_interrupt());
1244
1245         /* sync pages or failed read pages can leave pages in the page
1246          * cache that don't have our data associated with them anymore */
1247         if (page_private(page) == 0) {
1248                 EXIT;
1249                 return;
1250         }
1251
1252         LASSERT(!llap->llap_lockless_io_page);
1253         LASSERT(!llap->llap_nocache);
1254         LL_CDEBUG_PAGE(D_PAGE, page, "being evicted\n");
1255         __ll_put_llap(page);
1256         EXIT;
1257 }
1258
1259 static int ll_issue_page_read(struct obd_export *exp,
1260                               struct ll_async_page *llap,
1261                               struct obd_io_group *oig, int defer)
1262 {
1263         struct page *page = llap->llap_page;
1264         int rc;
1265
1266         page_cache_get(page);
1267         llap->llap_defer_uptodate = defer;
1268         llap->llap_ra_used = 0;
1269         rc = obd_queue_group_io(exp, ll_i2info(page->mapping->host)->lli_smd,
1270                                 NULL, oig, llap->llap_cookie, OBD_BRW_READ, 0,
1271                                 CFS_PAGE_SIZE, 0, ASYNC_COUNT_STABLE |
1272                                                   ASYNC_READY | ASYNC_URGENT);
1273         if (rc) {
1274                 LL_CDEBUG_PAGE(D_ERROR, page, "read queue failed: rc %d\n", rc);
1275                 page_cache_release(page);
1276         }
1277         RETURN(rc);
1278 }
1279
1280 static void ll_ra_stats_inc_sbi(struct ll_sb_info *sbi, enum ra_stat which)
1281 {
1282         LASSERTF(which >= 0 && which < _NR_RA_STAT, "which: %u\n", which);
1283         lprocfs_counter_incr(sbi->ll_ra_stats, which);
1284 }
1285
1286 static void ll_ra_stats_inc(struct address_space *mapping, enum ra_stat which)
1287 {
1288         struct ll_sb_info *sbi = ll_i2sbi(mapping->host);
1289         ll_ra_stats_inc_sbi(sbi, which);
1290 }
1291
1292 void ll_ra_accounting(struct ll_async_page *llap, struct address_space *mapping)
1293 {
1294         if (!llap->llap_defer_uptodate || llap->llap_ra_used)
1295                 return;
1296
1297         ll_ra_stats_inc(mapping, RA_STAT_DISCARDED);
1298 }
1299
1300 #define RAS_CDEBUG(ras) \
1301         CDEBUG(D_READA,                                                      \
1302                "lrp %lu cr %lu cp %lu ws %lu wl %lu nra %lu r %lu ri %lu"    \
1303                "csr %lu sf %lu sp %lu sl %lu \n",                            \
1304                ras->ras_last_readpage, ras->ras_consecutive_requests,        \
1305                ras->ras_consecutive_pages, ras->ras_window_start,            \
1306                ras->ras_window_len, ras->ras_next_readahead,                 \
1307                ras->ras_requests, ras->ras_request_index,                    \
1308                ras->ras_consecutive_stride_requests, ras->ras_stride_offset, \
1309                ras->ras_stride_pages, ras->ras_stride_length)
1310
1311 static int index_in_window(unsigned long index, unsigned long point,
1312                            unsigned long before, unsigned long after)
1313 {
1314         unsigned long start = point - before, end = point + after;
1315
1316         if (start > point)
1317                start = 0;
1318         if (end < point)
1319                end = ~0;
1320
1321         return start <= index && index <= end;
1322 }
1323
1324 static struct ll_readahead_state *ll_ras_get(struct file *f)
1325 {
1326         struct ll_file_data       *fd;
1327
1328         fd = LUSTRE_FPRIVATE(f);
1329         return &fd->fd_ras;
1330 }
1331
1332 void ll_ra_read_in(struct file *f, struct ll_ra_read *rar)
1333 {
1334         struct ll_readahead_state *ras;
1335
1336         ras = ll_ras_get(f);
1337
1338         spin_lock(&ras->ras_lock);
1339         ras->ras_requests++;
1340         ras->ras_request_index = 0;
1341         ras->ras_consecutive_requests++;
1342         rar->lrr_reader = current;
1343
1344         list_add(&rar->lrr_linkage, &ras->ras_read_beads);
1345         spin_unlock(&ras->ras_lock);
1346 }
1347
1348 void ll_ra_read_ex(struct file *f, struct ll_ra_read *rar)
1349 {
1350         struct ll_readahead_state *ras;
1351
1352         ras = ll_ras_get(f);
1353
1354         spin_lock(&ras->ras_lock);
1355         list_del_init(&rar->lrr_linkage);
1356         spin_unlock(&ras->ras_lock);
1357 }
1358
1359 static struct ll_ra_read *ll_ra_read_get_locked(struct ll_readahead_state *ras)
1360 {
1361         struct ll_ra_read *scan;
1362
1363         list_for_each_entry(scan, &ras->ras_read_beads, lrr_linkage) {
1364                 if (scan->lrr_reader == current)
1365                         return scan;
1366         }
1367         return NULL;
1368 }
1369
1370 struct ll_ra_read *ll_ra_read_get(struct file *f)
1371 {
1372         struct ll_readahead_state *ras;
1373         struct ll_ra_read         *bead;
1374
1375         ras = ll_ras_get(f);
1376
1377         spin_lock(&ras->ras_lock);
1378         bead = ll_ra_read_get_locked(ras);
1379         spin_unlock(&ras->ras_lock);
1380         return bead;
1381 }
1382
1383 static int ll_read_ahead_page(struct obd_export *exp, struct obd_io_group *oig,
1384                               int index, struct address_space *mapping)
1385 {
1386         struct ll_async_page *llap;
1387         struct page *page;
1388         unsigned int gfp_mask = 0;
1389         int rc = 0;
1390
1391         gfp_mask = GFP_HIGHUSER & ~__GFP_WAIT;
1392 #ifdef __GFP_NOWARN
1393         gfp_mask |= __GFP_NOWARN;
1394 #endif
1395         page = grab_cache_page_nowait_gfp(mapping, index, gfp_mask);
1396         if (page == NULL) {
1397                 ll_ra_stats_inc(mapping, RA_STAT_FAILED_GRAB_PAGE);
1398                 CDEBUG(D_READA, "g_c_p_n failed\n");
1399                 return 0;
1400         }
1401
1402         /* Check if page was truncated or reclaimed */
1403         if (page->mapping != mapping) {
1404                 ll_ra_stats_inc(mapping, RA_STAT_WRONG_GRAB_PAGE);
1405                 CDEBUG(D_READA, "g_c_p_n returned invalid page\n");
1406                 GOTO(unlock_page, rc = 0);      
1407         }
1408
1409         /* we do this first so that we can see the page in the /proc
1410          * accounting */
1411         llap = llap_from_page(page, LLAP_ORIGIN_READAHEAD);
1412         if (IS_ERR(llap) || llap->llap_defer_uptodate) {
1413                 if (PTR_ERR(llap) == -ENOLCK) {
1414                         ll_ra_stats_inc(mapping, RA_STAT_FAILED_MATCH);
1415                         CDEBUG(D_READA | D_PAGE,
1416                                "Adding page to cache failed index "
1417                                 "%d\n", index);
1418                                 CDEBUG(D_READA, "nolock page\n");
1419                                 GOTO(unlock_page, rc = -ENOLCK);
1420                 }
1421                 CDEBUG(D_READA, "read-ahead page\n");
1422                 GOTO(unlock_page, rc = 0);      
1423         }
1424
1425         /* skip completed pages */
1426         if (Page_Uptodate(page))
1427                 GOTO(unlock_page, rc = 0);      
1428
1429         /* bail out when we hit the end of the lock. */
1430         rc = ll_issue_page_read(exp, llap, oig, 1);
1431         if (rc == 0) {
1432                 LL_CDEBUG_PAGE(D_READA | D_PAGE, page, "started read-ahead\n");
1433                 rc = 1;
1434         } else {
1435 unlock_page:    
1436                 unlock_page(page);
1437                 LL_CDEBUG_PAGE(D_READA | D_PAGE, page, "skipping read-ahead\n");
1438         }
1439         page_cache_release(page);
1440         return rc;
1441 }
1442
1443 /* ra_io_arg will be filled in the beginning of ll_readahead with
1444  * ras_lock, then the following ll_read_ahead_pages will read RA
1445  * pages according to this arg, all the items in this structure are
1446  * counted by page index.
1447  */
1448 struct ra_io_arg {
1449         unsigned long ria_start;  /* start offset of read-ahead*/
1450         unsigned long ria_end;    /* end offset of read-ahead*/
1451         /* If stride read pattern is detected, ria_stoff means where
1452          * stride read is started. Note: for normal read-ahead, the
1453          * value here is meaningless, and also it will not be accessed*/
1454         pgoff_t ria_stoff;
1455         /* ria_length and ria_pages are the length and pages length in the
1456          * stride I/O mode. And they will also be used to check whether
1457          * it is stride I/O read-ahead in the read-ahead pages*/
1458         unsigned long ria_length;
1459         unsigned long ria_pages;
1460 };
1461
1462 #define RIA_DEBUG(ria)                                                \
1463         CDEBUG(D_READA, "rs %lu re %lu ro %lu rl %lu rp %lu\n",       \
1464         ria->ria_start, ria->ria_end, ria->ria_stoff, ria->ria_length,\
1465         ria->ria_pages)
1466
1467 #define RAS_INCREASE_STEP (1024 * 1024 >> CFS_PAGE_SHIFT)
1468
1469 static inline int stride_io_mode(struct ll_readahead_state *ras)
1470 {
1471         return ras->ras_consecutive_stride_requests > 1;
1472 }
1473
1474 /* The function calculates how much pages will be read in
1475  * [off, off + length], which will be read by stride I/O mode,
1476  * stride_offset = st_off, stride_lengh = st_len,
1477  * stride_pages = st_pgs
1478  */
1479 static unsigned long
1480 stride_pg_count(pgoff_t st_off, unsigned long st_len, unsigned long st_pgs,
1481                 unsigned long off, unsigned length)
1482 {
1483         unsigned long cont_len = st_off > off ?  st_off - off : 0;
1484         __u64 stride_len = length + off > st_off ?
1485                            length + off + 1 - st_off : 0;
1486         unsigned long left, pg_count;
1487
1488         if (st_len == 0 || length == 0)
1489                 return length;
1490
1491         left = do_div(stride_len, st_len);
1492         left = min(left, st_pgs);
1493
1494         pg_count = left + stride_len * st_pgs + cont_len;
1495
1496         LASSERT(pg_count >= left);
1497
1498         CDEBUG(D_READA, "st_off %lu, st_len %lu st_pgs %lu off %lu length %u"
1499                "pgcount %lu\n", st_off, st_len, st_pgs, off, length, pg_count);
1500
1501         return pg_count;
1502 }
1503
1504 static int ria_page_count(struct ra_io_arg *ria)
1505 {
1506         __u64 length = ria->ria_end >= ria->ria_start ?
1507                        ria->ria_end - ria->ria_start + 1 : 0;
1508
1509         return stride_pg_count(ria->ria_stoff, ria->ria_length,
1510                                ria->ria_pages, ria->ria_start,
1511                                length);
1512 }
1513
1514 /*Check whether the index is in the defined ra-window */
1515 static int ras_inside_ra_window(unsigned long idx, struct ra_io_arg *ria)
1516 {
1517         /* If ria_length == ria_pages, it means non-stride I/O mode,
1518          * idx should always inside read-ahead window in this case
1519          * For stride I/O mode, just check whether the idx is inside
1520          * the ria_pages. */
1521         return ria->ria_length == 0 || ria->ria_length == ria->ria_pages ||
1522                (idx - ria->ria_stoff) % ria->ria_length < ria->ria_pages;
1523 }
1524
1525 static int ll_read_ahead_pages(struct obd_export *exp,
1526                                struct obd_io_group *oig,
1527                                struct ra_io_arg *ria,   
1528                                unsigned long *reserved_pages,
1529                                struct address_space *mapping,
1530                                unsigned long *ra_end)
1531 {
1532         int rc, count = 0, stride_ria;
1533         unsigned long page_idx;
1534
1535         LASSERT(ria != NULL);
1536         RIA_DEBUG(ria);
1537
1538         stride_ria = ria->ria_length > ria->ria_pages && ria->ria_pages > 0;
1539         for (page_idx = ria->ria_start; page_idx <= ria->ria_end &&
1540                         *reserved_pages > 0; page_idx++) {
1541                 if (ras_inside_ra_window(page_idx, ria)) {
1542                         /* If the page is inside the read-ahead window*/
1543                         rc = ll_read_ahead_page(exp, oig, page_idx, mapping);
1544                         if (rc == 1) {
1545                                 (*reserved_pages)--;
1546                                 count ++;
1547                         } else if (rc == -ENOLCK)
1548                                 break;
1549                 } else if (stride_ria) {
1550                         /* If it is not in the read-ahead window, and it is
1551                          * read-ahead mode, then check whether it should skip
1552                          * the stride gap */
1553                         pgoff_t offset;
1554                         /* FIXME: This assertion only is valid when it is for
1555                          * forward read-ahead, it will be fixed when backward
1556                          * read-ahead is implemented */
1557                         LASSERTF(page_idx > ria->ria_stoff, "since %lu in the"
1558                                 " gap of ra window,it should bigger than stride"
1559                                 " offset %lu \n", page_idx, ria->ria_stoff);
1560
1561                         offset = page_idx - ria->ria_stoff;
1562                         offset = offset % (ria->ria_length);
1563                         if (offset > ria->ria_pages) {
1564                                 page_idx += ria->ria_length - offset;
1565                                 CDEBUG(D_READA, "i %lu skip %lu \n", page_idx,
1566                                        ria->ria_length - offset);
1567                                 continue;
1568                         }
1569                 }
1570         }
1571         *ra_end = page_idx;
1572         return count;
1573 }
1574
1575 static int ll_readahead(struct ll_readahead_state *ras,
1576                          struct obd_export *exp, struct address_space *mapping,
1577                          struct obd_io_group *oig, int flags)
1578 {
1579         unsigned long start = 0, end = 0, reserved;
1580         unsigned long ra_end, len;
1581         struct inode *inode;
1582         struct lov_stripe_md *lsm;
1583         struct ll_ra_read *bead;
1584         struct ost_lvb lvb;
1585         struct ra_io_arg ria = { 0 };
1586         int ret = 0;
1587         __u64 kms;
1588         ENTRY;
1589
1590         inode = mapping->host;
1591         lsm = ll_i2info(inode)->lli_smd;
1592
1593         lov_stripe_lock(lsm);
1594         inode_init_lvb(inode, &lvb);
1595         obd_merge_lvb(ll_i2dtexp(inode), lsm, &lvb, 1);
1596         kms = lvb.lvb_size;
1597         lov_stripe_unlock(lsm);
1598         if (kms == 0) {
1599                 ll_ra_stats_inc(mapping, RA_STAT_ZERO_LEN);
1600                 RETURN(0);
1601         }
1602
1603         spin_lock(&ras->ras_lock);
1604         bead = ll_ra_read_get_locked(ras);
1605         /* Enlarge the RA window to encompass the full read */
1606         if (bead != NULL && ras->ras_window_start + ras->ras_window_len <
1607             bead->lrr_start + bead->lrr_count) {
1608                 ras->ras_window_len = bead->lrr_start + bead->lrr_count -
1609                                       ras->ras_window_start;
1610         }
1611         /* Reserve a part of the read-ahead window that we'll be issuing */
1612         if (ras->ras_window_len) {
1613                 start = ras->ras_next_readahead;
1614                 end = ras->ras_window_start + ras->ras_window_len - 1;
1615         }
1616         if (end != 0) {
1617                 /* Truncate RA window to end of file */
1618                 end = min(end, (unsigned long)((kms - 1) >> CFS_PAGE_SHIFT));
1619                 ras->ras_next_readahead = max(end, end + 1);
1620                 RAS_CDEBUG(ras);
1621         }
1622         ria.ria_start = start;
1623         ria.ria_end = end;
1624         /* If stride I/O mode is detected, get stride window*/
1625         if (stride_io_mode(ras)) {
1626                 ria.ria_stoff = ras->ras_stride_offset;
1627                 ria.ria_length = ras->ras_stride_length;
1628                 ria.ria_pages = ras->ras_stride_pages;
1629         }
1630         spin_unlock(&ras->ras_lock);
1631
1632         if (end == 0) {
1633                 ll_ra_stats_inc(mapping, RA_STAT_ZERO_WINDOW);
1634                 RETURN(0);
1635         }
1636         len = ria_page_count(&ria);
1637         if (len == 0)
1638                 RETURN(0);
1639
1640         reserved = ll_ra_count_get(ll_i2sbi(inode), len);
1641
1642         if (reserved < len)
1643                 ll_ra_stats_inc(mapping, RA_STAT_MAX_IN_FLIGHT);
1644
1645         CDEBUG(D_READA, "reserved page %lu \n", reserved);
1646         
1647         ret = ll_read_ahead_pages(exp, oig, &ria, &reserved, mapping, &ra_end);
1648
1649         LASSERTF(reserved >= 0, "reserved %lu\n", reserved);
1650         if (reserved != 0)
1651                 ll_ra_count_put(ll_i2sbi(inode), reserved);
1652
1653         if (ra_end == end + 1 && ra_end == (kms >> CFS_PAGE_SHIFT))
1654                 ll_ra_stats_inc(mapping, RA_STAT_EOF);
1655
1656         /* if we didn't get to the end of the region we reserved from
1657          * the ras we need to go back and update the ras so that the
1658          * next read-ahead tries from where we left off.  we only do so
1659          * if the region we failed to issue read-ahead on is still ahead
1660          * of the app and behind the next index to start read-ahead from */
1661         CDEBUG(D_READA, "ra_end %lu end %lu stride end %lu \n",
1662                ra_end, end, ria.ria_end);
1663
1664         if (ra_end != (end + 1)) {
1665                 spin_lock(&ras->ras_lock);
1666                 if (ra_end < ras->ras_next_readahead &&
1667                     index_in_window(ra_end, ras->ras_window_start, 0,
1668                                     ras->ras_window_len)) {
1669                         ras->ras_next_readahead = ra_end;
1670                         RAS_CDEBUG(ras);
1671                 }
1672                 spin_unlock(&ras->ras_lock);
1673         }
1674
1675         RETURN(ret);
1676 }
1677
1678 static void ras_set_start(struct ll_readahead_state *ras, unsigned long index)
1679 {
1680         ras->ras_window_start = index & (~(RAS_INCREASE_STEP - 1));
1681 }
1682
1683 /* called with the ras_lock held or from places where it doesn't matter */
1684 static void ras_reset(struct ll_readahead_state *ras, unsigned long index)
1685 {
1686         ras->ras_last_readpage = index;
1687         ras->ras_consecutive_requests = 0;
1688         ras->ras_consecutive_pages = 0;
1689         ras->ras_window_len = 0;
1690         ras_set_start(ras, index);
1691         ras->ras_next_readahead = max(ras->ras_window_start, index);
1692
1693         RAS_CDEBUG(ras);
1694 }
1695
1696 /* called with the ras_lock held or from places where it doesn't matter */
1697 static void ras_stride_reset(struct ll_readahead_state *ras)
1698 {
1699         ras->ras_consecutive_stride_requests = 0;
1700         RAS_CDEBUG(ras);
1701 }
1702
1703 void ll_readahead_init(struct inode *inode, struct ll_readahead_state *ras)
1704 {
1705         spin_lock_init(&ras->ras_lock);
1706         ras_reset(ras, 0);
1707         ras->ras_requests = 0;
1708         INIT_LIST_HEAD(&ras->ras_read_beads);
1709 }
1710
1711 /* Check whether the read request is in the stride window.
1712  * If it is in the stride window, return 1, otherwise return 0.
1713  * and also update stride_gap and stride_pages.
1714  */
1715 static int index_in_stride_window(unsigned long index,
1716                                   struct ll_readahead_state *ras,
1717                                   struct inode *inode)
1718 {
1719         int stride_gap = index - ras->ras_last_readpage - 1;
1720
1721         LASSERT(stride_gap != 0);
1722
1723         if (ras->ras_consecutive_pages == 0)
1724                 return 0;
1725
1726         /*Otherwise check the stride by itself */
1727         if ((ras->ras_stride_length - ras->ras_stride_pages) == stride_gap &&
1728             ras->ras_consecutive_pages == ras->ras_stride_pages)
1729                 return 1;
1730
1731         if (stride_gap >= 0) {
1732                 /*
1733                  * only set stride_pages, stride_length if
1734                  * it is forward reading ( stride_gap > 0)
1735                  */
1736                 ras->ras_stride_pages = ras->ras_consecutive_pages;
1737                 ras->ras_stride_length = stride_gap + ras->ras_consecutive_pages;
1738         } else {
1739                 /*
1740                  * If stride_gap < 0,(back_forward reading),
1741                  * reset the stride_pages/length.
1742                  * FIXME:back_ward stride I/O read.
1743                  *
1744                  */
1745                 ras->ras_stride_pages = 0;
1746                 ras->ras_stride_length = 0;
1747         }
1748         RAS_CDEBUG(ras);
1749
1750         return 0;
1751 }
1752
1753 static unsigned long
1754 stride_page_count(struct ll_readahead_state *ras, unsigned long len)
1755 {
1756         return stride_pg_count(ras->ras_stride_offset, ras->ras_stride_length,
1757                                ras->ras_stride_pages, ras->ras_stride_offset,
1758                                len);
1759 }
1760
1761 /* Stride Read-ahead window will be increased inc_len according to
1762  * stride I/O pattern */
1763 static void ras_stride_increase_window(struct ll_readahead_state *ras,
1764                                        struct ll_ra_info *ra,
1765                                        unsigned long inc_len)
1766 {
1767         unsigned long left, step, window_len;
1768         unsigned long stride_len;
1769
1770         LASSERT(ras->ras_stride_length > 0);
1771
1772         stride_len = ras->ras_window_start + ras->ras_window_len -
1773                      ras->ras_stride_offset;
1774
1775         LASSERTF(stride_len >= 0, "window_start %lu, window_len %lu"
1776                  " stride_offset %lu\n", ras->ras_window_start,
1777                  ras->ras_window_len, ras->ras_stride_offset);
1778
1779         left = stride_len % ras->ras_stride_length;
1780
1781         window_len = ras->ras_window_len - left;
1782
1783         if (left < ras->ras_stride_pages)
1784                 left += inc_len;
1785         else
1786                 left = ras->ras_stride_pages + inc_len;
1787
1788         LASSERT(ras->ras_stride_pages != 0);
1789
1790         step = left / ras->ras_stride_pages;
1791         left %= ras->ras_stride_pages;
1792
1793         window_len += step * ras->ras_stride_length + left;
1794
1795         if (stride_page_count(ras, window_len) <= ra->ra_max_pages)
1796                 ras->ras_window_len = window_len;
1797
1798         RAS_CDEBUG(ras);
1799 }
1800
1801 /* Set stride I/O read-ahead window start offset */
1802 static void ras_set_stride_offset(struct ll_readahead_state *ras)
1803 {
1804         unsigned long window_len = ras->ras_next_readahead -
1805                                    ras->ras_window_start;
1806         unsigned long left;
1807
1808         LASSERT(ras->ras_stride_length != 0);
1809
1810         left = window_len % ras->ras_stride_length;
1811
1812         ras->ras_stride_offset = ras->ras_next_readahead - left;
1813
1814         RAS_CDEBUG(ras);
1815 }
1816
1817 static void ras_update(struct ll_sb_info *sbi, struct inode *inode,
1818                        struct ll_readahead_state *ras, unsigned long index,
1819                        unsigned hit)
1820 {
1821         struct ll_ra_info *ra = &sbi->ll_ra_info;
1822         int zero = 0, stride_zero = 0, stride_detect = 0, ra_miss = 0;
1823         ENTRY;
1824
1825         spin_lock(&ras->ras_lock);
1826
1827         ll_ra_stats_inc_sbi(sbi, hit ? RA_STAT_HIT : RA_STAT_MISS);
1828
1829         /* reset the read-ahead window in two cases.  First when the app seeks
1830          * or reads to some other part of the file.  Secondly if we get a
1831          * read-ahead miss that we think we've previously issued.  This can
1832          * be a symptom of there being so many read-ahead pages that the VM is
1833          * reclaiming it before we get to it. */
1834         if (!index_in_window(index, ras->ras_last_readpage, 8, 8)) {
1835                 zero = 1;
1836                 ll_ra_stats_inc_sbi(sbi, RA_STAT_DISTANT_READPAGE);
1837                 /* check whether it is in stride I/O mode*/
1838                 if (!index_in_stride_window(index, ras, inode))
1839                         stride_zero = 1;
1840         } else if (!hit && ras->ras_window_len &&
1841                    index < ras->ras_next_readahead &&
1842                    index_in_window(index, ras->ras_window_start, 0,
1843                                    ras->ras_window_len)) {
1844                 zero = 1;
1845                 ra_miss = 1;
1846                 /* If it hits read-ahead miss and the stride I/O is still
1847                  * not detected, reset stride stuff to re-detect the whole
1848                  * stride I/O mode to avoid complication */
1849                 if (!stride_io_mode(ras))
1850                         stride_zero = 1;
1851                 ll_ra_stats_inc_sbi(sbi, RA_STAT_MISS_IN_WINDOW);
1852         }
1853
1854         /* On the second access to a file smaller than the tunable
1855          * ra_max_read_ahead_whole_pages trigger RA on all pages in the
1856          * file up to ra_max_pages.  This is simply a best effort and
1857          * only occurs once per open file.  Normal RA behavior is reverted
1858          * to for subsequent IO.  The mmap case does not increment
1859          * ras_requests and thus can never trigger this behavior. */
1860         if (ras->ras_requests == 2 && !ras->ras_request_index) {
1861                 __u64 kms_pages;
1862
1863                 kms_pages = (i_size_read(inode) + CFS_PAGE_SIZE - 1) >>
1864                             CFS_PAGE_SHIFT;
1865
1866                 CDEBUG(D_READA, "kmsp "LPU64" mwp %lu mp %lu\n", kms_pages,
1867                        ra->ra_max_read_ahead_whole_pages, ra->ra_max_pages);
1868
1869                 if (kms_pages &&
1870                     kms_pages <= ra->ra_max_read_ahead_whole_pages) {
1871                         ras->ras_window_start = 0;
1872                         ras->ras_last_readpage = 0;
1873                         ras->ras_next_readahead = 0;
1874                         ras->ras_window_len = min(ra->ra_max_pages,
1875                                 ra->ra_max_read_ahead_whole_pages);
1876                         GOTO(out_unlock, 0);
1877                 }
1878         }
1879
1880         if (zero) {
1881                 /* If it is discontinuous read, check
1882                  * whether it is stride I/O mode*/
1883                 if (stride_zero) {
1884                         ras_reset(ras, index);
1885                         ras->ras_consecutive_pages++;
1886                         ras_stride_reset(ras);
1887                         RAS_CDEBUG(ras);
1888                         GOTO(out_unlock, 0);
1889                 } else {
1890                         /* The read is still in stride window or
1891                          * it hits read-ahead miss */
1892
1893                         /* If ra-window miss is hitted, which probably means VM
1894                          * pressure, and some read-ahead pages were reclaimed.So
1895                          * the length of ra-window will not increased, but also
1896                          * not reset to avoid redetecting the stride I/O mode.*/
1897                         ras->ras_consecutive_requests = 0;
1898                         if (!ra_miss) {
1899                                 ras->ras_consecutive_pages = 0;
1900                                 if (++ras->ras_consecutive_stride_requests > 1)
1901                                         stride_detect = 1;
1902                         }
1903                         RAS_CDEBUG(ras);
1904                 }
1905         } else if (ras->ras_consecutive_stride_requests > 1) {
1906                 /* If this is contiguous read but in stride I/O mode
1907                  * currently, check whether stride step still is valid,
1908                  * if invalid, it will reset the stride ra window*/     
1909                 if (ras->ras_consecutive_pages + 1 > ras->ras_stride_pages)
1910                         ras_stride_reset(ras);
1911         }
1912
1913         ras->ras_last_readpage = index;
1914         ras->ras_consecutive_pages++;
1915         ras_set_start(ras, index);
1916         ras->ras_next_readahead = max(ras->ras_window_start,
1917                                       ras->ras_next_readahead);
1918         RAS_CDEBUG(ras);
1919
1920         /* Trigger RA in the mmap case where ras_consecutive_requests
1921          * is not incremented and thus can't be used to trigger RA */
1922         if (!ras->ras_window_len && ras->ras_consecutive_pages == 4) {
1923                 ras->ras_window_len = RAS_INCREASE_STEP;
1924                 GOTO(out_unlock, 0);
1925         }
1926
1927         /* Initially reset the stride window offset to next_readahead*/
1928         if (ras->ras_consecutive_stride_requests == 2 && stride_detect)
1929                 ras_set_stride_offset(ras);
1930
1931         /* The initial ras_window_len is set to the request size.  To avoid
1932          * uselessly reading and discarding pages for random IO the window is
1933          * only increased once per consecutive request received. */
1934         if ((ras->ras_consecutive_requests > 1 &&
1935             !ras->ras_request_index) || stride_detect) {
1936                 if (stride_io_mode(ras))
1937                         ras_stride_increase_window(ras, ra, RAS_INCREASE_STEP);
1938                 else
1939                         ras->ras_window_len = min(ras->ras_window_len +
1940                                                   RAS_INCREASE_STEP,
1941                                                   ra->ra_max_pages);
1942         }
1943         EXIT;
1944 out_unlock:
1945         RAS_CDEBUG(ras);
1946         ras->ras_request_index++;
1947         spin_unlock(&ras->ras_lock);
1948         return;
1949 }
1950
1951 int ll_writepage(struct page *page)
1952 {
1953         struct inode *inode = page->mapping->host;
1954         struct ll_inode_info *lli = ll_i2info(inode);
1955         struct obd_export *exp;
1956         struct ll_async_page *llap;
1957         int rc = 0;
1958         ENTRY;
1959
1960         LASSERT(PageLocked(page));
1961
1962         exp = ll_i2dtexp(inode);
1963         if (exp == NULL)
1964                 GOTO(out, rc = -EINVAL);
1965
1966         llap = llap_from_page(page, LLAP_ORIGIN_WRITEPAGE);
1967         if (IS_ERR(llap))
1968                 GOTO(out, rc = PTR_ERR(llap));
1969
1970         LASSERT(!llap->llap_nocache);
1971         LASSERT(!PageWriteback(page));
1972         set_page_writeback(page);
1973
1974         page_cache_get(page);
1975         if (llap->llap_write_queued) {
1976                 LL_CDEBUG_PAGE(D_PAGE, page, "marking urgent\n");
1977                 rc = obd_set_async_flags(exp, lli->lli_smd, NULL,
1978                                          llap->llap_cookie,
1979                                          ASYNC_READY | ASYNC_URGENT);
1980         } else {
1981                 rc = queue_or_sync_write(exp, inode, llap, CFS_PAGE_SIZE,
1982                                          ASYNC_READY | ASYNC_URGENT);
1983         }
1984         if (rc) {
1985                 /* re-dirty page on error so it retries write */
1986                 if (PageWriteback(page))
1987                         end_page_writeback(page);
1988
1989                 /* resend page only for not started IO*/
1990                 if (!PageError(page))
1991                         ll_redirty_page(page);
1992
1993                 page_cache_release(page);
1994         }
1995 out:
1996         if (rc) {
1997                 if (!lli->lli_async_rc)
1998                         lli->lli_async_rc = rc;
1999                 /* resend page only for not started IO*/
2000                 unlock_page(page);
2001         }
2002         RETURN(rc);
2003 }
2004
2005 /*
2006  * for now we do our readpage the same on both 2.4 and 2.5.  The kernel's
2007  * read-ahead assumes it is valid to issue readpage all the way up to
2008  * i_size, but our dlm locks make that not the case.  We disable the
2009  * kernel's read-ahead and do our own by walking ahead in the page cache
2010  * checking for dlm lock coverage.  the main difference between 2.4 and
2011  * 2.6 is how read-ahead gets batched and issued, but we're using our own,
2012  * so they look the same.
2013  */
2014 int ll_readpage(struct file *filp, struct page *page)
2015 {
2016         struct ll_file_data *fd = LUSTRE_FPRIVATE(filp);
2017         struct inode *inode = page->mapping->host;
2018         struct obd_export *exp;
2019         struct ll_async_page *llap;
2020         struct obd_io_group *oig = NULL;
2021         struct lustre_handle *lockh = NULL;
2022         int rc;
2023         ENTRY;
2024
2025         LASSERT(PageLocked(page));
2026         LASSERT(!PageUptodate(page));
2027         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),offset=%Lu=%#Lx\n",
2028                inode->i_ino, inode->i_generation, inode,
2029                (((loff_t)page->index) << CFS_PAGE_SHIFT),
2030                (((loff_t)page->index) << CFS_PAGE_SHIFT));
2031         LASSERT(atomic_read(&filp->f_dentry->d_inode->i_count) > 0);
2032
2033         if (!ll_i2info(inode)->lli_smd) {
2034                 /* File with no objects - one big hole */
2035                 /* We use this just for remove_from_page_cache that is not
2036                  * exported, we'd make page back up to date. */
2037                 ll_truncate_complete_page(page);
2038                 clear_page(kmap(page));
2039                 kunmap(page);
2040                 SetPageUptodate(page);
2041                 unlock_page(page);
2042                 RETURN(0);
2043         }
2044
2045         rc = oig_init(&oig);
2046         if (rc < 0)
2047                 GOTO(out, rc);
2048
2049         exp = ll_i2dtexp(inode);
2050         if (exp == NULL)
2051                 GOTO(out, rc = -EINVAL);
2052
2053         if (fd->fd_flags & LL_FILE_GROUP_LOCKED)
2054                 lockh = &fd->fd_cwlockh;
2055
2056         llap = llap_from_page_with_lockh(page, LLAP_ORIGIN_READPAGE, lockh);
2057         if (IS_ERR(llap)) {
2058                 if (PTR_ERR(llap) == -ENOLCK) {
2059                         CWARN("ino %lu page %lu (%llu) not covered by "
2060                               "a lock (mmap?).  check debug logs.\n",
2061                               inode->i_ino, page->index,
2062                               (long long)page->index << PAGE_CACHE_SHIFT);
2063                 }
2064                 GOTO(out, rc = PTR_ERR(llap));
2065         }
2066
2067         if (ll_i2sbi(inode)->ll_ra_info.ra_max_pages)
2068                 ras_update(ll_i2sbi(inode), inode, &fd->fd_ras, page->index,
2069                            llap->llap_defer_uptodate);
2070
2071
2072         if (llap->llap_defer_uptodate) {
2073                 /* This is the callpath if we got the page from a readahead */
2074                 llap->llap_ra_used = 1;
2075                 rc = ll_readahead(&fd->fd_ras, exp, page->mapping, oig,
2076                                   fd->fd_flags);
2077                 if (rc > 0)
2078                         obd_trigger_group_io(exp, ll_i2info(inode)->lli_smd,
2079                                              NULL, oig);
2080                 LL_CDEBUG_PAGE(D_PAGE, page, "marking uptodate from defer\n");
2081                 SetPageUptodate(page);
2082                 unlock_page(page);
2083                 GOTO(out_oig, rc = 0);
2084         }
2085
2086         rc = ll_issue_page_read(exp, llap, oig, 0);
2087         if (rc)
2088                 GOTO(out, rc);
2089
2090         LL_CDEBUG_PAGE(D_PAGE, page, "queued readpage\n");
2091         /* We have just requested the actual page we want, see if we can tack
2092          * on some readahead to that page's RPC before it is sent. */
2093         if (ll_i2sbi(inode)->ll_ra_info.ra_max_pages)
2094                 ll_readahead(&fd->fd_ras, exp, page->mapping, oig,
2095                              fd->fd_flags);
2096
2097         rc = obd_trigger_group_io(exp, ll_i2info(inode)->lli_smd, NULL, oig);
2098
2099 out:
2100         if (rc)
2101                 unlock_page(page);
2102 out_oig:
2103         if (oig != NULL)
2104                 oig_release(oig);
2105         RETURN(rc);
2106 }
2107
2108 static void ll_file_put_pages(struct page **pages, int numpages)
2109 {
2110         int i;
2111         struct page **pp;
2112         ENTRY;
2113
2114         for (i = 0, pp = pages; i < numpages; i++, pp++) {
2115                 if (*pp) {
2116                         LL_CDEBUG_PAGE(D_PAGE, (*pp), "free\n");
2117                         __ll_put_llap(*pp);
2118                         if (page_private(*pp))
2119                                 CERROR("the llap wasn't freed\n");
2120                         (*pp)->mapping = NULL;
2121                         if (page_count(*pp) != 1)
2122                                 CERROR("page %p, flags %#lx, count %i, private %p\n",
2123                                 (*pp), (unsigned long)(*pp)->flags, page_count(*pp),
2124                                 (void*)page_private(*pp));
2125                         __free_pages(*pp, 0);
2126                 }
2127         }
2128         OBD_FREE(pages, numpages * sizeof(struct page*));
2129         EXIT;
2130 }
2131
2132 static struct page **ll_file_prepare_pages(int numpages, struct inode *inode,
2133                                            unsigned long first)
2134 {
2135         struct page **pages;
2136         int i;
2137         int rc = 0;
2138         ENTRY;
2139
2140         OBD_ALLOC(pages, sizeof(struct page *) * numpages);
2141         if (pages == NULL)
2142                 RETURN(ERR_PTR(-ENOMEM));
2143         for (i = 0; i < numpages; i++) {
2144                 struct page *page;
2145                 struct ll_async_page *llap;
2146
2147                 page = alloc_pages(GFP_HIGHUSER, 0);
2148                 if (page == NULL)
2149                         GOTO(err, rc = -ENOMEM);
2150                 pages[i] = page;
2151                 /* llap_from_page needs page index and mapping to be set */
2152                 page->index = first++;
2153                 page->mapping = inode->i_mapping;
2154                 llap = llap_from_page(page, LLAP_ORIGIN_LOCKLESS_IO);
2155                 if (IS_ERR(llap))
2156                         GOTO(err, rc = PTR_ERR(llap));
2157                 llap->llap_lockless_io_page = 1;
2158         }
2159         RETURN(pages);
2160 err:
2161         ll_file_put_pages(pages, numpages);
2162         RETURN(ERR_PTR(rc));
2163  }
2164
2165 static ssize_t ll_file_copy_pages(struct page **pages, int numpages,
2166                                   char *buf, loff_t pos, size_t count, int rw)
2167 {
2168         ssize_t amount = 0;
2169         int i;
2170         int updatechecksum = ll_i2sbi(pages[0]->mapping->host)->ll_flags &
2171                              LL_SBI_CHECKSUM;
2172         ENTRY;
2173
2174         for (i = 0; i < numpages; i++) {
2175                 unsigned offset, bytes, left;
2176                 char *vaddr;
2177
2178                 vaddr = kmap(pages[i]);
2179                 offset = pos & (CFS_PAGE_SIZE - 1);
2180                 bytes = min_t(unsigned, CFS_PAGE_SIZE - offset, count);
2181                 LL_CDEBUG_PAGE(D_PAGE, pages[i], "op = %s, addr = %p, "
2182                                "buf = %p, bytes = %u\n",
2183                                (rw == WRITE) ? "CFU" : "CTU",
2184                                vaddr + offset, buf, bytes);
2185                 if (rw == WRITE) {
2186                         left = copy_from_user(vaddr + offset, buf, bytes);
2187                         if (updatechecksum) {
2188                                 struct ll_async_page *llap;
2189
2190                                 llap = llap_cast_private(pages[i]);
2191                                 llap->llap_checksum = crc32_le(0, vaddr,
2192                                                                CFS_PAGE_SIZE);
2193                         }
2194                 } else {
2195                         left = copy_to_user(buf, vaddr + offset, bytes);
2196                 }
2197                 kunmap(pages[i]);
2198                 amount += bytes;
2199                 if (left) {
2200                         amount -= left;
2201                         break;
2202                 }
2203                 buf += bytes;
2204                 count -= bytes;
2205                 pos += bytes;
2206         }
2207         if (amount == 0)
2208                 RETURN(-EFAULT);
2209         RETURN(amount);
2210 }
2211
2212 static int ll_file_oig_pages(struct inode * inode, struct page **pages,
2213                              int numpages, loff_t pos, size_t count, int rw)
2214 {
2215         struct obd_io_group *oig;
2216         struct ll_inode_info *lli = ll_i2info(inode);
2217         struct obd_export *exp;
2218         loff_t org_pos = pos;
2219         obd_flag brw_flags;
2220         int rc;
2221         int i;
2222         ENTRY;
2223
2224         exp = ll_i2dtexp(inode);
2225         if (exp == NULL)
2226                 RETURN(-EINVAL);
2227         rc = oig_init(&oig);
2228         if (rc)
2229                 RETURN(rc);
2230         brw_flags = OBD_BRW_SRVLOCK;
2231         if (cfs_capable(CFS_CAP_SYS_RESOURCE))
2232                 brw_flags |= OBD_BRW_NOQUOTA;
2233
2234         for (i = 0; i < numpages; i++) {
2235                 struct ll_async_page *llap;
2236                 unsigned from, bytes;
2237
2238                 from = pos & (CFS_PAGE_SIZE - 1);
2239                 bytes = min_t(unsigned, CFS_PAGE_SIZE - from,
2240                               count - pos + org_pos);
2241                 llap = llap_cast_private(pages[i]);
2242                 LASSERT(llap);
2243
2244                 lock_page(pages[i]);
2245
2246                 LL_CDEBUG_PAGE(D_PAGE, pages[i], "offset "LPU64","
2247                                " from %u, bytes = %u\n",
2248                                (__u64)pos, from, bytes);
2249                 LASSERTF(pos >> CFS_PAGE_SHIFT == pages[i]->index,
2250                          "wrong page index %lu (%lu)\n",
2251                          pages[i]->index,
2252                          (unsigned long)(pos >> CFS_PAGE_SHIFT));
2253                 rc = obd_queue_group_io(exp, lli->lli_smd, NULL, oig,
2254                                         llap->llap_cookie,
2255                                         (rw == WRITE) ?
2256                                         OBD_BRW_WRITE:OBD_BRW_READ,
2257                                         from, bytes, brw_flags,
2258                                         ASYNC_READY | ASYNC_URGENT |
2259                                         ASYNC_COUNT_STABLE | ASYNC_GROUP_SYNC);
2260                 if (rc) {
2261                         i++;
2262                         GOTO(out, rc);
2263                 }
2264                 pos += bytes;
2265         }
2266         rc = obd_trigger_group_io(exp, lli->lli_smd, NULL, oig);
2267         if (rc)
2268                 GOTO(out, rc);
2269         rc = oig_wait(oig);
2270 out:
2271         while(--i >= 0)
2272                 unlock_page(pages[i]);
2273         oig_release(oig);
2274         RETURN(rc);
2275 }
2276
2277 ssize_t ll_file_lockless_io(struct file *file, char *buf, size_t count,
2278                                    loff_t *ppos, int rw)
2279 {
2280         loff_t pos;
2281         struct inode *inode = file->f_dentry->d_inode;
2282         ssize_t rc = 0;
2283         int max_pages;
2284         size_t amount = 0;
2285         unsigned long first, last;
2286         ENTRY;
2287
2288         if (rw == READ) {
2289                 loff_t isize;
2290
2291                 ll_inode_size_lock(inode, 0);
2292                 isize = i_size_read(inode);
2293                 ll_inode_size_unlock(inode, 0);
2294                 if (*ppos >= isize)
2295                         GOTO(out, rc = 0);
2296                 if (*ppos + count >= isize)
2297                         count -= *ppos + count - isize;
2298                 if (count == 0)
2299                         GOTO(out, rc);
2300         } else {
2301                 rc = generic_write_checks(file, ppos, &count, 0);
2302                 if (rc)
2303                         GOTO(out, rc);
2304                 rc = ll_remove_suid(file->f_dentry, file->f_vfsmnt);
2305                 if (rc)
2306                         GOTO(out, rc);
2307         }
2308         pos = *ppos;
2309         first = pos >> CFS_PAGE_SHIFT;
2310         last = (pos + count - 1) >> CFS_PAGE_SHIFT;
2311         max_pages = PTLRPC_MAX_BRW_PAGES *
2312                 ll_i2info(inode)->lli_smd->lsm_stripe_count;
2313         CDEBUG(D_INFO, "%u, stripe_count = %u\n",
2314                PTLRPC_MAX_BRW_PAGES /* max_pages_per_rpc */,
2315                ll_i2info(inode)->lli_smd->lsm_stripe_count);
2316
2317         while (first <= last && rc >= 0) {
2318                 int pages_for_io;
2319                 struct page **pages;
2320                 size_t bytes = count - amount;
2321
2322                 pages_for_io = min_t(int, last - first + 1, max_pages);
2323                 pages = ll_file_prepare_pages(pages_for_io, inode, first);
2324                 if (IS_ERR(pages)) {
2325                         rc = PTR_ERR(pages);
2326                         break;
2327                 }
2328                 if (rw == WRITE) {
2329                         rc = ll_file_copy_pages(pages, pages_for_io, buf,
2330                                                 pos + amount, bytes, rw);
2331                         if (rc < 0)
2332                                 GOTO(put_pages, rc);
2333                         bytes = rc;
2334                 }
2335                 rc = ll_file_oig_pages(inode, pages, pages_for_io,
2336                                        pos + amount, bytes, rw);
2337                 if (rc)
2338                         GOTO(put_pages, rc);
2339                 if (rw == READ) {
2340                         rc = ll_file_copy_pages(pages, pages_for_io, buf,
2341                                                 pos + amount, bytes, rw);
2342                         if (rc < 0)
2343                                 GOTO(put_pages, rc);
2344                         bytes = rc;
2345                 }
2346                 amount += bytes;
2347                 buf += bytes;
2348 put_pages:
2349                 ll_file_put_pages(pages, pages_for_io);
2350                 first += pages_for_io;
2351                 /* a short read/write check */
2352                 if (pos + amount < ((loff_t)first << CFS_PAGE_SHIFT))
2353                         break;
2354         }
2355         /* NOTE: don't update i_size and KMS in absence of LDLM locks even
2356          * write makes the file large */
2357         file_accessed(file);
2358         if (rw == READ && amount < count && rc == 0) {
2359                 unsigned long not_cleared;
2360
2361                 not_cleared = clear_user(buf, count - amount);
2362                 amount = count - not_cleared;
2363                 if (not_cleared)
2364                         rc = -EFAULT;
2365         }
2366         if (amount > 0) {
2367                 lprocfs_counter_add(ll_i2sbi(inode)->ll_stats,
2368                                     (rw == WRITE) ?
2369                                     LPROC_LL_LOCKLESS_WRITE :
2370                                     LPROC_LL_LOCKLESS_READ,
2371                                     (long)amount);
2372                 *ppos += amount;
2373                 RETURN(amount);
2374         }
2375 out:
2376         RETURN(rc);
2377 }