Whamcloud - gitweb
6d811d5f1dd3346ee194cd5e476c258623fad453
[fs/lustre-release.git] / lustre / obdclass / cl_io.c
1 // SPDX-License-Identifier: GPL-2.0
2
3 /*
4  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
5  * Use is subject to license terms.
6  *
7  * Copyright (c) 2011, 2017, Intel Corporation.
8  *
9  */
10
11 /*
12  * This file is part of Lustre, http://www.lustre.org/
13  *
14  * Client IO.
15  *
16  * Author: Nikita Danilov <nikita.danilov@sun.com>
17  * Author: Jinshan Xiong <jinshan.xiong@intel.com>
18  *
19  */
20
21 #define DEBUG_SUBSYSTEM S_CLASS
22
23 #include <linux/sched.h>
24 #include <linux/list.h>
25 #include <linux/list_sort.h>
26 #include <linux/mmu_context.h>
27 #include <obd_class.h>
28 #include <obd_support.h>
29 #include <lustre_fid.h>
30 #include <cl_object.h>
31 #include "cl_internal.h"
32
33 /*
34  * cl_io interface.
35  */
36
37 static inline int cl_io_type_is_valid(enum cl_io_type type)
38 {
39         return CIT_READ <= type && type < CIT_OP_NR;
40 }
41
42 static inline int cl_io_is_loopable(const struct cl_io *io)
43 {
44         return cl_io_type_is_valid(io->ci_type) && io->ci_type != CIT_MISC;
45 }
46
47 /**
48  * cl_io invariant that holds at all times when exported cl_io_*() functions
49  * are entered and left.
50  */
51 static inline int cl_io_invariant(const struct cl_io *io)
52 {
53         /*
54          * io can own pages only when it is ongoing. Sub-io might
55          * still be in CIS_LOCKED state when top-io is in
56          * CIS_IO_GOING.
57          */
58         return ergo(io->ci_owned_nr > 0, io->ci_state == CIS_IO_GOING ||
59                     (io->ci_state == CIS_LOCKED && io->ci_parent != NULL));
60 }
61
62 /**
63  * Finalize \a io, by calling cl_io_operations::cio_fini() bottom-to-top.
64  */
65 void cl_io_fini(const struct lu_env *env, struct cl_io *io)
66 {
67         struct cl_io_slice    *slice;
68
69         LINVRNT(cl_io_type_is_valid(io->ci_type));
70         LINVRNT(cl_io_invariant(io));
71         ENTRY;
72
73         while (!list_empty(&io->ci_layers)) {
74                 slice = container_of(io->ci_layers.prev, struct cl_io_slice,
75                                      cis_linkage);
76                 list_del_init(&slice->cis_linkage);
77                 if (slice->cis_iop->op[io->ci_type].cio_fini != NULL)
78                         slice->cis_iop->op[io->ci_type].cio_fini(env, slice);
79                 /*
80                  * Invalidate slice to catch use after free. This assumes that
81                  * slices are allocated within session and can be touched
82                  * after ->cio_fini() returns.
83                  */
84                 slice->cis_io = NULL;
85         }
86         io->ci_state = CIS_FINI;
87
88         /* sanity check for layout change */
89         switch(io->ci_type) {
90         case CIT_READ:
91         case CIT_WRITE:
92         case CIT_DATA_VERSION:
93         case CIT_FAULT:
94                 break;
95         case CIT_FSYNC:
96                 LASSERT(!io->ci_need_restart);
97                 break;
98         case CIT_SETATTR:
99         case CIT_MISC:
100                 /* Check ignore layout change conf */
101                 LASSERT(ergo(io->ci_ignore_layout || !io->ci_verify_layout,
102                                 !io->ci_need_restart));
103         case CIT_GLIMPSE:
104                 break;
105         case CIT_LADVISE:
106         case CIT_LSEEK:
107                 break;
108         default:
109                 LBUG();
110         }
111         EXIT;
112 }
113 EXPORT_SYMBOL(cl_io_fini);
114
115 static int __cl_io_init(const struct lu_env *env, struct cl_io *io,
116                         enum cl_io_type iot, struct cl_object *obj)
117 {
118         struct cl_object *scan;
119         int result;
120
121         LINVRNT(io->ci_state == CIS_ZERO || io->ci_state == CIS_FINI);
122         LINVRNT(cl_io_type_is_valid(iot));
123         LINVRNT(cl_io_invariant(io));
124         ENTRY;
125
126         io->ci_type = iot;
127         INIT_LIST_HEAD(&io->ci_lockset.cls_todo);
128         INIT_LIST_HEAD(&io->ci_lockset.cls_done);
129         INIT_LIST_HEAD(&io->ci_layers);
130
131         result = 0;
132         cl_object_for_each(scan, obj) {
133                 if (scan->co_ops->coo_io_init != NULL) {
134                         result = scan->co_ops->coo_io_init(env, scan, io);
135                         if (result != 0)
136                                 break;
137                 }
138         }
139         if (result == 0)
140                 io->ci_state = CIS_INIT;
141         RETURN(result);
142 }
143
144 /**
145  * Initialize sub-io, by calling cl_io_operations::cio_init() top-to-bottom.
146  *
147  * \pre obj != cl_object_top(obj)
148  */
149 int cl_io_sub_init(const struct lu_env *env, struct cl_io *io,
150                    enum cl_io_type iot, struct cl_object *obj)
151 {
152         LASSERT(obj != cl_object_top(obj));
153
154         return __cl_io_init(env, io, iot, obj);
155 }
156 EXPORT_SYMBOL(cl_io_sub_init);
157
158 /**
159  * Initialize \a io, by calling cl_io_operations::cio_init() top-to-bottom.
160  *
161  * Caller has to call cl_io_fini() after a call to cl_io_init(), no matter
162  * what the latter returned.
163  *
164  * \pre obj == cl_object_top(obj)
165  * \pre cl_io_type_is_valid(iot)
166  * \post cl_io_type_is_valid(io->ci_type) && io->ci_type == iot
167  */
168 int cl_io_init(const struct lu_env *env, struct cl_io *io,
169                enum cl_io_type iot, struct cl_object *obj)
170 {
171         LASSERT(obj == cl_object_top(obj));
172
173         /* clear I/O restart from previous instance */
174         io->ci_need_restart = 0;
175
176         return __cl_io_init(env, io, iot, obj);
177 }
178 EXPORT_SYMBOL(cl_io_init);
179
180 /**
181  * Initialize read or write io.
182  *
183  * \pre iot == CIT_READ || iot == CIT_WRITE
184  */
185 int cl_io_rw_init(const struct lu_env *env, struct cl_io *io,
186                   enum cl_io_type iot, loff_t pos, size_t bytes)
187 {
188         LINVRNT(iot == CIT_READ || iot == CIT_WRITE);
189         LINVRNT(io->ci_obj != NULL);
190         ENTRY;
191
192         LU_OBJECT_HEADER(D_VFSTRACE, env, &io->ci_obj->co_lu,
193                          "io range: %u [%llu, %llu) %u %u\n",
194                          iot, (__u64)pos, (__u64)pos + bytes,
195                          io->u.ci_rw.crw_nonblock, io->u.ci_wr.wr_append);
196         io->u.ci_rw.crw_pos    = pos;
197         io->u.ci_rw.crw_bytes  = bytes;
198         RETURN(cl_io_init(env, io, iot, io->ci_obj));
199 }
200 EXPORT_SYMBOL(cl_io_rw_init);
201
202 #ifdef HAVE_LIST_CMP_FUNC_T
203 static int cl_lock_descr_cmp(void *priv,
204                              const struct list_head *a,
205                              const struct list_head *b)
206 #else /* !HAVE_LIST_CMP_FUNC_T */
207 static int cl_lock_descr_cmp(void *priv,
208                              struct list_head *a, struct list_head *b)
209 #endif /* HAVE_LIST_CMP_FUNC_T */
210 {
211         const struct cl_io_lock_link *l0 = list_entry(a, struct cl_io_lock_link,
212                                                       cill_linkage);
213         const struct cl_io_lock_link *l1 = list_entry(b, struct cl_io_lock_link,
214                                                       cill_linkage);
215         const struct cl_lock_descr *d0 = &l0->cill_descr;
216         const struct cl_lock_descr *d1 = &l1->cill_descr;
217
218         return lu_fid_cmp(lu_object_fid(&d0->cld_obj->co_lu),
219                           lu_object_fid(&d1->cld_obj->co_lu));
220 }
221
222 static void cl_lock_descr_merge(struct cl_lock_descr *d0,
223                                 const struct cl_lock_descr *d1)
224 {
225         d0->cld_start = min(d0->cld_start, d1->cld_start);
226         d0->cld_end = max(d0->cld_end, d1->cld_end);
227
228         if (d1->cld_mode == CLM_WRITE && d0->cld_mode != CLM_WRITE)
229                 d0->cld_mode = CLM_WRITE;
230
231         if (d1->cld_mode == CLM_GROUP && d0->cld_mode != CLM_GROUP)
232                 d0->cld_mode = CLM_GROUP;
233 }
234
235 static int cl_lockset_merge(const struct cl_lockset *set,
236                             const struct cl_lock_descr *need)
237 {
238         struct cl_io_lock_link *scan;
239
240         ENTRY;
241         list_for_each_entry(scan, &set->cls_todo, cill_linkage) {
242                 if (!cl_object_same(scan->cill_descr.cld_obj, need->cld_obj))
243                         continue;
244
245                 /* Merge locks for the same object because ldlm lock server
246                  * may expand the lock extent, otherwise there is a deadlock
247                  * case if two conflicted locks are queueud for the same object
248                  * and lock server expands one lock to overlap the another.
249                  * The side effect is that it can generate a multi-stripe lock
250                  * that may cause casacading problem */
251                 cl_lock_descr_merge(&scan->cill_descr, need);
252                 CDEBUG(D_VFSTRACE, "lock: %d: [%lu, %lu]\n",
253                        scan->cill_descr.cld_mode, scan->cill_descr.cld_start,
254                        scan->cill_descr.cld_end);
255                 RETURN(+1);
256         }
257         RETURN(0);
258 }
259
260 static int cl_lockset_lock(const struct lu_env *env, struct cl_io *io,
261                            struct cl_lockset *set)
262 {
263         struct cl_io_lock_link *link;
264         struct cl_io_lock_link *temp;
265         int result;
266
267         ENTRY;
268         result = 0;
269         list_for_each_entry_safe(link, temp, &set->cls_todo, cill_linkage) {
270                 result = cl_lock_request(env, io, &link->cill_lock);
271                 if (result < 0)
272                         break;
273
274                 list_move(&link->cill_linkage, &set->cls_done);
275         }
276         RETURN(result);
277 }
278
279 /**
280  * Takes locks necessary for the current iteration of io.
281  *
282  * Calls cl_io_operations::cio_lock() top-to-bottom to collect locks required
283  * by layers for the current iteration. Then sort locks (to avoid dead-locks),
284  * and acquire them.
285  */
286 int cl_io_lock(const struct lu_env *env, struct cl_io *io)
287 {
288         const struct cl_io_slice *scan;
289         int result = 0;
290
291         LINVRNT(cl_io_is_loopable(io));
292         LINVRNT(io->ci_state == CIS_IT_STARTED);
293         LINVRNT(cl_io_invariant(io));
294
295         ENTRY;
296         list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
297                 if (scan->cis_iop->op[io->ci_type].cio_lock == NULL)
298                         continue;
299                 result = scan->cis_iop->op[io->ci_type].cio_lock(env, scan);
300                 if (result != 0)
301                         break;
302         }
303         if (result == 0) {
304                 /*
305                  * Sort locks in lexicographical order of their (fid,
306                  * start-offset) pairs to avoid deadlocks.
307                  */
308                 list_sort(NULL, &io->ci_lockset.cls_todo, cl_lock_descr_cmp);
309                 result = cl_lockset_lock(env, io, &io->ci_lockset);
310         }
311         if (result != 0)
312                 cl_io_unlock(env, io);
313         else
314                 io->ci_state = CIS_LOCKED;
315         RETURN(result);
316 }
317 EXPORT_SYMBOL(cl_io_lock);
318
319 /**
320  * Release locks takes by io.
321  */
322 void cl_io_unlock(const struct lu_env *env, struct cl_io *io)
323 {
324         struct cl_lockset *set;
325         struct cl_io_lock_link *link;
326         struct cl_io_lock_link *temp;
327         const struct cl_io_slice *scan;
328
329         LASSERT(cl_io_is_loopable(io));
330         LASSERT(CIS_IT_STARTED <= io->ci_state && io->ci_state < CIS_UNLOCKED);
331         LINVRNT(cl_io_invariant(io));
332
333         ENTRY;
334         set = &io->ci_lockset;
335
336         list_for_each_entry_safe(link, temp, &set->cls_todo, cill_linkage) {
337                 list_del_init(&link->cill_linkage);
338                 if (link->cill_fini != NULL)
339                         link->cill_fini(env, link);
340         }
341
342         list_for_each_entry_safe(link, temp, &set->cls_done, cill_linkage) {
343                 list_del_init(&link->cill_linkage);
344                 cl_lock_release(env, &link->cill_lock);
345                 if (link->cill_fini != NULL)
346                         link->cill_fini(env, link);
347         }
348
349         list_for_each_entry_reverse(scan, &io->ci_layers, cis_linkage) {
350                 if (scan->cis_iop->op[io->ci_type].cio_unlock != NULL)
351                         scan->cis_iop->op[io->ci_type].cio_unlock(env, scan);
352         }
353         io->ci_state = CIS_UNLOCKED;
354         EXIT;
355 }
356 EXPORT_SYMBOL(cl_io_unlock);
357
358 /**
359  * Prepares next iteration of io.
360  *
361  * Calls cl_io_operations::cio_iter_init() top-to-bottom. This exists to give
362  * layers a chance to modify io parameters, e.g., so that lov can restrict io
363  * to a single stripe.
364  */
365 int cl_io_iter_init(const struct lu_env *env, struct cl_io *io)
366 {
367         const struct cl_io_slice *scan;
368         int result;
369
370         LINVRNT(cl_io_is_loopable(io));
371         LINVRNT(io->ci_state == CIS_INIT || io->ci_state == CIS_IT_ENDED);
372         LINVRNT(cl_io_invariant(io));
373
374         ENTRY;
375         result = 0;
376         list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
377                 if (scan->cis_iop->op[io->ci_type].cio_iter_init == NULL)
378                         continue;
379                 result = scan->cis_iop->op[io->ci_type].cio_iter_init(env,
380                                                                       scan);
381                 if (result != 0)
382                         break;
383         }
384         if (result == 0)
385                 io->ci_state = CIS_IT_STARTED;
386         RETURN(result);
387 }
388 EXPORT_SYMBOL(cl_io_iter_init);
389
390 /**
391  * Finalizes io iteration.
392  *
393  * Calls cl_io_operations::cio_iter_fini() bottom-to-top.
394  */
395 void cl_io_iter_fini(const struct lu_env *env, struct cl_io *io)
396 {
397         const struct cl_io_slice *scan;
398
399         LINVRNT(cl_io_is_loopable(io));
400         LINVRNT(io->ci_state <= CIS_IT_STARTED ||
401                 io->ci_state > CIS_IO_FINISHED);
402         LINVRNT(cl_io_invariant(io));
403
404         ENTRY;
405         list_for_each_entry_reverse(scan, &io->ci_layers, cis_linkage) {
406                 if (scan->cis_iop->op[io->ci_type].cio_iter_fini != NULL)
407                         scan->cis_iop->op[io->ci_type].cio_iter_fini(env, scan);
408         }
409         io->ci_state = CIS_IT_ENDED;
410         EXIT;
411 }
412 EXPORT_SYMBOL(cl_io_iter_fini);
413
414 /**
415  * Records that read or write io progressed \a bytes forward.
416  */
417 void cl_io_rw_advance(const struct lu_env *env, struct cl_io *io, size_t bytes)
418 {
419         const struct cl_io_slice *scan;
420
421         ENTRY;
422
423         LINVRNT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE ||
424                 bytes == 0);
425         LINVRNT(cl_io_is_loopable(io));
426         LINVRNT(cl_io_invariant(io));
427
428         io->u.ci_rw.crw_pos   += bytes;
429         io->u.ci_rw.crw_bytes -= bytes;
430
431         /* layers have to be notified. */
432         list_for_each_entry_reverse(scan, &io->ci_layers, cis_linkage) {
433                 if (scan->cis_iop->op[io->ci_type].cio_advance != NULL)
434                         scan->cis_iop->op[io->ci_type].cio_advance(env, scan,
435                                                                    bytes);
436         }
437         EXIT;
438 }
439
440 /**
441  * Adds a lock to a lockset.
442  */
443 int cl_io_lock_add(const struct lu_env *env, struct cl_io *io,
444                    struct cl_io_lock_link *link)
445 {
446         int result;
447
448         ENTRY;
449         if (cl_lockset_merge(&io->ci_lockset, &link->cill_descr))
450                 result = +1;
451         else {
452                 list_add(&link->cill_linkage, &io->ci_lockset.cls_todo);
453                 result = 0;
454         }
455         RETURN(result);
456 }
457 EXPORT_SYMBOL(cl_io_lock_add);
458
459 static void cl_free_io_lock_link(const struct lu_env *env,
460                                  struct cl_io_lock_link *link)
461 {
462         OBD_FREE_PTR(link);
463 }
464
465 /**
466  * Allocates new lock link, and uses it to add a lock to a lockset.
467  */
468 int cl_io_lock_alloc_add(const struct lu_env *env, struct cl_io *io,
469                          struct cl_lock_descr *descr)
470 {
471         struct cl_io_lock_link *link;
472         int result;
473
474         ENTRY;
475         OBD_ALLOC_PTR(link);
476         if (link != NULL) {
477                 link->cill_descr = *descr;
478                 link->cill_fini  = cl_free_io_lock_link;
479                 result = cl_io_lock_add(env, io, link);
480                 if (result) /* lock match */
481                         link->cill_fini(env, link);
482         } else
483                 result = -ENOMEM;
484
485         RETURN(result);
486 }
487 EXPORT_SYMBOL(cl_io_lock_alloc_add);
488
489 /**
490  * Starts io by calling cl_io_operations::cio_start() top-to-bottom.
491  */
492 int cl_io_start(const struct lu_env *env, struct cl_io *io)
493 {
494         const struct cl_io_slice *scan;
495         int result = 0;
496
497         LINVRNT(cl_io_is_loopable(io));
498         LINVRNT(io->ci_state == CIS_LOCKED);
499         LINVRNT(cl_io_invariant(io));
500         ENTRY;
501
502         io->ci_state = CIS_IO_GOING;
503         list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
504                 if (scan->cis_iop->op[io->ci_type].cio_start == NULL)
505                         continue;
506                 result = scan->cis_iop->op[io->ci_type].cio_start(env, scan);
507                 if (result != 0)
508                         break;
509         }
510         if (result >= 0)
511                 result = 0;
512         RETURN(result);
513 }
514 EXPORT_SYMBOL(cl_io_start);
515
516 /**
517  * Wait until current io iteration is finished by calling
518  * cl_io_operations::cio_end() bottom-to-top.
519  */
520 void cl_io_end(const struct lu_env *env, struct cl_io *io)
521 {
522         const struct cl_io_slice *scan;
523
524         LINVRNT(cl_io_is_loopable(io));
525         LINVRNT(io->ci_state == CIS_IO_GOING);
526         LINVRNT(cl_io_invariant(io));
527         ENTRY;
528
529         list_for_each_entry_reverse(scan, &io->ci_layers, cis_linkage) {
530                 if (scan->cis_iop->op[io->ci_type].cio_end != NULL)
531                         scan->cis_iop->op[io->ci_type].cio_end(env, scan);
532                 /* TODO: error handling. */
533         }
534         io->ci_state = CIS_IO_FINISHED;
535         EXIT;
536 }
537 EXPORT_SYMBOL(cl_io_end);
538
539 /**
540  * Called by read io, to decide the readahead extent
541  *
542  * \see cl_io_operations::cio_read_ahead()
543  */
544 int cl_io_read_ahead(const struct lu_env *env, struct cl_io *io,
545                      pgoff_t start, struct cl_read_ahead *ra)
546 {
547         const struct cl_io_slice *scan;
548         int result = 0;
549
550         LINVRNT(io->ci_type == CIT_READ ||
551                 io->ci_type == CIT_FAULT ||
552                 io->ci_type == CIT_WRITE);
553         LINVRNT(io->ci_state == CIS_IO_GOING || io->ci_state == CIS_LOCKED);
554         LINVRNT(cl_io_invariant(io));
555         ENTRY;
556
557         list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
558                 if (scan->cis_iop->cio_read_ahead == NULL)
559                         continue;
560
561                 result = scan->cis_iop->cio_read_ahead(env, scan, start, ra);
562                 if (result != 0)
563                         break;
564         }
565         RETURN(result > 0 ? 0 : result);
566 }
567 EXPORT_SYMBOL(cl_io_read_ahead);
568
569 /**
570  * Called before io start, to reserve enough LRU slots to avoid
571  * deadlock.
572  *
573  * \see cl_io_operations::cio_lru_reserve()
574  */
575 int cl_io_lru_reserve(const struct lu_env *env, struct cl_io *io,
576                       loff_t pos, size_t bytes)
577 {
578         const struct cl_io_slice *scan;
579         int result = 0;
580
581         LINVRNT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
582         LINVRNT(cl_io_invariant(io));
583         ENTRY;
584
585         list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
586                 if (scan->cis_iop->cio_lru_reserve) {
587                         result = scan->cis_iop->cio_lru_reserve(env, scan,
588                                                                 pos, bytes);
589                         if (result)
590                                 break;
591                 }
592         }
593
594         RETURN(result);
595 }
596 EXPORT_SYMBOL(cl_io_lru_reserve);
597
598 /**
599  * Commit a list of contiguous pages into writeback cache.
600  *
601  * \returns 0 if all pages committed, or errcode if error occurred.
602  * \see cl_io_operations::cio_commit_async()
603  */
604 int cl_io_commit_async(const struct lu_env *env, struct cl_io *io,
605                        struct cl_page_list *queue, int from, int to,
606                        cl_commit_cbt cb)
607 {
608         const struct cl_io_slice *scan;
609         int result = 0;
610         ENTRY;
611
612         list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
613                 if (scan->cis_iop->cio_commit_async == NULL)
614                         continue;
615                 result = scan->cis_iop->cio_commit_async(env, scan, queue,
616                                                          from, to, cb);
617                 if (result != 0)
618                         break;
619         }
620         RETURN(result);
621 }
622 EXPORT_SYMBOL(cl_io_commit_async);
623
624 void cl_io_extent_release(const struct lu_env *env, struct cl_io *io)
625 {
626         const struct cl_io_slice *scan;
627         ENTRY;
628
629         list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
630                 if (scan->cis_iop->cio_extent_release == NULL)
631                         continue;
632                 scan->cis_iop->cio_extent_release(env, scan);
633         }
634         EXIT;
635 }
636 EXPORT_SYMBOL(cl_io_extent_release);
637
638 /**
639  * Submits a list of pages for immediate io.
640  *
641  * After the function gets returned, The submitted pages are moved to
642  * queue->c2_qout queue, and queue->c2_qin contain both the pages don't need
643  * to be submitted, and the pages are errant to submit.
644  *
645  * \returns 0 if at least one page was submitted, error code otherwise.
646  * \see cl_io_operations::cio_submit()
647  */
648 int cl_io_submit_rw(const struct lu_env *env, struct cl_io *io,
649                     enum cl_req_type crt, struct cl_2queue *queue)
650 {
651         const struct cl_io_slice *scan;
652         int result = 0;
653         ENTRY;
654
655         list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
656                 if (scan->cis_iop->cio_submit == NULL)
657                         continue;
658                 result = scan->cis_iop->cio_submit(env, io, scan, crt, queue);
659                 if (result != 0)
660                         break;
661         }
662         /*
663          * If ->cio_submit() failed, no pages were sent.
664          */
665         LASSERT(ergo(result != 0, list_empty(&queue->c2_qout.pl_pages)));
666         RETURN(result);
667 }
668 EXPORT_SYMBOL(cl_io_submit_rw);
669
670 /**
671  * Submit a sync_io and wait for the IO to be finished, or error happens.
672  * If \a timeout is zero, it means to wait for the IO unconditionally.
673  *
674  * This is used for synchronous submission of an async IO, so the waiting is
675  * done here in this function and the IO is done when this function returns.
676  */
677 int cl_io_submit_sync(const struct lu_env *env, struct cl_io *io,
678                       enum cl_req_type iot, struct cl_2queue *queue,
679                       long timeout)
680 {
681         struct cl_sync_io *anchor = &cl_env_info(env)->clt_anchor;
682         struct cl_page *pg;
683         int rc;
684         ENTRY;
685
686         cl_page_list_for_each(pg, &queue->c2_qin) {
687                 LASSERT(pg->cp_sync_io == NULL);
688                 /* this is for sync submission of async IO, IO that was always
689                  * sync (like DIO) is handled differently
690                  */
691                 LASSERT(pg->cp_type != CPT_TRANSIENT);
692                 pg->cp_sync_io = anchor;
693         }
694
695         cl_sync_io_init(anchor, queue->c2_qin.pl_nr);
696         rc = cl_io_submit_rw(env, io, iot, queue);
697         if (rc == 0) {
698                 /*
699                  * If some pages weren't sent for any reason (e.g.,
700                  * read found up-to-date pages in the cache, or write found
701                  * clean pages), count them as completed to avoid infinite
702                  * wait.
703                  */
704                 cl_page_list_for_each(pg, &queue->c2_qin) {
705                         pg->cp_sync_io = NULL;
706                         cl_sync_io_note(env, anchor, 1);
707                 }
708
709                 /* wait for the IO to be finished. */
710                 rc = cl_sync_io_wait(env, anchor, timeout);
711                 cl_page_list_assume(env, io, &queue->c2_qout);
712         } else {
713                 LASSERT(list_empty(&queue->c2_qout.pl_pages));
714                 cl_page_list_for_each(pg, &queue->c2_qin)
715                         pg->cp_sync_io = NULL;
716         }
717         RETURN(rc);
718 }
719 EXPORT_SYMBOL(cl_io_submit_sync);
720
721 /**
722  * Main io loop.
723  *
724  * Pumps io through iterations calling
725  *
726  *    - cl_io_iter_init()
727  *
728  *    - cl_io_lock()
729  *
730  *    - cl_io_start()
731  *
732  *    - cl_io_end()
733  *
734  *    - cl_io_unlock()
735  *
736  *    - cl_io_iter_fini()
737  *
738  * repeatedly until there is no more io to do.
739  */
740 int cl_io_loop(const struct lu_env *env, struct cl_io *io)
741 {
742         int result = 0;
743         int rc = 0;
744
745         LINVRNT(cl_io_is_loopable(io));
746         ENTRY;
747
748         do {
749                 size_t bytes;
750
751                 io->ci_continue = 0;
752                 result = cl_io_iter_init(env, io);
753                 if (result == 0) {
754                         bytes = io->ci_bytes;
755                         result = cl_io_lock(env, io);
756                         if (result == 0) {
757                                 /*
758                                  * Notify layers that locks has been taken,
759                                  * and do actual i/o.
760                                  *
761                                  *   - llite: kms, short read;
762                                  *   - llite: generic_file_read();
763                                  */
764                                 result = cl_io_start(env, io);
765                                 /*
766                                  * Send any remaining pending
767                                  * io, etc.
768                                  *
769                                  **   - llite: ll_rw_stats_tally.
770                                  */
771                                 cl_io_end(env, io);
772                                 cl_io_unlock(env, io);
773                                 cl_io_rw_advance(env, io, io->ci_bytes - bytes);
774                         }
775                 }
776                 cl_io_iter_fini(env, io);
777                 if (result)
778                         rc = result;
779         } while ((result == 0 || result == -EIOCBQUEUED) &&
780                  io->ci_continue);
781
782         if (rc && !result)
783                 result = rc;
784
785         if (result == -EAGAIN && io->ci_ndelay && !io->ci_iocb_nowait) {
786                 if (!io->ci_tried_all_mirrors) {
787                         io->ci_need_restart = 1;
788                         result = 0;
789                 } else {
790                         result = -EIO;
791                 }
792         }
793
794         if (result == 0)
795                 result = io->ci_result;
796         RETURN(result < 0 ? result : 0);
797 }
798 EXPORT_SYMBOL(cl_io_loop);
799
800 /**
801  * Adds io slice to the cl_io.
802  *
803  * This is called by cl_object_operations::coo_io_init() methods to add a
804  * per-layer state to the io. New state is added at the end of
805  * cl_io::ci_layers list, that is, it is at the bottom of the stack.
806  *
807  * \see cl_lock_slice_add(), cl_req_slice_add(), cl_page_slice_add()
808  */
809 void cl_io_slice_add(struct cl_io *io, struct cl_io_slice *slice,
810                      struct cl_object *obj,
811                      const struct cl_io_operations *ops)
812 {
813         struct list_head *linkage = &slice->cis_linkage;
814
815         LASSERT((linkage->prev == NULL && linkage->next == NULL) ||
816                 list_empty(linkage));
817         ENTRY;
818
819         list_add_tail(linkage, &io->ci_layers);
820         slice->cis_io  = io;
821         slice->cis_obj = obj;
822         slice->cis_iop = ops;
823         EXIT;
824 }
825 EXPORT_SYMBOL(cl_io_slice_add);
826
827
828 /**
829  * Initializes page list.
830  */
831 void cl_page_list_init(struct cl_page_list *plist)
832 {
833         ENTRY;
834         plist->pl_nr = 0;
835         INIT_LIST_HEAD(&plist->pl_pages);
836         EXIT;
837 }
838 EXPORT_SYMBOL(cl_page_list_init);
839
840 /**
841  * Adds a page to a page list.
842  */
843 void cl_page_list_add(struct cl_page_list *plist, struct cl_page *page,
844                       bool getref)
845 {
846         ENTRY;
847         /* it would be better to check that page is owned by "current" io, but
848          * it is not passed here. */
849         if (page->cp_type != CPT_TRANSIENT)
850                 LASSERT(page->cp_owner != NULL);
851
852         LASSERT(list_empty(&page->cp_batch));
853         list_add_tail(&page->cp_batch, &plist->pl_pages);
854         ++plist->pl_nr;
855         lu_ref_add_at(&page->cp_reference, &page->cp_queue_ref, "queue", plist);
856         if (getref)
857                 cl_page_get(page);
858         EXIT;
859 }
860 EXPORT_SYMBOL(cl_page_list_add);
861
862 /**
863  * Removes a page from a page list.
864  */
865 void cl_page_list_del(const struct lu_env *env,
866                       struct cl_page_list *plist, struct cl_page *page,
867                       bool putref)
868 {
869         LASSERT(plist->pl_nr > 0);
870
871         ENTRY;
872         list_del_init(&page->cp_batch);
873         --plist->pl_nr;
874         lu_ref_del_at(&page->cp_reference, &page->cp_queue_ref, "queue", plist);
875         if (putref)
876                 cl_page_put(env, page);
877         EXIT;
878 }
879 EXPORT_SYMBOL(cl_page_list_del);
880
881 /**
882  * Moves a page from one page list to another.
883  */
884 void cl_page_list_move(struct cl_page_list *dst, struct cl_page_list *src,
885                        struct cl_page *page)
886 {
887         LASSERT(src->pl_nr > 0);
888
889         ENTRY;
890         list_move_tail(&page->cp_batch, &dst->pl_pages);
891         --src->pl_nr;
892         ++dst->pl_nr;
893         lu_ref_set_at(&page->cp_reference, &page->cp_queue_ref, "queue",
894                       src, dst);
895         EXIT;
896 }
897 EXPORT_SYMBOL(cl_page_list_move);
898
899 /**
900  * Moves a page from one page list to the head of another list.
901  */
902 void cl_page_list_move_head(struct cl_page_list *dst, struct cl_page_list *src,
903                             struct cl_page *page)
904 {
905         LASSERT(src->pl_nr > 0);
906
907         ENTRY;
908         list_move(&page->cp_batch, &dst->pl_pages);
909         --src->pl_nr;
910         ++dst->pl_nr;
911         lu_ref_set_at(&page->cp_reference, &page->cp_queue_ref, "queue",
912                         src, dst);
913         EXIT;
914 }
915 EXPORT_SYMBOL(cl_page_list_move_head);
916
917 /**
918  * splice the cl_page_list, just as list head does
919  */
920 void cl_page_list_splice(struct cl_page_list *src, struct cl_page_list *dst)
921 {
922 #ifdef CONFIG_LUSTRE_DEBUG_LU_REF
923         struct cl_page *page;
924         struct cl_page *tmp;
925
926         ENTRY;
927         cl_page_list_for_each_safe(page, tmp, src)
928                 lu_ref_set_at(&page->cp_reference, &page->cp_queue_ref,
929                               "queue", src, dst);
930 #else
931         ENTRY;
932 #endif
933         dst->pl_nr += src->pl_nr;
934         src->pl_nr = 0;
935         list_splice_tail_init(&src->pl_pages, &dst->pl_pages);
936
937         EXIT;
938 }
939 EXPORT_SYMBOL(cl_page_list_splice);
940
941 /**
942  * Disowns pages in a queue.
943  */
944 void cl_page_list_disown(const struct lu_env *env, struct cl_page_list *plist)
945 {
946         struct cl_page *page;
947         struct cl_page *temp;
948
949         ENTRY;
950         cl_page_list_for_each_safe(page, temp, plist) {
951                 LASSERT(plist->pl_nr > 0);
952
953                 list_del_init(&page->cp_batch);
954                 --plist->pl_nr;
955                 /*
956                  * __cl_page_disown rather than usual cl_page_disown() is used,
957                  * because pages are possibly in CPS_FREEING state already due
958                  * to the call to cl_page_list_discard().
959                  */
960                 /*
961                  * XXX __cl_page_disown() will fail if page is not locked.
962                  */
963                 if (page->cp_type == CPT_CACHEABLE)
964                         __cl_page_disown(env, page);
965                 lu_ref_del_at(&page->cp_reference, &page->cp_queue_ref, "queue",
966                               plist);
967                 cl_page_put(env, page);
968         }
969         EXIT;
970 }
971 EXPORT_SYMBOL(cl_page_list_disown);
972
973 /**
974  * Releases pages from queue.
975  */
976 void cl_page_list_fini(const struct lu_env *env, struct cl_page_list *plist)
977 {
978         struct cl_page *page;
979         struct cl_page *temp;
980
981         ENTRY;
982         cl_page_list_for_each_safe(page, temp, plist)
983                 cl_page_list_del(env, plist, page, true);
984         LASSERT(plist->pl_nr == 0);
985         EXIT;
986 }
987 EXPORT_SYMBOL(cl_page_list_fini);
988
989 /**
990  * Assumes all pages in a queue.
991  */
992 void cl_page_list_assume(const struct lu_env *env,
993                          struct cl_io *io, struct cl_page_list *plist)
994 {
995         struct cl_page *page;
996
997         cl_page_list_for_each(page, plist)
998                 cl_page_assume(env, io, page);
999 }
1000
1001 /**
1002  * Discards all pages in a queue.
1003  */
1004 void cl_page_list_discard(const struct lu_env *env, struct cl_io *io,
1005                           struct cl_page_list *plist)
1006 {
1007         struct cl_page *page;
1008
1009         ENTRY;
1010         cl_page_list_for_each(page, plist)
1011                 cl_page_discard(env, io, page);
1012         EXIT;
1013 }
1014 EXPORT_SYMBOL(cl_page_list_discard);
1015
1016 /**
1017  * Initialize dual page queue.
1018  */
1019 void cl_2queue_init(struct cl_2queue *queue)
1020 {
1021         ENTRY;
1022         cl_page_list_init(&queue->c2_qin);
1023         cl_page_list_init(&queue->c2_qout);
1024         EXIT;
1025 }
1026 EXPORT_SYMBOL(cl_2queue_init);
1027
1028 /**
1029  * Disown pages in both lists of a 2-queue.
1030  */
1031 void cl_2queue_disown(const struct lu_env *env, struct cl_2queue *queue)
1032 {
1033         ENTRY;
1034         cl_page_list_disown(env, &queue->c2_qin);
1035         cl_page_list_disown(env, &queue->c2_qout);
1036         EXIT;
1037 }
1038 EXPORT_SYMBOL(cl_2queue_disown);
1039
1040 /**
1041  * Discard (truncate) pages in both lists of a 2-queue.
1042  */
1043 void cl_2queue_discard(const struct lu_env *env,
1044                        struct cl_io *io, struct cl_2queue *queue)
1045 {
1046         ENTRY;
1047         cl_page_list_discard(env, io, &queue->c2_qin);
1048         cl_page_list_discard(env, io, &queue->c2_qout);
1049         EXIT;
1050 }
1051 EXPORT_SYMBOL(cl_2queue_discard);
1052
1053 /**
1054  * Assume to own the pages in cl_2queue
1055  */
1056 void cl_2queue_assume(const struct lu_env *env,
1057                       struct cl_io *io, struct cl_2queue *queue)
1058 {
1059         cl_page_list_assume(env, io, &queue->c2_qin);
1060         cl_page_list_assume(env, io, &queue->c2_qout);
1061 }
1062
1063 /**
1064  * Finalize both page lists of a 2-queue.
1065  */
1066 void cl_2queue_fini(const struct lu_env *env, struct cl_2queue *queue)
1067 {
1068         ENTRY;
1069         cl_page_list_fini(env, &queue->c2_qout);
1070         cl_page_list_fini(env, &queue->c2_qin);
1071         EXIT;
1072 }
1073 EXPORT_SYMBOL(cl_2queue_fini);
1074
1075 /**
1076  * Initialize a 2-queue to contain \a page in its incoming page list.
1077  */
1078 void cl_2queue_init_page(struct cl_2queue *queue, struct cl_page *page)
1079 {
1080         ENTRY;
1081         cl_2queue_init(queue);
1082         /*
1083          * Add a page to the incoming page list of 2-queue.
1084          */
1085         cl_page_list_add(&queue->c2_qin, page, true);
1086         EXIT;
1087 }
1088 EXPORT_SYMBOL(cl_2queue_init_page);
1089
1090 /**
1091  * Returns top-level io.
1092  *
1093  * \see cl_object_top()
1094  */
1095 struct cl_io *cl_io_top(struct cl_io *io)
1096 {
1097         ENTRY;
1098         while (io->ci_parent != NULL)
1099                 io = io->ci_parent;
1100         RETURN(io);
1101 }
1102 EXPORT_SYMBOL(cl_io_top);
1103
1104 /**
1105  * Fills in attributes that are passed to server together with transfer. Only
1106  * attributes from \a flags may be touched. This can be called multiple times
1107  * for the same request.
1108  */
1109 void cl_req_attr_set(const struct lu_env *env, struct cl_object *obj,
1110                      struct cl_req_attr *attr)
1111 {
1112         struct cl_object *scan;
1113         ENTRY;
1114
1115         cl_object_for_each(scan, obj) {
1116                 if (scan->co_ops->coo_req_attr_set != NULL)
1117                         scan->co_ops->coo_req_attr_set(env, scan, attr);
1118         }
1119         EXIT;
1120 }
1121 EXPORT_SYMBOL(cl_req_attr_set);
1122
1123 /**
1124  * Initialize synchronous io wait \a anchor for \a nr pages with optional
1125  * \a end handler.
1126  * \param anchor owned by caller, initialzied here.
1127  * \param nr number of pages initally pending in sync.
1128  * \param end optional callback sync_io completion, can be used to
1129  *  trigger erasure coding, integrity, dedupe, or similar operation.
1130  * \q end is called with a spinlock on anchor->csi_waitq.lock
1131  */
1132 void cl_sync_io_init_notify(struct cl_sync_io *anchor, int nr,
1133                             void *dio_aio, cl_sync_io_end_t *end)
1134 {
1135         ENTRY;
1136         memset(anchor, 0, sizeof(*anchor));
1137         init_waitqueue_head(&anchor->csi_waitq);
1138         atomic_set(&anchor->csi_sync_nr, nr);
1139         atomic_set(&anchor->csi_complete, 0);
1140         anchor->csi_sync_rc = 0;
1141         anchor->csi_end_io = end;
1142         anchor->csi_dio_aio = dio_aio;
1143         EXIT;
1144 }
1145 EXPORT_SYMBOL(cl_sync_io_init_notify);
1146
1147 /**
1148  * Wait until all IO completes. Transfer completion routine has to call
1149  * cl_sync_io_note() for every entity.
1150  */
1151 int cl_sync_io_wait(const struct lu_env *env, struct cl_sync_io *anchor,
1152                     long timeout)
1153 {
1154         int rc = 0;
1155         ENTRY;
1156
1157         LASSERT(timeout >= 0);
1158
1159         if (timeout > 0 &&
1160             wait_event_idle_timeout(anchor->csi_waitq,
1161                                     atomic_read(&anchor->csi_complete) == 1,
1162                                     cfs_time_seconds(timeout)) == 0) {
1163                 rc = -ETIMEDOUT;
1164                 CERROR("IO failed: %d, still wait for %d remaining entries\n",
1165                        rc, atomic_read(&anchor->csi_complete));
1166         }
1167
1168         wait_event_idle(anchor->csi_waitq,
1169                         atomic_read(&anchor->csi_complete) == 1);
1170         if (!rc)
1171                 rc = anchor->csi_sync_rc;
1172
1173         /* We take the lock to ensure that cl_sync_io_note() has finished */
1174         spin_lock(&anchor->csi_waitq.lock);
1175         LASSERT(atomic_read(&anchor->csi_sync_nr) == 0);
1176         LASSERT(atomic_read(&anchor->csi_complete) == 1);
1177         spin_unlock(&anchor->csi_waitq.lock);
1178
1179         RETURN(rc);
1180 }
1181 EXPORT_SYMBOL(cl_sync_io_wait);
1182
1183 static inline void dio_aio_complete(struct kiocb *iocb, ssize_t res)
1184 {
1185 #ifdef HAVE_AIO_COMPLETE
1186         aio_complete(iocb, res, 0);
1187 #else
1188         if (iocb->ki_complete)
1189 # ifdef HAVE_KIOCB_COMPLETE_2ARGS
1190                 iocb->ki_complete(iocb, res);
1191 # else
1192                 iocb->ki_complete(iocb, res, 0);
1193 # endif
1194 #endif
1195 }
1196
1197 static void cl_dio_aio_end(const struct lu_env *env, struct cl_sync_io *anchor)
1198 {
1199         struct cl_dio_aio *aio = container_of(anchor, typeof(*aio), cda_sync);
1200         ssize_t ret = anchor->csi_sync_rc;
1201
1202         ENTRY;
1203
1204         if (!aio->cda_no_aio_complete)
1205                 dio_aio_complete(aio->cda_iocb, ret ?: aio->cda_bytes);
1206
1207         EXIT;
1208 }
1209
1210 static void cl_sub_dio_end(const struct lu_env *env, struct cl_sync_io *anchor)
1211 {
1212         struct cl_sub_dio *sdio = container_of(anchor, typeof(*sdio), csd_sync);
1213         ssize_t ret = anchor->csi_sync_rc;
1214
1215         ENTRY;
1216
1217         /* release pages */
1218         while (sdio->csd_pages.pl_nr > 0) {
1219                 struct cl_page *page = cl_page_list_first(&sdio->csd_pages);
1220
1221                 cl_page_delete(env, page);
1222                 cl_page_list_del(env, &sdio->csd_pages, page, false);
1223                 cl_page_put(env, page);
1224         }
1225
1226         if (sdio->csd_unaligned) {
1227                 /* save the iovec pointer before it's modified by
1228                  * ll_dio_user_copy
1229                  */
1230                 struct iovec *tmp = (struct iovec *) sdio->csd_iter.__iov;
1231
1232                 CDEBUG(D_VFSTRACE,
1233                        "finishing unaligned dio %s aio->cda_bytes %ld\n",
1234                        sdio->csd_write ? "write" : "read", sdio->csd_bytes);
1235                 /* read copies *from* the kernel buffer *to* userspace
1236                  * here at the end, write copies *to* the kernel
1237                  * buffer from userspace at the start
1238                  */
1239                 if (!sdio->csd_write && sdio->csd_bytes > 0)
1240                         ret = ll_dio_user_copy(sdio, NULL);
1241                 ll_free_dio_buffer(&sdio->csd_dio_pages);
1242                 /* handle the freeing here rather than in cl_sub_dio_free
1243                  * because we have the unmodified iovec pointer
1244                  */
1245                 OBD_FREE_PTR(tmp);
1246                 sdio->csd_iter.__iov = NULL;
1247         } else {
1248                 /* unaligned DIO does not get user pages, so it doesn't have to
1249                  * release them, but aligned I/O must
1250                  */
1251                 ll_release_user_pages(sdio->csd_dio_pages.ldp_pages,
1252                                       sdio->csd_dio_pages.ldp_count);
1253         }
1254         cl_sync_io_note(env, &sdio->csd_ll_aio->cda_sync, ret);
1255
1256         EXIT;
1257 }
1258
1259 struct cl_dio_aio *cl_dio_aio_alloc(struct kiocb *iocb, struct cl_object *obj,
1260                                     bool is_aio)
1261 {
1262         struct cl_dio_aio *aio;
1263
1264         OBD_SLAB_ALLOC_PTR_GFP(aio, cl_dio_aio_kmem, GFP_NOFS);
1265         if (aio != NULL) {
1266                 /*
1267                  * Hold one ref so that it won't be released until
1268                  * every pages is added.
1269                  */
1270                 cl_sync_io_init_notify(&aio->cda_sync, 1, aio, cl_dio_aio_end);
1271                 aio->cda_iocb = iocb;
1272                 aio->cda_no_aio_complete = !is_aio;
1273                 /* if this is true AIO, the memory is freed by the last call
1274                  * to cl_sync_io_note (when all the I/O is complete), because
1275                  * no one is waiting (in the kernel) for this to complete
1276                  *
1277                  * in other cases, the last user is cl_sync_io_wait, and in
1278                  * that case, the creator frees the struct after that call
1279                  */
1280                 aio->cda_creator_free = !is_aio;
1281
1282                 cl_object_get(obj);
1283                 aio->cda_obj = obj;
1284                 aio->cda_mm = get_task_mm(current);
1285         }
1286         return aio;
1287 }
1288 EXPORT_SYMBOL(cl_dio_aio_alloc);
1289
1290 struct cl_sub_dio *cl_sub_dio_alloc(struct cl_dio_aio *ll_aio,
1291                                     struct iov_iter *iter, bool write,
1292                                     bool unaligned, bool sync)
1293 {
1294         struct cl_sub_dio *sdio;
1295
1296         OBD_SLAB_ALLOC_PTR_GFP(sdio, cl_sub_dio_kmem, GFP_NOFS);
1297         if (sdio != NULL) {
1298                 /*
1299                  * Hold one ref so that it won't be released until
1300                  * every pages is added.
1301                  */
1302                 cl_sync_io_init_notify(&sdio->csd_sync, 1, sdio,
1303                                        cl_sub_dio_end);
1304                 cl_page_list_init(&sdio->csd_pages);
1305
1306                 sdio->csd_ll_aio = ll_aio;
1307                 sdio->csd_creator_free = sync;
1308                 sdio->csd_write = write;
1309                 sdio->csd_unaligned = unaligned;
1310
1311                 atomic_add(1,  &ll_aio->cda_sync.csi_sync_nr);
1312
1313                 if (unaligned) {
1314                         /* we need to make a copy of the user iovec at this
1315                          * point in time, in order to:
1316                          *
1317                          * A) have the correct state of the iovec for this
1318                          * chunk of I/O, ie, the main iovec is altered as we do
1319                          * I/O and this chunk needs the current state
1320                          * B) have a chunk-local copy; doing the IO later
1321                          * modifies the iovec, so to process each chunk from a
1322                          * separate thread requires a local copy of the iovec
1323                          */
1324                         memcpy(&sdio->csd_iter, iter, sizeof(struct iov_iter));
1325                         OBD_ALLOC_PTR(sdio->csd_iter.__iov);
1326                         if (sdio->csd_iter.__iov == NULL) {
1327                                 cl_sub_dio_free(sdio);
1328                                 sdio = NULL;
1329                                 goto out;
1330                         }
1331                         memcpy((void *) sdio->csd_iter.__iov, iter->__iov,
1332                                sizeof(struct iovec));
1333                 }
1334         }
1335 out:
1336         return sdio;
1337 }
1338 EXPORT_SYMBOL(cl_sub_dio_alloc);
1339
1340 void cl_dio_aio_free(const struct lu_env *env, struct cl_dio_aio *aio)
1341 {
1342         if (aio) {
1343                 if (aio->cda_mm)
1344                         mmput(aio->cda_mm);
1345                 cl_object_put(env, aio->cda_obj);
1346                 OBD_SLAB_FREE_PTR(aio, cl_dio_aio_kmem);
1347         }
1348 }
1349 EXPORT_SYMBOL(cl_dio_aio_free);
1350
1351 void cl_sub_dio_free(struct cl_sub_dio *sdio)
1352 {
1353         if (sdio) {
1354                 void *tmp = (void *)sdio->csd_iter.__iov;
1355
1356                 if (tmp) {
1357                         LASSERT(sdio->csd_unaligned);
1358                         OBD_FREE_PTR(tmp);
1359                 }
1360                 OBD_SLAB_FREE_PTR(sdio, cl_sub_dio_kmem);
1361         }
1362 }
1363 EXPORT_SYMBOL(cl_sub_dio_free);
1364
1365 /*
1366  * For unaligned DIO.
1367  *
1368  * Allocate the internal buffer from/to which we will perform DIO.  This takes
1369  * the user I/O parameters and allocates an internal buffer large enough to
1370  * hold it.  The pages in this buffer are aligned with pages in the file (ie,
1371  * they have a 1-to-1 mapping with file pages).
1372  */
1373 int ll_allocate_dio_buffer(struct ll_dio_pages *pvec, size_t io_size)
1374 {
1375         struct page *new_page;
1376         size_t pg_offset;
1377         int result = 0;
1378         ssize_t i;
1379
1380         ENTRY;
1381
1382         /* page level offset in the file where the I/O starts */
1383         pg_offset = pvec->ldp_file_offset & ~PAGE_MASK;
1384         /* this adds 1 for the first page and removes the bytes in it from the
1385          * io_size, making the rest of the calculation aligned
1386          */
1387         if (pg_offset) {
1388                 pvec->ldp_count++;
1389                 io_size -= min_t(size_t, PAGE_SIZE - pg_offset, io_size);
1390         }
1391
1392         /* calculate pages for the rest of the buffer */
1393         pvec->ldp_count += (io_size + PAGE_SIZE - 1) >> PAGE_SHIFT;
1394
1395 #ifdef HAVE_DIO_ITER
1396         pvec->ldp_pages = kvzalloc(pvec->ldp_count * sizeof(struct page *),
1397                                     GFP_NOFS);
1398 #else
1399         OBD_ALLOC_PTR_ARRAY_LARGE(pvec->ldp_pages, pvec->ldp_count);
1400 #endif
1401         if (pvec->ldp_pages == NULL)
1402                 RETURN(-ENOMEM);
1403
1404         for (i = 0; i < pvec->ldp_count; i++) {
1405                 new_page = alloc_page(GFP_NOFS);
1406                 if (!new_page) {
1407                         result = -ENOMEM;
1408                         pvec->ldp_count = i;
1409                         goto out;
1410                 }
1411                 pvec->ldp_pages[i] = new_page;
1412         }
1413         WARN_ON(i != pvec->ldp_count);
1414
1415 out:
1416         if (result) {
1417                 if (pvec->ldp_pages)
1418                         ll_free_dio_buffer(pvec);
1419         }
1420
1421         if (result == 0)
1422                 result = pvec->ldp_count;
1423
1424         RETURN(result);
1425 }
1426 EXPORT_SYMBOL(ll_allocate_dio_buffer);
1427
1428 void ll_free_dio_buffer(struct ll_dio_pages *pvec)
1429 {
1430         int i;
1431
1432         for (i = 0; i < pvec->ldp_count; i++)
1433                 __free_page(pvec->ldp_pages[i]);
1434
1435 #ifdef HAVE_DIO_ITER
1436         kfree(pvec->ldp_pages);
1437 #else
1438         OBD_FREE_PTR_ARRAY_LARGE(pvec->ldp_pages, pvec->ldp_count);
1439 #endif
1440 }
1441 EXPORT_SYMBOL(ll_free_dio_buffer);
1442
1443 /*
1444  * ll_release_user_pages - tear down page struct array
1445  * @pages: array of page struct pointers underlying target buffer
1446  */
1447 void ll_release_user_pages(struct page **pages, int npages)
1448 {
1449         int i;
1450
1451         if (npages == 0) {
1452                 LASSERT(!pages);
1453                 return;
1454         }
1455
1456         for (i = 0; i < npages; i++) {
1457                 if (!pages[i])
1458                         break;
1459                 put_page(pages[i]);
1460         }
1461
1462 #if defined(HAVE_DIO_ITER)
1463         kvfree(pages);
1464 #else
1465         OBD_FREE_PTR_ARRAY_LARGE(pages, npages);
1466 #endif
1467 }
1468 EXPORT_SYMBOL(ll_release_user_pages);
1469
1470 #ifdef HAVE_FAULT_IN_IOV_ITER_READABLE
1471 #define ll_iov_iter_fault_in_readable(iov, bytes) \
1472         fault_in_iov_iter_readable(iov, bytes)
1473 #else
1474 #define ll_iov_iter_fault_in_readable(iov, bytes) \
1475         iov_iter_fault_in_readable(iov, bytes)
1476 #endif
1477
1478 #ifndef HAVE_KTHREAD_USE_MM
1479 #define kthread_use_mm(mm) use_mm(mm)
1480 #define kthread_unuse_mm(mm) unuse_mm(mm)
1481 #endif
1482
1483 /* copy IO data to/from internal buffer and userspace iovec */
1484 ssize_t ll_dio_user_copy(struct cl_sub_dio *sdio, struct iov_iter *write_iov)
1485 {
1486         struct iov_iter *iter = write_iov ? write_iov : &sdio->csd_iter;
1487         struct ll_dio_pages *pvec = &sdio->csd_dio_pages;
1488         struct mm_struct *mm = sdio->csd_ll_aio->cda_mm;
1489         loff_t pos = pvec->ldp_file_offset;
1490         size_t count = sdio->csd_bytes;
1491         size_t original_count = count;
1492         int short_copies = 0;
1493         bool mm_used = false;
1494         int status = 0;
1495         int i = 0;
1496         int rw;
1497
1498         ENTRY;
1499
1500         LASSERT(sdio->csd_unaligned);
1501
1502         if (sdio->csd_write)
1503                 rw = WRITE;
1504         else
1505                 rw = READ;
1506
1507         /* if there's no mm, io is being done from a kernel thread, so there's
1508          * no need to transition to its mm context anyway.
1509          *
1510          * Also, if mm == current->mm, that means this is being handled in the
1511          * thread which created it, and not in a separate kthread - so it is
1512          * unnecessary (and incorrect) to do a use_mm here
1513          */
1514         if (mm && mm != current->mm) {
1515                 kthread_use_mm(mm);
1516                 mm_used = true;
1517         }
1518
1519         /* fault in the entire userspace iovec */
1520         if (rw == WRITE) {
1521                 if (unlikely(ll_iov_iter_fault_in_readable(iter, count)))
1522                         GOTO(out, status = -EFAULT);
1523         }
1524
1525         /* modeled on kernel generic_file_buffered_read/write()
1526          *
1527          * note we only have one 'chunk' of i/o here, so we do not copy the
1528          * whole iovec here (except when the chunk is the whole iovec) so we
1529          * use the count of bytes in the chunk, csd_bytes, instead of looking
1530          * at the iovec
1531          */
1532         while (true) {
1533                 struct page *page = pvec->ldp_pages[i];
1534                 unsigned long offset; /* offset into kernel buffer page */
1535                 size_t copied; /* bytes successfully copied */
1536                 size_t bytes; /* bytes to copy for this page */
1537
1538                 LASSERT(i < pvec->ldp_count);
1539
1540                 offset = pos & ~PAGE_MASK;
1541                 bytes = min_t(unsigned long, PAGE_SIZE - offset,
1542                               count);
1543
1544                 CDEBUG(D_VFSTRACE,
1545                        "count %zd, offset %lu, pos %lld, ldp_count %lu\n",
1546                        count, offset, pos, pvec->ldp_count);
1547
1548                 if (fatal_signal_pending(current)) {
1549                         status = -EINTR;
1550                         break;
1551                 }
1552
1553                 /* write requires a few extra steps */
1554                 if (rw == WRITE) {
1555                         /* like btrfs, we do not have a mapping since this isn't
1556                          * a page cache page, so we must do this flush
1557                          * unconditionally
1558                          *
1559                          * NB: This is a noop on x86 but active on other
1560                          * architectures
1561                          */
1562                         flush_dcache_page(page);
1563
1564 #ifndef HAVE_COPY_PAGE_FROM_ITER_ATOMIC
1565                         copied = iov_iter_copy_from_user_atomic(page, iter,
1566                                                                 offset, bytes);
1567                         iov_iter_advance(iter, copied);
1568 #else
1569                         copied = copy_page_from_iter_atomic(page, offset, bytes,
1570                                                             iter);
1571 #endif
1572
1573                 } else /* READ */ {
1574                         copied = copy_page_to_iter(page, offset, bytes, iter);
1575                 }
1576
1577                 pos += copied;
1578                 count -= copied;
1579
1580                 if (unlikely(copied < bytes)) {
1581                         short_copies++;
1582
1583                         CDEBUG(D_VFSTRACE,
1584                                "short copy - copied only %zd of %lu, short %d times\n",
1585                                copied, bytes, short_copies);
1586                         /* copies will very rarely be interrupted, but we
1587                          * should retry in those cases, since the other option
1588                          * is giving an IO error and this can occur in normal
1589                          * operation such as with racing unaligned AIOs
1590                          *
1591                          * but of course we should not retry indefinitely
1592                          */
1593                         if (short_copies > 2) {
1594                                 CERROR("Unaligned DIO copy repeatedly short, count %zd, offset %lu, bytes %lu, copied %zd, pos %lld\n",
1595                                 count, offset, bytes, copied, pos);
1596
1597                                 status = -EFAULT;
1598                                 break;
1599                         }
1600
1601                         continue;
1602                 }
1603
1604                 if (count == 0)
1605                         break;
1606
1607                 i++;
1608         }
1609
1610 out:
1611         /* if we complete successfully, we should reach all of the pages */
1612         LASSERTF(ergo(status == 0, i == pvec->ldp_count - 1),
1613                  "status: %d, i: %d, pvec->ldp_count %zu, count %zu\n",
1614                   status, i, pvec->ldp_count, count);
1615
1616         if (write_iov && status == 0) {
1617                 /* The copy function we use modifies the count in the iovec,
1618                  * but that's actually the job of the caller, so we return the
1619                  * iovec to the original count
1620                  */
1621                 iov_iter_reexpand(iter, original_count);
1622         }
1623
1624         if (mm_used)
1625                 kthread_unuse_mm(mm);
1626
1627         /* the total bytes copied, or status */
1628         RETURN(original_count - count ? original_count - count : status);
1629 }
1630 EXPORT_SYMBOL(ll_dio_user_copy);
1631
1632 /**
1633  * Indicate that transfer of a single page completed.
1634  */
1635 void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor,
1636                      int ioret)
1637 {
1638         ENTRY;
1639
1640         if (anchor->csi_sync_rc == 0 && ioret < 0)
1641                 anchor->csi_sync_rc = ioret;
1642         /*
1643          * Synchronous IO done without releasing page lock (e.g., as a part of
1644          * ->{prepare,commit}_write(). Completion is used to signal the end of
1645          * IO.
1646          */
1647         LASSERT(atomic_read(&anchor->csi_sync_nr) > 0);
1648         LASSERT(atomic_read(&anchor->csi_complete) == 0);
1649         if (atomic_dec_and_lock(&anchor->csi_sync_nr,
1650                                 &anchor->csi_waitq.lock)) {
1651                 struct cl_sub_dio *sub_dio_aio = NULL;
1652                 struct cl_dio_aio *dio_aio = NULL;
1653                 void *csi_dio_aio = NULL;
1654                 bool creator_free = true;
1655
1656                 cl_sync_io_end_t *end_io = anchor->csi_end_io;
1657
1658                 spin_unlock(&anchor->csi_waitq.lock);
1659                 /* we cannot do end_io while holding a spin lock, because
1660                  * end_io may sleep
1661                  */
1662                 if (end_io)
1663                         end_io(env, anchor);
1664
1665                 spin_lock(&anchor->csi_waitq.lock);
1666                 /* this tells the waiters we've completed, and can only be set
1667                  * after end_io() has been called and while we're holding the
1668                  * spinlock
1669                  */
1670                 atomic_set(&anchor->csi_complete, 1);
1671                 /*
1672                  * Holding the lock across both the decrement and
1673                  * the wakeup ensures cl_sync_io_wait() doesn't complete
1674                  * before the wakeup completes and the contents of
1675                  * of anchor become unsafe to access as the owner is free
1676                  * to immediately reclaim anchor when cl_sync_io_wait()
1677                  * completes.
1678                  */
1679                 wake_up_locked(&anchor->csi_waitq);
1680
1681                 csi_dio_aio = anchor->csi_dio_aio;
1682                 sub_dio_aio = csi_dio_aio;
1683                 dio_aio = csi_dio_aio;
1684
1685                 if (csi_dio_aio && end_io == cl_dio_aio_end)
1686                         creator_free = dio_aio->cda_creator_free;
1687                 else if (csi_dio_aio && end_io == cl_sub_dio_end)
1688                         creator_free = sub_dio_aio->csd_creator_free;
1689
1690                 spin_unlock(&anchor->csi_waitq.lock);
1691
1692                 if (csi_dio_aio && !creator_free) {
1693                         if (end_io == cl_dio_aio_end)
1694                                 cl_dio_aio_free(env, dio_aio);
1695                         else if (end_io == cl_sub_dio_end)
1696                                 cl_sub_dio_free(sub_dio_aio);
1697                 }
1698         }
1699         EXIT;
1700 }
1701 EXPORT_SYMBOL(cl_sync_io_note);
1702
1703 /* this function waits for completion of outstanding io and then re-initializes
1704  * the anchor used to track it.  This is used to wait to complete DIO before
1705  * returning to userspace, and is never called for true AIO
1706  */
1707 int cl_sync_io_wait_recycle(const struct lu_env *env, struct cl_sync_io *anchor,
1708                             long timeout, int ioret)
1709 {
1710         int rc = 0;
1711
1712         /*
1713          * @anchor was inited as 1 to prevent end_io to be
1714          * called before we add all pages for IO, so drop
1715          * one extra reference to make sure we could wait
1716          * count to be zero.
1717          */
1718         cl_sync_io_note(env, anchor, ioret);
1719         /* Wait for completion of outstanding dio before re-initializing for
1720          * possible restart
1721          */
1722         rc = cl_sync_io_wait(env, anchor, timeout);
1723         /**
1724          * One extra reference again, as if @anchor is
1725          * reused we assume it as 1 before using.
1726          */
1727         atomic_add(1, &anchor->csi_sync_nr);
1728         /* we must also set this anchor as incomplete */
1729         atomic_set(&anchor->csi_complete, 0);
1730
1731         return rc;
1732 }
1733 EXPORT_SYMBOL(cl_sync_io_wait_recycle);