Whamcloud - gitweb
LU-8964 clio: Parallelize generic I/O
[fs/lustre-release.git] / lustre / obdclass / cl_io.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * Client IO.
33  *
34  *   Author: Nikita Danilov <nikita.danilov@sun.com>
35  *   Author: Jinshan Xiong <jinshan.xiong@intel.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/sched.h>
41 #include <linux/list.h>
42 #include <obd_class.h>
43 #include <obd_support.h>
44 #include <lustre_fid.h>
45 #include <cl_object.h>
46 #include "cl_internal.h"
47 #include <lustre_compat.h>
48
49 /*****************************************************************************
50  *
51  * cl_io interface.
52  *
53  */
54
55 #define cl_io_for_each(slice, io) \
56         list_for_each_entry((slice), &io->ci_layers, cis_linkage)
57 #define cl_io_for_each_reverse(slice, io)                 \
58         list_for_each_entry_reverse((slice), &io->ci_layers, cis_linkage)
59
60 static inline int cl_io_type_is_valid(enum cl_io_type type)
61 {
62         return CIT_READ <= type && type < CIT_OP_NR;
63 }
64
65 static inline int cl_io_is_loopable(const struct cl_io *io)
66 {
67         return cl_io_type_is_valid(io->ci_type) && io->ci_type != CIT_MISC;
68 }
69
70 /**
71  * cl_io invariant that holds at all times when exported cl_io_*() functions
72  * are entered and left.
73  */
74 static int cl_io_invariant(const struct cl_io *io)
75 {
76         struct cl_io *up;
77
78         up = io->ci_parent;
79         return
80                 /*
81                  * io can own pages only when it is ongoing. Sub-io might
82                  * still be in CIS_LOCKED state when top-io is in
83                  * CIS_IO_GOING.
84                  */
85                 ergo(io->ci_owned_nr > 0, io->ci_state == CIS_IO_GOING ||
86                      (io->ci_state == CIS_LOCKED && up != NULL));
87 }
88
89 /**
90  * Finalize \a io, by calling cl_io_operations::cio_fini() bottom-to-top.
91  */
92 void cl_io_fini(const struct lu_env *env, struct cl_io *io)
93 {
94         struct cl_io_slice    *slice;
95
96         LINVRNT(cl_io_type_is_valid(io->ci_type));
97         LINVRNT(cl_io_invariant(io));
98         ENTRY;
99
100         while (!list_empty(&io->ci_layers)) {
101                 slice = container_of(io->ci_layers.prev, struct cl_io_slice,
102                                      cis_linkage);
103                 list_del_init(&slice->cis_linkage);
104                 if (slice->cis_iop->op[io->ci_type].cio_fini != NULL)
105                         slice->cis_iop->op[io->ci_type].cio_fini(env, slice);
106                 /*
107                  * Invalidate slice to catch use after free. This assumes that
108                  * slices are allocated within session and can be touched
109                  * after ->cio_fini() returns.
110                  */
111                 slice->cis_io = NULL;
112         }
113         io->ci_state = CIS_FINI;
114
115         /* sanity check for layout change */
116         switch(io->ci_type) {
117         case CIT_READ:
118         case CIT_WRITE:
119         case CIT_DATA_VERSION:
120         case CIT_FAULT:
121                 break;
122         case CIT_FSYNC:
123                 LASSERT(!io->ci_need_restart);
124                 break;
125         case CIT_SETATTR:
126         case CIT_MISC:
127                 /* Check ignore layout change conf */
128                 LASSERT(ergo(io->ci_ignore_layout || !io->ci_verify_layout,
129                                 !io->ci_need_restart));
130                 break;
131         case CIT_LADVISE:
132                 break;
133         default:
134                 LBUG();
135         }
136         EXIT;
137 }
138 EXPORT_SYMBOL(cl_io_fini);
139
140 static int cl_io_init0(const struct lu_env *env, struct cl_io *io,
141                        enum cl_io_type iot, struct cl_object *obj)
142 {
143         struct cl_object *scan;
144         int result;
145
146         LINVRNT(io->ci_state == CIS_ZERO || io->ci_state == CIS_FINI);
147         LINVRNT(cl_io_type_is_valid(iot));
148         LINVRNT(cl_io_invariant(io));
149         ENTRY;
150
151         io->ci_type = iot;
152         INIT_LIST_HEAD(&io->ci_lockset.cls_todo);
153         INIT_LIST_HEAD(&io->ci_lockset.cls_done);
154         INIT_LIST_HEAD(&io->ci_layers);
155
156         result = 0;
157         cl_object_for_each(scan, obj) {
158                 if (scan->co_ops->coo_io_init != NULL) {
159                         result = scan->co_ops->coo_io_init(env, scan, io);
160                         if (result != 0)
161                                 break;
162                 }
163         }
164         if (result == 0)
165                 io->ci_state = CIS_INIT;
166         RETURN(result);
167 }
168
169 /**
170  * Initialize sub-io, by calling cl_io_operations::cio_init() top-to-bottom.
171  *
172  * \pre obj != cl_object_top(obj)
173  */
174 int cl_io_sub_init(const struct lu_env *env, struct cl_io *io,
175                    enum cl_io_type iot, struct cl_object *obj)
176 {
177         LASSERT(obj != cl_object_top(obj));
178
179         return cl_io_init0(env, io, iot, obj);
180 }
181 EXPORT_SYMBOL(cl_io_sub_init);
182
183 /**
184  * Initialize \a io, by calling cl_io_operations::cio_init() top-to-bottom.
185  *
186  * Caller has to call cl_io_fini() after a call to cl_io_init(), no matter
187  * what the latter returned.
188  *
189  * \pre obj == cl_object_top(obj)
190  * \pre cl_io_type_is_valid(iot)
191  * \post cl_io_type_is_valid(io->ci_type) && io->ci_type == iot
192  */
193 int cl_io_init(const struct lu_env *env, struct cl_io *io,
194                enum cl_io_type iot, struct cl_object *obj)
195 {
196         LASSERT(obj == cl_object_top(obj));
197
198         return cl_io_init0(env, io, iot, obj);
199 }
200 EXPORT_SYMBOL(cl_io_init);
201
202 /**
203  * Initialize read or write io.
204  *
205  * \pre iot == CIT_READ || iot == CIT_WRITE
206  */
207 int cl_io_rw_init(const struct lu_env *env, struct cl_io *io,
208                   enum cl_io_type iot, loff_t pos, size_t count)
209 {
210         LINVRNT(iot == CIT_READ || iot == CIT_WRITE);
211         LINVRNT(io->ci_obj != NULL);
212         ENTRY;
213
214         if (cfs_ptengine_weight(cl_io_engine) < 2)
215                 io->ci_pio = 0;
216
217         LU_OBJECT_HEADER(D_VFSTRACE, env, &io->ci_obj->co_lu,
218                          "io %s range: [%llu, %llu) %s %s %s %s\n",
219                          iot == CIT_READ ? "read" : "write",
220                          pos, pos + count,
221                          io->u.ci_rw.rw_nonblock ? "nonblock" : "block",
222                          io->u.ci_rw.rw_append ? "append" : "-",
223                          io->u.ci_rw.rw_sync ? "sync" : "-",
224                          io->ci_pio ? "pio" : "-");
225
226         io->u.ci_rw.rw_range.cir_pos   = pos;
227         io->u.ci_rw.rw_range.cir_count = count;
228
229         RETURN(cl_io_init(env, io, iot, io->ci_obj));
230 }
231 EXPORT_SYMBOL(cl_io_rw_init);
232
233 static int cl_lock_descr_sort(const struct cl_lock_descr *d0,
234                               const struct cl_lock_descr *d1)
235 {
236         return lu_fid_cmp(lu_object_fid(&d0->cld_obj->co_lu),
237                           lu_object_fid(&d1->cld_obj->co_lu));
238 }
239
240 /*
241  * Sort locks in lexicographical order of their (fid, start-offset) pairs.
242  */
243 static void cl_io_locks_sort(struct cl_io *io)
244 {
245         int done = 0;
246
247         ENTRY;
248         /* hidden treasure: bubble sort for now. */
249         do {
250                 struct cl_io_lock_link *curr;
251                 struct cl_io_lock_link *prev;
252                 struct cl_io_lock_link *temp;
253
254                 done = 1;
255                 prev = NULL;
256
257                 list_for_each_entry_safe(curr, temp, &io->ci_lockset.cls_todo,
258                                          cill_linkage) {
259                         if (prev != NULL) {
260                                 switch (cl_lock_descr_sort(&prev->cill_descr,
261                                                            &curr->cill_descr)) {
262                                 case 0:
263                                         /*
264                                          * IMPOSSIBLE:  Identical locks are
265                                          *              already removed at
266                                          *              this point.
267                                          */
268                                 default:
269                                         LBUG();
270                                 case +1:
271                                         list_move_tail(&curr->cill_linkage,
272                                                        &prev->cill_linkage);
273                                         done = 0;
274                                         continue; /* don't change prev: it's
275                                                    * still "previous" */
276                                 case -1: /* already in order */
277                                         break;
278                                 }
279                         }
280                         prev = curr;
281                 }
282         } while (!done);
283         EXIT;
284 }
285
286 static void cl_lock_descr_merge(struct cl_lock_descr *d0,
287                                 const struct cl_lock_descr *d1)
288 {
289         d0->cld_start = min(d0->cld_start, d1->cld_start);
290         d0->cld_end = max(d0->cld_end, d1->cld_end);
291
292         if (d1->cld_mode == CLM_WRITE && d0->cld_mode != CLM_WRITE)
293                 d0->cld_mode = CLM_WRITE;
294
295         if (d1->cld_mode == CLM_GROUP && d0->cld_mode != CLM_GROUP)
296                 d0->cld_mode = CLM_GROUP;
297 }
298
299 static int cl_lockset_merge(const struct cl_lockset *set,
300                             const struct cl_lock_descr *need)
301 {
302         struct cl_io_lock_link *scan;
303
304         ENTRY;
305         list_for_each_entry(scan, &set->cls_todo, cill_linkage) {
306                 if (!cl_object_same(scan->cill_descr.cld_obj, need->cld_obj))
307                         continue;
308
309                 /* Merge locks for the same object because ldlm lock server
310                  * may expand the lock extent, otherwise there is a deadlock
311                  * case if two conflicted locks are queueud for the same object
312                  * and lock server expands one lock to overlap the another.
313                  * The side effect is that it can generate a multi-stripe lock
314                  * that may cause casacading problem */
315                 cl_lock_descr_merge(&scan->cill_descr, need);
316                 CDEBUG(D_VFSTRACE, "lock: %d: [%lu, %lu]\n",
317                        scan->cill_descr.cld_mode, scan->cill_descr.cld_start,
318                        scan->cill_descr.cld_end);
319                 RETURN(+1);
320         }
321         RETURN(0);
322 }
323
324 static int cl_lockset_lock(const struct lu_env *env, struct cl_io *io,
325                            struct cl_lockset *set)
326 {
327         struct cl_io_lock_link *link;
328         struct cl_io_lock_link *temp;
329         int result;
330
331         ENTRY;
332         result = 0;
333         list_for_each_entry_safe(link, temp, &set->cls_todo, cill_linkage) {
334                 result = cl_lock_request(env, io, &link->cill_lock);
335                 if (result < 0)
336                         break;
337
338                 list_move(&link->cill_linkage, &set->cls_done);
339         }
340         RETURN(result);
341 }
342
343 /**
344  * Takes locks necessary for the current iteration of io.
345  *
346  * Calls cl_io_operations::cio_lock() top-to-bottom to collect locks required
347  * by layers for the current iteration. Then sort locks (to avoid dead-locks),
348  * and acquire them.
349  */
350 int cl_io_lock(const struct lu_env *env, struct cl_io *io)
351 {
352         const struct cl_io_slice *scan;
353         int result = 0;
354
355         LINVRNT(cl_io_is_loopable(io));
356         LINVRNT(io->ci_state == CIS_IT_STARTED);
357         LINVRNT(cl_io_invariant(io));
358
359         ENTRY;
360         cl_io_for_each(scan, io) {
361                 if (scan->cis_iop->op[io->ci_type].cio_lock == NULL)
362                         continue;
363                 result = scan->cis_iop->op[io->ci_type].cio_lock(env, scan);
364                 if (result != 0)
365                         break;
366         }
367         if (result == 0) {
368                 cl_io_locks_sort(io);
369                 result = cl_lockset_lock(env, io, &io->ci_lockset);
370         }
371         if (result != 0)
372                 cl_io_unlock(env, io);
373         else
374                 io->ci_state = CIS_LOCKED;
375         RETURN(result);
376 }
377 EXPORT_SYMBOL(cl_io_lock);
378
379 /**
380  * Release locks takes by io.
381  */
382 void cl_io_unlock(const struct lu_env *env, struct cl_io *io)
383 {
384         struct cl_lockset        *set;
385         struct cl_io_lock_link   *link;
386         struct cl_io_lock_link   *temp;
387         const struct cl_io_slice *scan;
388
389         LASSERT(cl_io_is_loopable(io));
390         LASSERT(CIS_IT_STARTED <= io->ci_state && io->ci_state < CIS_UNLOCKED);
391         LINVRNT(cl_io_invariant(io));
392
393         ENTRY;
394         set = &io->ci_lockset;
395
396         list_for_each_entry_safe(link, temp, &set->cls_todo, cill_linkage) {
397                 list_del_init(&link->cill_linkage);
398                 if (link->cill_fini != NULL)
399                         link->cill_fini(env, link);
400         }
401
402         list_for_each_entry_safe(link, temp, &set->cls_done, cill_linkage) {
403                 list_del_init(&link->cill_linkage);
404                 cl_lock_release(env, &link->cill_lock);
405                 if (link->cill_fini != NULL)
406                         link->cill_fini(env, link);
407         }
408
409         cl_io_for_each_reverse(scan, io) {
410                 if (scan->cis_iop->op[io->ci_type].cio_unlock != NULL)
411                         scan->cis_iop->op[io->ci_type].cio_unlock(env, scan);
412         }
413         io->ci_state = CIS_UNLOCKED;
414         EXIT;
415 }
416 EXPORT_SYMBOL(cl_io_unlock);
417
418 /**
419  * Prepares next iteration of io.
420  *
421  * Calls cl_io_operations::cio_iter_init() top-to-bottom. This exists to give
422  * layers a chance to modify io parameters, e.g., so that lov can restrict io
423  * to a single stripe.
424  */
425 int cl_io_iter_init(const struct lu_env *env, struct cl_io *io)
426 {
427         const struct cl_io_slice *scan;
428         int result;
429
430         LINVRNT(cl_io_is_loopable(io));
431         LINVRNT(io->ci_state == CIS_INIT || io->ci_state == CIS_IT_ENDED);
432         LINVRNT(cl_io_invariant(io));
433
434         ENTRY;
435         result = 0;
436         cl_io_for_each(scan, io) {
437                 if (scan->cis_iop->op[io->ci_type].cio_iter_init == NULL)
438                         continue;
439                 result = scan->cis_iop->op[io->ci_type].cio_iter_init(env,
440                                                                       scan);
441                 if (result != 0)
442                         break;
443         }
444         if (result == 0)
445                 io->ci_state = CIS_IT_STARTED;
446         RETURN(result);
447 }
448 EXPORT_SYMBOL(cl_io_iter_init);
449
450 /**
451  * Finalizes io iteration.
452  *
453  * Calls cl_io_operations::cio_iter_fini() bottom-to-top.
454  */
455 void cl_io_iter_fini(const struct lu_env *env, struct cl_io *io)
456 {
457         const struct cl_io_slice *scan;
458
459         LINVRNT(cl_io_is_loopable(io));
460         LINVRNT(io->ci_state == CIS_UNLOCKED);
461         LINVRNT(cl_io_invariant(io));
462
463         ENTRY;
464         cl_io_for_each_reverse(scan, io) {
465                 if (scan->cis_iop->op[io->ci_type].cio_iter_fini != NULL)
466                         scan->cis_iop->op[io->ci_type].cio_iter_fini(env, scan);
467         }
468         io->ci_state = CIS_IT_ENDED;
469         EXIT;
470 }
471 EXPORT_SYMBOL(cl_io_iter_fini);
472
473 /**
474  * Records that read or write io progressed \a nob bytes forward.
475  */
476 void cl_io_rw_advance(const struct lu_env *env, struct cl_io *io, size_t nob)
477 {
478         const struct cl_io_slice *scan;
479
480         LINVRNT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE ||
481                 nob == 0);
482         LINVRNT(cl_io_is_loopable(io));
483         LINVRNT(cl_io_invariant(io));
484
485         ENTRY;
486
487         io->u.ci_rw.rw_range.cir_pos   += nob;
488         io->u.ci_rw.rw_range.cir_count -= nob;
489
490         /* layers have to be notified. */
491         cl_io_for_each_reverse(scan, io) {
492                 if (scan->cis_iop->op[io->ci_type].cio_advance != NULL)
493                         scan->cis_iop->op[io->ci_type].cio_advance(env, scan,
494                                                                    nob);
495         }
496         EXIT;
497 }
498
499 /**
500  * Adds a lock to a lockset.
501  */
502 int cl_io_lock_add(const struct lu_env *env, struct cl_io *io,
503                    struct cl_io_lock_link *link)
504 {
505         int result;
506
507         ENTRY;
508         if (cl_lockset_merge(&io->ci_lockset, &link->cill_descr))
509                 result = +1;
510         else {
511                 list_add(&link->cill_linkage, &io->ci_lockset.cls_todo);
512                 result = 0;
513         }
514         RETURN(result);
515 }
516 EXPORT_SYMBOL(cl_io_lock_add);
517
518 static void cl_free_io_lock_link(const struct lu_env *env,
519                                  struct cl_io_lock_link *link)
520 {
521         OBD_FREE_PTR(link);
522 }
523
524 /**
525  * Allocates new lock link, and uses it to add a lock to a lockset.
526  */
527 int cl_io_lock_alloc_add(const struct lu_env *env, struct cl_io *io,
528                          struct cl_lock_descr *descr)
529 {
530         struct cl_io_lock_link *link;
531         int result;
532
533         ENTRY;
534         OBD_ALLOC_PTR(link);
535         if (link != NULL) {
536                 link->cill_descr = *descr;
537                 link->cill_fini  = cl_free_io_lock_link;
538                 result = cl_io_lock_add(env, io, link);
539                 if (result) /* lock match */
540                         link->cill_fini(env, link);
541         } else
542                 result = -ENOMEM;
543
544         RETURN(result);
545 }
546 EXPORT_SYMBOL(cl_io_lock_alloc_add);
547
548 /**
549  * Starts io by calling cl_io_operations::cio_start() top-to-bottom.
550  */
551 int cl_io_start(const struct lu_env *env, struct cl_io *io)
552 {
553         const struct cl_io_slice *scan;
554         int result = 0;
555
556         LINVRNT(cl_io_is_loopable(io));
557         LINVRNT(io->ci_state == CIS_LOCKED);
558         LINVRNT(cl_io_invariant(io));
559         ENTRY;
560
561         io->ci_state = CIS_IO_GOING;
562         cl_io_for_each(scan, io) {
563                 if (scan->cis_iop->op[io->ci_type].cio_start == NULL)
564                         continue;
565                 result = scan->cis_iop->op[io->ci_type].cio_start(env, scan);
566                 if (result != 0)
567                         break;
568         }
569         if (result >= 0)
570                 result = 0;
571         RETURN(result);
572 }
573 EXPORT_SYMBOL(cl_io_start);
574
575 /**
576  * Wait until current io iteration is finished by calling
577  * cl_io_operations::cio_end() bottom-to-top.
578  */
579 void cl_io_end(const struct lu_env *env, struct cl_io *io)
580 {
581         const struct cl_io_slice *scan;
582
583         LINVRNT(cl_io_is_loopable(io));
584         LINVRNT(io->ci_state == CIS_IO_GOING);
585         LINVRNT(cl_io_invariant(io));
586         ENTRY;
587
588         cl_io_for_each_reverse(scan, io) {
589                 if (scan->cis_iop->op[io->ci_type].cio_end != NULL)
590                         scan->cis_iop->op[io->ci_type].cio_end(env, scan);
591                 /* TODO: error handling. */
592         }
593         io->ci_state = CIS_IO_FINISHED;
594         EXIT;
595 }
596 EXPORT_SYMBOL(cl_io_end);
597
598 /**
599  * Called by read io, to decide the readahead extent
600  *
601  * \see cl_io_operations::cio_read_ahead()
602  */
603 int cl_io_read_ahead(const struct lu_env *env, struct cl_io *io,
604                      pgoff_t start, struct cl_read_ahead *ra)
605 {
606         const struct cl_io_slice *scan;
607         int                       result = 0;
608
609         LINVRNT(io->ci_type == CIT_READ || io->ci_type == CIT_FAULT);
610         LINVRNT(io->ci_state == CIS_IO_GOING || io->ci_state == CIS_LOCKED);
611         LINVRNT(cl_io_invariant(io));
612         ENTRY;
613
614         cl_io_for_each(scan, io) {
615                 if (scan->cis_iop->cio_read_ahead == NULL)
616                         continue;
617
618                 result = scan->cis_iop->cio_read_ahead(env, scan, start, ra);
619                 if (result != 0)
620                         break;
621         }
622         RETURN(result > 0 ? 0 : result);
623 }
624 EXPORT_SYMBOL(cl_io_read_ahead);
625
626 /**
627  * Commit a list of contiguous pages into writeback cache.
628  *
629  * \returns 0 if all pages committed, or errcode if error occurred.
630  * \see cl_io_operations::cio_commit_async()
631  */
632 int cl_io_commit_async(const struct lu_env *env, struct cl_io *io,
633                         struct cl_page_list *queue, int from, int to,
634                         cl_commit_cbt cb)
635 {
636         const struct cl_io_slice *scan;
637         int result = 0;
638         ENTRY;
639
640         cl_io_for_each(scan, io) {
641                 if (scan->cis_iop->cio_commit_async == NULL)
642                         continue;
643                 result = scan->cis_iop->cio_commit_async(env, scan, queue,
644                                                          from, to, cb);
645                 if (result != 0)
646                         break;
647         }
648         RETURN(result);
649 }
650 EXPORT_SYMBOL(cl_io_commit_async);
651
652 /**
653  * Submits a list of pages for immediate io.
654  *
655  * After the function gets returned, The submitted pages are moved to
656  * queue->c2_qout queue, and queue->c2_qin contain both the pages don't need
657  * to be submitted, and the pages are errant to submit.
658  *
659  * \returns 0 if at least one page was submitted, error code otherwise.
660  * \see cl_io_operations::cio_submit()
661  */
662 int cl_io_submit_rw(const struct lu_env *env, struct cl_io *io,
663                     enum cl_req_type crt, struct cl_2queue *queue)
664 {
665         const struct cl_io_slice *scan;
666         int result = 0;
667         ENTRY;
668
669         cl_io_for_each(scan, io) {
670                 if (scan->cis_iop->cio_submit == NULL)
671                         continue;
672                 result = scan->cis_iop->cio_submit(env, scan, crt, queue);
673                 if (result != 0)
674                         break;
675         }
676         /*
677          * If ->cio_submit() failed, no pages were sent.
678          */
679         LASSERT(ergo(result != 0, list_empty(&queue->c2_qout.pl_pages)));
680         RETURN(result);
681 }
682 EXPORT_SYMBOL(cl_io_submit_rw);
683
684 /**
685  * Submit a sync_io and wait for the IO to be finished, or error happens.
686  * If \a timeout is zero, it means to wait for the IO unconditionally.
687  */
688 int cl_io_submit_sync(const struct lu_env *env, struct cl_io *io,
689                       enum cl_req_type iot, struct cl_2queue *queue,
690                       long timeout)
691 {
692         struct cl_sync_io *anchor = &cl_env_info(env)->clt_anchor;
693         struct cl_page *pg;
694         int rc;
695
696         cl_page_list_for_each(pg, &queue->c2_qin) {
697                 LASSERT(pg->cp_sync_io == NULL);
698                 pg->cp_sync_io = anchor;
699         }
700
701         cl_sync_io_init(anchor, queue->c2_qin.pl_nr, &cl_sync_io_end);
702         rc = cl_io_submit_rw(env, io, iot, queue);
703         if (rc == 0) {
704                 /*
705                  * If some pages weren't sent for any reason (e.g.,
706                  * read found up-to-date pages in the cache, or write found
707                  * clean pages), count them as completed to avoid infinite
708                  * wait.
709                  */
710                 cl_page_list_for_each(pg, &queue->c2_qin) {
711                         pg->cp_sync_io = NULL;
712                         cl_sync_io_note(env, anchor, 1);
713                 }
714
715                 /* wait for the IO to be finished. */
716                 rc = cl_sync_io_wait(env, anchor, timeout);
717                 cl_page_list_assume(env, io, &queue->c2_qout);
718         } else {
719                 LASSERT(list_empty(&queue->c2_qout.pl_pages));
720                 cl_page_list_for_each(pg, &queue->c2_qin)
721                         pg->cp_sync_io = NULL;
722         }
723         return rc;
724 }
725 EXPORT_SYMBOL(cl_io_submit_sync);
726
727 /**
728  * Cancel an IO which has been submitted by cl_io_submit_rw.
729  */
730 int cl_io_cancel(const struct lu_env *env, struct cl_io *io,
731                  struct cl_page_list *queue)
732 {
733         struct cl_page *page;
734         int result = 0;
735
736         CERROR("Canceling ongoing page trasmission\n");
737         cl_page_list_for_each(page, queue) {
738                 int rc;
739
740                 rc = cl_page_cancel(env, page);
741                 result = result ?: rc;
742         }
743         return result;
744 }
745
746 static
747 struct cl_io_pt *cl_io_submit_pt(struct cl_io *io, loff_t pos, size_t count)
748 {
749         struct cl_io_pt *pt;
750         int rc;
751
752         OBD_ALLOC(pt, sizeof(*pt));
753         if (pt == NULL)
754                 RETURN(ERR_PTR(-ENOMEM));
755
756         pt->cip_next = NULL;
757         init_sync_kiocb(&pt->cip_iocb, io->u.ci_rw.rw_file);
758         pt->cip_iocb.ki_pos = pos;
759 #ifdef HAVE_KIOCB_KI_LEFT
760         pt->cip_iocb.ki_left = count;
761 #elif defined(HAVE_KI_NBYTES)
762         pt->cip_iocb.ki_nbytes = count;
763 #endif
764         pt->cip_iter = io->u.ci_rw.rw_iter;
765         iov_iter_truncate(&pt->cip_iter, count);
766         pt->cip_file   = io->u.ci_rw.rw_file;
767         pt->cip_iot    = io->ci_type;
768         pt->cip_pos    = pos;
769         pt->cip_count  = count;
770         pt->cip_result = 0;
771
772         rc = cfs_ptask_init(&pt->cip_task, io->u.ci_rw.rw_ptask, pt,
773                             PTF_ORDERED | PTF_COMPLETE |
774                             PTF_USER_MM | PTF_RETRY, smp_processor_id());
775         if (rc)
776                 GOTO(out_error, rc);
777
778         CDEBUG(D_VFSTRACE, "submit %s range: [%llu, %llu)\n",
779                 io->ci_type == CIT_READ ? "read" : "write",
780                 pos, pos + count);
781
782         rc = cfs_ptask_submit(&pt->cip_task, cl_io_engine);
783         if (rc)
784                 GOTO(out_error, rc);
785
786         RETURN(pt);
787
788 out_error:
789         OBD_FREE(pt, sizeof(*pt));
790         RETURN(ERR_PTR(rc));
791 }
792
793 /**
794  * Main io loop.
795  *
796  * Pumps io through iterations calling
797  *
798  *    - cl_io_iter_init()
799  *
800  *    - cl_io_lock()
801  *
802  *    - cl_io_start()
803  *
804  *    - cl_io_end()
805  *
806  *    - cl_io_unlock()
807  *
808  *    - cl_io_iter_fini()
809  *
810  * repeatedly until there is no more io to do.
811  */
812 int cl_io_loop(const struct lu_env *env, struct cl_io *io)
813 {
814         struct cl_io_pt *pt = NULL, *head = NULL;
815         struct cl_io_pt **tail = &head;
816         loff_t pos;
817         size_t count;
818         size_t last_chunk_count = 0;
819         bool short_io = false;
820         int rc = 0;
821         ENTRY;
822
823         LINVRNT(cl_io_is_loopable(io));
824
825         do {
826                 io->ci_continue = 0;
827
828                 rc = cl_io_iter_init(env, io);
829                 if (rc) {
830                         cl_io_iter_fini(env, io);
831                         break;
832                 }
833
834                 pos   = io->u.ci_rw.rw_range.cir_pos;
835                 count = io->u.ci_rw.rw_range.cir_count;
836
837                 if (io->ci_pio) {
838                         /* submit this range for parallel execution */
839                         pt = cl_io_submit_pt(io, pos, count);
840                         if (IS_ERR(pt)) {
841                                 cl_io_iter_fini(env, io);
842                                 rc = PTR_ERR(pt);
843                                 break;
844                         }
845
846                         *tail = pt;
847                         tail = &pt->cip_next;
848                 } else {
849                         size_t nob = io->ci_nob;
850
851                         CDEBUG(D_VFSTRACE,
852                                 "execute type %u range: [%llu, %llu) nob: %zu %s\n",
853                                 io->ci_type, pos, pos + count, nob,
854                                 io->ci_continue ? "continue" : "stop");
855
856                         rc = cl_io_lock(env, io);
857                         if (rc) {
858                                 cl_io_iter_fini(env, io);
859                                 break;
860                         }
861
862                         /*
863                          * Notify layers that locks has been taken,
864                          * and do actual i/o.
865                          *
866                          *   - llite: kms, short read;
867                          *   - llite: generic_file_read();
868                          */
869                         rc = cl_io_start(env, io);
870
871                         /*
872                          * Send any remaining pending
873                          * io, etc.
874                          *
875                          *   - llite: ll_rw_stats_tally.
876                          */
877                         cl_io_end(env, io);
878                         cl_io_unlock(env, io);
879
880                         count = io->ci_nob - nob;
881                         last_chunk_count = count;
882                 }
883
884                 cl_io_rw_advance(env, io, count);
885                 cl_io_iter_fini(env, io);
886         } while (!rc && io->ci_continue);
887
888         CDEBUG(D_VFSTRACE, "loop type %u done: nob: %zu, rc: %d %s\n",
889                 io->ci_type, io->ci_nob, rc,
890                 io->ci_continue ? "continue" : "stop");
891
892         while (head != NULL) {
893                 int rc2;
894
895                 pt = head;
896                 head = head->cip_next;
897
898                 rc2 = cfs_ptask_wait_for(&pt->cip_task);
899                 LASSERTF(!rc2, "wait for task error: %d\n", rc2);
900
901                 rc2 = cfs_ptask_result(&pt->cip_task);
902                 CDEBUG(D_VFSTRACE,
903                         "done %s range: [%llu, %llu) ret: %zd, rc: %d\n",
904                         pt->cip_iot == CIT_READ ? "read" : "write",
905                         pt->cip_pos, pt->cip_pos + pt->cip_count,
906                         pt->cip_result, rc2);
907                 if (rc2)
908                         rc = rc ? rc : rc2;
909                 if (!short_io) {
910                         if (!rc2) /* IO is done by this task successfully */
911                                 io->ci_nob += pt->cip_result;
912                         if (pt->cip_result < pt->cip_count) {
913                                 /* short IO happened.
914                                  * Not necessary to be an error */
915                                 CDEBUG(D_VFSTRACE,
916                                         "incomplete range: [%llu, %llu) "
917                                         "last_chunk_count: %zu\n",
918                                         pt->cip_pos,
919                                         pt->cip_pos + pt->cip_count,
920                                         last_chunk_count);
921                                 io->ci_nob -= last_chunk_count;
922                                 short_io = true;
923                         }
924                 }
925                 OBD_FREE(pt, sizeof(*pt));
926         }
927
928         CDEBUG(D_VFSTRACE, "return nob: %zu (%s io), rc: %d\n",
929                 io->ci_nob, short_io ? "short" : "full", rc);
930
931         RETURN(rc < 0 ? rc : io->ci_result);
932 }
933 EXPORT_SYMBOL(cl_io_loop);
934
935 /**
936  * Adds io slice to the cl_io.
937  *
938  * This is called by cl_object_operations::coo_io_init() methods to add a
939  * per-layer state to the io. New state is added at the end of
940  * cl_io::ci_layers list, that is, it is at the bottom of the stack.
941  *
942  * \see cl_lock_slice_add(), cl_req_slice_add(), cl_page_slice_add()
943  */
944 void cl_io_slice_add(struct cl_io *io, struct cl_io_slice *slice,
945                      struct cl_object *obj,
946                      const struct cl_io_operations *ops)
947 {
948         struct list_head *linkage = &slice->cis_linkage;
949
950         LASSERT((linkage->prev == NULL && linkage->next == NULL) ||
951                 list_empty(linkage));
952         ENTRY;
953
954         list_add_tail(linkage, &io->ci_layers);
955         slice->cis_io  = io;
956         slice->cis_obj = obj;
957         slice->cis_iop = ops;
958         EXIT;
959 }
960 EXPORT_SYMBOL(cl_io_slice_add);
961
962
963 /**
964  * Initializes page list.
965  */
966 void cl_page_list_init(struct cl_page_list *plist)
967 {
968         ENTRY;
969         plist->pl_nr = 0;
970         INIT_LIST_HEAD(&plist->pl_pages);
971         plist->pl_owner = current;
972         EXIT;
973 }
974 EXPORT_SYMBOL(cl_page_list_init);
975
976 /**
977  * Adds a page to a page list.
978  */
979 void cl_page_list_add(struct cl_page_list *plist, struct cl_page *page)
980 {
981         ENTRY;
982         /* it would be better to check that page is owned by "current" io, but
983          * it is not passed here. */
984         LASSERT(page->cp_owner != NULL);
985         LINVRNT(plist->pl_owner == current);
986
987         LASSERT(list_empty(&page->cp_batch));
988         list_add_tail(&page->cp_batch, &plist->pl_pages);
989         ++plist->pl_nr;
990         lu_ref_add_at(&page->cp_reference, &page->cp_queue_ref, "queue", plist);
991         cl_page_get(page);
992         EXIT;
993 }
994 EXPORT_SYMBOL(cl_page_list_add);
995
996 /**
997  * Removes a page from a page list.
998  */
999 void cl_page_list_del(const struct lu_env *env,
1000                       struct cl_page_list *plist, struct cl_page *page)
1001 {
1002         LASSERT(plist->pl_nr > 0);
1003         LASSERT(cl_page_is_vmlocked(env, page));
1004         LINVRNT(plist->pl_owner == current);
1005
1006         ENTRY;
1007         list_del_init(&page->cp_batch);
1008         --plist->pl_nr;
1009         lu_ref_del_at(&page->cp_reference, &page->cp_queue_ref, "queue", plist);
1010         cl_page_put(env, page);
1011         EXIT;
1012 }
1013 EXPORT_SYMBOL(cl_page_list_del);
1014
1015 /**
1016  * Moves a page from one page list to another.
1017  */
1018 void cl_page_list_move(struct cl_page_list *dst, struct cl_page_list *src,
1019                        struct cl_page *page)
1020 {
1021         LASSERT(src->pl_nr > 0);
1022         LINVRNT(dst->pl_owner == current);
1023         LINVRNT(src->pl_owner == current);
1024
1025         ENTRY;
1026         list_move_tail(&page->cp_batch, &dst->pl_pages);
1027         --src->pl_nr;
1028         ++dst->pl_nr;
1029         lu_ref_set_at(&page->cp_reference, &page->cp_queue_ref, "queue",
1030                       src, dst);
1031         EXIT;
1032 }
1033 EXPORT_SYMBOL(cl_page_list_move);
1034
1035 /**
1036  * Moves a page from one page list to the head of another list.
1037  */
1038 void cl_page_list_move_head(struct cl_page_list *dst, struct cl_page_list *src,
1039                             struct cl_page *page)
1040 {
1041         LASSERT(src->pl_nr > 0);
1042         LINVRNT(dst->pl_owner == current);
1043         LINVRNT(src->pl_owner == current);
1044
1045         ENTRY;
1046         list_move(&page->cp_batch, &dst->pl_pages);
1047         --src->pl_nr;
1048         ++dst->pl_nr;
1049         lu_ref_set_at(&page->cp_reference, &page->cp_queue_ref, "queue",
1050                         src, dst);
1051         EXIT;
1052 }
1053 EXPORT_SYMBOL(cl_page_list_move_head);
1054
1055 /**
1056  * splice the cl_page_list, just as list head does
1057  */
1058 void cl_page_list_splice(struct cl_page_list *list, struct cl_page_list *head)
1059 {
1060         struct cl_page *page;
1061         struct cl_page *tmp;
1062
1063         LINVRNT(list->pl_owner == current);
1064         LINVRNT(head->pl_owner == current);
1065
1066         ENTRY;
1067         cl_page_list_for_each_safe(page, tmp, list)
1068                 cl_page_list_move(head, list, page);
1069         EXIT;
1070 }
1071 EXPORT_SYMBOL(cl_page_list_splice);
1072
1073 /**
1074  * Disowns pages in a queue.
1075  */
1076 void cl_page_list_disown(const struct lu_env *env,
1077                          struct cl_io *io, struct cl_page_list *plist)
1078 {
1079         struct cl_page *page;
1080         struct cl_page *temp;
1081
1082         LINVRNT(plist->pl_owner == current);
1083
1084         ENTRY;
1085         cl_page_list_for_each_safe(page, temp, plist) {
1086                 LASSERT(plist->pl_nr > 0);
1087
1088                 list_del_init(&page->cp_batch);
1089                 --plist->pl_nr;
1090                 /*
1091                  * cl_page_disown0 rather than usual cl_page_disown() is used,
1092                  * because pages are possibly in CPS_FREEING state already due
1093                  * to the call to cl_page_list_discard().
1094                  */
1095                 /*
1096                  * XXX cl_page_disown0() will fail if page is not locked.
1097                  */
1098                 cl_page_disown0(env, io, page);
1099                 lu_ref_del_at(&page->cp_reference, &page->cp_queue_ref, "queue",
1100                               plist);
1101                 cl_page_put(env, page);
1102         }
1103         EXIT;
1104 }
1105 EXPORT_SYMBOL(cl_page_list_disown);
1106
1107 /**
1108  * Releases pages from queue.
1109  */
1110 void cl_page_list_fini(const struct lu_env *env, struct cl_page_list *plist)
1111 {
1112         struct cl_page *page;
1113         struct cl_page *temp;
1114
1115         LINVRNT(plist->pl_owner == current);
1116
1117         ENTRY;
1118         cl_page_list_for_each_safe(page, temp, plist)
1119                 cl_page_list_del(env, plist, page);
1120         LASSERT(plist->pl_nr == 0);
1121         EXIT;
1122 }
1123 EXPORT_SYMBOL(cl_page_list_fini);
1124
1125 /**
1126  * Assumes all pages in a queue.
1127  */
1128 void cl_page_list_assume(const struct lu_env *env,
1129                          struct cl_io *io, struct cl_page_list *plist)
1130 {
1131         struct cl_page *page;
1132
1133         LINVRNT(plist->pl_owner == current);
1134
1135         cl_page_list_for_each(page, plist)
1136                 cl_page_assume(env, io, page);
1137 }
1138
1139 /**
1140  * Discards all pages in a queue.
1141  */
1142 void cl_page_list_discard(const struct lu_env *env, struct cl_io *io,
1143                           struct cl_page_list *plist)
1144 {
1145         struct cl_page *page;
1146
1147         LINVRNT(plist->pl_owner == current);
1148         ENTRY;
1149         cl_page_list_for_each(page, plist)
1150                 cl_page_discard(env, io, page);
1151         EXIT;
1152 }
1153
1154 /**
1155  * Initialize dual page queue.
1156  */
1157 void cl_2queue_init(struct cl_2queue *queue)
1158 {
1159         ENTRY;
1160         cl_page_list_init(&queue->c2_qin);
1161         cl_page_list_init(&queue->c2_qout);
1162         EXIT;
1163 }
1164 EXPORT_SYMBOL(cl_2queue_init);
1165
1166 /**
1167  * Add a page to the incoming page list of 2-queue.
1168  */
1169 void cl_2queue_add(struct cl_2queue *queue, struct cl_page *page)
1170 {
1171         ENTRY;
1172         cl_page_list_add(&queue->c2_qin, page);
1173         EXIT;
1174 }
1175 EXPORT_SYMBOL(cl_2queue_add);
1176
1177 /**
1178  * Disown pages in both lists of a 2-queue.
1179  */
1180 void cl_2queue_disown(const struct lu_env *env,
1181                       struct cl_io *io, struct cl_2queue *queue)
1182 {
1183         ENTRY;
1184         cl_page_list_disown(env, io, &queue->c2_qin);
1185         cl_page_list_disown(env, io, &queue->c2_qout);
1186         EXIT;
1187 }
1188 EXPORT_SYMBOL(cl_2queue_disown);
1189
1190 /**
1191  * Discard (truncate) pages in both lists of a 2-queue.
1192  */
1193 void cl_2queue_discard(const struct lu_env *env,
1194                        struct cl_io *io, struct cl_2queue *queue)
1195 {
1196         ENTRY;
1197         cl_page_list_discard(env, io, &queue->c2_qin);
1198         cl_page_list_discard(env, io, &queue->c2_qout);
1199         EXIT;
1200 }
1201 EXPORT_SYMBOL(cl_2queue_discard);
1202
1203 /**
1204  * Assume to own the pages in cl_2queue
1205  */
1206 void cl_2queue_assume(const struct lu_env *env,
1207                       struct cl_io *io, struct cl_2queue *queue)
1208 {
1209         cl_page_list_assume(env, io, &queue->c2_qin);
1210         cl_page_list_assume(env, io, &queue->c2_qout);
1211 }
1212
1213 /**
1214  * Finalize both page lists of a 2-queue.
1215  */
1216 void cl_2queue_fini(const struct lu_env *env, struct cl_2queue *queue)
1217 {
1218         ENTRY;
1219         cl_page_list_fini(env, &queue->c2_qout);
1220         cl_page_list_fini(env, &queue->c2_qin);
1221         EXIT;
1222 }
1223 EXPORT_SYMBOL(cl_2queue_fini);
1224
1225 /**
1226  * Initialize a 2-queue to contain \a page in its incoming page list.
1227  */
1228 void cl_2queue_init_page(struct cl_2queue *queue, struct cl_page *page)
1229 {
1230         ENTRY;
1231         cl_2queue_init(queue);
1232         cl_2queue_add(queue, page);
1233         EXIT;
1234 }
1235 EXPORT_SYMBOL(cl_2queue_init_page);
1236
1237 /**
1238  * Returns top-level io.
1239  *
1240  * \see cl_object_top()
1241  */
1242 struct cl_io *cl_io_top(struct cl_io *io)
1243 {
1244         ENTRY;
1245         while (io->ci_parent != NULL)
1246                 io = io->ci_parent;
1247         RETURN(io);
1248 }
1249 EXPORT_SYMBOL(cl_io_top);
1250
1251 /**
1252  * Prints human readable representation of \a io to the \a f.
1253  */
1254 void cl_io_print(const struct lu_env *env, void *cookie,
1255                  lu_printer_t printer, const struct cl_io *io)
1256 {
1257 }
1258
1259 /**
1260  * Fills in attributes that are passed to server together with transfer. Only
1261  * attributes from \a flags may be touched. This can be called multiple times
1262  * for the same request.
1263  */
1264 void cl_req_attr_set(const struct lu_env *env, struct cl_object *obj,
1265                      struct cl_req_attr *attr)
1266 {
1267         struct cl_object *scan;
1268         ENTRY;
1269
1270         cl_object_for_each(scan, obj) {
1271                 if (scan->co_ops->coo_req_attr_set != NULL)
1272                         scan->co_ops->coo_req_attr_set(env, scan, attr);
1273         }
1274         EXIT;
1275 }
1276 EXPORT_SYMBOL(cl_req_attr_set);
1277
1278 /* cl_sync_io_callback assumes the caller must call cl_sync_io_wait() to
1279  * wait for the IO to finish. */
1280 void cl_sync_io_end(const struct lu_env *env, struct cl_sync_io *anchor)
1281 {
1282         wake_up_all(&anchor->csi_waitq);
1283
1284         /* it's safe to nuke or reuse anchor now */
1285         atomic_set(&anchor->csi_barrier, 0);
1286 }
1287 EXPORT_SYMBOL(cl_sync_io_end);
1288
1289 /**
1290  * Initialize synchronous io wait anchor
1291  */
1292 void cl_sync_io_init(struct cl_sync_io *anchor, int nr,
1293                      void (*end)(const struct lu_env *, struct cl_sync_io *))
1294 {
1295         ENTRY;
1296         memset(anchor, 0, sizeof(*anchor));
1297         init_waitqueue_head(&anchor->csi_waitq);
1298         atomic_set(&anchor->csi_sync_nr, nr);
1299         atomic_set(&anchor->csi_barrier, nr > 0);
1300         anchor->csi_sync_rc = 0;
1301         anchor->csi_end_io = end;
1302         LASSERT(end != NULL);
1303         EXIT;
1304 }
1305 EXPORT_SYMBOL(cl_sync_io_init);
1306
1307 /**
1308  * Wait until all IO completes. Transfer completion routine has to call
1309  * cl_sync_io_note() for every entity.
1310  */
1311 int cl_sync_io_wait(const struct lu_env *env, struct cl_sync_io *anchor,
1312                     long timeout)
1313 {
1314         struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout),
1315                                                   NULL, NULL, NULL);
1316         int rc;
1317         ENTRY;
1318
1319         LASSERT(timeout >= 0);
1320
1321         rc = l_wait_event(anchor->csi_waitq,
1322                           atomic_read(&anchor->csi_sync_nr) == 0,
1323                           &lwi);
1324         if (rc < 0) {
1325                 CERROR("IO failed: %d, still wait for %d remaining entries\n",
1326                        rc, atomic_read(&anchor->csi_sync_nr));
1327
1328                 lwi = (struct l_wait_info) { 0 };
1329                 (void)l_wait_event(anchor->csi_waitq,
1330                                    atomic_read(&anchor->csi_sync_nr) == 0,
1331                                    &lwi);
1332         } else {
1333                 rc = anchor->csi_sync_rc;
1334         }
1335         LASSERT(atomic_read(&anchor->csi_sync_nr) == 0);
1336
1337         /* wait until cl_sync_io_note() has done wakeup */
1338         while (unlikely(atomic_read(&anchor->csi_barrier) != 0)) {
1339                 cpu_relax();
1340         }
1341         RETURN(rc);
1342 }
1343 EXPORT_SYMBOL(cl_sync_io_wait);
1344
1345 /**
1346  * Indicate that transfer of a single page completed.
1347  */
1348 void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor,
1349                      int ioret)
1350 {
1351         ENTRY;
1352         if (anchor->csi_sync_rc == 0 && ioret < 0)
1353                 anchor->csi_sync_rc = ioret;
1354         /*
1355          * Synchronous IO done without releasing page lock (e.g., as a part of
1356          * ->{prepare,commit}_write(). Completion is used to signal the end of
1357          * IO.
1358          */
1359         LASSERT(atomic_read(&anchor->csi_sync_nr) > 0);
1360         if (atomic_dec_and_test(&anchor->csi_sync_nr)) {
1361                 LASSERT(anchor->csi_end_io != NULL);
1362                 anchor->csi_end_io(env, anchor);
1363                 /* Can't access anchor any more */
1364         }
1365         EXIT;
1366 }
1367 EXPORT_SYMBOL(cl_sync_io_note);