1 // SPDX-License-Identifier: GPL-2.0
4 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
5 * Use is subject to license terms.
7 * Copyright (c) 2011, 2017, Intel Corporation.
12 * This file is part of Lustre, http://www.lustre.org/
16 * Author: Nikita Danilov <nikita.danilov@sun.com>
17 * Author: Jinshan Xiong <jinshan.xiong@intel.com>
21 #define DEBUG_SUBSYSTEM S_CLASS
23 #include <linux/sched.h>
24 #include <linux/list.h>
25 #include <linux/list_sort.h>
26 #include <linux/mmu_context.h>
27 #include <obd_class.h>
28 #include <obd_support.h>
29 #include <lustre_fid.h>
30 #include <cl_object.h>
31 #include "cl_internal.h"
37 static inline int cl_io_type_is_valid(enum cl_io_type type)
39 return CIT_READ <= type && type < CIT_OP_NR;
42 static inline int cl_io_is_loopable(const struct cl_io *io)
44 return cl_io_type_is_valid(io->ci_type) && io->ci_type != CIT_MISC;
48 * cl_io invariant that holds at all times when exported cl_io_*() functions
49 * are entered and left.
51 static inline int cl_io_invariant(const struct cl_io *io)
54 * io can own pages only when it is ongoing. Sub-io might
55 * still be in CIS_LOCKED state when top-io is in
58 return ergo(io->ci_owned_nr > 0, io->ci_state == CIS_IO_GOING ||
59 (io->ci_state == CIS_LOCKED && io->ci_parent != NULL));
63 * Finalize \a io, by calling cl_io_operations::cio_fini() bottom-to-top.
65 void cl_io_fini(const struct lu_env *env, struct cl_io *io)
67 struct cl_io_slice *slice;
69 LINVRNT(cl_io_type_is_valid(io->ci_type));
70 LINVRNT(cl_io_invariant(io));
73 while (!list_empty(&io->ci_layers)) {
74 slice = container_of(io->ci_layers.prev, struct cl_io_slice,
76 list_del_init(&slice->cis_linkage);
77 if (slice->cis_iop->op[io->ci_type].cio_fini != NULL)
78 slice->cis_iop->op[io->ci_type].cio_fini(env, slice);
80 * Invalidate slice to catch use after free. This assumes that
81 * slices are allocated within session and can be touched
82 * after ->cio_fini() returns.
86 io->ci_state = CIS_FINI;
88 /* sanity check for layout change */
92 case CIT_DATA_VERSION:
96 LASSERT(!io->ci_need_restart);
100 /* Check ignore layout change conf */
101 LASSERT(ergo(io->ci_ignore_layout || !io->ci_verify_layout,
102 !io->ci_need_restart));
113 EXPORT_SYMBOL(cl_io_fini);
115 static int __cl_io_init(const struct lu_env *env, struct cl_io *io,
116 enum cl_io_type iot, struct cl_object *obj)
118 struct cl_object *scan;
121 LINVRNT(io->ci_state == CIS_ZERO || io->ci_state == CIS_FINI);
122 LINVRNT(cl_io_type_is_valid(iot));
123 LINVRNT(cl_io_invariant(io));
127 INIT_LIST_HEAD(&io->ci_lockset.cls_todo);
128 INIT_LIST_HEAD(&io->ci_lockset.cls_done);
129 INIT_LIST_HEAD(&io->ci_layers);
132 cl_object_for_each(scan, obj) {
133 if (scan->co_ops->coo_io_init != NULL) {
134 result = scan->co_ops->coo_io_init(env, scan, io);
140 io->ci_state = CIS_INIT;
145 * Initialize sub-io, by calling cl_io_operations::cio_init() top-to-bottom.
147 * \pre obj != cl_object_top(obj)
149 int cl_io_sub_init(const struct lu_env *env, struct cl_io *io,
150 enum cl_io_type iot, struct cl_object *obj)
152 LASSERT(obj != cl_object_top(obj));
154 return __cl_io_init(env, io, iot, obj);
156 EXPORT_SYMBOL(cl_io_sub_init);
159 * Initialize \a io, by calling cl_io_operations::cio_init() top-to-bottom.
161 * Caller has to call cl_io_fini() after a call to cl_io_init(), no matter
162 * what the latter returned.
164 * \pre obj == cl_object_top(obj)
165 * \pre cl_io_type_is_valid(iot)
166 * \post cl_io_type_is_valid(io->ci_type) && io->ci_type == iot
168 int cl_io_init(const struct lu_env *env, struct cl_io *io,
169 enum cl_io_type iot, struct cl_object *obj)
171 LASSERT(obj == cl_object_top(obj));
173 /* clear I/O restart from previous instance */
174 io->ci_need_restart = 0;
176 return __cl_io_init(env, io, iot, obj);
178 EXPORT_SYMBOL(cl_io_init);
181 * Initialize read or write io.
183 * \pre iot == CIT_READ || iot == CIT_WRITE
185 int cl_io_rw_init(const struct lu_env *env, struct cl_io *io,
186 enum cl_io_type iot, loff_t pos, size_t bytes)
188 LINVRNT(iot == CIT_READ || iot == CIT_WRITE);
189 LINVRNT(io->ci_obj != NULL);
192 LU_OBJECT_HEADER(D_VFSTRACE, env, &io->ci_obj->co_lu,
193 "io range: %u [%llu, %llu) %u %u\n",
194 iot, (__u64)pos, (__u64)pos + bytes,
195 io->u.ci_rw.crw_nonblock, io->u.ci_wr.wr_append);
196 io->u.ci_rw.crw_pos = pos;
197 io->u.ci_rw.crw_bytes = bytes;
198 RETURN(cl_io_init(env, io, iot, io->ci_obj));
200 EXPORT_SYMBOL(cl_io_rw_init);
202 #ifdef HAVE_LIST_CMP_FUNC_T
203 static int cl_lock_descr_cmp(void *priv,
204 const struct list_head *a,
205 const struct list_head *b)
206 #else /* !HAVE_LIST_CMP_FUNC_T */
207 static int cl_lock_descr_cmp(void *priv,
208 struct list_head *a, struct list_head *b)
209 #endif /* HAVE_LIST_CMP_FUNC_T */
211 const struct cl_io_lock_link *l0 = list_entry(a, struct cl_io_lock_link,
213 const struct cl_io_lock_link *l1 = list_entry(b, struct cl_io_lock_link,
215 const struct cl_lock_descr *d0 = &l0->cill_descr;
216 const struct cl_lock_descr *d1 = &l1->cill_descr;
218 return lu_fid_cmp(lu_object_fid(&d0->cld_obj->co_lu),
219 lu_object_fid(&d1->cld_obj->co_lu));
222 static void cl_lock_descr_merge(struct cl_lock_descr *d0,
223 const struct cl_lock_descr *d1)
225 d0->cld_start = min(d0->cld_start, d1->cld_start);
226 d0->cld_end = max(d0->cld_end, d1->cld_end);
228 if (d1->cld_mode == CLM_WRITE && d0->cld_mode != CLM_WRITE)
229 d0->cld_mode = CLM_WRITE;
231 if (d1->cld_mode == CLM_GROUP && d0->cld_mode != CLM_GROUP)
232 d0->cld_mode = CLM_GROUP;
235 static int cl_lockset_merge(const struct cl_lockset *set,
236 const struct cl_lock_descr *need)
238 struct cl_io_lock_link *scan;
241 list_for_each_entry(scan, &set->cls_todo, cill_linkage) {
242 if (!cl_object_same(scan->cill_descr.cld_obj, need->cld_obj))
245 /* Merge locks for the same object because ldlm lock server
246 * may expand the lock extent, otherwise there is a deadlock
247 * case if two conflicted locks are queueud for the same object
248 * and lock server expands one lock to overlap the another.
249 * The side effect is that it can generate a multi-stripe lock
250 * that may cause casacading problem */
251 cl_lock_descr_merge(&scan->cill_descr, need);
252 CDEBUG(D_VFSTRACE, "lock: %d: [%lu, %lu]\n",
253 scan->cill_descr.cld_mode, scan->cill_descr.cld_start,
254 scan->cill_descr.cld_end);
260 static int cl_lockset_lock(const struct lu_env *env, struct cl_io *io,
261 struct cl_lockset *set)
263 struct cl_io_lock_link *link;
264 struct cl_io_lock_link *temp;
269 list_for_each_entry_safe(link, temp, &set->cls_todo, cill_linkage) {
270 result = cl_lock_request(env, io, &link->cill_lock);
274 list_move(&link->cill_linkage, &set->cls_done);
280 * Takes locks necessary for the current iteration of io.
282 * Calls cl_io_operations::cio_lock() top-to-bottom to collect locks required
283 * by layers for the current iteration. Then sort locks (to avoid dead-locks),
286 int cl_io_lock(const struct lu_env *env, struct cl_io *io)
288 const struct cl_io_slice *scan;
291 LINVRNT(cl_io_is_loopable(io));
292 LINVRNT(io->ci_state == CIS_IT_STARTED);
293 LINVRNT(cl_io_invariant(io));
296 list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
297 if (scan->cis_iop->op[io->ci_type].cio_lock == NULL)
299 result = scan->cis_iop->op[io->ci_type].cio_lock(env, scan);
305 * Sort locks in lexicographical order of their (fid,
306 * start-offset) pairs to avoid deadlocks.
308 list_sort(NULL, &io->ci_lockset.cls_todo, cl_lock_descr_cmp);
309 result = cl_lockset_lock(env, io, &io->ci_lockset);
312 cl_io_unlock(env, io);
314 io->ci_state = CIS_LOCKED;
317 EXPORT_SYMBOL(cl_io_lock);
320 * Release locks takes by io.
322 void cl_io_unlock(const struct lu_env *env, struct cl_io *io)
324 struct cl_lockset *set;
325 struct cl_io_lock_link *link;
326 struct cl_io_lock_link *temp;
327 const struct cl_io_slice *scan;
329 LASSERT(cl_io_is_loopable(io));
330 LASSERT(CIS_IT_STARTED <= io->ci_state && io->ci_state < CIS_UNLOCKED);
331 LINVRNT(cl_io_invariant(io));
334 set = &io->ci_lockset;
336 list_for_each_entry_safe(link, temp, &set->cls_todo, cill_linkage) {
337 list_del_init(&link->cill_linkage);
338 if (link->cill_fini != NULL)
339 link->cill_fini(env, link);
342 list_for_each_entry_safe(link, temp, &set->cls_done, cill_linkage) {
343 list_del_init(&link->cill_linkage);
344 cl_lock_release(env, &link->cill_lock);
345 if (link->cill_fini != NULL)
346 link->cill_fini(env, link);
349 list_for_each_entry_reverse(scan, &io->ci_layers, cis_linkage) {
350 if (scan->cis_iop->op[io->ci_type].cio_unlock != NULL)
351 scan->cis_iop->op[io->ci_type].cio_unlock(env, scan);
353 io->ci_state = CIS_UNLOCKED;
356 EXPORT_SYMBOL(cl_io_unlock);
359 * Prepares next iteration of io.
361 * Calls cl_io_operations::cio_iter_init() top-to-bottom. This exists to give
362 * layers a chance to modify io parameters, e.g., so that lov can restrict io
363 * to a single stripe.
365 int cl_io_iter_init(const struct lu_env *env, struct cl_io *io)
367 const struct cl_io_slice *scan;
370 LINVRNT(cl_io_is_loopable(io));
371 LINVRNT(io->ci_state == CIS_INIT || io->ci_state == CIS_IT_ENDED);
372 LINVRNT(cl_io_invariant(io));
376 list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
377 if (scan->cis_iop->op[io->ci_type].cio_iter_init == NULL)
379 result = scan->cis_iop->op[io->ci_type].cio_iter_init(env,
385 io->ci_state = CIS_IT_STARTED;
388 EXPORT_SYMBOL(cl_io_iter_init);
391 * Finalizes io iteration.
393 * Calls cl_io_operations::cio_iter_fini() bottom-to-top.
395 void cl_io_iter_fini(const struct lu_env *env, struct cl_io *io)
397 const struct cl_io_slice *scan;
399 LINVRNT(cl_io_is_loopable(io));
400 LINVRNT(io->ci_state <= CIS_IT_STARTED ||
401 io->ci_state > CIS_IO_FINISHED);
402 LINVRNT(cl_io_invariant(io));
405 list_for_each_entry_reverse(scan, &io->ci_layers, cis_linkage) {
406 if (scan->cis_iop->op[io->ci_type].cio_iter_fini != NULL)
407 scan->cis_iop->op[io->ci_type].cio_iter_fini(env, scan);
409 io->ci_state = CIS_IT_ENDED;
412 EXPORT_SYMBOL(cl_io_iter_fini);
415 * Records that read or write io progressed \a bytes forward.
417 void cl_io_rw_advance(const struct lu_env *env, struct cl_io *io, size_t bytes)
419 const struct cl_io_slice *scan;
423 LINVRNT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE ||
425 LINVRNT(cl_io_is_loopable(io));
426 LINVRNT(cl_io_invariant(io));
428 io->u.ci_rw.crw_pos += bytes;
429 io->u.ci_rw.crw_bytes -= bytes;
431 /* layers have to be notified. */
432 list_for_each_entry_reverse(scan, &io->ci_layers, cis_linkage) {
433 if (scan->cis_iop->op[io->ci_type].cio_advance != NULL)
434 scan->cis_iop->op[io->ci_type].cio_advance(env, scan,
441 * Adds a lock to a lockset.
443 int cl_io_lock_add(const struct lu_env *env, struct cl_io *io,
444 struct cl_io_lock_link *link)
449 if (cl_lockset_merge(&io->ci_lockset, &link->cill_descr))
452 list_add(&link->cill_linkage, &io->ci_lockset.cls_todo);
457 EXPORT_SYMBOL(cl_io_lock_add);
459 static void cl_free_io_lock_link(const struct lu_env *env,
460 struct cl_io_lock_link *link)
466 * Allocates new lock link, and uses it to add a lock to a lockset.
468 int cl_io_lock_alloc_add(const struct lu_env *env, struct cl_io *io,
469 struct cl_lock_descr *descr)
471 struct cl_io_lock_link *link;
477 link->cill_descr = *descr;
478 link->cill_fini = cl_free_io_lock_link;
479 result = cl_io_lock_add(env, io, link);
480 if (result) /* lock match */
481 link->cill_fini(env, link);
487 EXPORT_SYMBOL(cl_io_lock_alloc_add);
490 * Starts io by calling cl_io_operations::cio_start() top-to-bottom.
492 int cl_io_start(const struct lu_env *env, struct cl_io *io)
494 const struct cl_io_slice *scan;
497 LINVRNT(cl_io_is_loopable(io));
498 LINVRNT(io->ci_state == CIS_LOCKED);
499 LINVRNT(cl_io_invariant(io));
502 io->ci_state = CIS_IO_GOING;
503 list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
504 if (scan->cis_iop->op[io->ci_type].cio_start == NULL)
506 result = scan->cis_iop->op[io->ci_type].cio_start(env, scan);
514 EXPORT_SYMBOL(cl_io_start);
517 * Wait until current io iteration is finished by calling
518 * cl_io_operations::cio_end() bottom-to-top.
520 void cl_io_end(const struct lu_env *env, struct cl_io *io)
522 const struct cl_io_slice *scan;
524 LINVRNT(cl_io_is_loopable(io));
525 LINVRNT(io->ci_state == CIS_IO_GOING);
526 LINVRNT(cl_io_invariant(io));
529 list_for_each_entry_reverse(scan, &io->ci_layers, cis_linkage) {
530 if (scan->cis_iop->op[io->ci_type].cio_end != NULL)
531 scan->cis_iop->op[io->ci_type].cio_end(env, scan);
532 /* TODO: error handling. */
534 io->ci_state = CIS_IO_FINISHED;
537 EXPORT_SYMBOL(cl_io_end);
540 * Called by read io, to decide the readahead extent
542 * \see cl_io_operations::cio_read_ahead()
544 int cl_io_read_ahead(const struct lu_env *env, struct cl_io *io,
545 pgoff_t start, struct cl_read_ahead *ra)
547 const struct cl_io_slice *scan;
550 LINVRNT(io->ci_type == CIT_READ ||
551 io->ci_type == CIT_FAULT ||
552 io->ci_type == CIT_WRITE);
553 LINVRNT(io->ci_state == CIS_IO_GOING || io->ci_state == CIS_LOCKED);
554 LINVRNT(cl_io_invariant(io));
557 list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
558 if (scan->cis_iop->cio_read_ahead == NULL)
561 result = scan->cis_iop->cio_read_ahead(env, scan, start, ra);
565 RETURN(result > 0 ? 0 : result);
567 EXPORT_SYMBOL(cl_io_read_ahead);
570 * Called before io start, to reserve enough LRU slots to avoid
573 * \see cl_io_operations::cio_lru_reserve()
575 int cl_io_lru_reserve(const struct lu_env *env, struct cl_io *io,
576 loff_t pos, size_t bytes)
578 const struct cl_io_slice *scan;
581 LINVRNT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
582 LINVRNT(cl_io_invariant(io));
585 list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
586 if (scan->cis_iop->cio_lru_reserve) {
587 result = scan->cis_iop->cio_lru_reserve(env, scan,
596 EXPORT_SYMBOL(cl_io_lru_reserve);
599 * Commit a list of contiguous pages into writeback cache.
601 * \returns 0 if all pages committed, or errcode if error occurred.
602 * \see cl_io_operations::cio_commit_async()
604 int cl_io_commit_async(const struct lu_env *env, struct cl_io *io,
605 struct cl_page_list *queue, int from, int to,
608 const struct cl_io_slice *scan;
612 list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
613 if (scan->cis_iop->cio_commit_async == NULL)
615 result = scan->cis_iop->cio_commit_async(env, scan, queue,
622 EXPORT_SYMBOL(cl_io_commit_async);
624 void cl_io_extent_release(const struct lu_env *env, struct cl_io *io)
626 const struct cl_io_slice *scan;
629 list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
630 if (scan->cis_iop->cio_extent_release == NULL)
632 scan->cis_iop->cio_extent_release(env, scan);
636 EXPORT_SYMBOL(cl_io_extent_release);
639 * Submits a list of pages for immediate io.
641 * After the function gets returned, The submitted pages are moved to
642 * queue->c2_qout queue, and queue->c2_qin contain both the pages don't need
643 * to be submitted, and the pages are errant to submit.
645 * \returns 0 if at least one page was submitted, error code otherwise.
646 * \see cl_io_operations::cio_submit()
648 int cl_io_submit_rw(const struct lu_env *env, struct cl_io *io,
649 enum cl_req_type crt, struct cl_2queue *queue)
651 const struct cl_io_slice *scan;
655 list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
656 if (scan->cis_iop->cio_submit == NULL)
658 result = scan->cis_iop->cio_submit(env, io, scan, crt, queue);
663 * If ->cio_submit() failed, no pages were sent.
665 LASSERT(ergo(result != 0, list_empty(&queue->c2_qout.pl_pages)));
668 EXPORT_SYMBOL(cl_io_submit_rw);
671 * Submit a sync_io and wait for the IO to be finished, or error happens.
672 * If \a timeout is zero, it means to wait for the IO unconditionally.
674 * This is used for synchronous submission of an async IO, so the waiting is
675 * done here in this function and the IO is done when this function returns.
677 int cl_io_submit_sync(const struct lu_env *env, struct cl_io *io,
678 enum cl_req_type iot, struct cl_2queue *queue,
681 struct cl_sync_io *anchor = &cl_env_info(env)->clt_anchor;
686 cl_page_list_for_each(pg, &queue->c2_qin) {
687 LASSERT(pg->cp_sync_io == NULL);
688 /* this is for sync submission of async IO, IO that was always
689 * sync (like DIO) is handled differently
691 LASSERT(pg->cp_type != CPT_TRANSIENT);
692 pg->cp_sync_io = anchor;
695 cl_sync_io_init(anchor, queue->c2_qin.pl_nr);
696 rc = cl_io_submit_rw(env, io, iot, queue);
699 * If some pages weren't sent for any reason (e.g.,
700 * read found up-to-date pages in the cache, or write found
701 * clean pages), count them as completed to avoid infinite
704 cl_page_list_for_each(pg, &queue->c2_qin) {
705 pg->cp_sync_io = NULL;
706 cl_sync_io_note(env, anchor, 1);
709 /* wait for the IO to be finished. */
710 rc = cl_sync_io_wait(env, anchor, timeout);
711 cl_page_list_assume(env, io, &queue->c2_qout);
713 LASSERT(list_empty(&queue->c2_qout.pl_pages));
714 cl_page_list_for_each(pg, &queue->c2_qin)
715 pg->cp_sync_io = NULL;
719 EXPORT_SYMBOL(cl_io_submit_sync);
724 * Pumps io through iterations calling
726 * - cl_io_iter_init()
736 * - cl_io_iter_fini()
738 * repeatedly until there is no more io to do.
740 int cl_io_loop(const struct lu_env *env, struct cl_io *io)
745 LINVRNT(cl_io_is_loopable(io));
752 result = cl_io_iter_init(env, io);
754 bytes = io->ci_bytes;
755 result = cl_io_lock(env, io);
758 * Notify layers that locks has been taken,
761 * - llite: kms, short read;
762 * - llite: generic_file_read();
764 result = cl_io_start(env, io);
766 * Send any remaining pending
769 ** - llite: ll_rw_stats_tally.
772 cl_io_unlock(env, io);
773 cl_io_rw_advance(env, io, io->ci_bytes - bytes);
776 cl_io_iter_fini(env, io);
779 } while ((result == 0 || result == -EIOCBQUEUED) &&
785 if (result == -EAGAIN && io->ci_ndelay && !io->ci_iocb_nowait) {
786 if (!io->ci_tried_all_mirrors) {
787 io->ci_need_restart = 1;
795 result = io->ci_result;
796 RETURN(result < 0 ? result : 0);
798 EXPORT_SYMBOL(cl_io_loop);
801 * Adds io slice to the cl_io.
803 * This is called by cl_object_operations::coo_io_init() methods to add a
804 * per-layer state to the io. New state is added at the end of
805 * cl_io::ci_layers list, that is, it is at the bottom of the stack.
807 * \see cl_lock_slice_add(), cl_req_slice_add(), cl_page_slice_add()
809 void cl_io_slice_add(struct cl_io *io, struct cl_io_slice *slice,
810 struct cl_object *obj,
811 const struct cl_io_operations *ops)
813 struct list_head *linkage = &slice->cis_linkage;
815 LASSERT((linkage->prev == NULL && linkage->next == NULL) ||
816 list_empty(linkage));
819 list_add_tail(linkage, &io->ci_layers);
821 slice->cis_obj = obj;
822 slice->cis_iop = ops;
825 EXPORT_SYMBOL(cl_io_slice_add);
829 * Initializes page list.
831 void cl_page_list_init(struct cl_page_list *plist)
835 INIT_LIST_HEAD(&plist->pl_pages);
838 EXPORT_SYMBOL(cl_page_list_init);
841 * Adds a page to a page list.
843 void cl_page_list_add(struct cl_page_list *plist, struct cl_page *page,
847 /* it would be better to check that page is owned by "current" io, but
848 * it is not passed here. */
849 LASSERT(page->cp_owner != NULL);
851 LASSERT(list_empty(&page->cp_batch));
852 list_add_tail(&page->cp_batch, &plist->pl_pages);
854 lu_ref_add_at(&page->cp_reference, &page->cp_queue_ref, "queue", plist);
859 EXPORT_SYMBOL(cl_page_list_add);
862 * Removes a page from a page list.
864 void cl_page_list_del(const struct lu_env *env,
865 struct cl_page_list *plist, struct cl_page *page,
868 LASSERT(plist->pl_nr > 0);
871 list_del_init(&page->cp_batch);
873 lu_ref_del_at(&page->cp_reference, &page->cp_queue_ref, "queue", plist);
875 cl_page_put(env, page);
878 EXPORT_SYMBOL(cl_page_list_del);
881 * Moves a page from one page list to another.
883 void cl_page_list_move(struct cl_page_list *dst, struct cl_page_list *src,
884 struct cl_page *page)
886 LASSERT(src->pl_nr > 0);
889 list_move_tail(&page->cp_batch, &dst->pl_pages);
892 lu_ref_set_at(&page->cp_reference, &page->cp_queue_ref, "queue",
896 EXPORT_SYMBOL(cl_page_list_move);
899 * Moves a page from one page list to the head of another list.
901 void cl_page_list_move_head(struct cl_page_list *dst, struct cl_page_list *src,
902 struct cl_page *page)
904 LASSERT(src->pl_nr > 0);
907 list_move(&page->cp_batch, &dst->pl_pages);
910 lu_ref_set_at(&page->cp_reference, &page->cp_queue_ref, "queue",
914 EXPORT_SYMBOL(cl_page_list_move_head);
917 * splice the cl_page_list, just as list head does
919 void cl_page_list_splice(struct cl_page_list *src, struct cl_page_list *dst)
921 #ifdef CONFIG_LUSTRE_DEBUG_LU_REF
922 struct cl_page *page;
926 cl_page_list_for_each_safe(page, tmp, src)
927 lu_ref_set_at(&page->cp_reference, &page->cp_queue_ref,
932 dst->pl_nr += src->pl_nr;
934 list_splice_tail_init(&src->pl_pages, &dst->pl_pages);
938 EXPORT_SYMBOL(cl_page_list_splice);
941 * Disowns pages in a queue.
943 void cl_page_list_disown(const struct lu_env *env, struct cl_page_list *plist)
945 struct cl_page *page;
946 struct cl_page *temp;
949 cl_page_list_for_each_safe(page, temp, plist) {
950 LASSERT(plist->pl_nr > 0);
952 list_del_init(&page->cp_batch);
955 * __cl_page_disown rather than usual cl_page_disown() is used,
956 * because pages are possibly in CPS_FREEING state already due
957 * to the call to cl_page_list_discard().
960 * XXX __cl_page_disown() will fail if page is not locked.
962 __cl_page_disown(env, page);
963 lu_ref_del_at(&page->cp_reference, &page->cp_queue_ref, "queue",
965 cl_page_put(env, page);
969 EXPORT_SYMBOL(cl_page_list_disown);
972 * Releases pages from queue.
974 void cl_page_list_fini(const struct lu_env *env, struct cl_page_list *plist)
976 struct cl_page *page;
977 struct cl_page *temp;
980 cl_page_list_for_each_safe(page, temp, plist)
981 cl_page_list_del(env, plist, page, true);
982 LASSERT(plist->pl_nr == 0);
985 EXPORT_SYMBOL(cl_page_list_fini);
988 * Assumes all pages in a queue.
990 void cl_page_list_assume(const struct lu_env *env,
991 struct cl_io *io, struct cl_page_list *plist)
993 struct cl_page *page;
995 cl_page_list_for_each(page, plist)
996 cl_page_assume(env, io, page);
1000 * Discards all pages in a queue.
1002 void cl_page_list_discard(const struct lu_env *env, struct cl_io *io,
1003 struct cl_page_list *plist)
1005 struct cl_page *page;
1008 cl_page_list_for_each(page, plist)
1009 cl_page_discard(env, io, page);
1012 EXPORT_SYMBOL(cl_page_list_discard);
1015 * Initialize dual page queue.
1017 void cl_2queue_init(struct cl_2queue *queue)
1020 cl_page_list_init(&queue->c2_qin);
1021 cl_page_list_init(&queue->c2_qout);
1024 EXPORT_SYMBOL(cl_2queue_init);
1027 * Disown pages in both lists of a 2-queue.
1029 void cl_2queue_disown(const struct lu_env *env, struct cl_2queue *queue)
1032 cl_page_list_disown(env, &queue->c2_qin);
1033 cl_page_list_disown(env, &queue->c2_qout);
1036 EXPORT_SYMBOL(cl_2queue_disown);
1039 * Discard (truncate) pages in both lists of a 2-queue.
1041 void cl_2queue_discard(const struct lu_env *env,
1042 struct cl_io *io, struct cl_2queue *queue)
1045 cl_page_list_discard(env, io, &queue->c2_qin);
1046 cl_page_list_discard(env, io, &queue->c2_qout);
1049 EXPORT_SYMBOL(cl_2queue_discard);
1052 * Assume to own the pages in cl_2queue
1054 void cl_2queue_assume(const struct lu_env *env,
1055 struct cl_io *io, struct cl_2queue *queue)
1057 cl_page_list_assume(env, io, &queue->c2_qin);
1058 cl_page_list_assume(env, io, &queue->c2_qout);
1062 * Finalize both page lists of a 2-queue.
1064 void cl_2queue_fini(const struct lu_env *env, struct cl_2queue *queue)
1067 cl_page_list_fini(env, &queue->c2_qout);
1068 cl_page_list_fini(env, &queue->c2_qin);
1071 EXPORT_SYMBOL(cl_2queue_fini);
1074 * Initialize a 2-queue to contain \a page in its incoming page list.
1076 void cl_2queue_init_page(struct cl_2queue *queue, struct cl_page *page)
1079 cl_2queue_init(queue);
1081 * Add a page to the incoming page list of 2-queue.
1083 cl_page_list_add(&queue->c2_qin, page, true);
1086 EXPORT_SYMBOL(cl_2queue_init_page);
1089 * Returns top-level io.
1091 * \see cl_object_top()
1093 struct cl_io *cl_io_top(struct cl_io *io)
1096 while (io->ci_parent != NULL)
1100 EXPORT_SYMBOL(cl_io_top);
1103 * Fills in attributes that are passed to server together with transfer. Only
1104 * attributes from \a flags may be touched. This can be called multiple times
1105 * for the same request.
1107 void cl_req_attr_set(const struct lu_env *env, struct cl_object *obj,
1108 struct cl_req_attr *attr)
1110 struct cl_object *scan;
1113 cl_object_for_each(scan, obj) {
1114 if (scan->co_ops->coo_req_attr_set != NULL)
1115 scan->co_ops->coo_req_attr_set(env, scan, attr);
1119 EXPORT_SYMBOL(cl_req_attr_set);
1122 * Initialize synchronous io wait \a anchor for \a nr pages with optional
1124 * \param anchor owned by caller, initialzied here.
1125 * \param nr number of pages initally pending in sync.
1126 * \param end optional callback sync_io completion, can be used to
1127 * trigger erasure coding, integrity, dedupe, or similar operation.
1128 * \q end is called with a spinlock on anchor->csi_waitq.lock
1130 void cl_sync_io_init_notify(struct cl_sync_io *anchor, int nr,
1131 void *dio_aio, cl_sync_io_end_t *end)
1134 memset(anchor, 0, sizeof(*anchor));
1135 init_waitqueue_head(&anchor->csi_waitq);
1136 atomic_set(&anchor->csi_sync_nr, nr);
1137 atomic_set(&anchor->csi_complete, 0);
1138 anchor->csi_sync_rc = 0;
1139 anchor->csi_end_io = end;
1140 anchor->csi_dio_aio = dio_aio;
1143 EXPORT_SYMBOL(cl_sync_io_init_notify);
1146 * Wait until all IO completes. Transfer completion routine has to call
1147 * cl_sync_io_note() for every entity.
1149 int cl_sync_io_wait(const struct lu_env *env, struct cl_sync_io *anchor,
1155 LASSERT(timeout >= 0);
1158 wait_event_idle_timeout(anchor->csi_waitq,
1159 atomic_read(&anchor->csi_complete) == 1,
1160 cfs_time_seconds(timeout)) == 0) {
1162 CERROR("IO failed: %d, still wait for %d remaining entries\n",
1163 rc, atomic_read(&anchor->csi_complete));
1166 wait_event_idle(anchor->csi_waitq,
1167 atomic_read(&anchor->csi_complete) == 1);
1169 rc = anchor->csi_sync_rc;
1171 /* We take the lock to ensure that cl_sync_io_note() has finished */
1172 spin_lock(&anchor->csi_waitq.lock);
1173 LASSERT(atomic_read(&anchor->csi_sync_nr) == 0);
1174 LASSERT(atomic_read(&anchor->csi_complete) == 1);
1175 spin_unlock(&anchor->csi_waitq.lock);
1179 EXPORT_SYMBOL(cl_sync_io_wait);
1181 static inline void dio_aio_complete(struct kiocb *iocb, ssize_t res)
1183 #ifdef HAVE_AIO_COMPLETE
1184 aio_complete(iocb, res, 0);
1186 if (iocb->ki_complete)
1187 # ifdef HAVE_KIOCB_COMPLETE_2ARGS
1188 iocb->ki_complete(iocb, res);
1190 iocb->ki_complete(iocb, res, 0);
1195 static void cl_dio_aio_end(const struct lu_env *env, struct cl_sync_io *anchor)
1197 struct cl_dio_aio *aio = container_of(anchor, typeof(*aio), cda_sync);
1198 ssize_t ret = anchor->csi_sync_rc;
1202 if (!aio->cda_no_aio_complete)
1203 dio_aio_complete(aio->cda_iocb, ret ?: aio->cda_bytes);
1208 static void cl_sub_dio_end(const struct lu_env *env, struct cl_sync_io *anchor)
1210 struct cl_sub_dio *sdio = container_of(anchor, typeof(*sdio), csd_sync);
1211 ssize_t ret = anchor->csi_sync_rc;
1216 while (sdio->csd_pages.pl_nr > 0) {
1217 struct cl_page *page = cl_page_list_first(&sdio->csd_pages);
1219 cl_page_delete(env, page);
1220 cl_page_list_del(env, &sdio->csd_pages, page, false);
1221 cl_page_put(env, page);
1224 if (sdio->csd_unaligned) {
1225 /* save the iovec pointer before it's modified by
1228 struct iovec *tmp = (struct iovec *) sdio->csd_iter.__iov;
1231 "finishing unaligned dio %s aio->cda_bytes %ld\n",
1232 sdio->csd_write ? "write" : "read", sdio->csd_bytes);
1233 /* read copies *from* the kernel buffer *to* userspace
1234 * here at the end, write copies *to* the kernel
1235 * buffer from userspace at the start
1237 if (!sdio->csd_write && sdio->csd_bytes > 0)
1238 ret = ll_dio_user_copy(sdio, NULL);
1239 ll_free_dio_buffer(&sdio->csd_dio_pages);
1240 /* handle the freeing here rather than in cl_sub_dio_free
1241 * because we have the unmodified iovec pointer
1244 sdio->csd_iter.__iov = NULL;
1246 /* unaligned DIO does not get user pages, so it doesn't have to
1247 * release them, but aligned I/O must
1249 ll_release_user_pages(sdio->csd_dio_pages.ldp_pages,
1250 sdio->csd_dio_pages.ldp_count);
1252 cl_sync_io_note(env, &sdio->csd_ll_aio->cda_sync, ret);
1257 struct cl_dio_aio *cl_dio_aio_alloc(struct kiocb *iocb, struct cl_object *obj,
1260 struct cl_dio_aio *aio;
1262 OBD_SLAB_ALLOC_PTR_GFP(aio, cl_dio_aio_kmem, GFP_NOFS);
1265 * Hold one ref so that it won't be released until
1266 * every pages is added.
1268 cl_sync_io_init_notify(&aio->cda_sync, 1, aio, cl_dio_aio_end);
1269 aio->cda_iocb = iocb;
1270 aio->cda_no_aio_complete = !is_aio;
1271 /* if this is true AIO, the memory is freed by the last call
1272 * to cl_sync_io_note (when all the I/O is complete), because
1273 * no one is waiting (in the kernel) for this to complete
1275 * in other cases, the last user is cl_sync_io_wait, and in
1276 * that case, the creator frees the struct after that call
1278 aio->cda_creator_free = !is_aio;
1282 aio->cda_mm = get_task_mm(current);
1286 EXPORT_SYMBOL(cl_dio_aio_alloc);
1288 struct cl_sub_dio *cl_sub_dio_alloc(struct cl_dio_aio *ll_aio,
1289 struct iov_iter *iter, bool write,
1290 bool unaligned, bool sync)
1292 struct cl_sub_dio *sdio;
1294 OBD_SLAB_ALLOC_PTR_GFP(sdio, cl_sub_dio_kmem, GFP_NOFS);
1297 * Hold one ref so that it won't be released until
1298 * every pages is added.
1300 cl_sync_io_init_notify(&sdio->csd_sync, 1, sdio,
1302 cl_page_list_init(&sdio->csd_pages);
1304 sdio->csd_ll_aio = ll_aio;
1305 sdio->csd_creator_free = sync;
1306 sdio->csd_write = write;
1307 sdio->csd_unaligned = unaligned;
1309 atomic_add(1, &ll_aio->cda_sync.csi_sync_nr);
1312 /* we need to make a copy of the user iovec at this
1313 * point in time, in order to:
1315 * A) have the correct state of the iovec for this
1316 * chunk of I/O, ie, the main iovec is altered as we do
1317 * I/O and this chunk needs the current state
1318 * B) have a chunk-local copy; doing the IO later
1319 * modifies the iovec, so to process each chunk from a
1320 * separate thread requires a local copy of the iovec
1322 memcpy(&sdio->csd_iter, iter, sizeof(struct iov_iter));
1323 OBD_ALLOC_PTR(sdio->csd_iter.__iov);
1324 if (sdio->csd_iter.__iov == NULL) {
1325 cl_sub_dio_free(sdio);
1329 memcpy((void *) sdio->csd_iter.__iov, iter->__iov,
1330 sizeof(struct iovec));
1336 EXPORT_SYMBOL(cl_sub_dio_alloc);
1338 void cl_dio_aio_free(const struct lu_env *env, struct cl_dio_aio *aio)
1343 cl_object_put(env, aio->cda_obj);
1344 OBD_SLAB_FREE_PTR(aio, cl_dio_aio_kmem);
1347 EXPORT_SYMBOL(cl_dio_aio_free);
1349 void cl_sub_dio_free(struct cl_sub_dio *sdio)
1352 void *tmp = (void *)sdio->csd_iter.__iov;
1355 LASSERT(sdio->csd_unaligned);
1358 OBD_SLAB_FREE_PTR(sdio, cl_sub_dio_kmem);
1361 EXPORT_SYMBOL(cl_sub_dio_free);
1364 * For unaligned DIO.
1366 * Allocate the internal buffer from/to which we will perform DIO. This takes
1367 * the user I/O parameters and allocates an internal buffer large enough to
1368 * hold it. The pages in this buffer are aligned with pages in the file (ie,
1369 * they have a 1-to-1 mapping with file pages).
1371 int ll_allocate_dio_buffer(struct ll_dio_pages *pvec, size_t io_size)
1373 struct page *new_page;
1380 /* page level offset in the file where the I/O starts */
1381 pg_offset = pvec->ldp_file_offset & ~PAGE_MASK;
1382 /* this adds 1 for the first page and removes the bytes in it from the
1383 * io_size, making the rest of the calculation aligned
1387 io_size -= min_t(size_t, PAGE_SIZE - pg_offset, io_size);
1390 /* calculate pages for the rest of the buffer */
1391 pvec->ldp_count += (io_size + PAGE_SIZE - 1) >> PAGE_SHIFT;
1393 #ifdef HAVE_DIO_ITER
1394 pvec->ldp_pages = kvzalloc(pvec->ldp_count * sizeof(struct page *),
1397 OBD_ALLOC_PTR_ARRAY_LARGE(pvec->ldp_pages, pvec->ldp_count);
1399 if (pvec->ldp_pages == NULL)
1402 for (i = 0; i < pvec->ldp_count; i++) {
1403 new_page = alloc_page(GFP_NOFS);
1406 pvec->ldp_count = i;
1409 pvec->ldp_pages[i] = new_page;
1411 WARN_ON(i != pvec->ldp_count);
1415 if (pvec->ldp_pages)
1416 ll_free_dio_buffer(pvec);
1420 result = pvec->ldp_count;
1424 EXPORT_SYMBOL(ll_allocate_dio_buffer);
1426 void ll_free_dio_buffer(struct ll_dio_pages *pvec)
1430 for (i = 0; i < pvec->ldp_count; i++)
1431 __free_page(pvec->ldp_pages[i]);
1433 #ifdef HAVE_DIO_ITER
1434 kfree(pvec->ldp_pages);
1436 OBD_FREE_PTR_ARRAY_LARGE(pvec->ldp_pages, pvec->ldp_count);
1439 EXPORT_SYMBOL(ll_free_dio_buffer);
1442 * ll_release_user_pages - tear down page struct array
1443 * @pages: array of page struct pointers underlying target buffer
1445 void ll_release_user_pages(struct page **pages, int npages)
1454 for (i = 0; i < npages; i++) {
1460 #if defined(HAVE_DIO_ITER)
1463 OBD_FREE_PTR_ARRAY_LARGE(pages, npages);
1466 EXPORT_SYMBOL(ll_release_user_pages);
1468 #ifdef HAVE_FAULT_IN_IOV_ITER_READABLE
1469 #define ll_iov_iter_fault_in_readable(iov, bytes) \
1470 fault_in_iov_iter_readable(iov, bytes)
1472 #define ll_iov_iter_fault_in_readable(iov, bytes) \
1473 iov_iter_fault_in_readable(iov, bytes)
1476 #ifndef HAVE_KTHREAD_USE_MM
1477 #define kthread_use_mm(mm) use_mm(mm)
1478 #define kthread_unuse_mm(mm) unuse_mm(mm)
1481 /* copy IO data to/from internal buffer and userspace iovec */
1482 ssize_t ll_dio_user_copy(struct cl_sub_dio *sdio, struct iov_iter *write_iov)
1484 struct iov_iter *iter = write_iov ? write_iov : &sdio->csd_iter;
1485 struct ll_dio_pages *pvec = &sdio->csd_dio_pages;
1486 struct mm_struct *mm = sdio->csd_ll_aio->cda_mm;
1487 loff_t pos = pvec->ldp_file_offset;
1488 size_t count = sdio->csd_bytes;
1489 size_t original_count = count;
1490 int short_copies = 0;
1491 bool mm_used = false;
1498 LASSERT(sdio->csd_unaligned);
1500 if (sdio->csd_write)
1505 /* if there's no mm, io is being done from a kernel thread, so there's
1506 * no need to transition to its mm context anyway.
1508 * Also, if mm == current->mm, that means this is being handled in the
1509 * thread which created it, and not in a separate kthread - so it is
1510 * unnecessary (and incorrect) to do a use_mm here
1512 if (mm && mm != current->mm) {
1517 /* fault in the entire userspace iovec */
1519 if (unlikely(ll_iov_iter_fault_in_readable(iter, count)))
1520 GOTO(out, status = -EFAULT);
1523 /* modeled on kernel generic_file_buffered_read/write()
1525 * note we only have one 'chunk' of i/o here, so we do not copy the
1526 * whole iovec here (except when the chunk is the whole iovec) so we
1527 * use the count of bytes in the chunk, csd_bytes, instead of looking
1531 struct page *page = pvec->ldp_pages[i];
1532 unsigned long offset; /* offset into kernel buffer page */
1533 size_t copied; /* bytes successfully copied */
1534 size_t bytes; /* bytes to copy for this page */
1536 LASSERT(i < pvec->ldp_count);
1538 offset = pos & ~PAGE_MASK;
1539 bytes = min_t(unsigned long, PAGE_SIZE - offset,
1543 "count %zd, offset %lu, pos %lld, ldp_count %lu\n",
1544 count, offset, pos, pvec->ldp_count);
1546 if (fatal_signal_pending(current)) {
1551 /* write requires a few extra steps */
1553 /* like btrfs, we do not have a mapping since this isn't
1554 * a page cache page, so we must do this flush
1557 * NB: This is a noop on x86 but active on other
1560 flush_dcache_page(page);
1562 #ifndef HAVE_COPY_PAGE_FROM_ITER_ATOMIC
1563 copied = iov_iter_copy_from_user_atomic(page, iter,
1565 iov_iter_advance(iter, copied);
1567 copied = copy_page_from_iter_atomic(page, offset, bytes,
1572 copied = copy_page_to_iter(page, offset, bytes, iter);
1578 if (unlikely(copied < bytes)) {
1582 "short copy - copied only %zd of %lu, short %d times\n",
1583 copied, bytes, short_copies);
1584 /* copies will very rarely be interrupted, but we
1585 * should retry in those cases, since the other option
1586 * is giving an IO error and this can occur in normal
1587 * operation such as with racing unaligned AIOs
1589 * but of course we should not retry indefinitely
1591 if (short_copies > 2) {
1592 CERROR("Unaligned DIO copy repeatedly short, count %zd, offset %lu, bytes %lu, copied %zd, pos %lld\n",
1593 count, offset, bytes, copied, pos);
1609 /* if we complete successfully, we should reach all of the pages */
1610 LASSERTF(ergo(status == 0, i == pvec->ldp_count - 1),
1611 "status: %d, i: %d, pvec->ldp_count %zu, count %zu\n",
1612 status, i, pvec->ldp_count, count);
1614 if (write_iov && status == 0) {
1615 /* The copy function we use modifies the count in the iovec,
1616 * but that's actually the job of the caller, so we return the
1617 * iovec to the original count
1619 iov_iter_reexpand(iter, original_count);
1623 kthread_unuse_mm(mm);
1625 /* the total bytes copied, or status */
1626 RETURN(original_count - count ? original_count - count : status);
1628 EXPORT_SYMBOL(ll_dio_user_copy);
1631 * Indicate that transfer of a single page completed.
1633 void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor,
1638 if (anchor->csi_sync_rc == 0 && ioret < 0)
1639 anchor->csi_sync_rc = ioret;
1641 * Synchronous IO done without releasing page lock (e.g., as a part of
1642 * ->{prepare,commit}_write(). Completion is used to signal the end of
1645 LASSERT(atomic_read(&anchor->csi_sync_nr) > 0);
1646 LASSERT(atomic_read(&anchor->csi_complete) == 0);
1647 if (atomic_dec_and_lock(&anchor->csi_sync_nr,
1648 &anchor->csi_waitq.lock)) {
1649 struct cl_sub_dio *sub_dio_aio = NULL;
1650 struct cl_dio_aio *dio_aio = NULL;
1651 void *csi_dio_aio = NULL;
1652 bool creator_free = true;
1654 cl_sync_io_end_t *end_io = anchor->csi_end_io;
1656 spin_unlock(&anchor->csi_waitq.lock);
1657 /* we cannot do end_io while holding a spin lock, because
1661 end_io(env, anchor);
1663 spin_lock(&anchor->csi_waitq.lock);
1664 /* this tells the waiters we've completed, and can only be set
1665 * after end_io() has been called and while we're holding the
1668 atomic_set(&anchor->csi_complete, 1);
1670 * Holding the lock across both the decrement and
1671 * the wakeup ensures cl_sync_io_wait() doesn't complete
1672 * before the wakeup completes and the contents of
1673 * of anchor become unsafe to access as the owner is free
1674 * to immediately reclaim anchor when cl_sync_io_wait()
1677 wake_up_locked(&anchor->csi_waitq);
1679 csi_dio_aio = anchor->csi_dio_aio;
1680 sub_dio_aio = csi_dio_aio;
1681 dio_aio = csi_dio_aio;
1683 if (csi_dio_aio && end_io == cl_dio_aio_end)
1684 creator_free = dio_aio->cda_creator_free;
1685 else if (csi_dio_aio && end_io == cl_sub_dio_end)
1686 creator_free = sub_dio_aio->csd_creator_free;
1688 spin_unlock(&anchor->csi_waitq.lock);
1690 if (csi_dio_aio && !creator_free) {
1691 if (end_io == cl_dio_aio_end)
1692 cl_dio_aio_free(env, dio_aio);
1693 else if (end_io == cl_sub_dio_end)
1694 cl_sub_dio_free(sub_dio_aio);
1699 EXPORT_SYMBOL(cl_sync_io_note);
1701 /* this function waits for completion of outstanding io and then re-initializes
1702 * the anchor used to track it. This is used to wait to complete DIO before
1703 * returning to userspace, and is never called for true AIO
1705 int cl_sync_io_wait_recycle(const struct lu_env *env, struct cl_sync_io *anchor,
1706 long timeout, int ioret)
1711 * @anchor was inited as 1 to prevent end_io to be
1712 * called before we add all pages for IO, so drop
1713 * one extra reference to make sure we could wait
1716 cl_sync_io_note(env, anchor, ioret);
1717 /* Wait for completion of outstanding dio before re-initializing for
1720 rc = cl_sync_io_wait(env, anchor, timeout);
1722 * One extra reference again, as if @anchor is
1723 * reused we assume it as 1 before using.
1725 atomic_add(1, &anchor->csi_sync_nr);
1726 /* we must also set this anchor as incomplete */
1727 atomic_set(&anchor->csi_complete, 0);
1731 EXPORT_SYMBOL(cl_sync_io_wait_recycle);