Whamcloud - gitweb
LU-8130 ldlm: add a counter to the per-namespace data
[fs/lustre-release.git] / lustre / llite / rw.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/llite/rw.c
33  *
34  * Lustre Lite I/O page cache routines shared by different kernel revs
35  */
36
37 #include <linux/kernel.h>
38 #include <linux/mm.h>
39 #include <linux/string.h>
40 #include <linux/stat.h>
41 #include <linux/errno.h>
42 #include <linux/unistd.h>
43 #include <linux/writeback.h>
44 #include <asm/uaccess.h>
45
46 #include <linux/fs.h>
47 #include <linux/file.h>
48 #include <linux/stat.h>
49 #include <asm/uaccess.h>
50 #include <linux/mm.h>
51 #include <linux/pagemap.h>
52 /* current_is_kswapd() */
53 #include <linux/swap.h>
54 #include <linux/task_io_accounting_ops.h>
55
56 #define DEBUG_SUBSYSTEM S_LLITE
57
58 #include <obd_cksum.h>
59 #include "llite_internal.h"
60 #include <lustre_compat.h>
61
62 static void ll_ra_stats_inc_sbi(struct ll_sb_info *sbi, enum ra_stat which);
63
64 /**
65  * Get readahead pages from the filesystem readahead pool of the client for a
66  * thread.
67  *
68  * /param sbi superblock for filesystem readahead state ll_ra_info
69  * /param ria per-thread readahead state
70  * /param pages number of pages requested for readahead for the thread.
71  *
72  * WARNING: This algorithm is used to reduce contention on sbi->ll_lock.
73  * It should work well if the ra_max_pages is much greater than the single
74  * file's read-ahead window, and not too many threads contending for
75  * these readahead pages.
76  *
77  * TODO: There may be a 'global sync problem' if many threads are trying
78  * to get an ra budget that is larger than the remaining readahead pages
79  * and reach here at exactly the same time. They will compute /a ret to
80  * consume the remaining pages, but will fail at atomic_add_return() and
81  * get a zero ra window, although there is still ra space remaining. - Jay */
82
83 static unsigned long ll_ra_count_get(struct ll_sb_info *sbi,
84                                      struct ra_io_arg *ria,
85                                      unsigned long pages, unsigned long min)
86 {
87         struct ll_ra_info *ra = &sbi->ll_ra_info;
88         long ret;
89         ENTRY;
90
91         /* If read-ahead pages left are less than 1M, do not do read-ahead,
92          * otherwise it will form small read RPC(< 1M), which hurt server
93          * performance a lot. */
94         ret = min(ra->ra_max_pages - atomic_read(&ra->ra_cur_pages),
95                   pages);
96         if (ret < 0 || ret < min_t(long, PTLRPC_MAX_BRW_PAGES, pages))
97                 GOTO(out, ret = 0);
98
99         if (atomic_add_return(ret, &ra->ra_cur_pages) > ra->ra_max_pages) {
100                 atomic_sub(ret, &ra->ra_cur_pages);
101                 ret = 0;
102         }
103
104 out:
105         if (ret < min) {
106                 /* override ra limit for maximum performance */
107                 atomic_add(min - ret, &ra->ra_cur_pages);
108                 ret = min;
109         }
110         RETURN(ret);
111 }
112
113 void ll_ra_count_put(struct ll_sb_info *sbi, unsigned long len)
114 {
115         struct ll_ra_info *ra = &sbi->ll_ra_info;
116         atomic_sub(len, &ra->ra_cur_pages);
117 }
118
119 static void ll_ra_stats_inc_sbi(struct ll_sb_info *sbi, enum ra_stat which)
120 {
121         LASSERTF(which < _NR_RA_STAT, "which: %u\n", which);
122         lprocfs_counter_incr(sbi->ll_ra_stats, which);
123 }
124
125 void ll_ra_stats_inc(struct inode *inode, enum ra_stat which)
126 {
127         struct ll_sb_info *sbi = ll_i2sbi(inode);
128         ll_ra_stats_inc_sbi(sbi, which);
129 }
130
131 #define RAS_CDEBUG(ras) \
132         CDEBUG(D_READA,                                                      \
133                "lre %lu cr %lu cb %lu ws %lu wl %lu nra %lu rpc %lu "        \
134                "r %lu csr %lu sf %lu sb %lu sl %lu lr %lu\n",                \
135                ras->ras_last_read_end, ras->ras_consecutive_requests,        \
136                ras->ras_consecutive_bytes, ras->ras_window_start,            \
137                ras->ras_window_len, ras->ras_next_readahead,                 \
138                ras->ras_rpc_size, ras->ras_requests,                         \
139                ras->ras_consecutive_stride_requests, ras->ras_stride_offset, \
140                ras->ras_stride_bytes, ras->ras_stride_length,                \
141                ras->ras_async_last_readpage)
142
143 static int pos_in_window(unsigned long pos, unsigned long point,
144                          unsigned long before, unsigned long after)
145 {
146         unsigned long start = point - before, end = point + after;
147
148         if (start > point)
149                start = 0;
150         if (end < point)
151                end = ~0;
152
153         return start <= pos && pos <= end;
154 }
155
156 /**
157  * Initiates read-ahead of a page with given index.
158  *
159  * \retval +ve: page was already uptodate so it will be skipped
160  *              from being added;
161  * \retval -ve: page wasn't added to \a queue for error;
162  * \retval   0: page was added into \a queue for read ahead.
163  */
164 static int ll_read_ahead_page(const struct lu_env *env, struct cl_io *io,
165                               struct cl_page_list *queue, pgoff_t index)
166 {
167         struct cl_object *clob  = io->ci_obj;
168         struct inode     *inode = vvp_object_inode(clob);
169         struct page      *vmpage;
170         struct cl_page   *page;
171         struct vvp_page  *vpg;
172         enum ra_stat      which = _NR_RA_STAT; /* keep gcc happy */
173         int               rc    = 0;
174         const char       *msg   = NULL;
175         ENTRY;
176
177         vmpage = grab_cache_page_nowait(inode->i_mapping, index);
178         if (vmpage == NULL) {
179                 which = RA_STAT_FAILED_GRAB_PAGE;
180                 msg   = "g_c_p_n failed";
181                 GOTO(out, rc = -EBUSY);
182         }
183
184         /* Check if vmpage was truncated or reclaimed */
185         if (vmpage->mapping != inode->i_mapping) {
186                 which = RA_STAT_WRONG_GRAB_PAGE;
187                 msg   = "g_c_p_n returned invalid page";
188                 GOTO(out, rc = -EBUSY);
189         }
190
191         page = cl_page_find(env, clob, vmpage->index, vmpage, CPT_CACHEABLE);
192         if (IS_ERR(page)) {
193                 which = RA_STAT_FAILED_GRAB_PAGE;
194                 msg   = "cl_page_find failed";
195                 GOTO(out, rc = PTR_ERR(page));
196         }
197
198         lu_ref_add(&page->cp_reference, "ra", current);
199         cl_page_assume(env, io, page);
200         vpg = cl2vvp_page(cl_object_page_slice(clob, page));
201         if (!vpg->vpg_defer_uptodate && !PageUptodate(vmpage)) {
202                 vpg->vpg_defer_uptodate = 1;
203                 vpg->vpg_ra_used = 0;
204                 cl_page_list_add(queue, page);
205         } else {
206                 /* skip completed pages */
207                 cl_page_unassume(env, io, page);
208                 /* This page is already uptodate, returning a positive number
209                  * to tell the callers about this */
210                 rc = 1;
211         }
212
213         lu_ref_del(&page->cp_reference, "ra", current);
214         cl_page_put(env, page);
215
216 out:
217         if (vmpage != NULL) {
218                 if (rc != 0)
219                         unlock_page(vmpage);
220                 put_page(vmpage);
221         }
222         if (msg != NULL) {
223                 ll_ra_stats_inc(inode, which);
224                 CDEBUG(D_READA, "%s\n", msg);
225
226         }
227
228         RETURN(rc);
229 }
230
231 #define RIA_DEBUG(ria)                                          \
232         CDEBUG(D_READA, "rs %lu re %lu ro %lu rl %lu rb %lu\n", \
233                ria->ria_start, ria->ria_end, ria->ria_stoff,    \
234                ria->ria_length, ria->ria_bytes)
235
236 static inline int stride_io_mode(struct ll_readahead_state *ras)
237 {
238         return ras->ras_consecutive_stride_requests > 1;
239 }
240
241 /* The function calculates how much pages will be read in
242  * [off, off + length], in such stride IO area,
243  * stride_offset = st_off, stride_lengh = st_len,
244  * stride_bytes = st_bytes
245  *
246  *   |------------------|*****|------------------|*****|------------|*****|....
247  * st_off
248  *   |--- st_bytes     ---|
249  *   |-----     st_len   -----|
250  *
251  *              How many bytes it should read in such pattern
252  *              |-------------------------------------------------------------|
253  *              off
254  *              |<------                  length                      ------->|
255  *
256  *          =   |<----->|  +  |-------------------------------------| +   |---|
257  *             start_left                 st_bytes * i                 end_left
258  */
259 static unsigned long
260 stride_byte_count(unsigned long st_off, unsigned long st_len,
261                   unsigned long st_bytes, unsigned long off,
262                   unsigned long length)
263 {
264         __u64 start = off > st_off ? off - st_off : 0;
265         __u64 end = off + length > st_off ? off + length - st_off : 0;
266         unsigned long start_left = 0;
267         unsigned long end_left = 0;
268         unsigned long bytes_count;
269
270         if (st_len == 0 || length == 0 || end == 0)
271                 return length;
272
273         start_left = do_div(start, st_len);
274         if (start_left < st_bytes)
275                 start_left = st_bytes - start_left;
276         else
277                 start_left = 0;
278
279         end_left = do_div(end, st_len);
280         if (end_left > st_bytes)
281                 end_left = st_bytes;
282
283         CDEBUG(D_READA, "start %llu, end %llu start_left %lu end_left %lu\n",
284                start, end, start_left, end_left);
285
286         if (start == end)
287                 bytes_count = end_left - (st_bytes - start_left);
288         else
289                 bytes_count = start_left +
290                         st_bytes * (end - start - 1) + end_left;
291
292         CDEBUG(D_READA,
293                "st_off %lu, st_len %lu st_bytes %lu off %lu length %lu bytescount %lu\n",
294                st_off, st_len, st_bytes, off, length, bytes_count);
295
296         return bytes_count;
297 }
298
299 static int ria_page_count(struct ra_io_arg *ria)
300 {
301         u64 length_bytes = ria->ria_end >= ria->ria_start ?
302                 (ria->ria_end - ria->ria_start + 1) << PAGE_SHIFT : 0;
303         unsigned int bytes_count, pg_count;
304
305         if (ria->ria_length > ria->ria_bytes && ria->ria_bytes &&
306             (ria->ria_length % PAGE_SIZE || ria->ria_bytes % PAGE_SIZE ||
307              ria->ria_stoff % PAGE_SIZE)) {
308                 /* Over-estimate un-aligned page stride read */
309                 pg_count = ((ria->ria_bytes + PAGE_SIZE - 1) >>
310                             PAGE_SHIFT) + 1;
311                 pg_count *= length_bytes / ria->ria_length + 1;
312
313                 return pg_count;
314         }
315         bytes_count = stride_byte_count(ria->ria_stoff, ria->ria_length,
316                                          ria->ria_bytes, ria->ria_start,
317                                          length_bytes);
318         return (bytes_count + PAGE_SIZE - 1) >> PAGE_SHIFT;
319 }
320
321 static unsigned long ras_align(struct ll_readahead_state *ras,
322                                pgoff_t index, unsigned long *remainder)
323 {
324         unsigned long rem = index % ras->ras_rpc_size;
325         if (remainder != NULL)
326                 *remainder = rem;
327         return index - rem;
328 }
329
330 /*Check whether the index is in the defined ra-window */
331 static bool ras_inside_ra_window(unsigned long idx, struct ra_io_arg *ria)
332 {
333         unsigned long offset;
334         unsigned long pos = idx << PAGE_SHIFT;
335
336         /* If ria_length == ria_bytes, it means non-stride I/O mode,
337          * idx should always inside read-ahead window in this case
338          * For stride I/O mode, just check whether the idx is inside
339          * the ria_bytes.
340          */
341         if (ria->ria_length == 0 || ria->ria_length == ria->ria_bytes)
342                 return true;
343
344         if (pos >= ria->ria_stoff) {
345                 offset = (pos - ria->ria_stoff) % ria->ria_length;
346                 if (offset < ria->ria_bytes ||
347                     (ria->ria_length - offset) < PAGE_SIZE)
348                         return true;
349         } else if (pos + PAGE_SIZE > ria->ria_stoff)
350                 return true;
351
352         return false;
353 }
354
355 static unsigned long
356 ll_read_ahead_pages(const struct lu_env *env, struct cl_io *io,
357                     struct cl_page_list *queue, struct ll_readahead_state *ras,
358                     struct ra_io_arg *ria, pgoff_t *ra_end)
359 {
360         struct cl_read_ahead ra = { 0 };
361         int rc = 0, count = 0;
362         pgoff_t page_idx;
363
364         LASSERT(ria != NULL);
365         RIA_DEBUG(ria);
366
367         for (page_idx = ria->ria_start;
368              page_idx <= ria->ria_end && ria->ria_reserved > 0; page_idx++) {
369                 if (ras_inside_ra_window(page_idx, ria)) {
370                         if (ra.cra_end == 0 || ra.cra_end < page_idx) {
371                                 unsigned long end;
372
373                                 cl_read_ahead_release(env, &ra);
374
375                                 rc = cl_io_read_ahead(env, io, page_idx, &ra);
376                                 if (rc < 0)
377                                         break;
378
379                                 /* Do not shrink the ria_end at any case until
380                                  * the minimum end of current read is covered.
381                                  * And only shrink the ria_end if the matched
382                                  * LDLM lock doesn't cover more. */
383                                 if (page_idx > ra.cra_end ||
384                                     (ra.cra_contention &&
385                                      page_idx > ria->ria_end_min)) {
386                                         ria->ria_end = ra.cra_end;
387                                         break;
388                                 }
389
390                                 CDEBUG(D_READA, "idx: %lu, ra: %lu, rpc: %lu\n",
391                                        page_idx, ra.cra_end, ra.cra_rpc_size);
392                                 LASSERTF(ra.cra_end >= page_idx,
393                                          "object: %p, indcies %lu / %lu\n",
394                                          io->ci_obj, ra.cra_end, page_idx);
395                                 /* update read ahead RPC size.
396                                  * NB: it's racy but doesn't matter */
397                                 if (ras->ras_rpc_size != ra.cra_rpc_size &&
398                                     ra.cra_rpc_size > 0)
399                                         ras->ras_rpc_size = ra.cra_rpc_size;
400                                 /* trim it to align with optimal RPC size */
401                                 end = ras_align(ras, ria->ria_end + 1, NULL);
402                                 if (end > 0 && !ria->ria_eof)
403                                         ria->ria_end = end - 1;
404                                 if (ria->ria_end < ria->ria_end_min)
405                                         ria->ria_end = ria->ria_end_min;
406                         }
407                         if (page_idx > ria->ria_end)
408                                 break;
409
410                         /* If the page is inside the read-ahead window */
411                         rc = ll_read_ahead_page(env, io, queue, page_idx);
412                         if (rc < 0)
413                                 break;
414
415                         *ra_end = page_idx;
416                         /* Only subtract from reserve & count the page if we
417                          * really did readahead on that page. */
418                         if (rc == 0) {
419                                 ria->ria_reserved--;
420                                 count++;
421                         }
422                 } else if (stride_io_mode(ras)) {
423                         /* If it is not in the read-ahead window, and it is
424                          * read-ahead mode, then check whether it should skip
425                          * the stride gap.
426                          */
427                         unsigned long offset;
428                         unsigned long pos = page_idx << PAGE_SHIFT;
429
430                         offset = (pos - ria->ria_stoff) % ria->ria_length;
431                         if (offset >= ria->ria_bytes) {
432                                 pos += (ria->ria_length - offset);
433                                 if ((pos >> PAGE_SHIFT) >= page_idx + 1)
434                                         page_idx = (pos >> PAGE_SHIFT) - 1;
435                                 CDEBUG(D_READA,
436                                        "Stride: jump %lu pages to %lu\n",
437                                        ria->ria_length - offset, page_idx);
438                                 continue;
439                         }
440                 }
441         }
442
443         cl_read_ahead_release(env, &ra);
444
445         return count;
446 }
447
448 static void ll_readahead_work_free(struct ll_readahead_work *work)
449 {
450         fput(work->lrw_file);
451         OBD_FREE_PTR(work);
452 }
453
454 static void ll_readahead_handle_work(struct work_struct *wq);
455 static void ll_readahead_work_add(struct inode *inode,
456                                   struct ll_readahead_work *work)
457 {
458         INIT_WORK(&work->lrw_readahead_work, ll_readahead_handle_work);
459         queue_work(ll_i2sbi(inode)->ll_ra_info.ll_readahead_wq,
460                    &work->lrw_readahead_work);
461 }
462
463 static int ll_readahead_file_kms(const struct lu_env *env,
464                                 struct cl_io *io, __u64 *kms)
465 {
466         struct cl_object *clob;
467         struct inode *inode;
468         struct cl_attr *attr = vvp_env_thread_attr(env);
469         int ret;
470
471         clob = io->ci_obj;
472         inode = vvp_object_inode(clob);
473
474         cl_object_attr_lock(clob);
475         ret = cl_object_attr_get(env, clob, attr);
476         cl_object_attr_unlock(clob);
477
478         if (ret != 0)
479                 RETURN(ret);
480
481         *kms = attr->cat_kms;
482         return 0;
483 }
484
485 static void ll_readahead_handle_work(struct work_struct *wq)
486 {
487         struct ll_readahead_work *work;
488         struct lu_env *env;
489         __u16 refcheck;
490         struct ra_io_arg *ria;
491         struct inode *inode;
492         struct ll_file_data *fd;
493         struct ll_readahead_state *ras;
494         struct cl_io *io;
495         struct cl_2queue *queue;
496         pgoff_t ra_end = 0;
497         unsigned long len, mlen = 0;
498         struct file *file;
499         __u64 kms;
500         int rc;
501         unsigned long end_index;
502
503         work = container_of(wq, struct ll_readahead_work,
504                             lrw_readahead_work);
505         fd = LUSTRE_FPRIVATE(work->lrw_file);
506         ras = &fd->fd_ras;
507         file = work->lrw_file;
508         inode = file_inode(file);
509
510         env = cl_env_alloc(&refcheck, LCT_NOREF);
511         if (IS_ERR(env))
512                 GOTO(out_free_work, rc = PTR_ERR(env));
513
514         io = vvp_env_thread_io(env);
515         ll_io_init(io, file, CIT_READ, NULL);
516
517         rc = ll_readahead_file_kms(env, io, &kms);
518         if (rc != 0)
519                 GOTO(out_put_env, rc);
520
521         if (kms == 0) {
522                 ll_ra_stats_inc(inode, RA_STAT_ZERO_LEN);
523                 GOTO(out_put_env, rc = 0);
524         }
525
526         ria = &ll_env_info(env)->lti_ria;
527         memset(ria, 0, sizeof(*ria));
528
529         ria->ria_start = work->lrw_start;
530         /* Truncate RA window to end of file */
531         end_index = (unsigned long)((kms - 1) >> PAGE_SHIFT);
532         if (end_index <= work->lrw_end) {
533                 work->lrw_end = end_index;
534                 ria->ria_eof = true;
535         }
536         if (work->lrw_end <= work->lrw_start)
537                 GOTO(out_put_env, rc = 0);
538
539         ria->ria_end = work->lrw_end;
540         len = ria->ria_end - ria->ria_start + 1;
541         ria->ria_reserved = ll_ra_count_get(ll_i2sbi(inode), ria,
542                                             ria_page_count(ria), mlen);
543
544         CDEBUG(D_READA,
545                "async reserved pages: %lu/%lu/%lu, ra_cur %d, ra_max %lu\n",
546                ria->ria_reserved, len, mlen,
547                atomic_read(&ll_i2sbi(inode)->ll_ra_info.ra_cur_pages),
548                ll_i2sbi(inode)->ll_ra_info.ra_max_pages);
549
550         if (ria->ria_reserved < len) {
551                 ll_ra_stats_inc(inode, RA_STAT_MAX_IN_FLIGHT);
552                 if (PAGES_TO_MiB(ria->ria_reserved) < 1) {
553                         ll_ra_count_put(ll_i2sbi(inode), ria->ria_reserved);
554                         GOTO(out_put_env, rc = 0);
555                 }
556         }
557
558         rc = cl_io_rw_init(env, io, CIT_READ, ria->ria_start, len);
559         if (rc)
560                 GOTO(out_put_env, rc);
561
562         vvp_env_io(env)->vui_io_subtype = IO_NORMAL;
563         vvp_env_io(env)->vui_fd = fd;
564         io->ci_state = CIS_LOCKED;
565         io->ci_async_readahead = true;
566         rc = cl_io_start(env, io);
567         if (rc)
568                 GOTO(out_io_fini, rc);
569
570         queue = &io->ci_queue;
571         cl_2queue_init(queue);
572
573         rc = ll_read_ahead_pages(env, io, &queue->c2_qin, ras, ria, &ra_end);
574         if (ria->ria_reserved != 0)
575                 ll_ra_count_put(ll_i2sbi(inode), ria->ria_reserved);
576         if (queue->c2_qin.pl_nr > 0) {
577                 int count = queue->c2_qin.pl_nr;
578
579                 rc = cl_io_submit_rw(env, io, CRT_READ, queue);
580                 if (rc == 0)
581                         task_io_account_read(PAGE_SIZE * count);
582         }
583         if (ria->ria_end == ra_end && ra_end == (kms >> PAGE_SHIFT))
584                 ll_ra_stats_inc(inode, RA_STAT_EOF);
585
586         if (ra_end != ria->ria_end)
587                 ll_ra_stats_inc(inode, RA_STAT_FAILED_REACH_END);
588
589         /* TODO: discard all pages until page reinit route is implemented */
590         cl_page_list_discard(env, io, &queue->c2_qin);
591
592         /* Unlock unsent read pages in case of error. */
593         cl_page_list_disown(env, io, &queue->c2_qin);
594
595         cl_2queue_fini(env, queue);
596 out_io_fini:
597         cl_io_end(env, io);
598         cl_io_fini(env, io);
599 out_put_env:
600         cl_env_put(env, &refcheck);
601 out_free_work:
602         if (ra_end > 0)
603                 ll_ra_stats_inc_sbi(ll_i2sbi(inode), RA_STAT_ASYNC);
604         ll_readahead_work_free(work);
605 }
606
607 static int ll_readahead(const struct lu_env *env, struct cl_io *io,
608                         struct cl_page_list *queue,
609                         struct ll_readahead_state *ras, bool hit,
610                         struct file *file)
611 {
612         struct vvp_io *vio = vvp_env_io(env);
613         struct ll_thread_info *lti = ll_env_info(env);
614         unsigned long len, mlen = 0;
615         pgoff_t ra_end = 0, start = 0, end = 0;
616         struct inode *inode;
617         struct ra_io_arg *ria = &lti->lti_ria;
618         struct cl_object *clob;
619         int ret = 0;
620         __u64 kms;
621         ENTRY;
622
623         clob = io->ci_obj;
624         inode = vvp_object_inode(clob);
625
626         memset(ria, 0, sizeof *ria);
627         ret = ll_readahead_file_kms(env, io, &kms);
628         if (ret != 0)
629                 RETURN(ret);
630
631         if (kms == 0) {
632                 ll_ra_stats_inc(inode, RA_STAT_ZERO_LEN);
633                 RETURN(0);
634         }
635
636         spin_lock(&ras->ras_lock);
637
638         /**
639          * Note: other thread might rollback the ras_next_readahead,
640          * if it can not get the full size of prepared pages, see the
641          * end of this function. For stride read ahead, it needs to
642          * make sure the offset is no less than ras_stride_offset,
643          * so that stride read ahead can work correctly.
644          */
645         if (stride_io_mode(ras))
646                 start = max(ras->ras_next_readahead,
647                             ras->ras_stride_offset >> PAGE_SHIFT);
648         else
649                 start = ras->ras_next_readahead;
650
651         if (ras->ras_window_len > 0)
652                 end = ras->ras_window_start + ras->ras_window_len - 1;
653
654         /* Enlarge the RA window to encompass the full read */
655         if (vio->vui_ra_valid &&
656             end < vio->vui_ra_start + vio->vui_ra_count - 1)
657                 end = vio->vui_ra_start + vio->vui_ra_count - 1;
658
659         if (end != 0) {
660                 unsigned long end_index;
661
662                 /* Truncate RA window to end of file */
663                 end_index = (unsigned long)((kms - 1) >> PAGE_SHIFT);
664                 if (end_index <= end) {
665                         end = end_index;
666                         ria->ria_eof = true;
667                 }
668         }
669         ria->ria_start = start;
670         ria->ria_end = end;
671         /* If stride I/O mode is detected, get stride window*/
672         if (stride_io_mode(ras)) {
673                 ria->ria_stoff = ras->ras_stride_offset;
674                 ria->ria_length = ras->ras_stride_length;
675                 ria->ria_bytes = ras->ras_stride_bytes;
676         }
677         spin_unlock(&ras->ras_lock);
678
679         if (end == 0) {
680                 ll_ra_stats_inc(inode, RA_STAT_ZERO_WINDOW);
681                 RETURN(0);
682         }
683         len = ria_page_count(ria);
684         if (len == 0) {
685                 ll_ra_stats_inc(inode, RA_STAT_ZERO_WINDOW);
686                 RETURN(0);
687         }
688
689         RAS_CDEBUG(ras);
690         CDEBUG(D_READA, DFID": ria: %lu/%lu, bead: %lu/%lu, hit: %d\n",
691                PFID(lu_object_fid(&clob->co_lu)),
692                ria->ria_start, ria->ria_end,
693                vio->vui_ra_valid ? vio->vui_ra_start : 0,
694                vio->vui_ra_valid ? vio->vui_ra_count : 0,
695                hit);
696
697         /* at least to extend the readahead window to cover current read */
698         if (!hit && vio->vui_ra_valid &&
699             vio->vui_ra_start + vio->vui_ra_count > ria->ria_start)
700                 ria->ria_end_min = vio->vui_ra_start + vio->vui_ra_count - 1;
701
702         ria->ria_reserved = ll_ra_count_get(ll_i2sbi(inode), ria, len, mlen);
703         if (ria->ria_reserved < len)
704                 ll_ra_stats_inc(inode, RA_STAT_MAX_IN_FLIGHT);
705
706         CDEBUG(D_READA, "reserved pages: %lu/%lu/%lu, ra_cur %d, ra_max %lu\n",
707                ria->ria_reserved, len, mlen,
708                atomic_read(&ll_i2sbi(inode)->ll_ra_info.ra_cur_pages),
709                ll_i2sbi(inode)->ll_ra_info.ra_max_pages);
710
711         ret = ll_read_ahead_pages(env, io, queue, ras, ria, &ra_end);
712
713         if (ria->ria_reserved != 0)
714                 ll_ra_count_put(ll_i2sbi(inode), ria->ria_reserved);
715
716         if (ra_end == end && ra_end == (kms >> PAGE_SHIFT))
717                 ll_ra_stats_inc(inode, RA_STAT_EOF);
718
719         CDEBUG(D_READA, "ra_end = %lu end = %lu stride end = %lu pages = %d\n",
720                ra_end, end, ria->ria_end, ret);
721
722         if (ra_end != end)
723                 ll_ra_stats_inc(inode, RA_STAT_FAILED_REACH_END);
724         if (ra_end > 0) {
725                 /* update the ras so that the next read-ahead tries from
726                  * where we left off. */
727                 spin_lock(&ras->ras_lock);
728                 ras->ras_next_readahead = ra_end + 1;
729                 spin_unlock(&ras->ras_lock);
730                 RAS_CDEBUG(ras);
731         }
732
733         RETURN(ret);
734 }
735
736 static void ras_set_start(struct ll_readahead_state *ras, pgoff_t index)
737 {
738         ras->ras_window_start = ras_align(ras, index, NULL);
739 }
740
741 /* called with the ras_lock held or from places where it doesn't matter */
742 static void ras_reset(struct ll_readahead_state *ras, pgoff_t index)
743 {
744         ras->ras_consecutive_requests = 0;
745         ras->ras_consecutive_bytes = 0;
746         ras->ras_window_len = 0;
747         ras_set_start(ras, index);
748         ras->ras_next_readahead = max(ras->ras_window_start, index + 1);
749
750         RAS_CDEBUG(ras);
751 }
752
753 /* called with the ras_lock held or from places where it doesn't matter */
754 static void ras_stride_reset(struct ll_readahead_state *ras)
755 {
756         ras->ras_consecutive_stride_requests = 0;
757         ras->ras_stride_length = 0;
758         ras->ras_stride_bytes = 0;
759         RAS_CDEBUG(ras);
760 }
761
762 void ll_readahead_init(struct inode *inode, struct ll_readahead_state *ras)
763 {
764         spin_lock_init(&ras->ras_lock);
765         ras->ras_rpc_size = PTLRPC_MAX_BRW_PAGES;
766         ras_reset(ras, 0);
767         ras->ras_last_read_end = 0;
768         ras->ras_requests = 0;
769 }
770
771 /*
772  * Check whether the read request is in the stride window.
773  * If it is in the stride window, return true, otherwise return false.
774  */
775 static bool read_in_stride_window(struct ll_readahead_state *ras,
776                                   unsigned long pos, unsigned long count)
777 {
778         unsigned long stride_gap;
779
780         if (ras->ras_stride_length == 0 || ras->ras_stride_bytes == 0 ||
781             ras->ras_stride_bytes == ras->ras_stride_length)
782                 return false;
783
784         stride_gap = pos - ras->ras_last_read_end - 1;
785
786         /* If it is contiguous read */
787         if (stride_gap == 0)
788                 return ras->ras_consecutive_bytes + count <=
789                         ras->ras_stride_bytes;
790
791         /* Otherwise check the stride by itself */
792         return (ras->ras_stride_length - ras->ras_stride_bytes) == stride_gap &&
793                 ras->ras_consecutive_bytes == ras->ras_stride_bytes &&
794                 count <= ras->ras_stride_bytes;
795 }
796
797 static void ras_init_stride_detector(struct ll_readahead_state *ras,
798                                      unsigned long pos, unsigned long count)
799 {
800         unsigned long stride_gap = pos - ras->ras_last_read_end - 1;
801
802         LASSERT(ras->ras_consecutive_stride_requests == 0);
803
804         if (pos <= ras->ras_last_read_end) {
805                 /*Reset stride window for forward read*/
806                 ras_stride_reset(ras);
807                 return;
808         }
809
810         ras->ras_stride_bytes = ras->ras_consecutive_bytes;
811         ras->ras_stride_length = stride_gap + ras->ras_consecutive_bytes;
812         ras->ras_consecutive_stride_requests++;
813         ras->ras_stride_offset = pos;
814
815         RAS_CDEBUG(ras);
816 }
817
818 static unsigned long
819 stride_page_count(struct ll_readahead_state *ras, unsigned long len)
820 {
821         unsigned int bytes_count =
822                 stride_byte_count(ras->ras_stride_offset,
823                                   ras->ras_stride_length, ras->ras_stride_bytes,
824                                    ras->ras_stride_offset, len);
825
826         return (bytes_count + PAGE_SIZE - 1) >> PAGE_SHIFT;
827 }
828
829 /* Stride Read-ahead window will be increased inc_len according to
830  * stride I/O pattern */
831 static void ras_stride_increase_window(struct ll_readahead_state *ras,
832                                        struct ll_ra_info *ra,
833                                        unsigned long inc_len)
834 {
835         unsigned long left, step, window_len;
836         unsigned long stride_len;
837         unsigned long end = ras->ras_window_start + ras->ras_window_len;
838
839         LASSERT(ras->ras_stride_length > 0);
840         LASSERTF(end >= (ras->ras_stride_offset >> PAGE_SHIFT),
841                  "window_start %lu, window_len %lu stride_offset %lu\n",
842                  ras->ras_window_start, ras->ras_window_len,
843                  ras->ras_stride_offset);
844
845         end <<= PAGE_SHIFT;
846         if (end < ras->ras_stride_offset)
847                 stride_len = 0;
848         else
849                 stride_len = end - ras->ras_stride_offset;
850
851         left = stride_len % ras->ras_stride_length;
852         window_len = (ras->ras_window_len << PAGE_SHIFT) - left;
853
854         if (left < ras->ras_stride_bytes)
855                 left += inc_len;
856         else
857                 left = ras->ras_stride_bytes + inc_len;
858
859         LASSERT(ras->ras_stride_bytes != 0);
860
861         step = left / ras->ras_stride_bytes;
862         left %= ras->ras_stride_bytes;
863
864         window_len += step * ras->ras_stride_length + left;
865
866         if (stride_page_count(ras, window_len) <= ra->ra_max_pages_per_file)
867                 ras->ras_window_len = (window_len >> PAGE_SHIFT);
868
869         RAS_CDEBUG(ras);
870 }
871
872 static void ras_increase_window(struct inode *inode,
873                                 struct ll_readahead_state *ras,
874                                 struct ll_ra_info *ra)
875 {
876         /* The stretch of ra-window should be aligned with max rpc_size
877          * but current clio architecture does not support retrieve such
878          * information from lower layer. FIXME later
879          */
880         if (stride_io_mode(ras)) {
881                 ras_stride_increase_window(ras, ra,
882                                 ras->ras_rpc_size << PAGE_SHIFT);
883         } else {
884                 unsigned long wlen;
885
886                 wlen = min(ras->ras_window_len + ras->ras_rpc_size,
887                            ra->ra_max_pages_per_file);
888                 if (wlen < ras->ras_rpc_size)
889                         ras->ras_window_len = wlen;
890                 else
891                         ras->ras_window_len = ras_align(ras, wlen, NULL);
892         }
893 }
894
895 /**
896  * Seek within 8 pages are considered as sequential read for now.
897  */
898 static inline bool is_loose_seq_read(struct ll_readahead_state *ras,
899                                      unsigned long pos)
900 {
901         return pos_in_window(pos, ras->ras_last_read_end,
902                              8 << PAGE_SHIFT, 8 << PAGE_SHIFT);
903 }
904
905 static void ras_detect_read_pattern(struct ll_readahead_state *ras,
906                                     struct ll_sb_info *sbi,
907                                     unsigned long pos, unsigned long count,
908                                     bool mmap)
909 {
910         bool stride_detect = false;
911         unsigned long index = pos >> PAGE_SHIFT;
912
913         /*
914          * Reset the read-ahead window in two cases. First when the app seeks
915          * or reads to some other part of the file. Secondly if we get a
916          * read-ahead miss that we think we've previously issued. This can
917          * be a symptom of there being so many read-ahead pages that the VM
918          * is reclaiming it before we get to it.
919          */
920         if (!is_loose_seq_read(ras, pos)) {
921                 /* Check whether it is in stride I/O mode */
922                 if (!read_in_stride_window(ras, pos, count)) {
923                         if (ras->ras_consecutive_stride_requests == 0)
924                                 ras_init_stride_detector(ras, pos, count);
925                         else
926                                 ras_stride_reset(ras);
927                         ras->ras_consecutive_bytes = 0;
928                         ras_reset(ras, index);
929                 } else {
930                         ras->ras_consecutive_bytes = 0;
931                         ras->ras_consecutive_requests = 0;
932                         if (++ras->ras_consecutive_stride_requests > 1)
933                                 stride_detect = true;
934                         RAS_CDEBUG(ras);
935                 }
936                 ll_ra_stats_inc_sbi(sbi, RA_STAT_DISTANT_READPAGE);
937         } else if (stride_io_mode(ras)) {
938                 /*
939                  * If this is contiguous read but in stride I/O mode
940                  * currently, check whether stride step still is valid,
941                  * if invalid, it will reset the stride ra window to
942                  * be zero.
943                  */
944                 if (!read_in_stride_window(ras, pos, count)) {
945                         ras_stride_reset(ras);
946                         ras->ras_window_len = 0;
947                         ras->ras_next_readahead = index;
948                 }
949         }
950
951         ras->ras_consecutive_bytes += count;
952         if (mmap) {
953                 unsigned int idx = (ras->ras_consecutive_bytes >> PAGE_SHIFT);
954
955                 if ((idx >= 4 && idx % 4 == 0) || stride_detect)
956                         ras->ras_need_increase_window = true;
957         } else if ((ras->ras_consecutive_requests > 1 || stride_detect)) {
958                 ras->ras_need_increase_window = true;
959         }
960
961         ras->ras_last_read_end = pos + count - 1;
962 }
963
964 void ll_ras_enter(struct file *f, unsigned long pos, unsigned long count)
965 {
966         struct ll_file_data *fd = LUSTRE_FPRIVATE(f);
967         struct ll_readahead_state *ras = &fd->fd_ras;
968         struct inode *inode = file_inode(f);
969         unsigned long index = pos >> PAGE_SHIFT;
970         struct ll_sb_info *sbi = ll_i2sbi(inode);
971
972         spin_lock(&ras->ras_lock);
973         ras->ras_requests++;
974         ras->ras_consecutive_requests++;
975         ras->ras_need_increase_window = false;
976         ras->ras_no_miss_check = false;
977         /*
978          * On the second access to a file smaller than the tunable
979          * ra_max_read_ahead_whole_pages trigger RA on all pages in the
980          * file up to ra_max_pages_per_file.  This is simply a best effort
981          * and only occurs once per open file. Normal RA behavior is reverted
982          * to for subsequent IO.
983          */
984         if (ras->ras_requests >= 2) {
985                 __u64 kms_pages;
986                 struct ll_ra_info *ra = &sbi->ll_ra_info;
987
988                 kms_pages = (i_size_read(inode) + PAGE_SIZE - 1) >>
989                             PAGE_SHIFT;
990
991                 CDEBUG(D_READA, "kmsp %llu mwp %lu mp %lu\n", kms_pages,
992                        ra->ra_max_read_ahead_whole_pages, ra->ra_max_pages_per_file);
993
994                 if (kms_pages &&
995                     kms_pages <= ra->ra_max_read_ahead_whole_pages) {
996                         ras->ras_window_start = 0;
997                         ras->ras_next_readahead = index + 1;
998                         ras->ras_window_len = min(ra->ra_max_pages_per_file,
999                                 ra->ra_max_read_ahead_whole_pages);
1000                         ras->ras_no_miss_check = true;
1001                         GOTO(out_unlock, 0);
1002                 }
1003         }
1004         ras_detect_read_pattern(ras, sbi, pos, count, false);
1005 out_unlock:
1006         spin_unlock(&ras->ras_lock);
1007 }
1008
1009 static bool index_in_stride_window(struct ll_readahead_state *ras,
1010                                    unsigned int index)
1011 {
1012         unsigned long pos = index << PAGE_SHIFT;
1013         unsigned long offset;
1014
1015         if (ras->ras_stride_length == 0 || ras->ras_stride_bytes == 0 ||
1016             ras->ras_stride_bytes == ras->ras_stride_length)
1017                 return false;
1018
1019         if (pos >= ras->ras_stride_offset) {
1020                 offset = (pos - ras->ras_stride_offset) %
1021                                         ras->ras_stride_length;
1022                 if (offset < ras->ras_stride_bytes ||
1023                     ras->ras_stride_length - offset < PAGE_SIZE)
1024                         return true;
1025         } else if (ras->ras_stride_offset - pos < PAGE_SIZE) {
1026                 return true;
1027         }
1028
1029         return false;
1030 }
1031
1032 /*
1033  * ll_ras_enter() is used to detect read pattern according to
1034  * pos and count.
1035  *
1036  * ras_update() is used to detect cache miss and
1037  * reset window or increase window accordingly
1038  */
1039 static void ras_update(struct ll_sb_info *sbi, struct inode *inode,
1040                        struct ll_readahead_state *ras, unsigned long index,
1041                        enum ras_update_flags flags)
1042 {
1043         struct ll_ra_info *ra = &sbi->ll_ra_info;
1044         bool hit = flags & LL_RAS_HIT;
1045
1046         ENTRY;
1047         spin_lock(&ras->ras_lock);
1048
1049         if (!hit)
1050                 CDEBUG(D_READA, DFID " pages at %lu miss.\n",
1051                        PFID(ll_inode2fid(inode)), index);
1052         ll_ra_stats_inc_sbi(sbi, hit ? RA_STAT_HIT : RA_STAT_MISS);
1053
1054         /*
1055          * The readahead window has been expanded to cover whole
1056          * file size, we don't care whether ra miss happen or not.
1057          * Because we will read whole file to page cache even if
1058          * some pages missed.
1059          */
1060         if (ras->ras_no_miss_check)
1061                 GOTO(out_unlock, 0);
1062
1063         if (flags & LL_RAS_MMAP)
1064                 ras_detect_read_pattern(ras, sbi, index << PAGE_SHIFT,
1065                                         PAGE_SIZE, true);
1066
1067         if (!hit && ras->ras_window_len &&
1068             index < ras->ras_next_readahead &&
1069             pos_in_window(index, ras->ras_window_start, 0,
1070                           ras->ras_window_len)) {
1071                 ll_ra_stats_inc_sbi(sbi, RA_STAT_MISS_IN_WINDOW);
1072                 ras->ras_need_increase_window = false;
1073
1074                 if (index_in_stride_window(ras, index) &&
1075                     stride_io_mode(ras)) {
1076                         /*
1077                          * if (index != ras->ras_last_readpage + 1)
1078                          *      ras->ras_consecutive_pages = 0;
1079                          */
1080                         ras_reset(ras, index);
1081
1082                         /*
1083                          * If stride-RA hit cache miss, the stride
1084                          * detector will not be reset to avoid the
1085                          * overhead of redetecting read-ahead mode,
1086                          * but on the condition that the stride window
1087                          * is still intersect with normal sequential
1088                          * read-ahead window.
1089                          */
1090                         if (ras->ras_window_start <
1091                             ras->ras_stride_offset)
1092                                 ras_stride_reset(ras);
1093                         RAS_CDEBUG(ras);
1094                 } else {
1095                         /*
1096                          * Reset both stride window and normal RA
1097                          * window.
1098                          */
1099                         ras_reset(ras, index);
1100                         /* ras->ras_consecutive_pages++; */
1101                         ras->ras_consecutive_bytes = 0;
1102                         ras_stride_reset(ras);
1103                         GOTO(out_unlock, 0);
1104                 }
1105         }
1106         ras_set_start(ras, index);
1107
1108         if (stride_io_mode(ras)) {
1109                 /* Since stride readahead is sentivite to the offset
1110                  * of read-ahead, so we use original offset here,
1111                  * instead of ras_window_start, which is RPC aligned */
1112                 ras->ras_next_readahead = max(index + 1,
1113                                               ras->ras_next_readahead);
1114                 ras->ras_window_start =
1115                                 max(ras->ras_stride_offset >> PAGE_SHIFT,
1116                                     ras->ras_window_start);
1117         } else {
1118                 if (ras->ras_next_readahead < ras->ras_window_start)
1119                         ras->ras_next_readahead = ras->ras_window_start;
1120                 if (!hit)
1121                         ras->ras_next_readahead = index + 1;
1122         }
1123
1124         if (ras->ras_need_increase_window) {
1125                 ras_increase_window(inode, ras, ra);
1126                 ras->ras_need_increase_window = false;
1127         }
1128
1129         EXIT;
1130 out_unlock:
1131         spin_unlock(&ras->ras_lock);
1132 }
1133
1134 int ll_writepage(struct page *vmpage, struct writeback_control *wbc)
1135 {
1136         struct inode           *inode = vmpage->mapping->host;
1137         struct ll_inode_info   *lli   = ll_i2info(inode);
1138         struct lu_env          *env;
1139         struct cl_io           *io;
1140         struct cl_page         *page;
1141         struct cl_object       *clob;
1142         bool redirtied = false;
1143         bool unlocked = false;
1144         int result;
1145         __u16 refcheck;
1146         ENTRY;
1147
1148         LASSERT(PageLocked(vmpage));
1149         LASSERT(!PageWriteback(vmpage));
1150
1151         LASSERT(ll_i2dtexp(inode) != NULL);
1152
1153         env = cl_env_get(&refcheck);
1154         if (IS_ERR(env))
1155                 GOTO(out, result = PTR_ERR(env));
1156
1157         clob  = ll_i2info(inode)->lli_clob;
1158         LASSERT(clob != NULL);
1159
1160         io = vvp_env_thread_io(env);
1161         io->ci_obj = clob;
1162         io->ci_ignore_layout = 1;
1163         result = cl_io_init(env, io, CIT_MISC, clob);
1164         if (result == 0) {
1165                 page = cl_page_find(env, clob, vmpage->index,
1166                                     vmpage, CPT_CACHEABLE);
1167                 if (!IS_ERR(page)) {
1168                         lu_ref_add(&page->cp_reference, "writepage",
1169                                    current);
1170                         cl_page_assume(env, io, page);
1171                         result = cl_page_flush(env, io, page);
1172                         if (result != 0) {
1173                                 /*
1174                                  * Re-dirty page on error so it retries write,
1175                                  * but not in case when IO has actually
1176                                  * occurred and completed with an error.
1177                                  */
1178                                 if (!PageError(vmpage)) {
1179                                         redirty_page_for_writepage(wbc, vmpage);
1180                                         result = 0;
1181                                         redirtied = true;
1182                                 }
1183                         }
1184                         cl_page_disown(env, io, page);
1185                         unlocked = true;
1186                         lu_ref_del(&page->cp_reference,
1187                                    "writepage", current);
1188                         cl_page_put(env, page);
1189                 } else {
1190                         result = PTR_ERR(page);
1191                 }
1192         }
1193         cl_io_fini(env, io);
1194
1195         if (redirtied && wbc->sync_mode == WB_SYNC_ALL) {
1196                 loff_t offset = cl_offset(clob, vmpage->index);
1197
1198                 /* Flush page failed because the extent is being written out.
1199                  * Wait for the write of extent to be finished to avoid
1200                  * breaking kernel which assumes ->writepage should mark
1201                  * PageWriteback or clean the page. */
1202                 result = cl_sync_file_range(inode, offset,
1203                                             offset + PAGE_SIZE - 1,
1204                                             CL_FSYNC_LOCAL, 1);
1205                 if (result > 0) {
1206                         /* actually we may have written more than one page.
1207                          * decreasing this page because the caller will count
1208                          * it. */
1209                         wbc->nr_to_write -= result - 1;
1210                         result = 0;
1211                 }
1212         }
1213
1214         cl_env_put(env, &refcheck);
1215         GOTO(out, result);
1216
1217 out:
1218         if (result < 0) {
1219                 if (!lli->lli_async_rc)
1220                         lli->lli_async_rc = result;
1221                 SetPageError(vmpage);
1222                 if (!unlocked)
1223                         unlock_page(vmpage);
1224         }
1225         return result;
1226 }
1227
1228 int ll_writepages(struct address_space *mapping, struct writeback_control *wbc)
1229 {
1230         struct inode *inode = mapping->host;
1231         loff_t start;
1232         loff_t end;
1233         enum cl_fsync_mode mode;
1234         int range_whole = 0;
1235         int result;
1236         ENTRY;
1237
1238         if (wbc->range_cyclic) {
1239                 start = mapping->writeback_index << PAGE_SHIFT;
1240                 end = OBD_OBJECT_EOF;
1241         } else {
1242                 start = wbc->range_start;
1243                 end = wbc->range_end;
1244                 if (end == LLONG_MAX) {
1245                         end = OBD_OBJECT_EOF;
1246                         range_whole = start == 0;
1247                 }
1248         }
1249
1250         mode = CL_FSYNC_NONE;
1251         if (wbc->sync_mode == WB_SYNC_ALL)
1252                 mode = CL_FSYNC_LOCAL;
1253
1254         if (ll_i2info(inode)->lli_clob == NULL)
1255                 RETURN(0);
1256
1257         /* for directio, it would call writepages() to evict cached pages
1258          * inside the IO context of write, which will cause deadlock at
1259          * layout_conf since it waits for active IOs to complete. */
1260         result = cl_sync_file_range(inode, start, end, mode, 1);
1261         if (result > 0) {
1262                 wbc->nr_to_write -= result;
1263                 result = 0;
1264          }
1265
1266         if (wbc->range_cyclic || (range_whole && wbc->nr_to_write > 0)) {
1267                 if (end == OBD_OBJECT_EOF)
1268                         mapping->writeback_index = 0;
1269                 else
1270                         mapping->writeback_index = (end >> PAGE_SHIFT) + 1;
1271         }
1272         RETURN(result);
1273 }
1274
1275 struct ll_cl_context *ll_cl_find(struct file *file)
1276 {
1277         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1278         struct ll_cl_context *lcc;
1279         struct ll_cl_context *found = NULL;
1280
1281         read_lock(&fd->fd_lock);
1282         list_for_each_entry(lcc, &fd->fd_lccs, lcc_list) {
1283                 if (lcc->lcc_cookie == current) {
1284                         found = lcc;
1285                         break;
1286                 }
1287         }
1288         read_unlock(&fd->fd_lock);
1289
1290         return found;
1291 }
1292
1293 void ll_cl_add(struct file *file, const struct lu_env *env, struct cl_io *io,
1294                enum lcc_type type)
1295 {
1296         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1297         struct ll_cl_context *lcc = &ll_env_info(env)->lti_io_ctx;
1298
1299         memset(lcc, 0, sizeof(*lcc));
1300         INIT_LIST_HEAD(&lcc->lcc_list);
1301         lcc->lcc_cookie = current;
1302         lcc->lcc_env = env;
1303         lcc->lcc_io = io;
1304         lcc->lcc_type = type;
1305
1306         write_lock(&fd->fd_lock);
1307         list_add(&lcc->lcc_list, &fd->fd_lccs);
1308         write_unlock(&fd->fd_lock);
1309 }
1310
1311 void ll_cl_remove(struct file *file, const struct lu_env *env)
1312 {
1313         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1314         struct ll_cl_context *lcc = &ll_env_info(env)->lti_io_ctx;
1315
1316         write_lock(&fd->fd_lock);
1317         list_del_init(&lcc->lcc_list);
1318         write_unlock(&fd->fd_lock);
1319 }
1320
1321 int ll_io_read_page(const struct lu_env *env, struct cl_io *io,
1322                            struct cl_page *page, struct file *file)
1323 {
1324         struct inode              *inode  = vvp_object_inode(page->cp_obj);
1325         struct ll_sb_info         *sbi    = ll_i2sbi(inode);
1326         struct ll_file_data       *fd     = LUSTRE_FPRIVATE(file);
1327         struct ll_readahead_state *ras    = &fd->fd_ras;
1328         struct cl_2queue          *queue  = &io->ci_queue;
1329         struct cl_sync_io         *anchor = NULL;
1330         struct vvp_page           *vpg;
1331         int                        rc = 0;
1332         bool                       uptodate;
1333         ENTRY;
1334
1335         vpg = cl2vvp_page(cl_object_page_slice(page->cp_obj, page));
1336         uptodate = vpg->vpg_defer_uptodate;
1337
1338         if (sbi->ll_ra_info.ra_max_pages_per_file > 0 &&
1339             sbi->ll_ra_info.ra_max_pages > 0 &&
1340             !vpg->vpg_ra_updated) {
1341                 struct vvp_io *vio = vvp_env_io(env);
1342                 enum ras_update_flags flags = 0;
1343
1344                 if (uptodate)
1345                         flags |= LL_RAS_HIT;
1346                 if (!vio->vui_ra_valid)
1347                         flags |= LL_RAS_MMAP;
1348                 ras_update(sbi, inode, ras, vvp_index(vpg), flags);
1349         }
1350
1351         cl_2queue_init(queue);
1352         if (uptodate) {
1353                 vpg->vpg_ra_used = 1;
1354                 cl_page_export(env, page, 1);
1355                 cl_page_disown(env, io, page);
1356         } else {
1357                 anchor = &vvp_env_info(env)->vti_anchor;
1358                 cl_sync_io_init(anchor, 1);
1359                 page->cp_sync_io = anchor;
1360
1361                 cl_2queue_add(queue, page);
1362         }
1363
1364         if (sbi->ll_ra_info.ra_max_pages_per_file > 0 &&
1365             sbi->ll_ra_info.ra_max_pages > 0) {
1366                 int rc2;
1367
1368                 rc2 = ll_readahead(env, io, &queue->c2_qin, ras,
1369                                    uptodate, file);
1370                 CDEBUG(D_READA, DFID "%d pages read ahead at %lu\n",
1371                        PFID(ll_inode2fid(inode)), rc2, vvp_index(vpg));
1372         }
1373
1374         if (queue->c2_qin.pl_nr > 0) {
1375                 int count = queue->c2_qin.pl_nr;
1376                 rc = cl_io_submit_rw(env, io, CRT_READ, queue);
1377                 if (rc == 0)
1378                         task_io_account_read(PAGE_SIZE * count);
1379         }
1380
1381
1382         if (anchor != NULL && !cl_page_is_owned(page, io)) { /* have sent */
1383                 rc = cl_sync_io_wait(env, anchor, 0);
1384
1385                 cl_page_assume(env, io, page);
1386                 cl_page_list_del(env, &queue->c2_qout, page);
1387
1388                 if (!PageUptodate(cl_page_vmpage(page))) {
1389                         /* Failed to read a mirror, discard this page so that
1390                          * new page can be created with new mirror.
1391                          *
1392                          * TODO: this is not needed after page reinit
1393                          * route is implemented */
1394                         cl_page_discard(env, io, page);
1395                 }
1396                 cl_page_disown(env, io, page);
1397         }
1398
1399         /* TODO: discard all pages until page reinit route is implemented */
1400         cl_page_list_discard(env, io, &queue->c2_qin);
1401
1402         /* Unlock unsent read pages in case of error. */
1403         cl_page_list_disown(env, io, &queue->c2_qin);
1404
1405         cl_2queue_fini(env, queue);
1406
1407         RETURN(rc);
1408 }
1409
1410 /*
1411  * Possible return value:
1412  * 0 no async readahead triggered and fast read could not be used.
1413  * 1 no async readahead, but fast read could be used.
1414  * 2 async readahead triggered and fast read could be used too.
1415  * < 0 on error.
1416  */
1417 static int kickoff_async_readahead(struct file *file, unsigned long pages)
1418 {
1419         struct ll_readahead_work *lrw;
1420         struct inode *inode = file_inode(file);
1421         struct ll_sb_info *sbi = ll_i2sbi(inode);
1422         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1423         struct ll_readahead_state *ras = &fd->fd_ras;
1424         struct ll_ra_info *ra = &sbi->ll_ra_info;
1425         unsigned long throttle;
1426         unsigned long start = ras_align(ras, ras->ras_next_readahead, NULL);
1427         unsigned long end = start + pages - 1;
1428
1429         throttle = min(ra->ra_async_pages_per_file_threshold,
1430                        ra->ra_max_pages_per_file);
1431         /*
1432          * If this is strided i/o or the window is smaller than the
1433          * throttle limit, we do not do async readahead. Otherwise,
1434          * we do async readahead, allowing the user thread to do fast i/o.
1435          */
1436         if (stride_io_mode(ras) || !throttle ||
1437             ras->ras_window_len < throttle)
1438                 return 0;
1439
1440         if ((atomic_read(&ra->ra_cur_pages) + pages) > ra->ra_max_pages)
1441                 return 0;
1442
1443         if (ras->ras_async_last_readpage == start)
1444                 return 1;
1445
1446         /* ll_readahead_work_free() free it */
1447         OBD_ALLOC_PTR(lrw);
1448         if (lrw) {
1449                 lrw->lrw_file = get_file(file);
1450                 lrw->lrw_start = start;
1451                 lrw->lrw_end = end;
1452                 spin_lock(&ras->ras_lock);
1453                 ras->ras_next_readahead = end + 1;
1454                 ras->ras_async_last_readpage = start;
1455                 spin_unlock(&ras->ras_lock);
1456                 ll_readahead_work_add(inode, lrw);
1457         } else {
1458                 return -ENOMEM;
1459         }
1460
1461         return 2;
1462 }
1463
1464 int ll_readpage(struct file *file, struct page *vmpage)
1465 {
1466         struct inode *inode = file_inode(file);
1467         struct cl_object *clob = ll_i2info(inode)->lli_clob;
1468         struct ll_cl_context *lcc;
1469         const struct lu_env  *env = NULL;
1470         struct cl_io   *io = NULL;
1471         struct cl_page *page;
1472         struct ll_sb_info *sbi = ll_i2sbi(inode);
1473         int result;
1474         ENTRY;
1475
1476         lcc = ll_cl_find(file);
1477         if (lcc != NULL) {
1478                 env = lcc->lcc_env;
1479                 io  = lcc->lcc_io;
1480         }
1481
1482         if (io == NULL) { /* fast read */
1483                 struct inode *inode = file_inode(file);
1484                 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1485                 struct ll_readahead_state *ras = &fd->fd_ras;
1486                 struct lu_env  *local_env = NULL;
1487                 unsigned long fast_read_pages =
1488                         max(RA_REMAIN_WINDOW_MIN, ras->ras_rpc_size);
1489                 struct vvp_page *vpg;
1490
1491                 result = -ENODATA;
1492
1493                 /* TODO: need to verify the layout version to make sure
1494                  * the page is not invalid due to layout change. */
1495                 page = cl_vmpage_page(vmpage, clob);
1496                 if (page == NULL) {
1497                         unlock_page(vmpage);
1498                         ll_ra_stats_inc_sbi(sbi, RA_STAT_FAILED_FAST_READ);
1499                         RETURN(result);
1500                 }
1501
1502                 vpg = cl2vvp_page(cl_object_page_slice(page->cp_obj, page));
1503                 if (vpg->vpg_defer_uptodate) {
1504                         enum ras_update_flags flags = LL_RAS_HIT;
1505
1506                         if (lcc && lcc->lcc_type == LCC_MMAP)
1507                                 flags |= LL_RAS_MMAP;
1508
1509                         /* For fast read, it updates read ahead state only
1510                          * if the page is hit in cache because non cache page
1511                          * case will be handled by slow read later. */
1512                         ras_update(sbi, inode, ras, vvp_index(vpg), flags);
1513                         /* avoid duplicate ras_update() call */
1514                         vpg->vpg_ra_updated = 1;
1515
1516                         /* Check if we can issue a readahead RPC, if that is
1517                          * the case, we can't do fast IO because we will need
1518                          * a cl_io to issue the RPC. */
1519                         if (ras->ras_window_start + ras->ras_window_len <
1520                             ras->ras_next_readahead + fast_read_pages ||
1521                             kickoff_async_readahead(file, fast_read_pages) > 0)
1522                                 result = 0;
1523                 }
1524
1525                 if (!env) {
1526                         local_env = cl_env_percpu_get();
1527                         env = local_env;
1528                 }
1529
1530                 /* export the page and skip io stack */
1531                 if (result == 0) {
1532                         vpg->vpg_ra_used = 1;
1533                         cl_page_export(env, page, 1);
1534                 } else {
1535                         ll_ra_stats_inc_sbi(sbi, RA_STAT_FAILED_FAST_READ);
1536                 }
1537                 /* release page refcount before unlocking the page to ensure
1538                  * the object won't be destroyed in the calling path of
1539                  * cl_page_put(). Please see comment in ll_releasepage(). */
1540                 cl_page_put(env, page);
1541                 unlock_page(vmpage);
1542                 if (local_env)
1543                         cl_env_percpu_put(local_env);
1544
1545                 RETURN(result);
1546         }
1547
1548         LASSERT(io->ci_state == CIS_IO_GOING);
1549         page = cl_page_find(env, clob, vmpage->index, vmpage, CPT_CACHEABLE);
1550         if (!IS_ERR(page)) {
1551                 LASSERT(page->cp_type == CPT_CACHEABLE);
1552                 if (likely(!PageUptodate(vmpage))) {
1553                         cl_page_assume(env, io, page);
1554
1555                         result = ll_io_read_page(env, io, page, file);
1556                 } else {
1557                         /* Page from a non-object file. */
1558                         unlock_page(vmpage);
1559                         result = 0;
1560                 }
1561                 cl_page_put(env, page);
1562         } else {
1563                 unlock_page(vmpage);
1564                 result = PTR_ERR(page);
1565         }
1566         RETURN(result);
1567 }