Whamcloud - gitweb
LU-4931 ladvise: Add feature of giving file access advices
[fs/lustre-release.git] / lustre / obdclass / cl_io.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Client IO.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  *   Author: Jinshan Xiong <jinshan.xiong@intel.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43
44 #include <linux/sched.h>
45 #include <obd_class.h>
46 #include <obd_support.h>
47 #include <lustre_fid.h>
48 #include <libcfs/list.h>
49 #include <cl_object.h>
50 #include "cl_internal.h"
51
52 /*****************************************************************************
53  *
54  * cl_io interface.
55  *
56  */
57
58 #define cl_io_for_each(slice, io) \
59         list_for_each_entry((slice), &io->ci_layers, cis_linkage)
60 #define cl_io_for_each_reverse(slice, io)                 \
61         list_for_each_entry_reverse((slice), &io->ci_layers, cis_linkage)
62
63 static inline int cl_io_type_is_valid(enum cl_io_type type)
64 {
65         return CIT_READ <= type && type < CIT_OP_NR;
66 }
67
68 static inline int cl_io_is_loopable(const struct cl_io *io)
69 {
70         return cl_io_type_is_valid(io->ci_type) && io->ci_type != CIT_MISC;
71 }
72
73 /**
74  * Returns true iff there is an IO ongoing in the given environment.
75  */
76 int cl_io_is_going(const struct lu_env *env)
77 {
78         return cl_env_info(env)->clt_current_io != NULL;
79 }
80
81 /**
82  * cl_io invariant that holds at all times when exported cl_io_*() functions
83  * are entered and left.
84  */
85 static int cl_io_invariant(const struct cl_io *io)
86 {
87         struct cl_io *up;
88
89         up = io->ci_parent;
90         return
91                 /*
92                  * io can own pages only when it is ongoing. Sub-io might
93                  * still be in CIS_LOCKED state when top-io is in
94                  * CIS_IO_GOING.
95                  */
96                 ergo(io->ci_owned_nr > 0, io->ci_state == CIS_IO_GOING ||
97                      (io->ci_state == CIS_LOCKED && up != NULL));
98 }
99
100 /**
101  * Finalize \a io, by calling cl_io_operations::cio_fini() bottom-to-top.
102  */
103 void cl_io_fini(const struct lu_env *env, struct cl_io *io)
104 {
105         struct cl_io_slice    *slice;
106         struct cl_thread_info *info;
107
108         LINVRNT(cl_io_type_is_valid(io->ci_type));
109         LINVRNT(cl_io_invariant(io));
110         ENTRY;
111
112         while (!list_empty(&io->ci_layers)) {
113                 slice = container_of(io->ci_layers.prev, struct cl_io_slice,
114                                      cis_linkage);
115                 list_del_init(&slice->cis_linkage);
116                 if (slice->cis_iop->op[io->ci_type].cio_fini != NULL)
117                         slice->cis_iop->op[io->ci_type].cio_fini(env, slice);
118                 /*
119                  * Invalidate slice to catch use after free. This assumes that
120                  * slices are allocated within session and can be touched
121                  * after ->cio_fini() returns.
122                  */
123                 slice->cis_io = NULL;
124         }
125         io->ci_state = CIS_FINI;
126         info = cl_env_info(env);
127         if (info->clt_current_io == io)
128                 info->clt_current_io = NULL;
129
130         /* sanity check for layout change */
131         switch(io->ci_type) {
132         case CIT_READ:
133         case CIT_WRITE:
134         case CIT_DATA_VERSION:
135         case CIT_FAULT:
136                 break;
137         case CIT_FSYNC:
138                 LASSERT(!io->ci_need_restart);
139                 break;
140         case CIT_SETATTR:
141         case CIT_MISC:
142                 /* Check ignore layout change conf */
143                 LASSERT(ergo(io->ci_ignore_layout || !io->ci_verify_layout,
144                                 !io->ci_need_restart));
145                 break;
146         case CIT_LADVISE:
147                 break;
148         default:
149                 LBUG();
150         }
151         EXIT;
152 }
153 EXPORT_SYMBOL(cl_io_fini);
154
155 static int cl_io_init0(const struct lu_env *env, struct cl_io *io,
156                        enum cl_io_type iot, struct cl_object *obj)
157 {
158         struct cl_object *scan;
159         int result;
160
161         LINVRNT(io->ci_state == CIS_ZERO || io->ci_state == CIS_FINI);
162         LINVRNT(cl_io_type_is_valid(iot));
163         LINVRNT(cl_io_invariant(io));
164         ENTRY;
165
166         io->ci_type = iot;
167         INIT_LIST_HEAD(&io->ci_lockset.cls_todo);
168         INIT_LIST_HEAD(&io->ci_lockset.cls_done);
169         INIT_LIST_HEAD(&io->ci_layers);
170
171         result = 0;
172         cl_object_for_each(scan, obj) {
173                 if (scan->co_ops->coo_io_init != NULL) {
174                         result = scan->co_ops->coo_io_init(env, scan, io);
175                         if (result != 0)
176                                 break;
177                 }
178         }
179         if (result == 0)
180                 io->ci_state = CIS_INIT;
181         RETURN(result);
182 }
183
184 /**
185  * Initialize sub-io, by calling cl_io_operations::cio_init() top-to-bottom.
186  *
187  * \pre obj != cl_object_top(obj)
188  */
189 int cl_io_sub_init(const struct lu_env *env, struct cl_io *io,
190                    enum cl_io_type iot, struct cl_object *obj)
191 {
192         struct cl_thread_info *info = cl_env_info(env);
193
194         LASSERT(obj != cl_object_top(obj));
195         if (info->clt_current_io == NULL)
196                 info->clt_current_io = io;
197         return cl_io_init0(env, io, iot, obj);
198 }
199 EXPORT_SYMBOL(cl_io_sub_init);
200
201 /**
202  * Initialize \a io, by calling cl_io_operations::cio_init() top-to-bottom.
203  *
204  * Caller has to call cl_io_fini() after a call to cl_io_init(), no matter
205  * what the latter returned.
206  *
207  * \pre obj == cl_object_top(obj)
208  * \pre cl_io_type_is_valid(iot)
209  * \post cl_io_type_is_valid(io->ci_type) && io->ci_type == iot
210  */
211 int cl_io_init(const struct lu_env *env, struct cl_io *io,
212                enum cl_io_type iot, struct cl_object *obj)
213 {
214         struct cl_thread_info *info = cl_env_info(env);
215
216         LASSERT(obj == cl_object_top(obj));
217         LASSERT(info->clt_current_io == NULL);
218
219         info->clt_current_io = io;
220         return cl_io_init0(env, io, iot, obj);
221 }
222 EXPORT_SYMBOL(cl_io_init);
223
224 /**
225  * Initialize read or write io.
226  *
227  * \pre iot == CIT_READ || iot == CIT_WRITE
228  */
229 int cl_io_rw_init(const struct lu_env *env, struct cl_io *io,
230                   enum cl_io_type iot, loff_t pos, size_t count)
231 {
232         LINVRNT(iot == CIT_READ || iot == CIT_WRITE);
233         LINVRNT(io->ci_obj != NULL);
234         ENTRY;
235
236         LU_OBJECT_HEADER(D_VFSTRACE, env, &io->ci_obj->co_lu,
237                          "io range: %u ["LPU64", "LPU64") %u %u\n",
238                          iot, (__u64)pos, (__u64)pos + count,
239                          io->u.ci_rw.crw_nonblock, io->u.ci_wr.wr_append);
240         io->u.ci_rw.crw_pos    = pos;
241         io->u.ci_rw.crw_count  = count;
242         RETURN(cl_io_init(env, io, iot, io->ci_obj));
243 }
244 EXPORT_SYMBOL(cl_io_rw_init);
245
246 static int cl_lock_descr_sort(const struct cl_lock_descr *d0,
247                               const struct cl_lock_descr *d1)
248 {
249         return lu_fid_cmp(lu_object_fid(&d0->cld_obj->co_lu),
250                           lu_object_fid(&d1->cld_obj->co_lu));
251 }
252
253 /*
254  * Sort locks in lexicographical order of their (fid, start-offset) pairs.
255  */
256 static void cl_io_locks_sort(struct cl_io *io)
257 {
258         int done = 0;
259
260         ENTRY;
261         /* hidden treasure: bubble sort for now. */
262         do {
263                 struct cl_io_lock_link *curr;
264                 struct cl_io_lock_link *prev;
265                 struct cl_io_lock_link *temp;
266
267                 done = 1;
268                 prev = NULL;
269
270                 list_for_each_entry_safe(curr, temp, &io->ci_lockset.cls_todo,
271                                          cill_linkage) {
272                         if (prev != NULL) {
273                                 switch (cl_lock_descr_sort(&prev->cill_descr,
274                                                            &curr->cill_descr)) {
275                                 case 0:
276                                         /*
277                                          * IMPOSSIBLE:  Identical locks are
278                                          *              already removed at
279                                          *              this point.
280                                          */
281                                 default:
282                                         LBUG();
283                                 case +1:
284                                         list_move_tail(&curr->cill_linkage,
285                                                        &prev->cill_linkage);
286                                         done = 0;
287                                         continue; /* don't change prev: it's
288                                                    * still "previous" */
289                                 case -1: /* already in order */
290                                         break;
291                                 }
292                         }
293                         prev = curr;
294                 }
295         } while (!done);
296         EXIT;
297 }
298
299 static void cl_lock_descr_merge(struct cl_lock_descr *d0,
300                                 const struct cl_lock_descr *d1)
301 {
302         d0->cld_start = min(d0->cld_start, d1->cld_start);
303         d0->cld_end = max(d0->cld_end, d1->cld_end);
304
305         if (d1->cld_mode == CLM_WRITE && d0->cld_mode != CLM_WRITE)
306                 d0->cld_mode = CLM_WRITE;
307
308         if (d1->cld_mode == CLM_GROUP && d0->cld_mode != CLM_GROUP)
309                 d0->cld_mode = CLM_GROUP;
310 }
311
312 static int cl_lockset_merge(const struct cl_lockset *set,
313                             const struct cl_lock_descr *need)
314 {
315         struct cl_io_lock_link *scan;
316
317         ENTRY;
318         list_for_each_entry(scan, &set->cls_todo, cill_linkage) {
319                 if (!cl_object_same(scan->cill_descr.cld_obj, need->cld_obj))
320                         continue;
321
322                 /* Merge locks for the same object because ldlm lock server
323                  * may expand the lock extent, otherwise there is a deadlock
324                  * case if two conflicted locks are queueud for the same object
325                  * and lock server expands one lock to overlap the another.
326                  * The side effect is that it can generate a multi-stripe lock
327                  * that may cause casacading problem */
328                 cl_lock_descr_merge(&scan->cill_descr, need);
329                 CDEBUG(D_VFSTRACE, "lock: %d: [%lu, %lu]\n",
330                        scan->cill_descr.cld_mode, scan->cill_descr.cld_start,
331                        scan->cill_descr.cld_end);
332                 RETURN(+1);
333         }
334         RETURN(0);
335 }
336
337 static int cl_lockset_lock(const struct lu_env *env, struct cl_io *io,
338                            struct cl_lockset *set)
339 {
340         struct cl_io_lock_link *link;
341         struct cl_io_lock_link *temp;
342         int result;
343
344         ENTRY;
345         result = 0;
346         list_for_each_entry_safe(link, temp, &set->cls_todo, cill_linkage) {
347                 result = cl_lock_request(env, io, &link->cill_lock);
348                 if (result < 0)
349                         break;
350
351                 list_move(&link->cill_linkage, &set->cls_done);
352         }
353         RETURN(result);
354 }
355
356 /**
357  * Takes locks necessary for the current iteration of io.
358  *
359  * Calls cl_io_operations::cio_lock() top-to-bottom to collect locks required
360  * by layers for the current iteration. Then sort locks (to avoid dead-locks),
361  * and acquire them.
362  */
363 int cl_io_lock(const struct lu_env *env, struct cl_io *io)
364 {
365         const struct cl_io_slice *scan;
366         int result = 0;
367
368         LINVRNT(cl_io_is_loopable(io));
369         LINVRNT(io->ci_state == CIS_IT_STARTED);
370         LINVRNT(cl_io_invariant(io));
371
372         ENTRY;
373         cl_io_for_each(scan, io) {
374                 if (scan->cis_iop->op[io->ci_type].cio_lock == NULL)
375                         continue;
376                 result = scan->cis_iop->op[io->ci_type].cio_lock(env, scan);
377                 if (result != 0)
378                         break;
379         }
380         if (result == 0) {
381                 cl_io_locks_sort(io);
382                 result = cl_lockset_lock(env, io, &io->ci_lockset);
383         }
384         if (result != 0)
385                 cl_io_unlock(env, io);
386         else
387                 io->ci_state = CIS_LOCKED;
388         RETURN(result);
389 }
390 EXPORT_SYMBOL(cl_io_lock);
391
392 /**
393  * Release locks takes by io.
394  */
395 void cl_io_unlock(const struct lu_env *env, struct cl_io *io)
396 {
397         struct cl_lockset        *set;
398         struct cl_io_lock_link   *link;
399         struct cl_io_lock_link   *temp;
400         const struct cl_io_slice *scan;
401
402         LASSERT(cl_io_is_loopable(io));
403         LASSERT(CIS_IT_STARTED <= io->ci_state && io->ci_state < CIS_UNLOCKED);
404         LINVRNT(cl_io_invariant(io));
405
406         ENTRY;
407         set = &io->ci_lockset;
408
409         list_for_each_entry_safe(link, temp, &set->cls_todo, cill_linkage) {
410                 list_del_init(&link->cill_linkage);
411                 if (link->cill_fini != NULL)
412                         link->cill_fini(env, link);
413         }
414
415         list_for_each_entry_safe(link, temp, &set->cls_done, cill_linkage) {
416                 list_del_init(&link->cill_linkage);
417                 cl_lock_release(env, &link->cill_lock);
418                 if (link->cill_fini != NULL)
419                         link->cill_fini(env, link);
420         }
421
422         cl_io_for_each_reverse(scan, io) {
423                 if (scan->cis_iop->op[io->ci_type].cio_unlock != NULL)
424                         scan->cis_iop->op[io->ci_type].cio_unlock(env, scan);
425         }
426         io->ci_state = CIS_UNLOCKED;
427         LASSERT(!cl_env_info(env)->clt_counters[CNL_TOP].ctc_nr_locks_acquired);
428         EXIT;
429 }
430 EXPORT_SYMBOL(cl_io_unlock);
431
432 /**
433  * Prepares next iteration of io.
434  *
435  * Calls cl_io_operations::cio_iter_init() top-to-bottom. This exists to give
436  * layers a chance to modify io parameters, e.g., so that lov can restrict io
437  * to a single stripe.
438  */
439 int cl_io_iter_init(const struct lu_env *env, struct cl_io *io)
440 {
441         const struct cl_io_slice *scan;
442         int result;
443
444         LINVRNT(cl_io_is_loopable(io));
445         LINVRNT(io->ci_state == CIS_INIT || io->ci_state == CIS_IT_ENDED);
446         LINVRNT(cl_io_invariant(io));
447
448         ENTRY;
449         result = 0;
450         cl_io_for_each(scan, io) {
451                 if (scan->cis_iop->op[io->ci_type].cio_iter_init == NULL)
452                         continue;
453                 result = scan->cis_iop->op[io->ci_type].cio_iter_init(env,
454                                                                       scan);
455                 if (result != 0)
456                         break;
457         }
458         if (result == 0)
459                 io->ci_state = CIS_IT_STARTED;
460         RETURN(result);
461 }
462 EXPORT_SYMBOL(cl_io_iter_init);
463
464 /**
465  * Finalizes io iteration.
466  *
467  * Calls cl_io_operations::cio_iter_fini() bottom-to-top.
468  */
469 void cl_io_iter_fini(const struct lu_env *env, struct cl_io *io)
470 {
471         const struct cl_io_slice *scan;
472
473         LINVRNT(cl_io_is_loopable(io));
474         LINVRNT(io->ci_state == CIS_UNLOCKED);
475         LINVRNT(cl_io_invariant(io));
476
477         ENTRY;
478         cl_io_for_each_reverse(scan, io) {
479                 if (scan->cis_iop->op[io->ci_type].cio_iter_fini != NULL)
480                         scan->cis_iop->op[io->ci_type].cio_iter_fini(env, scan);
481         }
482         io->ci_state = CIS_IT_ENDED;
483         EXIT;
484 }
485 EXPORT_SYMBOL(cl_io_iter_fini);
486
487 /**
488  * Records that read or write io progressed \a nob bytes forward.
489  */
490 void cl_io_rw_advance(const struct lu_env *env, struct cl_io *io, size_t nob)
491 {
492         const struct cl_io_slice *scan;
493
494         LINVRNT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE ||
495                 nob == 0);
496         LINVRNT(cl_io_is_loopable(io));
497         LINVRNT(cl_io_invariant(io));
498
499         ENTRY;
500
501         io->u.ci_rw.crw_pos   += nob;
502         io->u.ci_rw.crw_count -= nob;
503
504         /* layers have to be notified. */
505         cl_io_for_each_reverse(scan, io) {
506                 if (scan->cis_iop->op[io->ci_type].cio_advance != NULL)
507                         scan->cis_iop->op[io->ci_type].cio_advance(env, scan,
508                                                                    nob);
509         }
510         EXIT;
511 }
512
513 /**
514  * Adds a lock to a lockset.
515  */
516 int cl_io_lock_add(const struct lu_env *env, struct cl_io *io,
517                    struct cl_io_lock_link *link)
518 {
519         int result;
520
521         ENTRY;
522         if (cl_lockset_merge(&io->ci_lockset, &link->cill_descr))
523                 result = +1;
524         else {
525                 list_add(&link->cill_linkage, &io->ci_lockset.cls_todo);
526                 result = 0;
527         }
528         RETURN(result);
529 }
530 EXPORT_SYMBOL(cl_io_lock_add);
531
532 static void cl_free_io_lock_link(const struct lu_env *env,
533                                  struct cl_io_lock_link *link)
534 {
535         OBD_FREE_PTR(link);
536 }
537
538 /**
539  * Allocates new lock link, and uses it to add a lock to a lockset.
540  */
541 int cl_io_lock_alloc_add(const struct lu_env *env, struct cl_io *io,
542                          struct cl_lock_descr *descr)
543 {
544         struct cl_io_lock_link *link;
545         int result;
546
547         ENTRY;
548         OBD_ALLOC_PTR(link);
549         if (link != NULL) {
550                 link->cill_descr = *descr;
551                 link->cill_fini  = cl_free_io_lock_link;
552                 result = cl_io_lock_add(env, io, link);
553                 if (result) /* lock match */
554                         link->cill_fini(env, link);
555         } else
556                 result = -ENOMEM;
557
558         RETURN(result);
559 }
560 EXPORT_SYMBOL(cl_io_lock_alloc_add);
561
562 /**
563  * Starts io by calling cl_io_operations::cio_start() top-to-bottom.
564  */
565 int cl_io_start(const struct lu_env *env, struct cl_io *io)
566 {
567         const struct cl_io_slice *scan;
568         int result = 0;
569
570         LINVRNT(cl_io_is_loopable(io));
571         LINVRNT(io->ci_state == CIS_LOCKED);
572         LINVRNT(cl_io_invariant(io));
573         ENTRY;
574
575         io->ci_state = CIS_IO_GOING;
576         cl_io_for_each(scan, io) {
577                 if (scan->cis_iop->op[io->ci_type].cio_start == NULL)
578                         continue;
579                 result = scan->cis_iop->op[io->ci_type].cio_start(env, scan);
580                 if (result != 0)
581                         break;
582         }
583         if (result >= 0)
584                 result = 0;
585         RETURN(result);
586 }
587 EXPORT_SYMBOL(cl_io_start);
588
589 /**
590  * Wait until current io iteration is finished by calling
591  * cl_io_operations::cio_end() bottom-to-top.
592  */
593 void cl_io_end(const struct lu_env *env, struct cl_io *io)
594 {
595         const struct cl_io_slice *scan;
596
597         LINVRNT(cl_io_is_loopable(io));
598         LINVRNT(io->ci_state == CIS_IO_GOING);
599         LINVRNT(cl_io_invariant(io));
600         ENTRY;
601
602         cl_io_for_each_reverse(scan, io) {
603                 if (scan->cis_iop->op[io->ci_type].cio_end != NULL)
604                         scan->cis_iop->op[io->ci_type].cio_end(env, scan);
605                 /* TODO: error handling. */
606         }
607         io->ci_state = CIS_IO_FINISHED;
608         EXIT;
609 }
610 EXPORT_SYMBOL(cl_io_end);
611
612 /**
613  * Called by read io, to decide the readahead extent
614  *
615  * \see cl_io_operations::cio_read_ahead()
616  */
617 int cl_io_read_ahead(const struct lu_env *env, struct cl_io *io,
618                      pgoff_t start, struct cl_read_ahead *ra)
619 {
620         const struct cl_io_slice *scan;
621         int                       result = 0;
622
623         LINVRNT(io->ci_type == CIT_READ || io->ci_type == CIT_FAULT);
624         LINVRNT(io->ci_state == CIS_IO_GOING || io->ci_state == CIS_LOCKED);
625         LINVRNT(cl_io_invariant(io));
626         ENTRY;
627
628         cl_io_for_each(scan, io) {
629                 if (scan->cis_iop->cio_read_ahead == NULL)
630                         continue;
631
632                 result = scan->cis_iop->cio_read_ahead(env, scan, start, ra);
633                 if (result != 0)
634                         break;
635         }
636         RETURN(result > 0 ? 0 : result);
637 }
638 EXPORT_SYMBOL(cl_io_read_ahead);
639
640 /**
641  * Commit a list of contiguous pages into writeback cache.
642  *
643  * \returns 0 if all pages committed, or errcode if error occurred.
644  * \see cl_io_operations::cio_commit_async()
645  */
646 int cl_io_commit_async(const struct lu_env *env, struct cl_io *io,
647                         struct cl_page_list *queue, int from, int to,
648                         cl_commit_cbt cb)
649 {
650         const struct cl_io_slice *scan;
651         int result = 0;
652         ENTRY;
653
654         cl_io_for_each(scan, io) {
655                 if (scan->cis_iop->cio_commit_async == NULL)
656                         continue;
657                 result = scan->cis_iop->cio_commit_async(env, scan, queue,
658                                                          from, to, cb);
659                 if (result != 0)
660                         break;
661         }
662         RETURN(result);
663 }
664 EXPORT_SYMBOL(cl_io_commit_async);
665
666 /**
667  * Submits a list of pages for immediate io.
668  *
669  * After the function gets returned, The submitted pages are moved to
670  * queue->c2_qout queue, and queue->c2_qin contain both the pages don't need
671  * to be submitted, and the pages are errant to submit.
672  *
673  * \returns 0 if at least one page was submitted, error code otherwise.
674  * \see cl_io_operations::cio_submit()
675  */
676 int cl_io_submit_rw(const struct lu_env *env, struct cl_io *io,
677                     enum cl_req_type crt, struct cl_2queue *queue)
678 {
679         const struct cl_io_slice *scan;
680         int result = 0;
681         ENTRY;
682
683         cl_io_for_each(scan, io) {
684                 if (scan->cis_iop->cio_submit == NULL)
685                         continue;
686                 result = scan->cis_iop->cio_submit(env, scan, crt, queue);
687                 if (result != 0)
688                         break;
689         }
690         /*
691          * If ->cio_submit() failed, no pages were sent.
692          */
693         LASSERT(ergo(result != 0, list_empty(&queue->c2_qout.pl_pages)));
694         RETURN(result);
695 }
696 EXPORT_SYMBOL(cl_io_submit_rw);
697
698 /**
699  * Submit a sync_io and wait for the IO to be finished, or error happens.
700  * If \a timeout is zero, it means to wait for the IO unconditionally.
701  */
702 int cl_io_submit_sync(const struct lu_env *env, struct cl_io *io,
703                       enum cl_req_type iot, struct cl_2queue *queue,
704                       long timeout)
705 {
706         struct cl_sync_io *anchor = &cl_env_info(env)->clt_anchor;
707         struct cl_page *pg;
708         int rc;
709
710         cl_page_list_for_each(pg, &queue->c2_qin) {
711                 LASSERT(pg->cp_sync_io == NULL);
712                 pg->cp_sync_io = anchor;
713         }
714
715         cl_sync_io_init(anchor, queue->c2_qin.pl_nr, &cl_sync_io_end);
716         rc = cl_io_submit_rw(env, io, iot, queue);
717         if (rc == 0) {
718                 /*
719                  * If some pages weren't sent for any reason (e.g.,
720                  * read found up-to-date pages in the cache, or write found
721                  * clean pages), count them as completed to avoid infinite
722                  * wait.
723                  */
724                 cl_page_list_for_each(pg, &queue->c2_qin) {
725                         pg->cp_sync_io = NULL;
726                         cl_sync_io_note(env, anchor, 1);
727                 }
728
729                 /* wait for the IO to be finished. */
730                 rc = cl_sync_io_wait(env, anchor, timeout);
731                 cl_page_list_assume(env, io, &queue->c2_qout);
732         } else {
733                 LASSERT(list_empty(&queue->c2_qout.pl_pages));
734                 cl_page_list_for_each(pg, &queue->c2_qin)
735                         pg->cp_sync_io = NULL;
736         }
737         return rc;
738 }
739 EXPORT_SYMBOL(cl_io_submit_sync);
740
741 /**
742  * Cancel an IO which has been submitted by cl_io_submit_rw.
743  */
744 int cl_io_cancel(const struct lu_env *env, struct cl_io *io,
745                  struct cl_page_list *queue)
746 {
747         struct cl_page *page;
748         int result = 0;
749
750         CERROR("Canceling ongoing page trasmission\n");
751         cl_page_list_for_each(page, queue) {
752                 int rc;
753
754                 rc = cl_page_cancel(env, page);
755                 result = result ?: rc;
756         }
757         return result;
758 }
759
760 /**
761  * Main io loop.
762  *
763  * Pumps io through iterations calling
764  *
765  *    - cl_io_iter_init()
766  *
767  *    - cl_io_lock()
768  *
769  *    - cl_io_start()
770  *
771  *    - cl_io_end()
772  *
773  *    - cl_io_unlock()
774  *
775  *    - cl_io_iter_fini()
776  *
777  * repeatedly until there is no more io to do.
778  */
779 int cl_io_loop(const struct lu_env *env, struct cl_io *io)
780 {
781         int result   = 0;
782
783         LINVRNT(cl_io_is_loopable(io));
784         ENTRY;
785
786         do {
787                 size_t nob;
788
789                 io->ci_continue = 0;
790                 result = cl_io_iter_init(env, io);
791                 if (result == 0) {
792                         nob    = io->ci_nob;
793                         result = cl_io_lock(env, io);
794                         if (result == 0) {
795                                 /*
796                                  * Notify layers that locks has been taken,
797                                  * and do actual i/o.
798                                  *
799                                  *   - llite: kms, short read;
800                                  *   - llite: generic_file_read();
801                                  */
802                                 result = cl_io_start(env, io);
803                                 /*
804                                  * Send any remaining pending
805                                  * io, etc.
806                                  *
807                                  *   - llite: ll_rw_stats_tally.
808                                  */
809                                 cl_io_end(env, io);
810                                 cl_io_unlock(env, io);
811                                 cl_io_rw_advance(env, io, io->ci_nob - nob);
812                         }
813                 }
814                 cl_io_iter_fini(env, io);
815         } while (result == 0 && io->ci_continue);
816         if (result == 0)
817                 result = io->ci_result;
818         RETURN(result < 0 ? result : 0);
819 }
820 EXPORT_SYMBOL(cl_io_loop);
821
822 /**
823  * Adds io slice to the cl_io.
824  *
825  * This is called by cl_object_operations::coo_io_init() methods to add a
826  * per-layer state to the io. New state is added at the end of
827  * cl_io::ci_layers list, that is, it is at the bottom of the stack.
828  *
829  * \see cl_lock_slice_add(), cl_req_slice_add(), cl_page_slice_add()
830  */
831 void cl_io_slice_add(struct cl_io *io, struct cl_io_slice *slice,
832                      struct cl_object *obj,
833                      const struct cl_io_operations *ops)
834 {
835         struct list_head *linkage = &slice->cis_linkage;
836
837         LASSERT((linkage->prev == NULL && linkage->next == NULL) ||
838                 list_empty(linkage));
839         ENTRY;
840
841         list_add_tail(linkage, &io->ci_layers);
842         slice->cis_io  = io;
843         slice->cis_obj = obj;
844         slice->cis_iop = ops;
845         EXIT;
846 }
847 EXPORT_SYMBOL(cl_io_slice_add);
848
849
850 /**
851  * Initializes page list.
852  */
853 void cl_page_list_init(struct cl_page_list *plist)
854 {
855         ENTRY;
856         plist->pl_nr = 0;
857         INIT_LIST_HEAD(&plist->pl_pages);
858         plist->pl_owner = current;
859         EXIT;
860 }
861 EXPORT_SYMBOL(cl_page_list_init);
862
863 /**
864  * Adds a page to a page list.
865  */
866 void cl_page_list_add(struct cl_page_list *plist, struct cl_page *page)
867 {
868         ENTRY;
869         /* it would be better to check that page is owned by "current" io, but
870          * it is not passed here. */
871         LASSERT(page->cp_owner != NULL);
872         LINVRNT(plist->pl_owner == current);
873
874         LASSERT(list_empty(&page->cp_batch));
875         list_add_tail(&page->cp_batch, &plist->pl_pages);
876         ++plist->pl_nr;
877         lu_ref_add_at(&page->cp_reference, &page->cp_queue_ref, "queue", plist);
878         cl_page_get(page);
879         EXIT;
880 }
881 EXPORT_SYMBOL(cl_page_list_add);
882
883 /**
884  * Removes a page from a page list.
885  */
886 void cl_page_list_del(const struct lu_env *env,
887                       struct cl_page_list *plist, struct cl_page *page)
888 {
889         LASSERT(plist->pl_nr > 0);
890         LASSERT(cl_page_is_vmlocked(env, page));
891         LINVRNT(plist->pl_owner == current);
892
893         ENTRY;
894         list_del_init(&page->cp_batch);
895         --plist->pl_nr;
896         lu_ref_del_at(&page->cp_reference, &page->cp_queue_ref, "queue", plist);
897         cl_page_put(env, page);
898         EXIT;
899 }
900 EXPORT_SYMBOL(cl_page_list_del);
901
902 /**
903  * Moves a page from one page list to another.
904  */
905 void cl_page_list_move(struct cl_page_list *dst, struct cl_page_list *src,
906                        struct cl_page *page)
907 {
908         LASSERT(src->pl_nr > 0);
909         LINVRNT(dst->pl_owner == current);
910         LINVRNT(src->pl_owner == current);
911
912         ENTRY;
913         list_move_tail(&page->cp_batch, &dst->pl_pages);
914         --src->pl_nr;
915         ++dst->pl_nr;
916         lu_ref_set_at(&page->cp_reference, &page->cp_queue_ref, "queue",
917                       src, dst);
918         EXIT;
919 }
920 EXPORT_SYMBOL(cl_page_list_move);
921
922 /**
923  * Moves a page from one page list to the head of another list.
924  */
925 void cl_page_list_move_head(struct cl_page_list *dst, struct cl_page_list *src,
926                             struct cl_page *page)
927 {
928         LASSERT(src->pl_nr > 0);
929         LINVRNT(dst->pl_owner == current);
930         LINVRNT(src->pl_owner == current);
931
932         ENTRY;
933         list_move(&page->cp_batch, &dst->pl_pages);
934         --src->pl_nr;
935         ++dst->pl_nr;
936         lu_ref_set_at(&page->cp_reference, &page->cp_queue_ref, "queue",
937                         src, dst);
938         EXIT;
939 }
940 EXPORT_SYMBOL(cl_page_list_move_head);
941
942 /**
943  * splice the cl_page_list, just as list head does
944  */
945 void cl_page_list_splice(struct cl_page_list *list, struct cl_page_list *head)
946 {
947         struct cl_page *page;
948         struct cl_page *tmp;
949
950         LINVRNT(list->pl_owner == current);
951         LINVRNT(head->pl_owner == current);
952
953         ENTRY;
954         cl_page_list_for_each_safe(page, tmp, list)
955                 cl_page_list_move(head, list, page);
956         EXIT;
957 }
958 EXPORT_SYMBOL(cl_page_list_splice);
959
960 /**
961  * Disowns pages in a queue.
962  */
963 void cl_page_list_disown(const struct lu_env *env,
964                          struct cl_io *io, struct cl_page_list *plist)
965 {
966         struct cl_page *page;
967         struct cl_page *temp;
968
969         LINVRNT(plist->pl_owner == current);
970
971         ENTRY;
972         cl_page_list_for_each_safe(page, temp, plist) {
973                 LASSERT(plist->pl_nr > 0);
974
975                 list_del_init(&page->cp_batch);
976                 --plist->pl_nr;
977                 /*
978                  * cl_page_disown0 rather than usual cl_page_disown() is used,
979                  * because pages are possibly in CPS_FREEING state already due
980                  * to the call to cl_page_list_discard().
981                  */
982                 /*
983                  * XXX cl_page_disown0() will fail if page is not locked.
984                  */
985                 cl_page_disown0(env, io, page);
986                 lu_ref_del_at(&page->cp_reference, &page->cp_queue_ref, "queue",
987                               plist);
988                 cl_page_put(env, page);
989         }
990         EXIT;
991 }
992 EXPORT_SYMBOL(cl_page_list_disown);
993
994 /**
995  * Releases pages from queue.
996  */
997 void cl_page_list_fini(const struct lu_env *env, struct cl_page_list *plist)
998 {
999         struct cl_page *page;
1000         struct cl_page *temp;
1001
1002         LINVRNT(plist->pl_owner == current);
1003
1004         ENTRY;
1005         cl_page_list_for_each_safe(page, temp, plist)
1006                 cl_page_list_del(env, plist, page);
1007         LASSERT(plist->pl_nr == 0);
1008         EXIT;
1009 }
1010 EXPORT_SYMBOL(cl_page_list_fini);
1011
1012 /**
1013  * Owns all pages in a queue.
1014  */
1015 int cl_page_list_own(const struct lu_env *env,
1016                      struct cl_io *io, struct cl_page_list *plist)
1017 {
1018         struct cl_page *page;
1019         struct cl_page *temp;
1020         int result;
1021
1022         LINVRNT(plist->pl_owner == current);
1023
1024         ENTRY;
1025         result = 0;
1026         cl_page_list_for_each_safe(page, temp, plist) {
1027                 if (cl_page_own(env, io, page) == 0)
1028                         result = result ?: page->cp_error;
1029                 else
1030                         cl_page_list_del(env, plist, page);
1031         }
1032         RETURN(result);
1033 }
1034
1035 /**
1036  * Assumes all pages in a queue.
1037  */
1038 void cl_page_list_assume(const struct lu_env *env,
1039                          struct cl_io *io, struct cl_page_list *plist)
1040 {
1041         struct cl_page *page;
1042
1043         LINVRNT(plist->pl_owner == current);
1044
1045         cl_page_list_for_each(page, plist)
1046                 cl_page_assume(env, io, page);
1047 }
1048
1049 /**
1050  * Discards all pages in a queue.
1051  */
1052 void cl_page_list_discard(const struct lu_env *env, struct cl_io *io,
1053                           struct cl_page_list *plist)
1054 {
1055         struct cl_page *page;
1056
1057         LINVRNT(plist->pl_owner == current);
1058         ENTRY;
1059         cl_page_list_for_each(page, plist)
1060                 cl_page_discard(env, io, page);
1061         EXIT;
1062 }
1063
1064 /**
1065  * Initialize dual page queue.
1066  */
1067 void cl_2queue_init(struct cl_2queue *queue)
1068 {
1069         ENTRY;
1070         cl_page_list_init(&queue->c2_qin);
1071         cl_page_list_init(&queue->c2_qout);
1072         EXIT;
1073 }
1074 EXPORT_SYMBOL(cl_2queue_init);
1075
1076 /**
1077  * Add a page to the incoming page list of 2-queue.
1078  */
1079 void cl_2queue_add(struct cl_2queue *queue, struct cl_page *page)
1080 {
1081         ENTRY;
1082         cl_page_list_add(&queue->c2_qin, page);
1083         EXIT;
1084 }
1085 EXPORT_SYMBOL(cl_2queue_add);
1086
1087 /**
1088  * Disown pages in both lists of a 2-queue.
1089  */
1090 void cl_2queue_disown(const struct lu_env *env,
1091                       struct cl_io *io, struct cl_2queue *queue)
1092 {
1093         ENTRY;
1094         cl_page_list_disown(env, io, &queue->c2_qin);
1095         cl_page_list_disown(env, io, &queue->c2_qout);
1096         EXIT;
1097 }
1098 EXPORT_SYMBOL(cl_2queue_disown);
1099
1100 /**
1101  * Discard (truncate) pages in both lists of a 2-queue.
1102  */
1103 void cl_2queue_discard(const struct lu_env *env,
1104                        struct cl_io *io, struct cl_2queue *queue)
1105 {
1106         ENTRY;
1107         cl_page_list_discard(env, io, &queue->c2_qin);
1108         cl_page_list_discard(env, io, &queue->c2_qout);
1109         EXIT;
1110 }
1111 EXPORT_SYMBOL(cl_2queue_discard);
1112
1113 /**
1114  * Assume to own the pages in cl_2queue
1115  */
1116 void cl_2queue_assume(const struct lu_env *env,
1117                       struct cl_io *io, struct cl_2queue *queue)
1118 {
1119         cl_page_list_assume(env, io, &queue->c2_qin);
1120         cl_page_list_assume(env, io, &queue->c2_qout);
1121 }
1122
1123 /**
1124  * Finalize both page lists of a 2-queue.
1125  */
1126 void cl_2queue_fini(const struct lu_env *env, struct cl_2queue *queue)
1127 {
1128         ENTRY;
1129         cl_page_list_fini(env, &queue->c2_qout);
1130         cl_page_list_fini(env, &queue->c2_qin);
1131         EXIT;
1132 }
1133 EXPORT_SYMBOL(cl_2queue_fini);
1134
1135 /**
1136  * Initialize a 2-queue to contain \a page in its incoming page list.
1137  */
1138 void cl_2queue_init_page(struct cl_2queue *queue, struct cl_page *page)
1139 {
1140         ENTRY;
1141         cl_2queue_init(queue);
1142         cl_2queue_add(queue, page);
1143         EXIT;
1144 }
1145 EXPORT_SYMBOL(cl_2queue_init_page);
1146
1147 /**
1148  * Returns top-level io.
1149  *
1150  * \see cl_object_top()
1151  */
1152 struct cl_io *cl_io_top(struct cl_io *io)
1153 {
1154         ENTRY;
1155         while (io->ci_parent != NULL)
1156                 io = io->ci_parent;
1157         RETURN(io);
1158 }
1159 EXPORT_SYMBOL(cl_io_top);
1160
1161 /**
1162  * Prints human readable representation of \a io to the \a f.
1163  */
1164 void cl_io_print(const struct lu_env *env, void *cookie,
1165                  lu_printer_t printer, const struct cl_io *io)
1166 {
1167 }
1168
1169 /**
1170  * Fills in attributes that are passed to server together with transfer. Only
1171  * attributes from \a flags may be touched. This can be called multiple times
1172  * for the same request.
1173  */
1174 void cl_req_attr_set(const struct lu_env *env, struct cl_object *obj,
1175                      struct cl_req_attr *attr)
1176 {
1177         struct cl_object *scan;
1178         ENTRY;
1179
1180         cl_object_for_each(scan, obj) {
1181                 if (scan->co_ops->coo_req_attr_set != NULL)
1182                         scan->co_ops->coo_req_attr_set(env, scan, attr);
1183         }
1184         EXIT;
1185 }
1186 EXPORT_SYMBOL(cl_req_attr_set);
1187
1188 /* cl_sync_io_callback assumes the caller must call cl_sync_io_wait() to
1189  * wait for the IO to finish. */
1190 void cl_sync_io_end(const struct lu_env *env, struct cl_sync_io *anchor)
1191 {
1192         wake_up_all(&anchor->csi_waitq);
1193
1194         /* it's safe to nuke or reuse anchor now */
1195         atomic_set(&anchor->csi_barrier, 0);
1196 }
1197 EXPORT_SYMBOL(cl_sync_io_end);
1198
1199 /**
1200  * Initialize synchronous io wait anchor
1201  */
1202 void cl_sync_io_init(struct cl_sync_io *anchor, int nr,
1203                      void (*end)(const struct lu_env *, struct cl_sync_io *))
1204 {
1205         ENTRY;
1206         memset(anchor, 0, sizeof(*anchor));
1207         init_waitqueue_head(&anchor->csi_waitq);
1208         atomic_set(&anchor->csi_sync_nr, nr);
1209         atomic_set(&anchor->csi_barrier, nr > 0);
1210         anchor->csi_sync_rc = 0;
1211         anchor->csi_end_io = end;
1212         LASSERT(end != NULL);
1213         EXIT;
1214 }
1215 EXPORT_SYMBOL(cl_sync_io_init);
1216
1217 /**
1218  * Wait until all IO completes. Transfer completion routine has to call
1219  * cl_sync_io_note() for every entity.
1220  */
1221 int cl_sync_io_wait(const struct lu_env *env, struct cl_sync_io *anchor,
1222                     long timeout)
1223 {
1224         struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout),
1225                                                   NULL, NULL, NULL);
1226         int rc;
1227         ENTRY;
1228
1229         LASSERT(timeout >= 0);
1230
1231         rc = l_wait_event(anchor->csi_waitq,
1232                           atomic_read(&anchor->csi_sync_nr) == 0,
1233                           &lwi);
1234         if (rc < 0) {
1235                 CERROR("IO failed: %d, still wait for %d remaining entries\n",
1236                        rc, atomic_read(&anchor->csi_sync_nr));
1237
1238                 lwi = (struct l_wait_info) { 0 };
1239                 (void)l_wait_event(anchor->csi_waitq,
1240                                    atomic_read(&anchor->csi_sync_nr) == 0,
1241                                    &lwi);
1242         } else {
1243                 rc = anchor->csi_sync_rc;
1244         }
1245         LASSERT(atomic_read(&anchor->csi_sync_nr) == 0);
1246
1247         /* wait until cl_sync_io_note() has done wakeup */
1248         while (unlikely(atomic_read(&anchor->csi_barrier) != 0)) {
1249                 cpu_relax();
1250         }
1251         RETURN(rc);
1252 }
1253 EXPORT_SYMBOL(cl_sync_io_wait);
1254
1255 /**
1256  * Indicate that transfer of a single page completed.
1257  */
1258 void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor,
1259                      int ioret)
1260 {
1261         ENTRY;
1262         if (anchor->csi_sync_rc == 0 && ioret < 0)
1263                 anchor->csi_sync_rc = ioret;
1264         /*
1265          * Synchronous IO done without releasing page lock (e.g., as a part of
1266          * ->{prepare,commit}_write(). Completion is used to signal the end of
1267          * IO.
1268          */
1269         LASSERT(atomic_read(&anchor->csi_sync_nr) > 0);
1270         if (atomic_dec_and_test(&anchor->csi_sync_nr)) {
1271                 LASSERT(anchor->csi_end_io != NULL);
1272                 anchor->csi_end_io(env, anchor);
1273                 /* Can't access anchor any more */
1274         }
1275         EXIT;
1276 }
1277 EXPORT_SYMBOL(cl_sync_io_note);