Whamcloud - gitweb
LU-10391 lustre: change cfs_match_nid to take large nid.
[fs/lustre-release.git] / lustre / llite / rw26.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/lustre/llite/rw26.c
32  *
33  * Lustre Lite I/O page cache routines for the 2.5/2.6 kernel version
34  */
35
36 #include <linux/buffer_head.h>
37 #include <linux/errno.h>
38 #include <linux/fs.h>
39 #include <linux/kernel.h>
40 #include <linux/mm.h>
41 #include <linux/mpage.h>
42 #include <linux/pagemap.h>
43 #include <linux/string.h>
44 #include <linux/unistd.h>
45 #include <linux/writeback.h>
46 #include <linux/migrate.h>
47
48 #define DEBUG_SUBSYSTEM S_LLITE
49
50 #include "llite_internal.h"
51 #include <lustre_compat.h>
52
53 #ifdef HAVE_INVALIDATE_FOLIO
54 /**
55  * Implements Linux VM address_space::invalidate_folio() method. This method is
56  * called when the folio is truncated from a file, either as a result of
57  * explicit truncate, or when inode is removed from memory (as a result of
58  * final iput(), umount, or memory pressure induced icache shrinking).
59  *
60  * [0, off] bytes of the folio remain valid (this is for a case of non-page
61  * aligned truncate). Lustre leaves partially truncated folios in the cache,
62  * relying on struct inode::i_size to limit further accesses.
63  */
64 static void ll_invalidate_folio(struct folio *folio, size_t offset, size_t len)
65 {
66         struct inode *inode;
67         struct lu_env *env;
68         struct cl_page *page;
69         struct cl_object *obj;
70
71         LASSERT(!folio_test_writeback(folio));
72         LASSERT(folio_test_locked(folio));
73
74         if (!(offset == 0 && len == folio_size(folio)) &&
75             !folio_test_large(folio))
76                 return;
77
78         /* Drop the pages from the folio */
79         env = cl_env_percpu_get();
80         LASSERT(!IS_ERR(env));
81
82         inode = folio_inode(folio);
83         obj = ll_i2info(inode)->lli_clob;
84         if (obj != NULL) {
85                 int n, npgs = folio_nr_pages(folio);
86
87                 for (n = 0; n < npgs; n++) {
88                         struct page *vmpage = folio_page(folio, n);
89
90                         LASSERT(PageLocked(vmpage));
91                         LASSERT(!PageWriteback(vmpage));
92
93                         page = cl_vmpage_page(vmpage, obj);
94                         if (page != NULL) {
95                                 cl_page_delete(env, page);
96                                 cl_page_put(env, page);
97                         }
98                 }
99         } else {
100                 LASSERT(!folio_get_private(folio));
101         }
102         cl_env_percpu_put(env);
103 }
104 #else
105
106 /**
107  * Implements Linux VM address_space::invalidatepage() method. This method is
108  * called when the page is truncate from a file, either as a result of
109  * explicit truncate, or when inode is removed from memory (as a result of
110  * final iput(), umount, or memory pressure induced icache shrinking).
111  *
112  * [0, offset] bytes of the page remain valid (this is for a case of not-page
113  * aligned truncate). Lustre leaves partially truncated page in the cache,
114  * relying on struct inode::i_size to limit further accesses.
115  */
116 static void ll_invalidatepage(struct page *vmpage,
117 #ifdef HAVE_INVALIDATE_RANGE
118                                 unsigned int offset, unsigned int length
119 #else
120                                 unsigned long offset
121 #endif
122                              )
123 {
124         struct inode     *inode;
125         struct lu_env    *env;
126         struct cl_page   *page;
127         struct cl_object *obj;
128
129         LASSERT(PageLocked(vmpage));
130         LASSERT(!PageWriteback(vmpage));
131
132         /*
133          * It is safe to not check anything in invalidatepage/releasepage
134          * below because they are run with page locked and all our io is
135          * happening with locked page too
136          */
137 #ifdef HAVE_INVALIDATE_RANGE
138         if (offset == 0 && length == PAGE_SIZE) {
139 #else
140         if (offset == 0) {
141 #endif
142                 /* See the comment in ll_releasepage() */
143                 env = cl_env_percpu_get();
144                 LASSERT(!IS_ERR(env));
145
146                 inode = vmpage->mapping->host;
147                 obj = ll_i2info(inode)->lli_clob;
148                 if (obj != NULL) {
149                         page = cl_vmpage_page(vmpage, obj);
150                         if (page != NULL) {
151                                 cl_page_delete(env, page);
152                                 cl_page_put(env, page);
153                         }
154                 } else
155                         LASSERT(vmpage->private == 0);
156
157                 cl_env_percpu_put(env);
158         }
159
160         if (OBD_FAIL_PRECHECK(OBD_FAIL_LLITE_PAGE_INVALIDATE_PAUSE)) {
161                 unlock_page(vmpage);
162                 OBD_FAIL_TIMEOUT(OBD_FAIL_LLITE_PAGE_INVALIDATE_PAUSE,
163                                  cfs_fail_val);
164                 lock_page(vmpage);
165         }
166 }
167 #endif
168
169 static bool do_release_page(struct page *vmpage, gfp_t wait)
170 {
171         struct lu_env *env;
172         struct cl_object *obj;
173         struct cl_page *clpage;
174         struct address_space *mapping;
175         int result = 0;
176
177         LASSERT(PageLocked(vmpage));
178         if (PageWriteback(vmpage) || PageDirty(vmpage))
179                 return 0;
180
181         mapping = vmpage->mapping;
182         if (mapping == NULL)
183                 return 1;
184
185         obj = ll_i2info(mapping->host)->lli_clob;
186         if (obj == NULL)
187                 return 1;
188
189         clpage = cl_vmpage_page(vmpage, obj);
190         if (clpage == NULL)
191                 return 1;
192
193         env = cl_env_percpu_get();
194         LASSERT(!IS_ERR(env));
195
196         /* we must not delete the cl_page if the vmpage is in use, otherwise we
197          * disconnect the vmpage from Lustre while it's still alive(!), which
198          * means we won't find it to discard on lock cancellation.
199          *
200          * References here are: caller + cl_page + page cache.
201          * Any other references are potentially transient and must be ignored.
202          */
203         if (!cl_page_in_use(clpage) && !vmpage_in_use(vmpage, 1)) {
204                 result = 1;
205                 cl_page_delete(env, clpage);
206         }
207
208         /* To use percpu env array, the call path can not be rescheduled;
209          * otherwise percpu array will be messed if ll_releaspage() called
210          * again on the same CPU.
211          *
212          * If this page holds the last refc of cl_object, the following
213          * call path may cause reschedule:
214          *   cl_page_put -> cl_page_free -> cl_object_put ->
215          *     lu_object_put -> lu_object_free -> lov_delete_raid0.
216          *
217          * However, the kernel can't get rid of this inode until all pages have
218          * been cleaned up. Now that we hold page lock here, it's pretty safe
219          * that we won't get into object delete path.
220          */
221         LASSERT(cl_object_refc(obj) > 1);
222         cl_page_put(env, clpage);
223
224         cl_env_percpu_put(env);
225         return result;
226 }
227
228 #ifdef HAVE_AOPS_RELEASE_FOLIO
229 static bool ll_release_folio(struct folio *folio, gfp_t wait)
230 {
231         struct page *vmpage = folio_page(folio, 0);
232
233         /* folio_nr_pages(folio) == 1 is fixed with grab_cache_page* */
234         BUG_ON(folio_nr_pages(folio) != 1);
235
236         return do_release_page(vmpage, wait);
237 }
238 #else /* !HAVE_AOPS_RELEASE_FOLIO */
239 #ifdef HAVE_RELEASEPAGE_WITH_INT
240 #define RELEASEPAGE_ARG_TYPE int
241 #else
242 #define RELEASEPAGE_ARG_TYPE gfp_t
243 #endif
244 static int ll_releasepage(struct page *vmpage, RELEASEPAGE_ARG_TYPE gfp_mask)
245 {
246         return do_release_page(vmpage, gfp_mask);
247 }
248 #endif /* HAVE_AOPS_RELEASE_FOLIO */
249
250 static ssize_t ll_get_user_pages(int rw, struct iov_iter *iter,
251                                 struct page ***pages, ssize_t *npages,
252                                 size_t maxsize)
253 {
254 #if defined(HAVE_DIO_ITER)
255         size_t start;
256         size_t result;
257
258         result = iov_iter_get_pages_alloc2(iter, pages, maxsize, &start);
259         if (result > 0)
260                 *npages = DIV_ROUND_UP(result + start, PAGE_SIZE);
261
262         return result;
263 #else
264         unsigned long addr;
265         size_t page_count;
266         size_t size;
267         long result;
268
269         if (!maxsize)
270                 return 0;
271
272         if (!iter->nr_segs)
273                 return 0;
274
275         addr = (unsigned long)iter->iov->iov_base + iter->iov_offset;
276         if (addr & ~PAGE_MASK)
277                 return -EINVAL;
278
279         size = min_t(size_t, maxsize, iter->iov->iov_len);
280         page_count = (size + PAGE_SIZE - 1) >> PAGE_SHIFT;
281         OBD_ALLOC_PTR_ARRAY_LARGE(*pages, page_count);
282         if (*pages == NULL)
283                 return -ENOMEM;
284
285         mmap_read_lock(current->mm);
286         result = get_user_pages(current, current->mm, addr, page_count,
287                                 rw == READ, 0, *pages, NULL);
288         mmap_read_unlock(current->mm);
289
290         if (unlikely(result != page_count)) {
291                 ll_release_user_pages(*pages, page_count);
292                 *pages = NULL;
293
294                 if (result >= 0)
295                         return -EFAULT;
296
297                 return result;
298         }
299         *npages = page_count;
300
301         return size;
302 #endif
303 }
304
305 /* iov_iter_alignment() is introduced in 3.16 similar to HAVE_DIO_ITER */
306 #if defined(HAVE_DIO_ITER)
307 static unsigned long iov_iter_alignment_vfs(const struct iov_iter *i)
308 {
309         return iov_iter_alignment(i);
310 }
311 #else /* copied from alignment_iovec() */
312 static unsigned long iov_iter_alignment_vfs(const struct iov_iter *i)
313 {
314         const struct iovec *iov = i->iov;
315         unsigned long res;
316         size_t size = i->count;
317         size_t n;
318
319         if (!size)
320                 return 0;
321
322         res = (unsigned long)iov->iov_base + i->iov_offset;
323         n = iov->iov_len - i->iov_offset;
324         if (n >= size)
325                 return res | size;
326
327         size -= n;
328         res |= n;
329         while (size > (++iov)->iov_len) {
330                 res |= (unsigned long)iov->iov_base | iov->iov_len;
331                 size -= iov->iov_len;
332         }
333         res |= (unsigned long)iov->iov_base | size;
334
335         return res;
336 }
337 #endif
338
339 /*
340  * Lustre could relax a bit for alignment, io count is not
341  * necessary page alignment.
342  */
343 static unsigned long ll_iov_iter_alignment(struct iov_iter *i)
344 {
345         size_t orig_size = i->count;
346         size_t count = orig_size & ~PAGE_MASK;
347         unsigned long res;
348
349         if (!count)
350                 return iov_iter_alignment_vfs(i);
351
352         if (orig_size > PAGE_SIZE) {
353                 iov_iter_truncate(i, orig_size - count);
354                 res = iov_iter_alignment_vfs(i);
355                 iov_iter_reexpand(i, orig_size);
356
357                 return res;
358         }
359
360         res = iov_iter_alignment_vfs(i);
361         /* start address is page aligned */
362         if ((res & ~PAGE_MASK) == orig_size)
363                 return PAGE_SIZE;
364
365         return res;
366 }
367
368 static int
369 ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, size_t size,
370                    int rw, struct inode *inode, struct cl_sub_dio *sdio)
371 {
372         struct ll_dio_pages *pv = &sdio->csd_dio_pages;
373         struct cl_page    *page;
374         struct cl_2queue  *queue = &io->ci_queue;
375         struct cl_object  *obj = io->ci_obj;
376         struct cl_sync_io *anchor = &sdio->csd_sync;
377         loff_t offset   = pv->ldp_file_offset;
378         int io_pages    = 0;
379         int i;
380         ssize_t rc = 0;
381
382         ENTRY;
383
384         cl_2queue_init(queue);
385         for (i = 0; i < pv->ldp_count; i++) {
386                 LASSERT(!(offset & (PAGE_SIZE - 1)));
387                 page = cl_page_find(env, obj, cl_index(obj, offset),
388                                     pv->ldp_pages[i], CPT_TRANSIENT);
389                 if (IS_ERR(page)) {
390                         rc = PTR_ERR(page);
391                         break;
392                 }
393                 LASSERT(page->cp_type == CPT_TRANSIENT);
394                 rc = cl_page_own(env, io, page);
395                 if (rc) {
396                         cl_page_put(env, page);
397                         break;
398                 }
399
400                 page->cp_sync_io = anchor;
401                 if (inode && IS_ENCRYPTED(inode)) {
402                         /* In case of Direct IO on encrypted file, we need to
403                          * add a reference to the inode on the cl_page.
404                          * This info is required by llcrypt to proceed
405                          * to encryption/decryption.
406                          * This is safe because we know these pages are private
407                          * to the thread doing the Direct IO.
408                          */
409                         page->cp_inode = inode;
410                 }
411                 /* We keep the refcount from cl_page_find, so we don't need
412                  * another one here
413                  */
414                 cl_page_list_add(&queue->c2_qin, page, false);
415                 /*
416                  * Set page clip to tell transfer formation engine
417                  * that page has to be sent even if it is beyond KMS.
418                  */
419                 if (size < PAGE_SIZE)
420                         cl_page_clip(env, page, 0, size);
421                 ++io_pages;
422
423                 offset += PAGE_SIZE;
424                 size -= PAGE_SIZE;
425         }
426         if (rc == 0 && io_pages > 0) {
427                 int iot = rw == READ ? CRT_READ : CRT_WRITE;
428
429                 atomic_add(io_pages, &anchor->csi_sync_nr);
430                 /*
431                  * Avoid out-of-order execution of adding inflight
432                  * modifications count and io submit.
433                  */
434                 smp_mb();
435                 rc = cl_io_submit_rw(env, io, iot, queue);
436                 if (rc == 0) {
437                         cl_page_list_splice(&queue->c2_qout, &sdio->csd_pages);
438                 } else {
439                         atomic_add(-queue->c2_qin.pl_nr,
440                                    &anchor->csi_sync_nr);
441                         cl_page_list_for_each(page, &queue->c2_qin)
442                                 page->cp_sync_io = NULL;
443                 }
444                 /* handle partially submitted reqs */
445                 if (queue->c2_qin.pl_nr > 0) {
446                         CERROR(DFID " failed to submit %d dio pages: %zd\n",
447                                PFID(lu_object_fid(&obj->co_lu)),
448                                queue->c2_qin.pl_nr, rc);
449                         if (rc == 0)
450                                 rc = -EIO;
451                 }
452         }
453
454         cl_2queue_discard(env, io, queue);
455         cl_2queue_disown(env, queue);
456         cl_2queue_fini(env, queue);
457         RETURN(rc);
458 }
459
460 #ifdef KMALLOC_MAX_SIZE
461 #define MAX_MALLOC KMALLOC_MAX_SIZE
462 #else
463 #define MAX_MALLOC (128 * 1024)
464 #endif
465
466 /* This is the maximum size of a single O_DIRECT request, based on the
467  * kmalloc limit.  We need to fit all of the brw_page structs, each one
468  * representing PAGE_SIZE worth of user data, into a single buffer, and
469  * then truncate this to be a full-sized RPC.  For 4kB PAGE_SIZE this is
470  * up to 22MB for 128kB kmalloc and up to 682MB for 4MB kmalloc. */
471 #define MAX_DIO_SIZE ((MAX_MALLOC / sizeof(struct brw_page) * PAGE_SIZE) & \
472                       ~((size_t)DT_MAX_BRW_SIZE - 1))
473
474 static ssize_t
475 ll_direct_IO_impl(struct kiocb *iocb, struct iov_iter *iter, int rw)
476 {
477         struct ll_cl_context *lcc;
478         const struct lu_env *env;
479         struct cl_io *io;
480         struct file *file = iocb->ki_filp;
481         struct inode *inode = file->f_mapping->host;
482         struct cl_dio_aio *ll_dio_aio;
483         struct cl_sub_dio *ldp_aio;
484         size_t count = iov_iter_count(iter);
485         ssize_t tot_bytes = 0, result = 0;
486         loff_t file_offset = iocb->ki_pos;
487         bool sync_submit = false;
488         struct vvp_io *vio;
489         ssize_t rc2;
490
491         /* Check EOF by ourselves */
492         if (rw == READ && file_offset >= i_size_read(inode))
493                 return 0;
494
495         /* FIXME: io smaller than PAGE_SIZE is broken on ia64 ??? */
496         if (file_offset & ~PAGE_MASK)
497                 RETURN(-EINVAL);
498
499         CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p), size=%zd (max %lu), "
500                "offset=%lld=%llx, pages %zd (max %lu)\n",
501                PFID(ll_inode2fid(inode)), inode, count, MAX_DIO_SIZE,
502                file_offset, file_offset, count >> PAGE_SHIFT,
503                MAX_DIO_SIZE >> PAGE_SHIFT);
504
505         /* Check that all user buffers are aligned as well */
506         if (ll_iov_iter_alignment(iter) & ~PAGE_MASK)
507                 RETURN(-EINVAL);
508
509         lcc = ll_cl_find(inode);
510         if (lcc == NULL)
511                 RETURN(-EIO);
512
513         env = lcc->lcc_env;
514         LASSERT(!IS_ERR(env));
515         vio = vvp_env_io(env);
516         io = lcc->lcc_io;
517         LASSERT(io != NULL);
518
519         ll_dio_aio = io->ci_dio_aio;
520         LASSERT(ll_dio_aio);
521         LASSERT(ll_dio_aio->cda_iocb == iocb);
522
523         /* We cannot do parallel submission of sub-I/Os - for AIO or regular
524          * DIO - unless lockless because it causes us to release the lock
525          * early.
526          *
527          * There are also several circumstances in which we must disable
528          * parallel DIO, so we check if it is enabled.
529          *
530          * The check for "is_sync_kiocb" excludes AIO, which does not need to
531          * be disabled in these situations.
532          */
533         if (io->ci_dio_lock || (is_sync_kiocb(iocb) && !io->ci_parallel_dio))
534                 sync_submit = true;
535
536         while (iov_iter_count(iter)) {
537                 struct ll_dio_pages *pvec;
538                 struct page **pages;
539
540                 count = min_t(size_t, iov_iter_count(iter), MAX_DIO_SIZE);
541                 if (rw == READ) {
542                         if (file_offset >= i_size_read(inode))
543                                 break;
544
545                         if (file_offset + count > i_size_read(inode))
546                                 count = i_size_read(inode) - file_offset;
547                 }
548
549                 /* if we are doing sync_submit, then we free this below,
550                  * otherwise it is freed on the final call to cl_sync_io_note
551                  * (either in this function or from a ptlrpcd daemon)
552                  */
553                 ldp_aio = cl_sub_dio_alloc(ll_dio_aio, sync_submit);
554                 if (!ldp_aio)
555                         GOTO(out, result = -ENOMEM);
556
557                 pvec = &ldp_aio->csd_dio_pages;
558
559                 result = ll_get_user_pages(rw, iter, &pages,
560                                            &pvec->ldp_count, count);
561                 if (unlikely(result <= 0)) {
562                         cl_sync_io_note(env, &ldp_aio->csd_sync, result);
563                         if (sync_submit) {
564                                 LASSERT(ldp_aio->csd_creator_free);
565                                 cl_sub_dio_free(ldp_aio);
566                         }
567                         GOTO(out, result);
568                 }
569
570                 count = result;
571                 pvec->ldp_file_offset = file_offset;
572                 pvec->ldp_pages = pages;
573
574                 result = ll_direct_rw_pages(env, io, count,
575                                             rw, inode, ldp_aio);
576                 /* We've submitted pages and can now remove the extra
577                  * reference for that
578                  */
579                 cl_sync_io_note(env, &ldp_aio->csd_sync, result);
580
581                 if (sync_submit) {
582                         rc2 = cl_sync_io_wait(env, &ldp_aio->csd_sync,
583                                              0);
584                         if (result == 0 && rc2)
585                                 result = rc2;
586                         LASSERT(ldp_aio->csd_creator_free);
587                         cl_sub_dio_free(ldp_aio);
588                 }
589                 if (unlikely(result < 0))
590                         GOTO(out, result);
591
592                 iov_iter_advance(iter, count);
593                 tot_bytes += count;
594                 file_offset += count;
595         }
596
597 out:
598         ll_dio_aio->cda_bytes += tot_bytes;
599
600         if (rw == WRITE)
601                 vio->u.readwrite.vui_written += tot_bytes;
602         else
603                 vio->u.readwrite.vui_read += tot_bytes;
604
605         /* AIO is not supported on pipes, so we cannot return EIOCBQEUED like
606          * we normally would for both DIO and AIO here
607          */
608         if (result == 0 && !iov_iter_is_pipe(iter))
609                 result = -EIOCBQUEUED;
610
611         return result;
612 }
613
614 #if defined(HAVE_DIO_ITER)
615 static ssize_t ll_direct_IO(
616 #ifndef HAVE_IOV_ITER_RW
617              int rw,
618 #endif
619              struct kiocb *iocb, struct iov_iter *iter
620 #ifndef HAVE_DIRECTIO_2ARGS
621              , loff_t file_offset
622 #endif
623              )
624 {
625         int nrw;
626
627 #ifndef HAVE_IOV_ITER_RW
628         nrw = rw;
629 #else
630         nrw = iov_iter_rw(iter);
631 #endif
632
633         return ll_direct_IO_impl(iocb, iter, nrw);
634 }
635
636 #else /* !defined(HAVE_DIO_ITER) */
637
638 static ssize_t
639 ll_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov,
640              loff_t file_offset, unsigned long nr_segs)
641 {
642         struct iov_iter iter;
643
644         iov_iter_init(&iter, iov, nr_segs, iov_length(iov, nr_segs), 0);
645         return ll_direct_IO_impl(iocb, &iter, rw);
646 }
647
648 #endif /* !defined(HAVE_DIO_ITER) */
649
650 /**
651  * Prepare partially written-to page for a write.
652  * @pg is owned when passed in and disowned when it returns non-zero result to
653  * the caller.
654  */
655 static int ll_prepare_partial_page(const struct lu_env *env, struct cl_io *io,
656                                    struct cl_page *pg, struct file *file)
657 {
658         struct cl_attr *attr   = vvp_env_thread_attr(env);
659         struct cl_object *obj  = io->ci_obj;
660         loff_t          offset = cl_offset(obj, cl_page_index(pg));
661         int             result;
662         ENTRY;
663
664         cl_object_attr_lock(obj);
665         result = cl_object_attr_get(env, obj, attr);
666         cl_object_attr_unlock(obj);
667         if (result) {
668                 cl_page_disown(env, io, pg);
669                 GOTO(out, result);
670         }
671
672         /*
673          * If are writing to a new page, no need to read old data.
674          * The extent locking will have updated the KMS, and for our
675          * purposes here we can treat it like i_size.
676          */
677         if (attr->cat_kms <= offset) {
678                 char *kaddr = kmap_atomic(pg->cp_vmpage);
679
680                 memset(kaddr, 0, PAGE_SIZE);
681                 kunmap_atomic(kaddr);
682                 GOTO(out, result = 0);
683         }
684
685         if (pg->cp_defer_uptodate) {
686                 pg->cp_ra_used = 1;
687                 GOTO(out, result = 0);
688         }
689
690         result = ll_io_read_page(env, io, pg, file);
691         if (result)
692                 GOTO(out, result);
693
694         /* ll_io_read_page() disowns the page */
695         result = cl_page_own(env, io, pg);
696         if (!result) {
697                 if (!PageUptodate(cl_page_vmpage(pg))) {
698                         cl_page_disown(env, io, pg);
699                         result = -EIO;
700                 }
701         } else if (result == -ENOENT) {
702                 /* page was truncated */
703                 result = -EAGAIN;
704         }
705         EXIT;
706
707 out:
708         return result;
709 }
710
711 static int ll_tiny_write_begin(struct page *vmpage, struct address_space *mapping)
712 {
713         /* Page must be present, up to date, dirty, and not in writeback. */
714         if (!vmpage || !PageUptodate(vmpage) || !PageDirty(vmpage) ||
715             PageWriteback(vmpage) || vmpage->mapping != mapping)
716                 return -ENODATA;
717
718         return 0;
719 }
720
721 static int ll_write_begin(struct file *file, struct address_space *mapping,
722                           loff_t pos, unsigned int len,
723 #ifdef HAVE_GRAB_CACHE_PAGE_WRITE_BEGIN_WITH_FLAGS
724                           unsigned int flags,
725 #endif
726                           struct page **pagep, void **fsdata)
727 {
728         struct ll_cl_context *lcc = NULL;
729         const struct lu_env  *env = NULL;
730         struct cl_io   *io = NULL;
731         struct cl_page *page = NULL;
732         struct inode *inode = file_inode(file);
733         struct cl_object *clob = ll_i2info(mapping->host)->lli_clob;
734         pgoff_t index = pos >> PAGE_SHIFT;
735         struct page *vmpage = NULL;
736         unsigned from = pos & (PAGE_SIZE - 1);
737         unsigned to = from + len;
738         int result = 0;
739         ENTRY;
740
741         CDEBUG(D_VFSTRACE, "Writing %lu of %d to %d bytes\n", index, from, len);
742
743         lcc = ll_cl_find(inode);
744         if (lcc == NULL) {
745                 vmpage = grab_cache_page_nowait(mapping, index);
746                 result = ll_tiny_write_begin(vmpage, mapping);
747                 GOTO(out, result);
748         }
749
750         env = lcc->lcc_env;
751         io  = lcc->lcc_io;
752
753         if (file->f_flags & O_DIRECT) {
754                 /* direct IO failed because it couldn't clean up cached pages,
755                  * this causes a problem for mirror write because the cached
756                  * page may belong to another mirror, which will result in
757                  * problem submitting the I/O. */
758                 if (io->ci_designated_mirror > 0)
759                         GOTO(out, result = -EBUSY);
760
761                 /**
762                  * Direct write can fall back to buffered read, but DIO is done
763                  * with lockless i/o, and buffered requires LDLM locking, so
764                  * in this case we must restart without lockless.
765                  */
766                 if (!io->ci_dio_lock) {
767                         io->ci_dio_lock = 1;
768                         io->ci_need_restart = 1;
769                         GOTO(out, result = -ENOLCK);
770                 }
771         }
772 again:
773         /* To avoid deadlock, try to lock page first. */
774         vmpage = grab_cache_page_nowait(mapping, index);
775
776         if (unlikely(vmpage == NULL ||
777                      PageDirty(vmpage) || PageWriteback(vmpage))) {
778                 struct vvp_io *vio = vvp_env_io(env);
779                 struct cl_page_list *plist = &vio->u.readwrite.vui_queue;
780
781                 /* if the page is already in dirty cache, we have to commit
782                  * the pages right now; otherwise, it may cause deadlock
783                  * because it holds page lock of a dirty page and request for
784                  * more grants. It's okay for the dirty page to be the first
785                  * one in commit page list, though. */
786                 if (vmpage != NULL && plist->pl_nr > 0) {
787                         unlock_page(vmpage);
788                         put_page(vmpage);
789                         vmpage = NULL;
790                 }
791
792                 /* commit pages and then wait for page lock */
793                 result = vvp_io_write_commit(env, io);
794                 if (result < 0)
795                         GOTO(out, result);
796
797                 if (vmpage == NULL) {
798                         vmpage = grab_cache_page_write_begin(mapping, index
799 #ifdef HAVE_GRAB_CACHE_PAGE_WRITE_BEGIN_WITH_FLAGS
800                                                              , flags
801 #endif
802                                                              );
803                         if (vmpage == NULL)
804                                 GOTO(out, result = -ENOMEM);
805                 }
806         }
807
808         /* page was truncated */
809         if (mapping != vmpage->mapping) {
810                 CDEBUG(D_VFSTRACE, "page: %lu was truncated\n", index);
811                 unlock_page(vmpage);
812                 put_page(vmpage);
813                 vmpage = NULL;
814                 goto again;
815         }
816
817         page = cl_page_find(env, clob, vmpage->index, vmpage, CPT_CACHEABLE);
818         if (IS_ERR(page))
819                 GOTO(out, result = PTR_ERR(page));
820
821         lcc->lcc_page = page;
822         lu_ref_add(&page->cp_reference, "cl_io", io);
823
824         cl_page_assume(env, io, page);
825         if (!PageUptodate(vmpage)) {
826                 /*
827                  * We're completely overwriting an existing page,
828                  * so _don't_ set it up to date until commit_write
829                  */
830                 if (from == 0 && to == PAGE_SIZE) {
831                         CL_PAGE_HEADER(D_PAGE, env, page, "full page write\n");
832                         POISON_PAGE(vmpage, 0x11);
833                 } else {
834                         /* TODO: can be optimized at OSC layer to check if it
835                          * is a lockless IO. In that case, it's not necessary
836                          * to read the data. */
837                         result = ll_prepare_partial_page(env, io, page, file);
838                         if (result) {
839                                 /* vmpage should have been unlocked */
840                                 put_page(vmpage);
841                                 vmpage = NULL;
842
843                                 if (result == -EAGAIN)
844                                         goto again;
845                                 GOTO(out, result);
846                         }
847                 }
848         }
849         EXIT;
850 out:
851         if (result < 0) {
852                 if (vmpage != NULL) {
853                         unlock_page(vmpage);
854                         put_page(vmpage);
855                 }
856                 /* On tiny_write failure, page and io are always null. */
857                 if (!IS_ERR_OR_NULL(page)) {
858                         lu_ref_del(&page->cp_reference, "cl_io", io);
859                         cl_page_put(env, page);
860                 }
861                 if (io)
862                         io->ci_result = result;
863         } else {
864                 *pagep = vmpage;
865                 *fsdata = lcc;
866         }
867         RETURN(result);
868 }
869
870 static int ll_tiny_write_end(struct file *file, struct address_space *mapping,
871                              loff_t pos, unsigned int len, unsigned int copied,
872                              struct page *vmpage)
873 {
874         struct cl_page *clpage = (struct cl_page *) vmpage->private;
875         loff_t kms = pos+copied;
876         loff_t to = kms & (PAGE_SIZE-1) ? kms & (PAGE_SIZE-1) : PAGE_SIZE;
877         __u16 refcheck;
878         struct lu_env *env = cl_env_get(&refcheck);
879         int rc = 0;
880
881         ENTRY;
882
883         if (IS_ERR(env)) {
884                 rc = PTR_ERR(env);
885                 goto out;
886         }
887
888         /* This page is dirty in cache, so it should have a cl_page pointer
889          * set in vmpage->private.
890          */
891         LASSERT(clpage != NULL);
892
893         if (copied == 0)
894                 goto out_env;
895
896         /* Update the underlying size information in the OSC/LOV objects this
897          * page is part of.
898          */
899         cl_page_touch(env, clpage, to);
900
901 out_env:
902         cl_env_put(env, &refcheck);
903
904 out:
905         /* Must return page unlocked. */
906         unlock_page(vmpage);
907
908         RETURN(rc);
909 }
910
911 static int ll_write_end(struct file *file, struct address_space *mapping,
912                         loff_t pos, unsigned len, unsigned copied,
913                         struct page *vmpage, void *fsdata)
914 {
915         struct ll_cl_context *lcc = fsdata;
916         const struct lu_env *env;
917         struct cl_io *io;
918         struct vvp_io *vio;
919         struct cl_page *page;
920         unsigned from = pos & (PAGE_SIZE - 1);
921         bool unplug = false;
922         int result = 0;
923         ENTRY;
924
925         put_page(vmpage);
926
927         CDEBUG(D_VFSTRACE, "pos %llu, len %u, copied %u\n", pos, len, copied);
928
929         if (lcc == NULL) {
930                 result = ll_tiny_write_end(file, mapping, pos, len, copied,
931                                            vmpage);
932                 GOTO(out, result);
933         }
934
935         LASSERT(lcc != NULL);
936         env  = lcc->lcc_env;
937         page = lcc->lcc_page;
938         io   = lcc->lcc_io;
939         vio  = vvp_env_io(env);
940
941         LASSERT(cl_page_is_owned(page, io));
942         if (copied > 0) {
943                 struct cl_page_list *plist = &vio->u.readwrite.vui_queue;
944
945                 lcc->lcc_page = NULL; /* page will be queued */
946
947                 /* Add it into write queue */
948                 cl_page_list_add(plist, page, true);
949                 if (plist->pl_nr == 1) /* first page */
950                         vio->u.readwrite.vui_from = from;
951                 else
952                         LASSERT(from == 0);
953                 vio->u.readwrite.vui_to = from + copied;
954
955                 /* To address the deadlock in balance_dirty_pages() where
956                  * this dirty page may be written back in the same thread. */
957                 if (PageDirty(vmpage))
958                         unplug = true;
959
960                 /* We may have one full RPC, commit it soon */
961                 if (plist->pl_nr >= PTLRPC_MAX_BRW_PAGES)
962                         unplug = true;
963
964                 CL_PAGE_DEBUG(D_VFSTRACE, env, page,
965                               "queued page: %d.\n", plist->pl_nr);
966         } else {
967                 cl_page_disown(env, io, page);
968
969                 lcc->lcc_page = NULL;
970                 lu_ref_del(&page->cp_reference, "cl_io", io);
971                 cl_page_put(env, page);
972
973                 /* page list is not contiguous now, commit it now */
974                 unplug = true;
975         }
976         if (unplug || io->u.ci_wr.wr_sync)
977                 result = vvp_io_write_commit(env, io);
978
979         if (result < 0)
980                 io->ci_result = result;
981
982
983 out:
984         RETURN(result >= 0 ? copied : result);
985 }
986
987 #ifdef CONFIG_MIGRATION
988 static int ll_migrate_folio(struct address_space *mapping,
989                             struct folio_migr *newpage, struct folio_migr *page,
990                             enum migrate_mode mode)
991 {
992         /* Always fail page migration until we have a proper implementation */
993         return -EIO;
994 }
995 #endif
996
997 const struct address_space_operations ll_aops = {
998 #ifdef HAVE_DIRTY_FOLIO
999         .dirty_folio            = filemap_dirty_folio,
1000 #else
1001         .set_page_dirty         = __set_page_dirty_nobuffers,
1002 #endif
1003 #ifdef HAVE_INVALIDATE_FOLIO
1004         .invalidate_folio       = ll_invalidate_folio,
1005 #else
1006         .invalidatepage         = ll_invalidatepage,
1007 #endif
1008 #ifdef HAVE_AOPS_READ_FOLIO
1009         .read_folio             = ll_read_folio,
1010 #else
1011         .readpage               = ll_readpage,
1012 #endif
1013 #ifdef HAVE_AOPS_RELEASE_FOLIO
1014         .release_folio          = ll_release_folio,
1015 #else
1016         .releasepage            = (void *)ll_releasepage,
1017 #endif
1018         .direct_IO              = ll_direct_IO,
1019         .writepage              = ll_writepage,
1020         .writepages             = ll_writepages,
1021         .write_begin            = ll_write_begin,
1022         .write_end              = ll_write_end,
1023 #ifdef CONFIG_MIGRATION
1024         .migrate_folio          = ll_migrate_folio,
1025 #endif
1026 };