Whamcloud - gitweb
LU-12518 llite: rename count and nob variables to bytes
[fs/lustre-release.git] / lustre / obdclass / cl_io.c
1 // SPDX-License-Identifier: GPL-2.0
2
3 /*
4  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
5  * Use is subject to license terms.
6  *
7  * Copyright (c) 2011, 2017, Intel Corporation.
8  *
9  */
10
11 /*
12  * This file is part of Lustre, http://www.lustre.org/
13  *
14  * Client IO.
15  *
16  * Author: Nikita Danilov <nikita.danilov@sun.com>
17  * Author: Jinshan Xiong <jinshan.xiong@intel.com>
18  *
19  */
20
21 #define DEBUG_SUBSYSTEM S_CLASS
22
23 #include <linux/sched.h>
24 #include <linux/list.h>
25 #include <linux/list_sort.h>
26 #include <obd_class.h>
27 #include <obd_support.h>
28 #include <lustre_fid.h>
29 #include <cl_object.h>
30 #include "cl_internal.h"
31
32 /*
33  * cl_io interface.
34  */
35
36 static inline int cl_io_type_is_valid(enum cl_io_type type)
37 {
38         return CIT_READ <= type && type < CIT_OP_NR;
39 }
40
41 static inline int cl_io_is_loopable(const struct cl_io *io)
42 {
43         return cl_io_type_is_valid(io->ci_type) && io->ci_type != CIT_MISC;
44 }
45
46 /**
47  * cl_io invariant that holds at all times when exported cl_io_*() functions
48  * are entered and left.
49  */
50 static inline int cl_io_invariant(const struct cl_io *io)
51 {
52         /*
53          * io can own pages only when it is ongoing. Sub-io might
54          * still be in CIS_LOCKED state when top-io is in
55          * CIS_IO_GOING.
56          */
57         return ergo(io->ci_owned_nr > 0, io->ci_state == CIS_IO_GOING ||
58                     (io->ci_state == CIS_LOCKED && io->ci_parent != NULL));
59 }
60
61 /**
62  * Finalize \a io, by calling cl_io_operations::cio_fini() bottom-to-top.
63  */
64 void cl_io_fini(const struct lu_env *env, struct cl_io *io)
65 {
66         struct cl_io_slice    *slice;
67
68         LINVRNT(cl_io_type_is_valid(io->ci_type));
69         LINVRNT(cl_io_invariant(io));
70         ENTRY;
71
72         while (!list_empty(&io->ci_layers)) {
73                 slice = container_of(io->ci_layers.prev, struct cl_io_slice,
74                                      cis_linkage);
75                 list_del_init(&slice->cis_linkage);
76                 if (slice->cis_iop->op[io->ci_type].cio_fini != NULL)
77                         slice->cis_iop->op[io->ci_type].cio_fini(env, slice);
78                 /*
79                  * Invalidate slice to catch use after free. This assumes that
80                  * slices are allocated within session and can be touched
81                  * after ->cio_fini() returns.
82                  */
83                 slice->cis_io = NULL;
84         }
85         io->ci_state = CIS_FINI;
86
87         /* sanity check for layout change */
88         switch(io->ci_type) {
89         case CIT_READ:
90         case CIT_WRITE:
91         case CIT_DATA_VERSION:
92         case CIT_FAULT:
93                 break;
94         case CIT_FSYNC:
95                 LASSERT(!io->ci_need_restart);
96                 break;
97         case CIT_SETATTR:
98         case CIT_MISC:
99                 /* Check ignore layout change conf */
100                 LASSERT(ergo(io->ci_ignore_layout || !io->ci_verify_layout,
101                                 !io->ci_need_restart));
102         case CIT_GLIMPSE:
103                 break;
104         case CIT_LADVISE:
105         case CIT_LSEEK:
106                 break;
107         default:
108                 LBUG();
109         }
110         EXIT;
111 }
112 EXPORT_SYMBOL(cl_io_fini);
113
114 static int __cl_io_init(const struct lu_env *env, struct cl_io *io,
115                         enum cl_io_type iot, struct cl_object *obj)
116 {
117         struct cl_object *scan;
118         int result;
119
120         LINVRNT(io->ci_state == CIS_ZERO || io->ci_state == CIS_FINI);
121         LINVRNT(cl_io_type_is_valid(iot));
122         LINVRNT(cl_io_invariant(io));
123         ENTRY;
124
125         io->ci_type = iot;
126         INIT_LIST_HEAD(&io->ci_lockset.cls_todo);
127         INIT_LIST_HEAD(&io->ci_lockset.cls_done);
128         INIT_LIST_HEAD(&io->ci_layers);
129
130         result = 0;
131         cl_object_for_each(scan, obj) {
132                 if (scan->co_ops->coo_io_init != NULL) {
133                         result = scan->co_ops->coo_io_init(env, scan, io);
134                         if (result != 0)
135                                 break;
136                 }
137         }
138         if (result == 0)
139                 io->ci_state = CIS_INIT;
140         RETURN(result);
141 }
142
143 /**
144  * Initialize sub-io, by calling cl_io_operations::cio_init() top-to-bottom.
145  *
146  * \pre obj != cl_object_top(obj)
147  */
148 int cl_io_sub_init(const struct lu_env *env, struct cl_io *io,
149                    enum cl_io_type iot, struct cl_object *obj)
150 {
151         LASSERT(obj != cl_object_top(obj));
152
153         return __cl_io_init(env, io, iot, obj);
154 }
155 EXPORT_SYMBOL(cl_io_sub_init);
156
157 /**
158  * Initialize \a io, by calling cl_io_operations::cio_init() top-to-bottom.
159  *
160  * Caller has to call cl_io_fini() after a call to cl_io_init(), no matter
161  * what the latter returned.
162  *
163  * \pre obj == cl_object_top(obj)
164  * \pre cl_io_type_is_valid(iot)
165  * \post cl_io_type_is_valid(io->ci_type) && io->ci_type == iot
166  */
167 int cl_io_init(const struct lu_env *env, struct cl_io *io,
168                enum cl_io_type iot, struct cl_object *obj)
169 {
170         LASSERT(obj == cl_object_top(obj));
171
172         /* clear I/O restart from previous instance */
173         io->ci_need_restart = 0;
174
175         return __cl_io_init(env, io, iot, obj);
176 }
177 EXPORT_SYMBOL(cl_io_init);
178
179 /**
180  * Initialize read or write io.
181  *
182  * \pre iot == CIT_READ || iot == CIT_WRITE
183  */
184 int cl_io_rw_init(const struct lu_env *env, struct cl_io *io,
185                   enum cl_io_type iot, loff_t pos, size_t bytes)
186 {
187         LINVRNT(iot == CIT_READ || iot == CIT_WRITE);
188         LINVRNT(io->ci_obj != NULL);
189         ENTRY;
190
191         LU_OBJECT_HEADER(D_VFSTRACE, env, &io->ci_obj->co_lu,
192                          "io range: %u [%llu, %llu) %u %u\n",
193                          iot, (__u64)pos, (__u64)pos + bytes,
194                          io->u.ci_rw.crw_nonblock, io->u.ci_wr.wr_append);
195         io->u.ci_rw.crw_pos    = pos;
196         io->u.ci_rw.crw_bytes  = bytes;
197         RETURN(cl_io_init(env, io, iot, io->ci_obj));
198 }
199 EXPORT_SYMBOL(cl_io_rw_init);
200
201 #ifdef HAVE_LIST_CMP_FUNC_T
202 static int cl_lock_descr_cmp(void *priv,
203                              const struct list_head *a,
204                              const struct list_head *b)
205 #else /* !HAVE_LIST_CMP_FUNC_T */
206 static int cl_lock_descr_cmp(void *priv,
207                              struct list_head *a, struct list_head *b)
208 #endif /* HAVE_LIST_CMP_FUNC_T */
209 {
210         const struct cl_io_lock_link *l0 = list_entry(a, struct cl_io_lock_link,
211                                                       cill_linkage);
212         const struct cl_io_lock_link *l1 = list_entry(b, struct cl_io_lock_link,
213                                                       cill_linkage);
214         const struct cl_lock_descr *d0 = &l0->cill_descr;
215         const struct cl_lock_descr *d1 = &l1->cill_descr;
216
217         return lu_fid_cmp(lu_object_fid(&d0->cld_obj->co_lu),
218                           lu_object_fid(&d1->cld_obj->co_lu));
219 }
220
221 static void cl_lock_descr_merge(struct cl_lock_descr *d0,
222                                 const struct cl_lock_descr *d1)
223 {
224         d0->cld_start = min(d0->cld_start, d1->cld_start);
225         d0->cld_end = max(d0->cld_end, d1->cld_end);
226
227         if (d1->cld_mode == CLM_WRITE && d0->cld_mode != CLM_WRITE)
228                 d0->cld_mode = CLM_WRITE;
229
230         if (d1->cld_mode == CLM_GROUP && d0->cld_mode != CLM_GROUP)
231                 d0->cld_mode = CLM_GROUP;
232 }
233
234 static int cl_lockset_merge(const struct cl_lockset *set,
235                             const struct cl_lock_descr *need)
236 {
237         struct cl_io_lock_link *scan;
238
239         ENTRY;
240         list_for_each_entry(scan, &set->cls_todo, cill_linkage) {
241                 if (!cl_object_same(scan->cill_descr.cld_obj, need->cld_obj))
242                         continue;
243
244                 /* Merge locks for the same object because ldlm lock server
245                  * may expand the lock extent, otherwise there is a deadlock
246                  * case if two conflicted locks are queueud for the same object
247                  * and lock server expands one lock to overlap the another.
248                  * The side effect is that it can generate a multi-stripe lock
249                  * that may cause casacading problem */
250                 cl_lock_descr_merge(&scan->cill_descr, need);
251                 CDEBUG(D_VFSTRACE, "lock: %d: [%lu, %lu]\n",
252                        scan->cill_descr.cld_mode, scan->cill_descr.cld_start,
253                        scan->cill_descr.cld_end);
254                 RETURN(+1);
255         }
256         RETURN(0);
257 }
258
259 static int cl_lockset_lock(const struct lu_env *env, struct cl_io *io,
260                            struct cl_lockset *set)
261 {
262         struct cl_io_lock_link *link;
263         struct cl_io_lock_link *temp;
264         int result;
265
266         ENTRY;
267         result = 0;
268         list_for_each_entry_safe(link, temp, &set->cls_todo, cill_linkage) {
269                 result = cl_lock_request(env, io, &link->cill_lock);
270                 if (result < 0)
271                         break;
272
273                 list_move(&link->cill_linkage, &set->cls_done);
274         }
275         RETURN(result);
276 }
277
278 /**
279  * Takes locks necessary for the current iteration of io.
280  *
281  * Calls cl_io_operations::cio_lock() top-to-bottom to collect locks required
282  * by layers for the current iteration. Then sort locks (to avoid dead-locks),
283  * and acquire them.
284  */
285 int cl_io_lock(const struct lu_env *env, struct cl_io *io)
286 {
287         const struct cl_io_slice *scan;
288         int result = 0;
289
290         LINVRNT(cl_io_is_loopable(io));
291         LINVRNT(io->ci_state == CIS_IT_STARTED);
292         LINVRNT(cl_io_invariant(io));
293
294         ENTRY;
295         list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
296                 if (scan->cis_iop->op[io->ci_type].cio_lock == NULL)
297                         continue;
298                 result = scan->cis_iop->op[io->ci_type].cio_lock(env, scan);
299                 if (result != 0)
300                         break;
301         }
302         if (result == 0) {
303                 /*
304                  * Sort locks in lexicographical order of their (fid,
305                  * start-offset) pairs to avoid deadlocks.
306                  */
307                 list_sort(NULL, &io->ci_lockset.cls_todo, cl_lock_descr_cmp);
308                 result = cl_lockset_lock(env, io, &io->ci_lockset);
309         }
310         if (result != 0)
311                 cl_io_unlock(env, io);
312         else
313                 io->ci_state = CIS_LOCKED;
314         RETURN(result);
315 }
316 EXPORT_SYMBOL(cl_io_lock);
317
318 /**
319  * Release locks takes by io.
320  */
321 void cl_io_unlock(const struct lu_env *env, struct cl_io *io)
322 {
323         struct cl_lockset *set;
324         struct cl_io_lock_link *link;
325         struct cl_io_lock_link *temp;
326         const struct cl_io_slice *scan;
327
328         LASSERT(cl_io_is_loopable(io));
329         LASSERT(CIS_IT_STARTED <= io->ci_state && io->ci_state < CIS_UNLOCKED);
330         LINVRNT(cl_io_invariant(io));
331
332         ENTRY;
333         set = &io->ci_lockset;
334
335         list_for_each_entry_safe(link, temp, &set->cls_todo, cill_linkage) {
336                 list_del_init(&link->cill_linkage);
337                 if (link->cill_fini != NULL)
338                         link->cill_fini(env, link);
339         }
340
341         list_for_each_entry_safe(link, temp, &set->cls_done, cill_linkage) {
342                 list_del_init(&link->cill_linkage);
343                 cl_lock_release(env, &link->cill_lock);
344                 if (link->cill_fini != NULL)
345                         link->cill_fini(env, link);
346         }
347
348         list_for_each_entry_reverse(scan, &io->ci_layers, cis_linkage) {
349                 if (scan->cis_iop->op[io->ci_type].cio_unlock != NULL)
350                         scan->cis_iop->op[io->ci_type].cio_unlock(env, scan);
351         }
352         io->ci_state = CIS_UNLOCKED;
353         EXIT;
354 }
355 EXPORT_SYMBOL(cl_io_unlock);
356
357 /**
358  * Prepares next iteration of io.
359  *
360  * Calls cl_io_operations::cio_iter_init() top-to-bottom. This exists to give
361  * layers a chance to modify io parameters, e.g., so that lov can restrict io
362  * to a single stripe.
363  */
364 int cl_io_iter_init(const struct lu_env *env, struct cl_io *io)
365 {
366         const struct cl_io_slice *scan;
367         int result;
368
369         LINVRNT(cl_io_is_loopable(io));
370         LINVRNT(io->ci_state == CIS_INIT || io->ci_state == CIS_IT_ENDED);
371         LINVRNT(cl_io_invariant(io));
372
373         ENTRY;
374         result = 0;
375         list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
376                 if (scan->cis_iop->op[io->ci_type].cio_iter_init == NULL)
377                         continue;
378                 result = scan->cis_iop->op[io->ci_type].cio_iter_init(env,
379                                                                       scan);
380                 if (result != 0)
381                         break;
382         }
383         if (result == 0)
384                 io->ci_state = CIS_IT_STARTED;
385         RETURN(result);
386 }
387 EXPORT_SYMBOL(cl_io_iter_init);
388
389 /**
390  * Finalizes io iteration.
391  *
392  * Calls cl_io_operations::cio_iter_fini() bottom-to-top.
393  */
394 void cl_io_iter_fini(const struct lu_env *env, struct cl_io *io)
395 {
396         const struct cl_io_slice *scan;
397
398         LINVRNT(cl_io_is_loopable(io));
399         LINVRNT(io->ci_state <= CIS_IT_STARTED ||
400                 io->ci_state > CIS_IO_FINISHED);
401         LINVRNT(cl_io_invariant(io));
402
403         ENTRY;
404         list_for_each_entry_reverse(scan, &io->ci_layers, cis_linkage) {
405                 if (scan->cis_iop->op[io->ci_type].cio_iter_fini != NULL)
406                         scan->cis_iop->op[io->ci_type].cio_iter_fini(env, scan);
407         }
408         io->ci_state = CIS_IT_ENDED;
409         EXIT;
410 }
411 EXPORT_SYMBOL(cl_io_iter_fini);
412
413 /**
414  * Records that read or write io progressed \a bytes forward.
415  */
416 void cl_io_rw_advance(const struct lu_env *env, struct cl_io *io, size_t bytes)
417 {
418         const struct cl_io_slice *scan;
419
420         ENTRY;
421
422         LINVRNT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE ||
423                 bytes == 0);
424         LINVRNT(cl_io_is_loopable(io));
425         LINVRNT(cl_io_invariant(io));
426
427         io->u.ci_rw.crw_pos   += bytes;
428         io->u.ci_rw.crw_bytes -= bytes;
429
430         /* layers have to be notified. */
431         list_for_each_entry_reverse(scan, &io->ci_layers, cis_linkage) {
432                 if (scan->cis_iop->op[io->ci_type].cio_advance != NULL)
433                         scan->cis_iop->op[io->ci_type].cio_advance(env, scan,
434                                                                    bytes);
435         }
436         EXIT;
437 }
438
439 /**
440  * Adds a lock to a lockset.
441  */
442 int cl_io_lock_add(const struct lu_env *env, struct cl_io *io,
443                    struct cl_io_lock_link *link)
444 {
445         int result;
446
447         ENTRY;
448         if (cl_lockset_merge(&io->ci_lockset, &link->cill_descr))
449                 result = +1;
450         else {
451                 list_add(&link->cill_linkage, &io->ci_lockset.cls_todo);
452                 result = 0;
453         }
454         RETURN(result);
455 }
456 EXPORT_SYMBOL(cl_io_lock_add);
457
458 static void cl_free_io_lock_link(const struct lu_env *env,
459                                  struct cl_io_lock_link *link)
460 {
461         OBD_FREE_PTR(link);
462 }
463
464 /**
465  * Allocates new lock link, and uses it to add a lock to a lockset.
466  */
467 int cl_io_lock_alloc_add(const struct lu_env *env, struct cl_io *io,
468                          struct cl_lock_descr *descr)
469 {
470         struct cl_io_lock_link *link;
471         int result;
472
473         ENTRY;
474         OBD_ALLOC_PTR(link);
475         if (link != NULL) {
476                 link->cill_descr = *descr;
477                 link->cill_fini  = cl_free_io_lock_link;
478                 result = cl_io_lock_add(env, io, link);
479                 if (result) /* lock match */
480                         link->cill_fini(env, link);
481         } else
482                 result = -ENOMEM;
483
484         RETURN(result);
485 }
486 EXPORT_SYMBOL(cl_io_lock_alloc_add);
487
488 /**
489  * Starts io by calling cl_io_operations::cio_start() top-to-bottom.
490  */
491 int cl_io_start(const struct lu_env *env, struct cl_io *io)
492 {
493         const struct cl_io_slice *scan;
494         int result = 0;
495
496         LINVRNT(cl_io_is_loopable(io));
497         LINVRNT(io->ci_state == CIS_LOCKED);
498         LINVRNT(cl_io_invariant(io));
499         ENTRY;
500
501         io->ci_state = CIS_IO_GOING;
502         list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
503                 if (scan->cis_iop->op[io->ci_type].cio_start == NULL)
504                         continue;
505                 result = scan->cis_iop->op[io->ci_type].cio_start(env, scan);
506                 if (result != 0)
507                         break;
508         }
509         if (result >= 0)
510                 result = 0;
511         RETURN(result);
512 }
513 EXPORT_SYMBOL(cl_io_start);
514
515 /**
516  * Wait until current io iteration is finished by calling
517  * cl_io_operations::cio_end() bottom-to-top.
518  */
519 void cl_io_end(const struct lu_env *env, struct cl_io *io)
520 {
521         const struct cl_io_slice *scan;
522
523         LINVRNT(cl_io_is_loopable(io));
524         LINVRNT(io->ci_state == CIS_IO_GOING);
525         LINVRNT(cl_io_invariant(io));
526         ENTRY;
527
528         list_for_each_entry_reverse(scan, &io->ci_layers, cis_linkage) {
529                 if (scan->cis_iop->op[io->ci_type].cio_end != NULL)
530                         scan->cis_iop->op[io->ci_type].cio_end(env, scan);
531                 /* TODO: error handling. */
532         }
533         io->ci_state = CIS_IO_FINISHED;
534         EXIT;
535 }
536 EXPORT_SYMBOL(cl_io_end);
537
538 /**
539  * Called by read io, to decide the readahead extent
540  *
541  * \see cl_io_operations::cio_read_ahead()
542  */
543 int cl_io_read_ahead(const struct lu_env *env, struct cl_io *io,
544                      pgoff_t start, struct cl_read_ahead *ra)
545 {
546         const struct cl_io_slice *scan;
547         int result = 0;
548
549         LINVRNT(io->ci_type == CIT_READ ||
550                 io->ci_type == CIT_FAULT ||
551                 io->ci_type == CIT_WRITE);
552         LINVRNT(io->ci_state == CIS_IO_GOING || io->ci_state == CIS_LOCKED);
553         LINVRNT(cl_io_invariant(io));
554         ENTRY;
555
556         list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
557                 if (scan->cis_iop->cio_read_ahead == NULL)
558                         continue;
559
560                 result = scan->cis_iop->cio_read_ahead(env, scan, start, ra);
561                 if (result != 0)
562                         break;
563         }
564         RETURN(result > 0 ? 0 : result);
565 }
566 EXPORT_SYMBOL(cl_io_read_ahead);
567
568 /**
569  * Called before io start, to reserve enough LRU slots to avoid
570  * deadlock.
571  *
572  * \see cl_io_operations::cio_lru_reserve()
573  */
574 int cl_io_lru_reserve(const struct lu_env *env, struct cl_io *io,
575                       loff_t pos, size_t bytes)
576 {
577         const struct cl_io_slice *scan;
578         int result = 0;
579
580         LINVRNT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
581         LINVRNT(cl_io_invariant(io));
582         ENTRY;
583
584         list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
585                 if (scan->cis_iop->cio_lru_reserve) {
586                         result = scan->cis_iop->cio_lru_reserve(env, scan,
587                                                                 pos, bytes);
588                         if (result)
589                                 break;
590                 }
591         }
592
593         RETURN(result);
594 }
595 EXPORT_SYMBOL(cl_io_lru_reserve);
596
597 /**
598  * Commit a list of contiguous pages into writeback cache.
599  *
600  * \returns 0 if all pages committed, or errcode if error occurred.
601  * \see cl_io_operations::cio_commit_async()
602  */
603 int cl_io_commit_async(const struct lu_env *env, struct cl_io *io,
604                        struct cl_page_list *queue, int from, int to,
605                        cl_commit_cbt cb)
606 {
607         const struct cl_io_slice *scan;
608         int result = 0;
609         ENTRY;
610
611         list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
612                 if (scan->cis_iop->cio_commit_async == NULL)
613                         continue;
614                 result = scan->cis_iop->cio_commit_async(env, scan, queue,
615                                                          from, to, cb);
616                 if (result != 0)
617                         break;
618         }
619         RETURN(result);
620 }
621 EXPORT_SYMBOL(cl_io_commit_async);
622
623 void cl_io_extent_release(const struct lu_env *env, struct cl_io *io)
624 {
625         const struct cl_io_slice *scan;
626         ENTRY;
627
628         list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
629                 if (scan->cis_iop->cio_extent_release == NULL)
630                         continue;
631                 scan->cis_iop->cio_extent_release(env, scan);
632         }
633         EXIT;
634 }
635 EXPORT_SYMBOL(cl_io_extent_release);
636
637 /**
638  * Submits a list of pages for immediate io.
639  *
640  * After the function gets returned, The submitted pages are moved to
641  * queue->c2_qout queue, and queue->c2_qin contain both the pages don't need
642  * to be submitted, and the pages are errant to submit.
643  *
644  * \returns 0 if at least one page was submitted, error code otherwise.
645  * \see cl_io_operations::cio_submit()
646  */
647 int cl_io_submit_rw(const struct lu_env *env, struct cl_io *io,
648                     enum cl_req_type crt, struct cl_2queue *queue)
649 {
650         const struct cl_io_slice *scan;
651         int result = 0;
652         ENTRY;
653
654         list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
655                 if (scan->cis_iop->cio_submit == NULL)
656                         continue;
657                 result = scan->cis_iop->cio_submit(env, scan, crt, queue);
658                 if (result != 0)
659                         break;
660         }
661         /*
662          * If ->cio_submit() failed, no pages were sent.
663          */
664         LASSERT(ergo(result != 0, list_empty(&queue->c2_qout.pl_pages)));
665         RETURN(result);
666 }
667 EXPORT_SYMBOL(cl_io_submit_rw);
668
669 /**
670  * Submit a sync_io and wait for the IO to be finished, or error happens.
671  * If \a timeout is zero, it means to wait for the IO unconditionally.
672  */
673 int cl_io_submit_sync(const struct lu_env *env, struct cl_io *io,
674                       enum cl_req_type iot, struct cl_2queue *queue,
675                       long timeout)
676 {
677         struct cl_sync_io *anchor = &cl_env_info(env)->clt_anchor;
678         struct cl_page *pg;
679         int rc;
680         ENTRY;
681
682         cl_page_list_for_each(pg, &queue->c2_qin) {
683                 LASSERT(pg->cp_sync_io == NULL);
684                 pg->cp_sync_io = anchor;
685         }
686
687         cl_sync_io_init(anchor, queue->c2_qin.pl_nr);
688         rc = cl_io_submit_rw(env, io, iot, queue);
689         if (rc == 0) {
690                 /*
691                  * If some pages weren't sent for any reason (e.g.,
692                  * read found up-to-date pages in the cache, or write found
693                  * clean pages), count them as completed to avoid infinite
694                  * wait.
695                  */
696                 cl_page_list_for_each(pg, &queue->c2_qin) {
697                         pg->cp_sync_io = NULL;
698                         cl_sync_io_note(env, anchor, 1);
699                 }
700
701                 /* wait for the IO to be finished. */
702                 rc = cl_sync_io_wait(env, anchor, timeout);
703                 cl_page_list_assume(env, io, &queue->c2_qout);
704         } else {
705                 LASSERT(list_empty(&queue->c2_qout.pl_pages));
706                 cl_page_list_for_each(pg, &queue->c2_qin)
707                         pg->cp_sync_io = NULL;
708         }
709         RETURN(rc);
710 }
711 EXPORT_SYMBOL(cl_io_submit_sync);
712
713 /**
714  * Main io loop.
715  *
716  * Pumps io through iterations calling
717  *
718  *    - cl_io_iter_init()
719  *
720  *    - cl_io_lock()
721  *
722  *    - cl_io_start()
723  *
724  *    - cl_io_end()
725  *
726  *    - cl_io_unlock()
727  *
728  *    - cl_io_iter_fini()
729  *
730  * repeatedly until there is no more io to do.
731  */
732 int cl_io_loop(const struct lu_env *env, struct cl_io *io)
733 {
734         int result = 0;
735         int rc = 0;
736
737         LINVRNT(cl_io_is_loopable(io));
738         ENTRY;
739
740         do {
741                 size_t bytes;
742
743                 io->ci_continue = 0;
744                 result = cl_io_iter_init(env, io);
745                 if (result == 0) {
746                         bytes = io->ci_bytes;
747                         result = cl_io_lock(env, io);
748                         if (result == 0) {
749                                 /*
750                                  * Notify layers that locks has been taken,
751                                  * and do actual i/o.
752                                  *
753                                  *   - llite: kms, short read;
754                                  *   - llite: generic_file_read();
755                                  */
756                                 result = cl_io_start(env, io);
757                                 /*
758                                  * Send any remaining pending
759                                  * io, etc.
760                                  *
761                                  **   - llite: ll_rw_stats_tally.
762                                  */
763                                 cl_io_end(env, io);
764                                 cl_io_unlock(env, io);
765                                 cl_io_rw_advance(env, io, io->ci_bytes - bytes);
766                         }
767                 }
768                 cl_io_iter_fini(env, io);
769                 if (result)
770                         rc = result;
771         } while ((result == 0 || result == -EIOCBQUEUED) &&
772                  io->ci_continue);
773
774         if (rc && !result)
775                 result = rc;
776
777         if (result == -EAGAIN && io->ci_ndelay && !io->ci_iocb_nowait) {
778                 io->ci_need_restart = 1;
779                 result = 0;
780         }
781
782         if (result == 0)
783                 result = io->ci_result;
784         RETURN(result < 0 ? result : 0);
785 }
786 EXPORT_SYMBOL(cl_io_loop);
787
788 /**
789  * Adds io slice to the cl_io.
790  *
791  * This is called by cl_object_operations::coo_io_init() methods to add a
792  * per-layer state to the io. New state is added at the end of
793  * cl_io::ci_layers list, that is, it is at the bottom of the stack.
794  *
795  * \see cl_lock_slice_add(), cl_req_slice_add(), cl_page_slice_add()
796  */
797 void cl_io_slice_add(struct cl_io *io, struct cl_io_slice *slice,
798                      struct cl_object *obj,
799                      const struct cl_io_operations *ops)
800 {
801         struct list_head *linkage = &slice->cis_linkage;
802
803         LASSERT((linkage->prev == NULL && linkage->next == NULL) ||
804                 list_empty(linkage));
805         ENTRY;
806
807         list_add_tail(linkage, &io->ci_layers);
808         slice->cis_io  = io;
809         slice->cis_obj = obj;
810         slice->cis_iop = ops;
811         EXIT;
812 }
813 EXPORT_SYMBOL(cl_io_slice_add);
814
815
816 /**
817  * Initializes page list.
818  */
819 void cl_page_list_init(struct cl_page_list *plist)
820 {
821         ENTRY;
822         plist->pl_nr = 0;
823         INIT_LIST_HEAD(&plist->pl_pages);
824         EXIT;
825 }
826 EXPORT_SYMBOL(cl_page_list_init);
827
828 /**
829  * Adds a page to a page list.
830  */
831 void cl_page_list_add(struct cl_page_list *plist, struct cl_page *page,
832                       bool get_ref)
833 {
834         ENTRY;
835         /* it would be better to check that page is owned by "current" io, but
836          * it is not passed here. */
837         LASSERT(page->cp_owner != NULL);
838
839         LASSERT(list_empty(&page->cp_batch));
840         list_add_tail(&page->cp_batch, &plist->pl_pages);
841         ++plist->pl_nr;
842         lu_ref_add_at(&page->cp_reference, &page->cp_queue_ref, "queue", plist);
843         if (get_ref)
844                 cl_page_get(page);
845         EXIT;
846 }
847 EXPORT_SYMBOL(cl_page_list_add);
848
849 /**
850  * Removes a page from a page list.
851  */
852 void cl_page_list_del(const struct lu_env *env,
853                       struct cl_page_list *plist, struct cl_page *page)
854 {
855         LASSERT(plist->pl_nr > 0);
856
857         ENTRY;
858         list_del_init(&page->cp_batch);
859         --plist->pl_nr;
860         lu_ref_del_at(&page->cp_reference, &page->cp_queue_ref, "queue", plist);
861         cl_page_put(env, page);
862         EXIT;
863 }
864 EXPORT_SYMBOL(cl_page_list_del);
865
866 /**
867  * Moves a page from one page list to another.
868  */
869 void cl_page_list_move(struct cl_page_list *dst, struct cl_page_list *src,
870                        struct cl_page *page)
871 {
872         LASSERT(src->pl_nr > 0);
873
874         ENTRY;
875         list_move_tail(&page->cp_batch, &dst->pl_pages);
876         --src->pl_nr;
877         ++dst->pl_nr;
878         lu_ref_set_at(&page->cp_reference, &page->cp_queue_ref, "queue",
879                       src, dst);
880         EXIT;
881 }
882 EXPORT_SYMBOL(cl_page_list_move);
883
884 /**
885  * Moves a page from one page list to the head of another list.
886  */
887 void cl_page_list_move_head(struct cl_page_list *dst, struct cl_page_list *src,
888                             struct cl_page *page)
889 {
890         LASSERT(src->pl_nr > 0);
891
892         ENTRY;
893         list_move(&page->cp_batch, &dst->pl_pages);
894         --src->pl_nr;
895         ++dst->pl_nr;
896         lu_ref_set_at(&page->cp_reference, &page->cp_queue_ref, "queue",
897                         src, dst);
898         EXIT;
899 }
900 EXPORT_SYMBOL(cl_page_list_move_head);
901
902 /**
903  * splice the cl_page_list, just as list head does
904  */
905 void cl_page_list_splice(struct cl_page_list *src, struct cl_page_list *dst)
906 {
907 #ifdef CONFIG_LUSTRE_DEBUG_LU_REF
908         struct cl_page *page;
909         struct cl_page *tmp;
910
911         ENTRY;
912         cl_page_list_for_each_safe(page, tmp, src)
913                 lu_ref_set_at(&page->cp_reference, &page->cp_queue_ref,
914                               "queue", src, dst);
915 #else
916         ENTRY;
917 #endif
918         dst->pl_nr += src->pl_nr;
919         src->pl_nr = 0;
920         list_splice_tail_init(&src->pl_pages, &dst->pl_pages);
921
922         EXIT;
923 }
924 EXPORT_SYMBOL(cl_page_list_splice);
925
926 /**
927  * Disowns pages in a queue.
928  */
929 void cl_page_list_disown(const struct lu_env *env, struct cl_page_list *plist)
930 {
931         struct cl_page *page;
932         struct cl_page *temp;
933
934         ENTRY;
935         cl_page_list_for_each_safe(page, temp, plist) {
936                 LASSERT(plist->pl_nr > 0);
937
938                 list_del_init(&page->cp_batch);
939                 --plist->pl_nr;
940                 /*
941                  * __cl_page_disown rather than usual cl_page_disown() is used,
942                  * because pages are possibly in CPS_FREEING state already due
943                  * to the call to cl_page_list_discard().
944                  */
945                 /*
946                  * XXX __cl_page_disown() will fail if page is not locked.
947                  */
948                 __cl_page_disown(env, page);
949                 lu_ref_del_at(&page->cp_reference, &page->cp_queue_ref, "queue",
950                               plist);
951                 cl_page_put(env, page);
952         }
953         EXIT;
954 }
955 EXPORT_SYMBOL(cl_page_list_disown);
956
957 /**
958  * Releases pages from queue.
959  */
960 void cl_page_list_fini(const struct lu_env *env, struct cl_page_list *plist)
961 {
962         struct cl_page *page;
963         struct cl_page *temp;
964
965         ENTRY;
966         cl_page_list_for_each_safe(page, temp, plist)
967                 cl_page_list_del(env, plist, page);
968         LASSERT(plist->pl_nr == 0);
969         EXIT;
970 }
971 EXPORT_SYMBOL(cl_page_list_fini);
972
973 /**
974  * Assumes all pages in a queue.
975  */
976 void cl_page_list_assume(const struct lu_env *env,
977                          struct cl_io *io, struct cl_page_list *plist)
978 {
979         struct cl_page *page;
980
981         cl_page_list_for_each(page, plist)
982                 cl_page_assume(env, io, page);
983 }
984
985 /**
986  * Discards all pages in a queue.
987  */
988 void cl_page_list_discard(const struct lu_env *env, struct cl_io *io,
989                           struct cl_page_list *plist)
990 {
991         struct cl_page *page;
992
993         ENTRY;
994         cl_page_list_for_each(page, plist)
995                 cl_page_discard(env, io, page);
996         EXIT;
997 }
998 EXPORT_SYMBOL(cl_page_list_discard);
999
1000 /**
1001  * Initialize dual page queue.
1002  */
1003 void cl_2queue_init(struct cl_2queue *queue)
1004 {
1005         ENTRY;
1006         cl_page_list_init(&queue->c2_qin);
1007         cl_page_list_init(&queue->c2_qout);
1008         EXIT;
1009 }
1010 EXPORT_SYMBOL(cl_2queue_init);
1011
1012 /**
1013  * Disown pages in both lists of a 2-queue.
1014  */
1015 void cl_2queue_disown(const struct lu_env *env, struct cl_2queue *queue)
1016 {
1017         ENTRY;
1018         cl_page_list_disown(env, &queue->c2_qin);
1019         cl_page_list_disown(env, &queue->c2_qout);
1020         EXIT;
1021 }
1022 EXPORT_SYMBOL(cl_2queue_disown);
1023
1024 /**
1025  * Discard (truncate) pages in both lists of a 2-queue.
1026  */
1027 void cl_2queue_discard(const struct lu_env *env,
1028                        struct cl_io *io, struct cl_2queue *queue)
1029 {
1030         ENTRY;
1031         cl_page_list_discard(env, io, &queue->c2_qin);
1032         cl_page_list_discard(env, io, &queue->c2_qout);
1033         EXIT;
1034 }
1035 EXPORT_SYMBOL(cl_2queue_discard);
1036
1037 /**
1038  * Assume to own the pages in cl_2queue
1039  */
1040 void cl_2queue_assume(const struct lu_env *env,
1041                       struct cl_io *io, struct cl_2queue *queue)
1042 {
1043         cl_page_list_assume(env, io, &queue->c2_qin);
1044         cl_page_list_assume(env, io, &queue->c2_qout);
1045 }
1046
1047 /**
1048  * Finalize both page lists of a 2-queue.
1049  */
1050 void cl_2queue_fini(const struct lu_env *env, struct cl_2queue *queue)
1051 {
1052         ENTRY;
1053         cl_page_list_fini(env, &queue->c2_qout);
1054         cl_page_list_fini(env, &queue->c2_qin);
1055         EXIT;
1056 }
1057 EXPORT_SYMBOL(cl_2queue_fini);
1058
1059 /**
1060  * Initialize a 2-queue to contain \a page in its incoming page list.
1061  */
1062 void cl_2queue_init_page(struct cl_2queue *queue, struct cl_page *page)
1063 {
1064         ENTRY;
1065         cl_2queue_init(queue);
1066         /*
1067          * Add a page to the incoming page list of 2-queue.
1068          */
1069         cl_page_list_add(&queue->c2_qin, page, true);
1070         EXIT;
1071 }
1072 EXPORT_SYMBOL(cl_2queue_init_page);
1073
1074 /**
1075  * Returns top-level io.
1076  *
1077  * \see cl_object_top()
1078  */
1079 struct cl_io *cl_io_top(struct cl_io *io)
1080 {
1081         ENTRY;
1082         while (io->ci_parent != NULL)
1083                 io = io->ci_parent;
1084         RETURN(io);
1085 }
1086 EXPORT_SYMBOL(cl_io_top);
1087
1088 /**
1089  * Fills in attributes that are passed to server together with transfer. Only
1090  * attributes from \a flags may be touched. This can be called multiple times
1091  * for the same request.
1092  */
1093 void cl_req_attr_set(const struct lu_env *env, struct cl_object *obj,
1094                      struct cl_req_attr *attr)
1095 {
1096         struct cl_object *scan;
1097         ENTRY;
1098
1099         cl_object_for_each(scan, obj) {
1100                 if (scan->co_ops->coo_req_attr_set != NULL)
1101                         scan->co_ops->coo_req_attr_set(env, scan, attr);
1102         }
1103         EXIT;
1104 }
1105 EXPORT_SYMBOL(cl_req_attr_set);
1106
1107 /**
1108  * Initialize synchronous io wait \a anchor for \a nr pages with optional
1109  * \a end handler.
1110  * \param anchor owned by caller, initialzied here.
1111  * \param nr number of pages initally pending in sync.
1112  * \param end optional callback sync_io completion, can be used to
1113  *  trigger erasure coding, integrity, dedupe, or similar operation.
1114  * \q end is called with a spinlock on anchor->csi_waitq.lock
1115  */
1116 void cl_sync_io_init_notify(struct cl_sync_io *anchor, int nr,
1117                             void *dio_aio, cl_sync_io_end_t *end)
1118 {
1119         ENTRY;
1120         memset(anchor, 0, sizeof(*anchor));
1121         init_waitqueue_head(&anchor->csi_waitq);
1122         atomic_set(&anchor->csi_sync_nr, nr);
1123         atomic_set(&anchor->csi_complete, 0);
1124         anchor->csi_sync_rc = 0;
1125         anchor->csi_end_io = end;
1126         anchor->csi_dio_aio = dio_aio;
1127         EXIT;
1128 }
1129 EXPORT_SYMBOL(cl_sync_io_init_notify);
1130
1131 /**
1132  * Wait until all IO completes. Transfer completion routine has to call
1133  * cl_sync_io_note() for every entity.
1134  */
1135 int cl_sync_io_wait(const struct lu_env *env, struct cl_sync_io *anchor,
1136                     long timeout)
1137 {
1138         int rc = 0;
1139         ENTRY;
1140
1141         LASSERT(timeout >= 0);
1142
1143         if (timeout > 0 &&
1144             wait_event_idle_timeout(anchor->csi_waitq,
1145                                     atomic_read(&anchor->csi_complete) == 1,
1146                                     cfs_time_seconds(timeout)) == 0) {
1147                 rc = -ETIMEDOUT;
1148                 CERROR("IO failed: %d, still wait for %d remaining entries\n",
1149                        rc, atomic_read(&anchor->csi_complete));
1150         }
1151
1152         wait_event_idle(anchor->csi_waitq,
1153                         atomic_read(&anchor->csi_complete) == 1);
1154         if (!rc)
1155                 rc = anchor->csi_sync_rc;
1156
1157         /* We take the lock to ensure that cl_sync_io_note() has finished */
1158         spin_lock(&anchor->csi_waitq.lock);
1159         LASSERT(atomic_read(&anchor->csi_sync_nr) == 0);
1160         LASSERT(atomic_read(&anchor->csi_complete) == 1);
1161         spin_unlock(&anchor->csi_waitq.lock);
1162
1163         RETURN(rc);
1164 }
1165 EXPORT_SYMBOL(cl_sync_io_wait);
1166
1167 static inline void dio_aio_complete(struct kiocb *iocb, ssize_t res)
1168 {
1169 #ifdef HAVE_AIO_COMPLETE
1170         aio_complete(iocb, res, 0);
1171 #else
1172         if (iocb->ki_complete)
1173 # ifdef HAVE_KIOCB_COMPLETE_2ARGS
1174                 iocb->ki_complete(iocb, res);
1175 # else
1176                 iocb->ki_complete(iocb, res, 0);
1177 # endif
1178 #endif
1179 }
1180
1181 static void cl_dio_aio_end(const struct lu_env *env, struct cl_sync_io *anchor)
1182 {
1183         struct cl_dio_aio *aio = container_of(anchor, typeof(*aio), cda_sync);
1184         ssize_t ret = anchor->csi_sync_rc;
1185
1186         ENTRY;
1187
1188         if (!aio->cda_no_aio_complete)
1189                 dio_aio_complete(aio->cda_iocb, ret ?: aio->cda_bytes);
1190
1191         EXIT;
1192 }
1193
1194 static void cl_sub_dio_end(const struct lu_env *env, struct cl_sync_io *anchor)
1195 {
1196         struct cl_sub_dio *sdio = container_of(anchor, typeof(*sdio), csd_sync);
1197         ssize_t ret = anchor->csi_sync_rc;
1198
1199         ENTRY;
1200
1201         /* release pages */
1202         while (sdio->csd_pages.pl_nr > 0) {
1203                 struct cl_page *page = cl_page_list_first(&sdio->csd_pages);
1204
1205                 cl_page_delete(env, page);
1206                 cl_page_list_del(env, &sdio->csd_pages, page);
1207         }
1208
1209         ll_release_user_pages(sdio->csd_dio_pages.ldp_pages,
1210                               sdio->csd_dio_pages.ldp_count);
1211         cl_sync_io_note(env, &sdio->csd_ll_aio->cda_sync, ret);
1212
1213         EXIT;
1214 }
1215
1216 struct cl_dio_aio *cl_dio_aio_alloc(struct kiocb *iocb, struct cl_object *obj,
1217                                     bool is_aio)
1218 {
1219         struct cl_dio_aio *aio;
1220
1221         OBD_SLAB_ALLOC_PTR_GFP(aio, cl_dio_aio_kmem, GFP_NOFS);
1222         if (aio != NULL) {
1223                 /*
1224                  * Hold one ref so that it won't be released until
1225                  * every pages is added.
1226                  */
1227                 cl_sync_io_init_notify(&aio->cda_sync, 1, aio, cl_dio_aio_end);
1228                 aio->cda_iocb = iocb;
1229                 aio->cda_no_aio_complete = !is_aio;
1230                 /* if this is true AIO, the memory is freed by the last call
1231                  * to cl_sync_io_note (when all the I/O is complete), because
1232                  * no one is waiting (in the kernel) for this to complete
1233                  *
1234                  * in other cases, the last user is cl_sync_io_wait, and in
1235                  * that case, the creator frees the struct after that call
1236                  */
1237                 aio->cda_creator_free = !is_aio;
1238
1239                 cl_object_get(obj);
1240                 aio->cda_obj = obj;
1241                 aio->cda_mm = get_task_mm(current);
1242         }
1243         return aio;
1244 }
1245 EXPORT_SYMBOL(cl_dio_aio_alloc);
1246
1247 struct cl_sub_dio *cl_sub_dio_alloc(struct cl_dio_aio *ll_aio,
1248                                     struct iov_iter *iter, bool write,
1249                                     bool sync)
1250 {
1251         struct cl_sub_dio *sdio;
1252
1253         OBD_SLAB_ALLOC_PTR_GFP(sdio, cl_sub_dio_kmem, GFP_NOFS);
1254         if (sdio != NULL) {
1255                 /*
1256                  * Hold one ref so that it won't be released until
1257                  * every pages is added.
1258                  */
1259                 cl_sync_io_init_notify(&sdio->csd_sync, 1, sdio,
1260                                        cl_sub_dio_end);
1261                 cl_page_list_init(&sdio->csd_pages);
1262
1263                 sdio->csd_ll_aio = ll_aio;
1264                 atomic_add(1,  &ll_aio->cda_sync.csi_sync_nr);
1265                 sdio->csd_creator_free = sync;
1266                 sdio->csd_write = write;
1267
1268                 /* we need to make a copy of the user iovec at this point in
1269                  * time so we:
1270                  * A) have the correct state of the iovec for this chunk of I/O
1271                  * B) have a chunk-local copy; some of the things we want to
1272                  * do to the iovec modify it, so to process each chunk from a
1273                  * separate thread requires a local copy of the iovec
1274                  */
1275                 memcpy(&sdio->csd_iter, iter,
1276                        sizeof(struct iov_iter));
1277                 OBD_ALLOC_PTR(sdio->csd_iter.iov);
1278                 if (sdio->csd_iter.iov == NULL) {
1279                         cl_sub_dio_free(sdio);
1280                         sdio = NULL;
1281                         goto out;
1282                 }
1283                 memcpy((void *) sdio->csd_iter.iov, iter->iov,
1284                        sizeof(struct iovec));
1285         }
1286 out:
1287         return sdio;
1288 }
1289 EXPORT_SYMBOL(cl_sub_dio_alloc);
1290
1291 void cl_dio_aio_free(const struct lu_env *env, struct cl_dio_aio *aio)
1292 {
1293         if (aio) {
1294                 if (aio->cda_mm)
1295                         mmput(aio->cda_mm);
1296                 cl_object_put(env, aio->cda_obj);
1297                 OBD_SLAB_FREE_PTR(aio, cl_dio_aio_kmem);
1298         }
1299 }
1300 EXPORT_SYMBOL(cl_dio_aio_free);
1301
1302 void cl_sub_dio_free(struct cl_sub_dio *sdio)
1303 {
1304         if (sdio) {
1305                 void *tmp = (void *) sdio->csd_iter.iov;
1306
1307                 if (tmp)
1308                         OBD_FREE(tmp, sizeof(struct iovec));
1309                 OBD_SLAB_FREE_PTR(sdio, cl_sub_dio_kmem);
1310         }
1311 }
1312 EXPORT_SYMBOL(cl_sub_dio_free);
1313
1314 /*
1315  * ll_release_user_pages - tear down page struct array
1316  * @pages: array of page struct pointers underlying target buffer
1317  */
1318 void ll_release_user_pages(struct page **pages, int npages)
1319 {
1320         int i;
1321
1322         if (npages == 0) {
1323                 LASSERT(!pages);
1324                 return;
1325         }
1326
1327         for (i = 0; i < npages; i++) {
1328                 if (!pages[i])
1329                         break;
1330                 put_page(pages[i]);
1331         }
1332
1333 #if defined(HAVE_DIO_ITER)
1334         kvfree(pages);
1335 #else
1336         OBD_FREE_PTR_ARRAY_LARGE(pages, npages);
1337 #endif
1338 }
1339 EXPORT_SYMBOL(ll_release_user_pages);
1340
1341 /**
1342  * Indicate that transfer of a single page completed.
1343  */
1344 void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor,
1345                      int ioret)
1346 {
1347         ENTRY;
1348
1349         if (anchor->csi_sync_rc == 0 && ioret < 0)
1350                 anchor->csi_sync_rc = ioret;
1351         /*
1352          * Synchronous IO done without releasing page lock (e.g., as a part of
1353          * ->{prepare,commit}_write(). Completion is used to signal the end of
1354          * IO.
1355          */
1356         LASSERT(atomic_read(&anchor->csi_sync_nr) > 0);
1357         LASSERT(atomic_read(&anchor->csi_complete) == 0);
1358         if (atomic_dec_and_lock(&anchor->csi_sync_nr,
1359                                 &anchor->csi_waitq.lock)) {
1360                 struct cl_sub_dio *sub_dio_aio = NULL;
1361                 struct cl_dio_aio *dio_aio = NULL;
1362                 void *csi_dio_aio = NULL;
1363                 bool creator_free = true;
1364
1365                 cl_sync_io_end_t *end_io = anchor->csi_end_io;
1366
1367                 spin_unlock(&anchor->csi_waitq.lock);
1368                 /* we cannot do end_io while holding a spin lock, because
1369                  * end_io may sleep
1370                  */
1371                 if (end_io)
1372                         end_io(env, anchor);
1373
1374                 spin_lock(&anchor->csi_waitq.lock);
1375                 /* this tells the waiters we've completed, and can only be set
1376                  * after end_io() has been called and while we're holding the
1377                  * spinlock
1378                  */
1379                 atomic_set(&anchor->csi_complete, 1);
1380                 /*
1381                  * Holding the lock across both the decrement and
1382                  * the wakeup ensures cl_sync_io_wait() doesn't complete
1383                  * before the wakeup completes and the contents of
1384                  * of anchor become unsafe to access as the owner is free
1385                  * to immediately reclaim anchor when cl_sync_io_wait()
1386                  * completes.
1387                  */
1388                 wake_up_locked(&anchor->csi_waitq);
1389
1390                 csi_dio_aio = anchor->csi_dio_aio;
1391                 sub_dio_aio = csi_dio_aio;
1392                 dio_aio = csi_dio_aio;
1393
1394                 if (csi_dio_aio && end_io == cl_dio_aio_end)
1395                         creator_free = dio_aio->cda_creator_free;
1396                 else if (csi_dio_aio && end_io == cl_sub_dio_end)
1397                         creator_free = sub_dio_aio->csd_creator_free;
1398
1399                 spin_unlock(&anchor->csi_waitq.lock);
1400
1401                 if (csi_dio_aio && !creator_free) {
1402                         if (end_io == cl_dio_aio_end)
1403                                 cl_dio_aio_free(env, dio_aio);
1404                         else if (end_io == cl_sub_dio_end)
1405                                 cl_sub_dio_free(sub_dio_aio);
1406                 }
1407         }
1408         EXIT;
1409 }
1410 EXPORT_SYMBOL(cl_sync_io_note);
1411
1412 /* this function waits for completion of outstanding io and then re-initializes
1413  * the anchor used to track it.  This is used to wait to complete DIO before
1414  * returning to userspace, and is never called for true AIO
1415  */
1416 int cl_sync_io_wait_recycle(const struct lu_env *env, struct cl_sync_io *anchor,
1417                             long timeout, int ioret)
1418 {
1419         int rc = 0;
1420
1421         /*
1422          * @anchor was inited as 1 to prevent end_io to be
1423          * called before we add all pages for IO, so drop
1424          * one extra reference to make sure we could wait
1425          * count to be zero.
1426          */
1427         cl_sync_io_note(env, anchor, ioret);
1428         /* Wait for completion of outstanding dio before re-initializing for
1429          * possible restart
1430          */
1431         rc = cl_sync_io_wait(env, anchor, timeout);
1432         /**
1433          * One extra reference again, as if @anchor is
1434          * reused we assume it as 1 before using.
1435          */
1436         atomic_add(1, &anchor->csi_sync_nr);
1437         /* we must also set this anchor as incomplete */
1438         atomic_set(&anchor->csi_complete, 0);
1439
1440         return rc;
1441 }
1442 EXPORT_SYMBOL(cl_sync_io_wait_recycle);