Whamcloud - gitweb
LU-9575 obdclass: remove cl_for_each defines
[fs/lustre-release.git] / lustre / obdclass / cl_io.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * Client IO.
33  *
34  *   Author: Nikita Danilov <nikita.danilov@sun.com>
35  *   Author: Jinshan Xiong <jinshan.xiong@intel.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/sched.h>
41 #include <linux/list.h>
42 #include <obd_class.h>
43 #include <obd_support.h>
44 #include <lustre_fid.h>
45 #include <cl_object.h>
46 #include "cl_internal.h"
47 #include <lustre_compat.h>
48
49 /*****************************************************************************
50  *
51  * cl_io interface.
52  *
53  */
54
55 static inline int cl_io_type_is_valid(enum cl_io_type type)
56 {
57         return CIT_READ <= type && type < CIT_OP_NR;
58 }
59
60 static inline int cl_io_is_loopable(const struct cl_io *io)
61 {
62         return cl_io_type_is_valid(io->ci_type) && io->ci_type != CIT_MISC;
63 }
64
65 /**
66  * cl_io invariant that holds at all times when exported cl_io_*() functions
67  * are entered and left.
68  */
69 static int cl_io_invariant(const struct cl_io *io)
70 {
71         struct cl_io *up;
72
73         up = io->ci_parent;
74         return
75                 /*
76                  * io can own pages only when it is ongoing. Sub-io might
77                  * still be in CIS_LOCKED state when top-io is in
78                  * CIS_IO_GOING.
79                  */
80                 ergo(io->ci_owned_nr > 0, io->ci_state == CIS_IO_GOING ||
81                      (io->ci_state == CIS_LOCKED && up != NULL));
82 }
83
84 /**
85  * Finalize \a io, by calling cl_io_operations::cio_fini() bottom-to-top.
86  */
87 void cl_io_fini(const struct lu_env *env, struct cl_io *io)
88 {
89         struct cl_io_slice    *slice;
90
91         LINVRNT(cl_io_type_is_valid(io->ci_type));
92         LINVRNT(cl_io_invariant(io));
93         ENTRY;
94
95         while (!list_empty(&io->ci_layers)) {
96                 slice = container_of(io->ci_layers.prev, struct cl_io_slice,
97                                      cis_linkage);
98                 list_del_init(&slice->cis_linkage);
99                 if (slice->cis_iop->op[io->ci_type].cio_fini != NULL)
100                         slice->cis_iop->op[io->ci_type].cio_fini(env, slice);
101                 /*
102                  * Invalidate slice to catch use after free. This assumes that
103                  * slices are allocated within session and can be touched
104                  * after ->cio_fini() returns.
105                  */
106                 slice->cis_io = NULL;
107         }
108         io->ci_state = CIS_FINI;
109
110         /* sanity check for layout change */
111         switch(io->ci_type) {
112         case CIT_READ:
113         case CIT_WRITE:
114         case CIT_DATA_VERSION:
115         case CIT_FAULT:
116                 break;
117         case CIT_FSYNC:
118                 LASSERT(!io->ci_need_restart);
119                 break;
120         case CIT_SETATTR:
121         case CIT_MISC:
122                 /* Check ignore layout change conf */
123                 LASSERT(ergo(io->ci_ignore_layout || !io->ci_verify_layout,
124                                 !io->ci_need_restart));
125                 break;
126         case CIT_LADVISE:
127                 break;
128         default:
129                 LBUG();
130         }
131         EXIT;
132 }
133 EXPORT_SYMBOL(cl_io_fini);
134
135 static int cl_io_init0(const struct lu_env *env, struct cl_io *io,
136                        enum cl_io_type iot, struct cl_object *obj)
137 {
138         struct cl_object *scan;
139         int result;
140
141         LINVRNT(io->ci_state == CIS_ZERO || io->ci_state == CIS_FINI);
142         LINVRNT(cl_io_type_is_valid(iot));
143         LINVRNT(cl_io_invariant(io));
144         ENTRY;
145
146         io->ci_type = iot;
147         INIT_LIST_HEAD(&io->ci_lockset.cls_todo);
148         INIT_LIST_HEAD(&io->ci_lockset.cls_done);
149         INIT_LIST_HEAD(&io->ci_layers);
150
151         result = 0;
152         cl_object_for_each(scan, obj) {
153                 if (scan->co_ops->coo_io_init != NULL) {
154                         result = scan->co_ops->coo_io_init(env, scan, io);
155                         if (result != 0)
156                                 break;
157                 }
158         }
159         if (result == 0)
160                 io->ci_state = CIS_INIT;
161         RETURN(result);
162 }
163
164 /**
165  * Initialize sub-io, by calling cl_io_operations::cio_init() top-to-bottom.
166  *
167  * \pre obj != cl_object_top(obj)
168  */
169 int cl_io_sub_init(const struct lu_env *env, struct cl_io *io,
170                    enum cl_io_type iot, struct cl_object *obj)
171 {
172         LASSERT(obj != cl_object_top(obj));
173
174         return cl_io_init0(env, io, iot, obj);
175 }
176 EXPORT_SYMBOL(cl_io_sub_init);
177
178 /**
179  * Initialize \a io, by calling cl_io_operations::cio_init() top-to-bottom.
180  *
181  * Caller has to call cl_io_fini() after a call to cl_io_init(), no matter
182  * what the latter returned.
183  *
184  * \pre obj == cl_object_top(obj)
185  * \pre cl_io_type_is_valid(iot)
186  * \post cl_io_type_is_valid(io->ci_type) && io->ci_type == iot
187  */
188 int cl_io_init(const struct lu_env *env, struct cl_io *io,
189                enum cl_io_type iot, struct cl_object *obj)
190 {
191         LASSERT(obj == cl_object_top(obj));
192
193         return cl_io_init0(env, io, iot, obj);
194 }
195 EXPORT_SYMBOL(cl_io_init);
196
197 /**
198  * Initialize read or write io.
199  *
200  * \pre iot == CIT_READ || iot == CIT_WRITE
201  */
202 int cl_io_rw_init(const struct lu_env *env, struct cl_io *io,
203                   enum cl_io_type iot, loff_t pos, size_t count)
204 {
205         LINVRNT(iot == CIT_READ || iot == CIT_WRITE);
206         LINVRNT(io->ci_obj != NULL);
207         ENTRY;
208
209         if (cfs_ptengine_weight(cl_io_engine) < 2)
210                 io->ci_pio = 0;
211
212         LU_OBJECT_HEADER(D_VFSTRACE, env, &io->ci_obj->co_lu,
213                          "io %s range: [%llu, %llu) %s %s %s %s\n",
214                          iot == CIT_READ ? "read" : "write",
215                          pos, pos + count,
216                          io->u.ci_rw.rw_nonblock ? "nonblock" : "block",
217                          io->u.ci_rw.rw_append ? "append" : "-",
218                          io->u.ci_rw.rw_sync ? "sync" : "-",
219                          io->ci_pio ? "pio" : "-");
220
221         io->u.ci_rw.rw_range.cir_pos   = pos;
222         io->u.ci_rw.rw_range.cir_count = count;
223
224         RETURN(cl_io_init(env, io, iot, io->ci_obj));
225 }
226 EXPORT_SYMBOL(cl_io_rw_init);
227
228 static int cl_lock_descr_sort(const struct cl_lock_descr *d0,
229                               const struct cl_lock_descr *d1)
230 {
231         return lu_fid_cmp(lu_object_fid(&d0->cld_obj->co_lu),
232                           lu_object_fid(&d1->cld_obj->co_lu));
233 }
234
235 /*
236  * Sort locks in lexicographical order of their (fid, start-offset) pairs.
237  */
238 static void cl_io_locks_sort(struct cl_io *io)
239 {
240         int done = 0;
241
242         ENTRY;
243         /* hidden treasure: bubble sort for now. */
244         do {
245                 struct cl_io_lock_link *curr;
246                 struct cl_io_lock_link *prev;
247                 struct cl_io_lock_link *temp;
248
249                 done = 1;
250                 prev = NULL;
251
252                 list_for_each_entry_safe(curr, temp, &io->ci_lockset.cls_todo,
253                                          cill_linkage) {
254                         if (prev != NULL) {
255                                 switch (cl_lock_descr_sort(&prev->cill_descr,
256                                                            &curr->cill_descr)) {
257                                 case 0:
258                                         /*
259                                          * IMPOSSIBLE:  Identical locks are
260                                          *              already removed at
261                                          *              this point.
262                                          */
263                                 default:
264                                         LBUG();
265                                 case +1:
266                                         list_move_tail(&curr->cill_linkage,
267                                                        &prev->cill_linkage);
268                                         done = 0;
269                                         continue; /* don't change prev: it's
270                                                    * still "previous" */
271                                 case -1: /* already in order */
272                                         break;
273                                 }
274                         }
275                         prev = curr;
276                 }
277         } while (!done);
278         EXIT;
279 }
280
281 static void cl_lock_descr_merge(struct cl_lock_descr *d0,
282                                 const struct cl_lock_descr *d1)
283 {
284         d0->cld_start = min(d0->cld_start, d1->cld_start);
285         d0->cld_end = max(d0->cld_end, d1->cld_end);
286
287         if (d1->cld_mode == CLM_WRITE && d0->cld_mode != CLM_WRITE)
288                 d0->cld_mode = CLM_WRITE;
289
290         if (d1->cld_mode == CLM_GROUP && d0->cld_mode != CLM_GROUP)
291                 d0->cld_mode = CLM_GROUP;
292 }
293
294 static int cl_lockset_merge(const struct cl_lockset *set,
295                             const struct cl_lock_descr *need)
296 {
297         struct cl_io_lock_link *scan;
298
299         ENTRY;
300         list_for_each_entry(scan, &set->cls_todo, cill_linkage) {
301                 if (!cl_object_same(scan->cill_descr.cld_obj, need->cld_obj))
302                         continue;
303
304                 /* Merge locks for the same object because ldlm lock server
305                  * may expand the lock extent, otherwise there is a deadlock
306                  * case if two conflicted locks are queueud for the same object
307                  * and lock server expands one lock to overlap the another.
308                  * The side effect is that it can generate a multi-stripe lock
309                  * that may cause casacading problem */
310                 cl_lock_descr_merge(&scan->cill_descr, need);
311                 CDEBUG(D_VFSTRACE, "lock: %d: [%lu, %lu]\n",
312                        scan->cill_descr.cld_mode, scan->cill_descr.cld_start,
313                        scan->cill_descr.cld_end);
314                 RETURN(+1);
315         }
316         RETURN(0);
317 }
318
319 static int cl_lockset_lock(const struct lu_env *env, struct cl_io *io,
320                            struct cl_lockset *set)
321 {
322         struct cl_io_lock_link *link;
323         struct cl_io_lock_link *temp;
324         int result;
325
326         ENTRY;
327         result = 0;
328         list_for_each_entry_safe(link, temp, &set->cls_todo, cill_linkage) {
329                 result = cl_lock_request(env, io, &link->cill_lock);
330                 if (result < 0)
331                         break;
332
333                 list_move(&link->cill_linkage, &set->cls_done);
334         }
335         RETURN(result);
336 }
337
338 /**
339  * Takes locks necessary for the current iteration of io.
340  *
341  * Calls cl_io_operations::cio_lock() top-to-bottom to collect locks required
342  * by layers for the current iteration. Then sort locks (to avoid dead-locks),
343  * and acquire them.
344  */
345 int cl_io_lock(const struct lu_env *env, struct cl_io *io)
346 {
347         const struct cl_io_slice *scan;
348         int result = 0;
349
350         LINVRNT(cl_io_is_loopable(io));
351         LINVRNT(io->ci_state == CIS_IT_STARTED);
352         LINVRNT(cl_io_invariant(io));
353
354         ENTRY;
355         list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
356                 if (scan->cis_iop->op[io->ci_type].cio_lock == NULL)
357                         continue;
358                 result = scan->cis_iop->op[io->ci_type].cio_lock(env, scan);
359                 if (result != 0)
360                         break;
361         }
362         if (result == 0) {
363                 cl_io_locks_sort(io);
364                 result = cl_lockset_lock(env, io, &io->ci_lockset);
365         }
366         if (result != 0)
367                 cl_io_unlock(env, io);
368         else
369                 io->ci_state = CIS_LOCKED;
370         RETURN(result);
371 }
372 EXPORT_SYMBOL(cl_io_lock);
373
374 /**
375  * Release locks takes by io.
376  */
377 void cl_io_unlock(const struct lu_env *env, struct cl_io *io)
378 {
379         struct cl_lockset        *set;
380         struct cl_io_lock_link   *link;
381         struct cl_io_lock_link   *temp;
382         const struct cl_io_slice *scan;
383
384         LASSERT(cl_io_is_loopable(io));
385         LASSERT(CIS_IT_STARTED <= io->ci_state && io->ci_state < CIS_UNLOCKED);
386         LINVRNT(cl_io_invariant(io));
387
388         ENTRY;
389         set = &io->ci_lockset;
390
391         list_for_each_entry_safe(link, temp, &set->cls_todo, cill_linkage) {
392                 list_del_init(&link->cill_linkage);
393                 if (link->cill_fini != NULL)
394                         link->cill_fini(env, link);
395         }
396
397         list_for_each_entry_safe(link, temp, &set->cls_done, cill_linkage) {
398                 list_del_init(&link->cill_linkage);
399                 cl_lock_release(env, &link->cill_lock);
400                 if (link->cill_fini != NULL)
401                         link->cill_fini(env, link);
402         }
403
404         list_for_each_entry_reverse(scan, &io->ci_layers, cis_linkage) {
405                 if (scan->cis_iop->op[io->ci_type].cio_unlock != NULL)
406                         scan->cis_iop->op[io->ci_type].cio_unlock(env, scan);
407         }
408         io->ci_state = CIS_UNLOCKED;
409         EXIT;
410 }
411 EXPORT_SYMBOL(cl_io_unlock);
412
413 /**
414  * Prepares next iteration of io.
415  *
416  * Calls cl_io_operations::cio_iter_init() top-to-bottom. This exists to give
417  * layers a chance to modify io parameters, e.g., so that lov can restrict io
418  * to a single stripe.
419  */
420 int cl_io_iter_init(const struct lu_env *env, struct cl_io *io)
421 {
422         const struct cl_io_slice *scan;
423         int result;
424
425         LINVRNT(cl_io_is_loopable(io));
426         LINVRNT(io->ci_state == CIS_INIT || io->ci_state == CIS_IT_ENDED);
427         LINVRNT(cl_io_invariant(io));
428
429         ENTRY;
430         result = 0;
431         list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
432                 if (scan->cis_iop->op[io->ci_type].cio_iter_init == NULL)
433                         continue;
434                 result = scan->cis_iop->op[io->ci_type].cio_iter_init(env,
435                                                                       scan);
436                 if (result != 0)
437                         break;
438         }
439         if (result == 0)
440                 io->ci_state = CIS_IT_STARTED;
441         RETURN(result);
442 }
443 EXPORT_SYMBOL(cl_io_iter_init);
444
445 /**
446  * Finalizes io iteration.
447  *
448  * Calls cl_io_operations::cio_iter_fini() bottom-to-top.
449  */
450 void cl_io_iter_fini(const struct lu_env *env, struct cl_io *io)
451 {
452         const struct cl_io_slice *scan;
453
454         LINVRNT(cl_io_is_loopable(io));
455         LINVRNT(io->ci_state == CIS_UNLOCKED);
456         LINVRNT(cl_io_invariant(io));
457
458         ENTRY;
459         list_for_each_entry_reverse(scan, &io->ci_layers, cis_linkage) {
460                 if (scan->cis_iop->op[io->ci_type].cio_iter_fini != NULL)
461                         scan->cis_iop->op[io->ci_type].cio_iter_fini(env, scan);
462         }
463         io->ci_state = CIS_IT_ENDED;
464         EXIT;
465 }
466 EXPORT_SYMBOL(cl_io_iter_fini);
467
468 /**
469  * Records that read or write io progressed \a nob bytes forward.
470  */
471 void cl_io_rw_advance(const struct lu_env *env, struct cl_io *io, size_t nob)
472 {
473         const struct cl_io_slice *scan;
474
475         LINVRNT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE ||
476                 nob == 0);
477         LINVRNT(cl_io_is_loopable(io));
478         LINVRNT(cl_io_invariant(io));
479
480         ENTRY;
481
482         io->u.ci_rw.rw_range.cir_pos   += nob;
483         io->u.ci_rw.rw_range.cir_count -= nob;
484
485         /* layers have to be notified. */
486         list_for_each_entry_reverse(scan, &io->ci_layers, cis_linkage) {
487                 if (scan->cis_iop->op[io->ci_type].cio_advance != NULL)
488                         scan->cis_iop->op[io->ci_type].cio_advance(env, scan,
489                                                                    nob);
490         }
491         EXIT;
492 }
493
494 /**
495  * Adds a lock to a lockset.
496  */
497 int cl_io_lock_add(const struct lu_env *env, struct cl_io *io,
498                    struct cl_io_lock_link *link)
499 {
500         int result;
501
502         ENTRY;
503         if (cl_lockset_merge(&io->ci_lockset, &link->cill_descr))
504                 result = +1;
505         else {
506                 list_add(&link->cill_linkage, &io->ci_lockset.cls_todo);
507                 result = 0;
508         }
509         RETURN(result);
510 }
511 EXPORT_SYMBOL(cl_io_lock_add);
512
513 static void cl_free_io_lock_link(const struct lu_env *env,
514                                  struct cl_io_lock_link *link)
515 {
516         OBD_FREE_PTR(link);
517 }
518
519 /**
520  * Allocates new lock link, and uses it to add a lock to a lockset.
521  */
522 int cl_io_lock_alloc_add(const struct lu_env *env, struct cl_io *io,
523                          struct cl_lock_descr *descr)
524 {
525         struct cl_io_lock_link *link;
526         int result;
527
528         ENTRY;
529         OBD_ALLOC_PTR(link);
530         if (link != NULL) {
531                 link->cill_descr = *descr;
532                 link->cill_fini  = cl_free_io_lock_link;
533                 result = cl_io_lock_add(env, io, link);
534                 if (result) /* lock match */
535                         link->cill_fini(env, link);
536         } else
537                 result = -ENOMEM;
538
539         RETURN(result);
540 }
541 EXPORT_SYMBOL(cl_io_lock_alloc_add);
542
543 /**
544  * Starts io by calling cl_io_operations::cio_start() top-to-bottom.
545  */
546 int cl_io_start(const struct lu_env *env, struct cl_io *io)
547 {
548         const struct cl_io_slice *scan;
549         int result = 0;
550
551         LINVRNT(cl_io_is_loopable(io));
552         LINVRNT(io->ci_state == CIS_LOCKED);
553         LINVRNT(cl_io_invariant(io));
554         ENTRY;
555
556         io->ci_state = CIS_IO_GOING;
557         list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
558                 if (scan->cis_iop->op[io->ci_type].cio_start == NULL)
559                         continue;
560                 result = scan->cis_iop->op[io->ci_type].cio_start(env, scan);
561                 if (result != 0)
562                         break;
563         }
564         if (result >= 0)
565                 result = 0;
566         RETURN(result);
567 }
568 EXPORT_SYMBOL(cl_io_start);
569
570 /**
571  * Wait until current io iteration is finished by calling
572  * cl_io_operations::cio_end() bottom-to-top.
573  */
574 void cl_io_end(const struct lu_env *env, struct cl_io *io)
575 {
576         const struct cl_io_slice *scan;
577
578         LINVRNT(cl_io_is_loopable(io));
579         LINVRNT(io->ci_state == CIS_IO_GOING);
580         LINVRNT(cl_io_invariant(io));
581         ENTRY;
582
583         list_for_each_entry_reverse(scan, &io->ci_layers, cis_linkage) {
584                 if (scan->cis_iop->op[io->ci_type].cio_end != NULL)
585                         scan->cis_iop->op[io->ci_type].cio_end(env, scan);
586                 /* TODO: error handling. */
587         }
588         io->ci_state = CIS_IO_FINISHED;
589         EXIT;
590 }
591 EXPORT_SYMBOL(cl_io_end);
592
593 /**
594  * Called by read io, to decide the readahead extent
595  *
596  * \see cl_io_operations::cio_read_ahead()
597  */
598 int cl_io_read_ahead(const struct lu_env *env, struct cl_io *io,
599                      pgoff_t start, struct cl_read_ahead *ra)
600 {
601         const struct cl_io_slice *scan;
602         int                       result = 0;
603
604         LINVRNT(io->ci_type == CIT_READ || io->ci_type == CIT_FAULT);
605         LINVRNT(io->ci_state == CIS_IO_GOING || io->ci_state == CIS_LOCKED);
606         LINVRNT(cl_io_invariant(io));
607         ENTRY;
608
609         list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
610                 if (scan->cis_iop->cio_read_ahead == NULL)
611                         continue;
612
613                 result = scan->cis_iop->cio_read_ahead(env, scan, start, ra);
614                 if (result != 0)
615                         break;
616         }
617         RETURN(result > 0 ? 0 : result);
618 }
619 EXPORT_SYMBOL(cl_io_read_ahead);
620
621 /**
622  * Commit a list of contiguous pages into writeback cache.
623  *
624  * \returns 0 if all pages committed, or errcode if error occurred.
625  * \see cl_io_operations::cio_commit_async()
626  */
627 int cl_io_commit_async(const struct lu_env *env, struct cl_io *io,
628                         struct cl_page_list *queue, int from, int to,
629                         cl_commit_cbt cb)
630 {
631         const struct cl_io_slice *scan;
632         int result = 0;
633         ENTRY;
634
635         list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
636                 if (scan->cis_iop->cio_commit_async == NULL)
637                         continue;
638                 result = scan->cis_iop->cio_commit_async(env, scan, queue,
639                                                          from, to, cb);
640                 if (result != 0)
641                         break;
642         }
643         RETURN(result);
644 }
645 EXPORT_SYMBOL(cl_io_commit_async);
646
647 /**
648  * Submits a list of pages for immediate io.
649  *
650  * After the function gets returned, The submitted pages are moved to
651  * queue->c2_qout queue, and queue->c2_qin contain both the pages don't need
652  * to be submitted, and the pages are errant to submit.
653  *
654  * \returns 0 if at least one page was submitted, error code otherwise.
655  * \see cl_io_operations::cio_submit()
656  */
657 int cl_io_submit_rw(const struct lu_env *env, struct cl_io *io,
658                     enum cl_req_type crt, struct cl_2queue *queue)
659 {
660         const struct cl_io_slice *scan;
661         int result = 0;
662         ENTRY;
663
664         list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
665                 if (scan->cis_iop->cio_submit == NULL)
666                         continue;
667                 result = scan->cis_iop->cio_submit(env, scan, crt, queue);
668                 if (result != 0)
669                         break;
670         }
671         /*
672          * If ->cio_submit() failed, no pages were sent.
673          */
674         LASSERT(ergo(result != 0, list_empty(&queue->c2_qout.pl_pages)));
675         RETURN(result);
676 }
677 EXPORT_SYMBOL(cl_io_submit_rw);
678
679 /**
680  * Submit a sync_io and wait for the IO to be finished, or error happens.
681  * If \a timeout is zero, it means to wait for the IO unconditionally.
682  */
683 int cl_io_submit_sync(const struct lu_env *env, struct cl_io *io,
684                       enum cl_req_type iot, struct cl_2queue *queue,
685                       long timeout)
686 {
687         struct cl_sync_io *anchor = &cl_env_info(env)->clt_anchor;
688         struct cl_page *pg;
689         int rc;
690
691         cl_page_list_for_each(pg, &queue->c2_qin) {
692                 LASSERT(pg->cp_sync_io == NULL);
693                 pg->cp_sync_io = anchor;
694         }
695
696         cl_sync_io_init(anchor, queue->c2_qin.pl_nr, &cl_sync_io_end);
697         rc = cl_io_submit_rw(env, io, iot, queue);
698         if (rc == 0) {
699                 /*
700                  * If some pages weren't sent for any reason (e.g.,
701                  * read found up-to-date pages in the cache, or write found
702                  * clean pages), count them as completed to avoid infinite
703                  * wait.
704                  */
705                 cl_page_list_for_each(pg, &queue->c2_qin) {
706                         pg->cp_sync_io = NULL;
707                         cl_sync_io_note(env, anchor, 1);
708                 }
709
710                 /* wait for the IO to be finished. */
711                 rc = cl_sync_io_wait(env, anchor, timeout);
712                 cl_page_list_assume(env, io, &queue->c2_qout);
713         } else {
714                 LASSERT(list_empty(&queue->c2_qout.pl_pages));
715                 cl_page_list_for_each(pg, &queue->c2_qin)
716                         pg->cp_sync_io = NULL;
717         }
718         return rc;
719 }
720 EXPORT_SYMBOL(cl_io_submit_sync);
721
722 /**
723  * Cancel an IO which has been submitted by cl_io_submit_rw.
724  */
725 int cl_io_cancel(const struct lu_env *env, struct cl_io *io,
726                  struct cl_page_list *queue)
727 {
728         struct cl_page *page;
729         int result = 0;
730
731         CERROR("Canceling ongoing page trasmission\n");
732         cl_page_list_for_each(page, queue) {
733                 int rc;
734
735                 rc = cl_page_cancel(env, page);
736                 result = result ?: rc;
737         }
738         return result;
739 }
740
741 static
742 struct cl_io_pt *cl_io_submit_pt(struct cl_io *io, loff_t pos, size_t count)
743 {
744         struct cl_io_pt *pt;
745         int rc;
746
747         OBD_ALLOC(pt, sizeof(*pt));
748         if (pt == NULL)
749                 RETURN(ERR_PTR(-ENOMEM));
750
751         pt->cip_next = NULL;
752         init_sync_kiocb(&pt->cip_iocb, io->u.ci_rw.rw_file);
753         pt->cip_iocb.ki_pos = pos;
754 #ifdef HAVE_KIOCB_KI_LEFT
755         pt->cip_iocb.ki_left = count;
756 #elif defined(HAVE_KI_NBYTES)
757         pt->cip_iocb.ki_nbytes = count;
758 #endif
759         pt->cip_iter = io->u.ci_rw.rw_iter;
760         iov_iter_truncate(&pt->cip_iter, count);
761         pt->cip_file   = io->u.ci_rw.rw_file;
762         pt->cip_iot    = io->ci_type;
763         pt->cip_pos    = pos;
764         pt->cip_count  = count;
765         pt->cip_result = 0;
766
767         rc = cfs_ptask_init(&pt->cip_task, io->u.ci_rw.rw_ptask, pt,
768                             PTF_ORDERED | PTF_COMPLETE |
769                             PTF_USER_MM | PTF_RETRY, smp_processor_id());
770         if (rc)
771                 GOTO(out_error, rc);
772
773         CDEBUG(D_VFSTRACE, "submit %s range: [%llu, %llu)\n",
774                 io->ci_type == CIT_READ ? "read" : "write",
775                 pos, pos + count);
776
777         rc = cfs_ptask_submit(&pt->cip_task, cl_io_engine);
778         if (rc)
779                 GOTO(out_error, rc);
780
781         RETURN(pt);
782
783 out_error:
784         OBD_FREE(pt, sizeof(*pt));
785         RETURN(ERR_PTR(rc));
786 }
787
788 /**
789  * Main io loop.
790  *
791  * Pumps io through iterations calling
792  *
793  *    - cl_io_iter_init()
794  *
795  *    - cl_io_lock()
796  *
797  *    - cl_io_start()
798  *
799  *    - cl_io_end()
800  *
801  *    - cl_io_unlock()
802  *
803  *    - cl_io_iter_fini()
804  *
805  * repeatedly until there is no more io to do.
806  */
807 int cl_io_loop(const struct lu_env *env, struct cl_io *io)
808 {
809         struct cl_io_pt *pt = NULL, *head = NULL;
810         struct cl_io_pt **tail = &head;
811         loff_t pos;
812         size_t count;
813         size_t last_chunk_count = 0;
814         bool short_io = false;
815         int rc = 0;
816         ENTRY;
817
818         LINVRNT(cl_io_is_loopable(io));
819
820         do {
821                 io->ci_continue = 0;
822
823                 rc = cl_io_iter_init(env, io);
824                 if (rc) {
825                         cl_io_iter_fini(env, io);
826                         break;
827                 }
828
829                 pos   = io->u.ci_rw.rw_range.cir_pos;
830                 count = io->u.ci_rw.rw_range.cir_count;
831
832                 if (io->ci_pio) {
833                         /* submit this range for parallel execution */
834                         pt = cl_io_submit_pt(io, pos, count);
835                         if (IS_ERR(pt)) {
836                                 cl_io_iter_fini(env, io);
837                                 rc = PTR_ERR(pt);
838                                 break;
839                         }
840
841                         *tail = pt;
842                         tail = &pt->cip_next;
843                 } else {
844                         size_t nob = io->ci_nob;
845
846                         CDEBUG(D_VFSTRACE,
847                                 "execute type %u range: [%llu, %llu) nob: %zu %s\n",
848                                 io->ci_type, pos, pos + count, nob,
849                                 io->ci_continue ? "continue" : "stop");
850
851                         rc = cl_io_lock(env, io);
852                         if (rc) {
853                                 cl_io_iter_fini(env, io);
854                                 break;
855                         }
856
857                         /*
858                          * Notify layers that locks has been taken,
859                          * and do actual i/o.
860                          *
861                          *   - llite: kms, short read;
862                          *   - llite: generic_file_read();
863                          */
864                         rc = cl_io_start(env, io);
865
866                         /*
867                          * Send any remaining pending
868                          * io, etc.
869                          *
870                          *   - llite: ll_rw_stats_tally.
871                          */
872                         cl_io_end(env, io);
873                         cl_io_unlock(env, io);
874
875                         count = io->ci_nob - nob;
876                         last_chunk_count = count;
877                 }
878
879                 cl_io_rw_advance(env, io, count);
880                 cl_io_iter_fini(env, io);
881         } while (!rc && io->ci_continue);
882
883         CDEBUG(D_VFSTRACE, "loop type %u done: nob: %zu, rc: %d %s\n",
884                 io->ci_type, io->ci_nob, rc,
885                 io->ci_continue ? "continue" : "stop");
886
887         while (head != NULL) {
888                 int rc2;
889
890                 pt = head;
891                 head = head->cip_next;
892
893                 rc2 = cfs_ptask_wait_for(&pt->cip_task);
894                 LASSERTF(!rc2, "wait for task error: %d\n", rc2);
895
896                 rc2 = cfs_ptask_result(&pt->cip_task);
897                 CDEBUG(D_VFSTRACE,
898                         "done %s range: [%llu, %llu) ret: %zd, rc: %d\n",
899                         pt->cip_iot == CIT_READ ? "read" : "write",
900                         pt->cip_pos, pt->cip_pos + pt->cip_count,
901                         pt->cip_result, rc2);
902                 if (rc2)
903                         rc = rc ? rc : rc2;
904                 if (!short_io) {
905                         if (!rc2) /* IO is done by this task successfully */
906                                 io->ci_nob += pt->cip_result;
907                         if (pt->cip_result < pt->cip_count) {
908                                 /* short IO happened.
909                                  * Not necessary to be an error */
910                                 CDEBUG(D_VFSTRACE,
911                                         "incomplete range: [%llu, %llu) "
912                                         "last_chunk_count: %zu\n",
913                                         pt->cip_pos,
914                                         pt->cip_pos + pt->cip_count,
915                                         last_chunk_count);
916                                 io->ci_nob -= last_chunk_count;
917                                 short_io = true;
918                         }
919                 }
920                 OBD_FREE(pt, sizeof(*pt));
921         }
922
923         CDEBUG(D_VFSTRACE, "return nob: %zu (%s io), rc: %d\n",
924                 io->ci_nob, short_io ? "short" : "full", rc);
925
926         RETURN(rc < 0 ? rc : io->ci_result);
927 }
928 EXPORT_SYMBOL(cl_io_loop);
929
930 /**
931  * Adds io slice to the cl_io.
932  *
933  * This is called by cl_object_operations::coo_io_init() methods to add a
934  * per-layer state to the io. New state is added at the end of
935  * cl_io::ci_layers list, that is, it is at the bottom of the stack.
936  *
937  * \see cl_lock_slice_add(), cl_req_slice_add(), cl_page_slice_add()
938  */
939 void cl_io_slice_add(struct cl_io *io, struct cl_io_slice *slice,
940                      struct cl_object *obj,
941                      const struct cl_io_operations *ops)
942 {
943         struct list_head *linkage = &slice->cis_linkage;
944
945         LASSERT((linkage->prev == NULL && linkage->next == NULL) ||
946                 list_empty(linkage));
947         ENTRY;
948
949         list_add_tail(linkage, &io->ci_layers);
950         slice->cis_io  = io;
951         slice->cis_obj = obj;
952         slice->cis_iop = ops;
953         EXIT;
954 }
955 EXPORT_SYMBOL(cl_io_slice_add);
956
957
958 /**
959  * Initializes page list.
960  */
961 void cl_page_list_init(struct cl_page_list *plist)
962 {
963         ENTRY;
964         plist->pl_nr = 0;
965         INIT_LIST_HEAD(&plist->pl_pages);
966         plist->pl_owner = current;
967         EXIT;
968 }
969 EXPORT_SYMBOL(cl_page_list_init);
970
971 /**
972  * Adds a page to a page list.
973  */
974 void cl_page_list_add(struct cl_page_list *plist, struct cl_page *page)
975 {
976         ENTRY;
977         /* it would be better to check that page is owned by "current" io, but
978          * it is not passed here. */
979         LASSERT(page->cp_owner != NULL);
980         LINVRNT(plist->pl_owner == current);
981
982         LASSERT(list_empty(&page->cp_batch));
983         list_add_tail(&page->cp_batch, &plist->pl_pages);
984         ++plist->pl_nr;
985         lu_ref_add_at(&page->cp_reference, &page->cp_queue_ref, "queue", plist);
986         cl_page_get(page);
987         EXIT;
988 }
989 EXPORT_SYMBOL(cl_page_list_add);
990
991 /**
992  * Removes a page from a page list.
993  */
994 void cl_page_list_del(const struct lu_env *env,
995                       struct cl_page_list *plist, struct cl_page *page)
996 {
997         LASSERT(plist->pl_nr > 0);
998         LASSERT(cl_page_is_vmlocked(env, page));
999         LINVRNT(plist->pl_owner == current);
1000
1001         ENTRY;
1002         list_del_init(&page->cp_batch);
1003         --plist->pl_nr;
1004         lu_ref_del_at(&page->cp_reference, &page->cp_queue_ref, "queue", plist);
1005         cl_page_put(env, page);
1006         EXIT;
1007 }
1008 EXPORT_SYMBOL(cl_page_list_del);
1009
1010 /**
1011  * Moves a page from one page list to another.
1012  */
1013 void cl_page_list_move(struct cl_page_list *dst, struct cl_page_list *src,
1014                        struct cl_page *page)
1015 {
1016         LASSERT(src->pl_nr > 0);
1017         LINVRNT(dst->pl_owner == current);
1018         LINVRNT(src->pl_owner == current);
1019
1020         ENTRY;
1021         list_move_tail(&page->cp_batch, &dst->pl_pages);
1022         --src->pl_nr;
1023         ++dst->pl_nr;
1024         lu_ref_set_at(&page->cp_reference, &page->cp_queue_ref, "queue",
1025                       src, dst);
1026         EXIT;
1027 }
1028 EXPORT_SYMBOL(cl_page_list_move);
1029
1030 /**
1031  * Moves a page from one page list to the head of another list.
1032  */
1033 void cl_page_list_move_head(struct cl_page_list *dst, struct cl_page_list *src,
1034                             struct cl_page *page)
1035 {
1036         LASSERT(src->pl_nr > 0);
1037         LINVRNT(dst->pl_owner == current);
1038         LINVRNT(src->pl_owner == current);
1039
1040         ENTRY;
1041         list_move(&page->cp_batch, &dst->pl_pages);
1042         --src->pl_nr;
1043         ++dst->pl_nr;
1044         lu_ref_set_at(&page->cp_reference, &page->cp_queue_ref, "queue",
1045                         src, dst);
1046         EXIT;
1047 }
1048 EXPORT_SYMBOL(cl_page_list_move_head);
1049
1050 /**
1051  * splice the cl_page_list, just as list head does
1052  */
1053 void cl_page_list_splice(struct cl_page_list *list, struct cl_page_list *head)
1054 {
1055         struct cl_page *page;
1056         struct cl_page *tmp;
1057
1058         LINVRNT(list->pl_owner == current);
1059         LINVRNT(head->pl_owner == current);
1060
1061         ENTRY;
1062         cl_page_list_for_each_safe(page, tmp, list)
1063                 cl_page_list_move(head, list, page);
1064         EXIT;
1065 }
1066 EXPORT_SYMBOL(cl_page_list_splice);
1067
1068 /**
1069  * Disowns pages in a queue.
1070  */
1071 void cl_page_list_disown(const struct lu_env *env,
1072                          struct cl_io *io, struct cl_page_list *plist)
1073 {
1074         struct cl_page *page;
1075         struct cl_page *temp;
1076
1077         LINVRNT(plist->pl_owner == current);
1078
1079         ENTRY;
1080         cl_page_list_for_each_safe(page, temp, plist) {
1081                 LASSERT(plist->pl_nr > 0);
1082
1083                 list_del_init(&page->cp_batch);
1084                 --plist->pl_nr;
1085                 /*
1086                  * cl_page_disown0 rather than usual cl_page_disown() is used,
1087                  * because pages are possibly in CPS_FREEING state already due
1088                  * to the call to cl_page_list_discard().
1089                  */
1090                 /*
1091                  * XXX cl_page_disown0() will fail if page is not locked.
1092                  */
1093                 cl_page_disown0(env, io, page);
1094                 lu_ref_del_at(&page->cp_reference, &page->cp_queue_ref, "queue",
1095                               plist);
1096                 cl_page_put(env, page);
1097         }
1098         EXIT;
1099 }
1100 EXPORT_SYMBOL(cl_page_list_disown);
1101
1102 /**
1103  * Releases pages from queue.
1104  */
1105 void cl_page_list_fini(const struct lu_env *env, struct cl_page_list *plist)
1106 {
1107         struct cl_page *page;
1108         struct cl_page *temp;
1109
1110         LINVRNT(plist->pl_owner == current);
1111
1112         ENTRY;
1113         cl_page_list_for_each_safe(page, temp, plist)
1114                 cl_page_list_del(env, plist, page);
1115         LASSERT(plist->pl_nr == 0);
1116         EXIT;
1117 }
1118 EXPORT_SYMBOL(cl_page_list_fini);
1119
1120 /**
1121  * Assumes all pages in a queue.
1122  */
1123 void cl_page_list_assume(const struct lu_env *env,
1124                          struct cl_io *io, struct cl_page_list *plist)
1125 {
1126         struct cl_page *page;
1127
1128         LINVRNT(plist->pl_owner == current);
1129
1130         cl_page_list_for_each(page, plist)
1131                 cl_page_assume(env, io, page);
1132 }
1133
1134 /**
1135  * Discards all pages in a queue.
1136  */
1137 void cl_page_list_discard(const struct lu_env *env, struct cl_io *io,
1138                           struct cl_page_list *plist)
1139 {
1140         struct cl_page *page;
1141
1142         LINVRNT(plist->pl_owner == current);
1143         ENTRY;
1144         cl_page_list_for_each(page, plist)
1145                 cl_page_discard(env, io, page);
1146         EXIT;
1147 }
1148
1149 /**
1150  * Initialize dual page queue.
1151  */
1152 void cl_2queue_init(struct cl_2queue *queue)
1153 {
1154         ENTRY;
1155         cl_page_list_init(&queue->c2_qin);
1156         cl_page_list_init(&queue->c2_qout);
1157         EXIT;
1158 }
1159 EXPORT_SYMBOL(cl_2queue_init);
1160
1161 /**
1162  * Add a page to the incoming page list of 2-queue.
1163  */
1164 void cl_2queue_add(struct cl_2queue *queue, struct cl_page *page)
1165 {
1166         ENTRY;
1167         cl_page_list_add(&queue->c2_qin, page);
1168         EXIT;
1169 }
1170 EXPORT_SYMBOL(cl_2queue_add);
1171
1172 /**
1173  * Disown pages in both lists of a 2-queue.
1174  */
1175 void cl_2queue_disown(const struct lu_env *env,
1176                       struct cl_io *io, struct cl_2queue *queue)
1177 {
1178         ENTRY;
1179         cl_page_list_disown(env, io, &queue->c2_qin);
1180         cl_page_list_disown(env, io, &queue->c2_qout);
1181         EXIT;
1182 }
1183 EXPORT_SYMBOL(cl_2queue_disown);
1184
1185 /**
1186  * Discard (truncate) pages in both lists of a 2-queue.
1187  */
1188 void cl_2queue_discard(const struct lu_env *env,
1189                        struct cl_io *io, struct cl_2queue *queue)
1190 {
1191         ENTRY;
1192         cl_page_list_discard(env, io, &queue->c2_qin);
1193         cl_page_list_discard(env, io, &queue->c2_qout);
1194         EXIT;
1195 }
1196 EXPORT_SYMBOL(cl_2queue_discard);
1197
1198 /**
1199  * Assume to own the pages in cl_2queue
1200  */
1201 void cl_2queue_assume(const struct lu_env *env,
1202                       struct cl_io *io, struct cl_2queue *queue)
1203 {
1204         cl_page_list_assume(env, io, &queue->c2_qin);
1205         cl_page_list_assume(env, io, &queue->c2_qout);
1206 }
1207
1208 /**
1209  * Finalize both page lists of a 2-queue.
1210  */
1211 void cl_2queue_fini(const struct lu_env *env, struct cl_2queue *queue)
1212 {
1213         ENTRY;
1214         cl_page_list_fini(env, &queue->c2_qout);
1215         cl_page_list_fini(env, &queue->c2_qin);
1216         EXIT;
1217 }
1218 EXPORT_SYMBOL(cl_2queue_fini);
1219
1220 /**
1221  * Initialize a 2-queue to contain \a page in its incoming page list.
1222  */
1223 void cl_2queue_init_page(struct cl_2queue *queue, struct cl_page *page)
1224 {
1225         ENTRY;
1226         cl_2queue_init(queue);
1227         cl_2queue_add(queue, page);
1228         EXIT;
1229 }
1230 EXPORT_SYMBOL(cl_2queue_init_page);
1231
1232 /**
1233  * Returns top-level io.
1234  *
1235  * \see cl_object_top()
1236  */
1237 struct cl_io *cl_io_top(struct cl_io *io)
1238 {
1239         ENTRY;
1240         while (io->ci_parent != NULL)
1241                 io = io->ci_parent;
1242         RETURN(io);
1243 }
1244 EXPORT_SYMBOL(cl_io_top);
1245
1246 /**
1247  * Prints human readable representation of \a io to the \a f.
1248  */
1249 void cl_io_print(const struct lu_env *env, void *cookie,
1250                  lu_printer_t printer, const struct cl_io *io)
1251 {
1252 }
1253
1254 /**
1255  * Fills in attributes that are passed to server together with transfer. Only
1256  * attributes from \a flags may be touched. This can be called multiple times
1257  * for the same request.
1258  */
1259 void cl_req_attr_set(const struct lu_env *env, struct cl_object *obj,
1260                      struct cl_req_attr *attr)
1261 {
1262         struct cl_object *scan;
1263         ENTRY;
1264
1265         cl_object_for_each(scan, obj) {
1266                 if (scan->co_ops->coo_req_attr_set != NULL)
1267                         scan->co_ops->coo_req_attr_set(env, scan, attr);
1268         }
1269         EXIT;
1270 }
1271 EXPORT_SYMBOL(cl_req_attr_set);
1272
1273 /* cl_sync_io_callback assumes the caller must call cl_sync_io_wait() to
1274  * wait for the IO to finish. */
1275 void cl_sync_io_end(const struct lu_env *env, struct cl_sync_io *anchor)
1276 {
1277         wake_up_all(&anchor->csi_waitq);
1278
1279         /* it's safe to nuke or reuse anchor now */
1280         atomic_set(&anchor->csi_barrier, 0);
1281 }
1282 EXPORT_SYMBOL(cl_sync_io_end);
1283
1284 /**
1285  * Initialize synchronous io wait anchor
1286  */
1287 void cl_sync_io_init(struct cl_sync_io *anchor, int nr,
1288                      void (*end)(const struct lu_env *, struct cl_sync_io *))
1289 {
1290         ENTRY;
1291         memset(anchor, 0, sizeof(*anchor));
1292         init_waitqueue_head(&anchor->csi_waitq);
1293         atomic_set(&anchor->csi_sync_nr, nr);
1294         atomic_set(&anchor->csi_barrier, nr > 0);
1295         anchor->csi_sync_rc = 0;
1296         anchor->csi_end_io = end;
1297         LASSERT(end != NULL);
1298         EXIT;
1299 }
1300 EXPORT_SYMBOL(cl_sync_io_init);
1301
1302 /**
1303  * Wait until all IO completes. Transfer completion routine has to call
1304  * cl_sync_io_note() for every entity.
1305  */
1306 int cl_sync_io_wait(const struct lu_env *env, struct cl_sync_io *anchor,
1307                     long timeout)
1308 {
1309         struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout),
1310                                                   NULL, NULL, NULL);
1311         int rc;
1312         ENTRY;
1313
1314         LASSERT(timeout >= 0);
1315
1316         rc = l_wait_event(anchor->csi_waitq,
1317                           atomic_read(&anchor->csi_sync_nr) == 0,
1318                           &lwi);
1319         if (rc < 0) {
1320                 CERROR("IO failed: %d, still wait for %d remaining entries\n",
1321                        rc, atomic_read(&anchor->csi_sync_nr));
1322
1323                 lwi = (struct l_wait_info) { 0 };
1324                 (void)l_wait_event(anchor->csi_waitq,
1325                                    atomic_read(&anchor->csi_sync_nr) == 0,
1326                                    &lwi);
1327         } else {
1328                 rc = anchor->csi_sync_rc;
1329         }
1330         LASSERT(atomic_read(&anchor->csi_sync_nr) == 0);
1331
1332         /* wait until cl_sync_io_note() has done wakeup */
1333         while (unlikely(atomic_read(&anchor->csi_barrier) != 0)) {
1334                 cpu_relax();
1335         }
1336         RETURN(rc);
1337 }
1338 EXPORT_SYMBOL(cl_sync_io_wait);
1339
1340 /**
1341  * Indicate that transfer of a single page completed.
1342  */
1343 void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor,
1344                      int ioret)
1345 {
1346         ENTRY;
1347         if (anchor->csi_sync_rc == 0 && ioret < 0)
1348                 anchor->csi_sync_rc = ioret;
1349         /*
1350          * Synchronous IO done without releasing page lock (e.g., as a part of
1351          * ->{prepare,commit}_write(). Completion is used to signal the end of
1352          * IO.
1353          */
1354         LASSERT(atomic_read(&anchor->csi_sync_nr) > 0);
1355         if (atomic_dec_and_test(&anchor->csi_sync_nr)) {
1356                 LASSERT(anchor->csi_end_io != NULL);
1357                 anchor->csi_end_io(env, anchor);
1358                 /* Can't access anchor any more */
1359         }
1360         EXIT;
1361 }
1362 EXPORT_SYMBOL(cl_sync_io_note);