Whamcloud - gitweb
b=21259 "lfs check" is only allowed for root.
[fs/lustre-release.git] / lustre / obdclass / cl_io.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Client IO.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_CLASS
42 #ifndef EXPORT_SYMTAB
43 # define EXPORT_SYMTAB
44 #endif
45
46 #include <obd_class.h>
47 #include <obd_support.h>
48 #include <lustre_fid.h>
49 #include <libcfs/list.h>
50 /* lu_time_global_{init,fini}() */
51 #include <lu_time.h>
52
53 #include <cl_object.h>
54 #include "cl_internal.h"
55
56 /*****************************************************************************
57  *
58  * cl_io interface.
59  *
60  */
61
62 #define cl_io_for_each(slice, io) \
63         cfs_list_for_each_entry((slice), &io->ci_layers, cis_linkage)
64 #define cl_io_for_each_reverse(slice, io)                 \
65         cfs_list_for_each_entry_reverse((slice), &io->ci_layers, cis_linkage)
66
67 static inline int cl_io_type_is_valid(enum cl_io_type type)
68 {
69         return CIT_READ <= type && type < CIT_OP_NR;
70 }
71
72 static inline int cl_io_is_loopable(const struct cl_io *io)
73 {
74         return cl_io_type_is_valid(io->ci_type) && io->ci_type != CIT_MISC;
75 }
76
77 /**
78  * Returns true iff there is an IO ongoing in the given environment.
79  */
80 int cl_io_is_going(const struct lu_env *env)
81 {
82         return cl_env_info(env)->clt_current_io != NULL;
83 }
84 EXPORT_SYMBOL(cl_io_is_going);
85
86 /**
87  * cl_io invariant that holds at all times when exported cl_io_*() functions
88  * are entered and left.
89  */
90 static int cl_io_invariant(const struct cl_io *io)
91 {
92         struct cl_io *up;
93
94         up = io->ci_parent;
95         return
96                 /*
97                  * io can own pages only when it is ongoing. Sub-io might
98                  * still be in CIS_LOCKED state when top-io is in
99                  * CIS_IO_GOING.
100                  */
101                 ergo(io->ci_owned_nr > 0, io->ci_state == CIS_IO_GOING ||
102                      (io->ci_state == CIS_LOCKED && up != NULL));
103 }
104
105 /**
106  * Finalize \a io, by calling cl_io_operations::cio_fini() bottom-to-top.
107  */
108 void cl_io_fini(const struct lu_env *env, struct cl_io *io)
109 {
110         struct cl_io_slice    *slice;
111         struct cl_thread_info *info;
112
113         LINVRNT(cl_io_type_is_valid(io->ci_type));
114         LINVRNT(cl_io_invariant(io));
115         ENTRY;
116
117         while (!cfs_list_empty(&io->ci_layers)) {
118                 slice = container_of(io->ci_layers.next, struct cl_io_slice,
119                                      cis_linkage);
120                 cfs_list_del_init(&slice->cis_linkage);
121                 if (slice->cis_iop->op[io->ci_type].cio_fini != NULL)
122                         slice->cis_iop->op[io->ci_type].cio_fini(env, slice);
123                 /*
124                  * Invalidate slice to catch use after free. This assumes that
125                  * slices are allocated within session and can be touched
126                  * after ->cio_fini() returns.
127                  */
128                 slice->cis_io = NULL;
129         }
130         io->ci_state = CIS_FINI;
131         info = cl_env_info(env);
132         if (info->clt_current_io == io)
133                 info->clt_current_io = NULL;
134         EXIT;
135 }
136 EXPORT_SYMBOL(cl_io_fini);
137
138 static int cl_io_init0(const struct lu_env *env, struct cl_io *io,
139                        enum cl_io_type iot, struct cl_object *obj)
140 {
141         struct cl_object *scan;
142         int result;
143
144         LINVRNT(io->ci_state == CIS_ZERO || io->ci_state == CIS_FINI);
145         LINVRNT(cl_io_type_is_valid(iot));
146         LINVRNT(cl_io_invariant(io));
147         ENTRY;
148
149         io->ci_type = iot;
150         CFS_INIT_LIST_HEAD(&io->ci_lockset.cls_todo);
151         CFS_INIT_LIST_HEAD(&io->ci_lockset.cls_curr);
152         CFS_INIT_LIST_HEAD(&io->ci_lockset.cls_done);
153         CFS_INIT_LIST_HEAD(&io->ci_layers);
154
155         result = 0;
156         cl_object_for_each(scan, obj) {
157                 if (scan->co_ops->coo_io_init != NULL) {
158                         result = scan->co_ops->coo_io_init(env, scan, io);
159                         if (result != 0)
160                                 break;
161                 }
162         }
163         if (result == 0)
164                 io->ci_state = CIS_INIT;
165         RETURN(result);
166 }
167
168 /**
169  * Initialize sub-io, by calling cl_io_operations::cio_init() top-to-bottom.
170  *
171  * \pre obj != cl_object_top(obj)
172  */
173 int cl_io_sub_init(const struct lu_env *env, struct cl_io *io,
174                    enum cl_io_type iot, struct cl_object *obj)
175 {
176         struct cl_thread_info *info = cl_env_info(env);
177
178         LASSERT(obj != cl_object_top(obj));
179         if (info->clt_current_io == NULL)
180                 info->clt_current_io = io;
181         return cl_io_init0(env, io, iot, obj);
182 }
183 EXPORT_SYMBOL(cl_io_sub_init);
184
185 /**
186  * Initialize \a io, by calling cl_io_operations::cio_init() top-to-bottom.
187  *
188  * Caller has to call cl_io_fini() after a call to cl_io_init(), no matter
189  * what the latter returned.
190  *
191  * \pre obj == cl_object_top(obj)
192  * \pre cl_io_type_is_valid(iot)
193  * \post cl_io_type_is_valid(io->ci_type) && io->ci_type == iot
194  */
195 int cl_io_init(const struct lu_env *env, struct cl_io *io,
196                enum cl_io_type iot, struct cl_object *obj)
197 {
198         struct cl_thread_info *info = cl_env_info(env);
199
200         LASSERT(obj == cl_object_top(obj));
201         LASSERT(info->clt_current_io == NULL);
202
203         info->clt_current_io = io;
204         return cl_io_init0(env, io, iot, obj);
205 }
206 EXPORT_SYMBOL(cl_io_init);
207
208 /**
209  * Initialize read or write io.
210  *
211  * \pre iot == CIT_READ || iot == CIT_WRITE
212  */
213 int cl_io_rw_init(const struct lu_env *env, struct cl_io *io,
214                   enum cl_io_type iot, loff_t pos, size_t count)
215 {
216         LINVRNT(iot == CIT_READ || iot == CIT_WRITE);
217         LINVRNT(io->ci_obj != NULL);
218         ENTRY;
219
220         LU_OBJECT_HEADER(D_VFSTRACE, env, &io->ci_obj->co_lu,
221                          "io range: %u ["LPU64", "LPU64") %u %u\n",
222                          iot, (__u64)pos, (__u64)pos + count,
223                          io->u.ci_rw.crw_nonblock, io->u.ci_wr.wr_append);
224         io->u.ci_rw.crw_pos    = pos;
225         io->u.ci_rw.crw_count  = count;
226         RETURN(cl_io_init(env, io, iot, io->ci_obj));
227 }
228 EXPORT_SYMBOL(cl_io_rw_init);
229
230 static inline const struct lu_fid *
231 cl_lock_descr_fid(const struct cl_lock_descr *descr)
232 {
233         return lu_object_fid(&descr->cld_obj->co_lu);
234 }
235
236 static int cl_lock_descr_cmp(const struct cl_lock_descr *d0,
237                              const struct cl_lock_descr *d1)
238 {
239         return lu_fid_cmp(cl_lock_descr_fid(d0), cl_lock_descr_fid(d1)) ?:
240                 __diff_normalize(d0->cld_start, d1->cld_start);
241 }
242
243 /*
244  * Sort locks in lexicographical order of their (fid, start-offset) pairs.
245  */
246 static void cl_io_locks_sort(struct cl_io *io)
247 {
248         int done = 0;
249
250         ENTRY;
251         /* hidden treasure: bubble sort for now. */
252         do {
253                 struct cl_io_lock_link *curr;
254                 struct cl_io_lock_link *prev;
255                 struct cl_io_lock_link *temp;
256
257                 done = 1;
258                 prev = NULL;
259
260                 cfs_list_for_each_entry_safe(curr, temp,
261                                              &io->ci_lockset.cls_todo,
262                                              cill_linkage) {
263                         if (prev != NULL) {
264                                 switch (cl_lock_descr_cmp(&prev->cill_descr,
265                                                           &curr->cill_descr)) {
266                                 case 0:
267                                         /*
268                                          * IMPOSSIBLE: Identical locks are
269                                          *             already removed at
270                                          *             this point.
271                                          */
272                                 default:
273                                         LBUG();
274                                 case +1:
275                                         cfs_list_move_tail(&curr->cill_linkage,
276                                                            &prev->cill_linkage);
277                                         done = 0;
278                                         continue; /* don't change prev: it's
279                                                    * still "previous" */
280                                 case -1: /* already in order */
281                                         break;
282                                 }
283                         }
284                         prev = curr;
285                 }
286         } while (!done);
287         EXIT;
288 }
289
290 /**
291  * Check whether \a queue contains locks matching \a need.
292  *
293  * \retval +ve there is a matching lock in the \a queue
294  * \retval   0 there are no matching locks in the \a queue
295  */
296 int cl_queue_match(const cfs_list_t *queue,
297                    const struct cl_lock_descr *need)
298 {
299        struct cl_io_lock_link *scan;
300
301        ENTRY;
302        cfs_list_for_each_entry(scan, queue, cill_linkage) {
303                if (cl_lock_descr_match(&scan->cill_descr, need))
304                        RETURN(+1);
305        }
306        return 0;
307 }
308 EXPORT_SYMBOL(cl_queue_match);
309
310 static int cl_lockset_match(const struct cl_lockset *set,
311                             const struct cl_lock_descr *need, int all_queues)
312 {
313         return (all_queues ? cl_queue_match(&set->cls_todo, need) : 0) ||
314                 cl_queue_match(&set->cls_curr, need) ||
315                 cl_queue_match(&set->cls_done, need);
316 }
317
318 static int cl_lockset_lock_one(const struct lu_env *env,
319                                struct cl_io *io, struct cl_lockset *set,
320                                struct cl_io_lock_link *link)
321 {
322         struct cl_lock *lock;
323         int             result;
324
325         ENTRY;
326
327         lock = cl_lock_request(env, io, &link->cill_descr, "io", io);
328         if (!IS_ERR(lock)) {
329                 link->cill_lock = lock;
330                 cfs_list_move(&link->cill_linkage, &set->cls_curr);
331                 if (!(link->cill_descr.cld_enq_flags & CEF_ASYNC)) {
332                         result = cl_wait(env, lock);
333                         if (result == 0)
334                                 cfs_list_move(&link->cill_linkage,
335                                               &set->cls_done);
336                 } else
337                         result = 0;
338         } else
339                 result = PTR_ERR(lock);
340         RETURN(result);
341 }
342
343 static void cl_lock_link_fini(const struct lu_env *env, struct cl_io *io,
344                               struct cl_io_lock_link *link)
345 {
346         struct cl_lock *lock = link->cill_lock;
347
348         ENTRY;
349         cfs_list_del_init(&link->cill_linkage);
350         if (lock != NULL) {
351                 cl_lock_release(env, lock, "io", io);
352                 link->cill_lock = NULL;
353         }
354         if (link->cill_fini != NULL)
355                 link->cill_fini(env, link);
356         EXIT;
357 }
358
359 static int cl_lockset_lock(const struct lu_env *env, struct cl_io *io,
360                            struct cl_lockset *set)
361 {
362         struct cl_io_lock_link *link;
363         struct cl_io_lock_link *temp;
364         struct cl_lock         *lock;
365         int result;
366
367         ENTRY;
368         result = 0;
369         cfs_list_for_each_entry_safe(link, temp, &set->cls_todo, cill_linkage) {
370                 if (!cl_lockset_match(set, &link->cill_descr, 0)) {
371                         /* XXX some locking to guarantee that locks aren't
372                          * expanded in between. */
373                         result = cl_lockset_lock_one(env, io, set, link);
374                         if (result != 0)
375                                 break;
376                 } else
377                         cl_lock_link_fini(env, io, link);
378         }
379         if (result == 0) {
380                 cfs_list_for_each_entry_safe(link, temp,
381                                              &set->cls_curr, cill_linkage) {
382                         lock = link->cill_lock;
383                         result = cl_wait(env, lock);
384                         if (result == 0)
385                                 cfs_list_move(&link->cill_linkage,
386                                               &set->cls_done);
387                         else
388                                 break;
389                 }
390         }
391         RETURN(result);
392 }
393
394 /**
395  * Takes locks necessary for the current iteration of io.
396  *
397  * Calls cl_io_operations::cio_lock() top-to-bottom to collect locks required
398  * by layers for the current iteration. Then sort locks (to avoid dead-locks),
399  * and acquire them.
400  */
401 int cl_io_lock(const struct lu_env *env, struct cl_io *io)
402 {
403         const struct cl_io_slice *scan;
404         int result = 0;
405
406         LINVRNT(cl_io_is_loopable(io));
407         LINVRNT(io->ci_state == CIS_IT_STARTED);
408         LINVRNT(cl_io_invariant(io));
409
410         ENTRY;
411         cl_io_for_each(scan, io) {
412                 if (scan->cis_iop->op[io->ci_type].cio_lock == NULL)
413                         continue;
414                 result = scan->cis_iop->op[io->ci_type].cio_lock(env, scan);
415                 if (result != 0)
416                         break;
417         }
418         if (result == 0) {
419                 cl_io_locks_sort(io);
420                 result = cl_lockset_lock(env, io, &io->ci_lockset);
421         }
422         if (result != 0)
423                 cl_io_unlock(env, io);
424         else
425                 io->ci_state = CIS_LOCKED;
426         RETURN(result);
427 }
428 EXPORT_SYMBOL(cl_io_lock);
429
430 /**
431  * Release locks takes by io.
432  */
433 void cl_io_unlock(const struct lu_env *env, struct cl_io *io)
434 {
435         struct cl_lockset        *set;
436         struct cl_io_lock_link   *link;
437         struct cl_io_lock_link   *temp;
438         const struct cl_io_slice *scan;
439
440         LASSERT(cl_io_is_loopable(io));
441         LASSERT(CIS_IT_STARTED <= io->ci_state && io->ci_state < CIS_UNLOCKED);
442         LINVRNT(cl_io_invariant(io));
443
444         ENTRY;
445         set = &io->ci_lockset;
446
447         cfs_list_for_each_entry_safe(link, temp, &set->cls_todo, cill_linkage)
448                 cl_lock_link_fini(env, io, link);
449
450         cfs_list_for_each_entry_safe(link, temp, &set->cls_curr, cill_linkage)
451                 cl_lock_link_fini(env, io, link);
452
453         cfs_list_for_each_entry_safe(link, temp, &set->cls_done, cill_linkage) {
454                 cl_unuse(env, link->cill_lock);
455                 cl_lock_link_fini(env, io, link);
456         }
457         cl_io_for_each_reverse(scan, io) {
458                 if (scan->cis_iop->op[io->ci_type].cio_unlock != NULL)
459                         scan->cis_iop->op[io->ci_type].cio_unlock(env, scan);
460         }
461         io->ci_state = CIS_UNLOCKED;
462         LASSERT(!cl_env_info(env)->clt_counters[CNL_TOP].ctc_nr_locks_acquired);
463         EXIT;
464 }
465 EXPORT_SYMBOL(cl_io_unlock);
466
467 /**
468  * Prepares next iteration of io.
469  *
470  * Calls cl_io_operations::cio_iter_init() top-to-bottom. This exists to give
471  * layers a chance to modify io parameters, e.g., so that lov can restrict io
472  * to a single stripe.
473  */
474 int cl_io_iter_init(const struct lu_env *env, struct cl_io *io)
475 {
476         const struct cl_io_slice *scan;
477         int result;
478
479         LINVRNT(cl_io_is_loopable(io));
480         LINVRNT(io->ci_state == CIS_INIT || io->ci_state == CIS_IT_ENDED);
481         LINVRNT(cl_io_invariant(io));
482
483         ENTRY;
484         result = 0;
485         cl_io_for_each(scan, io) {
486                 if (scan->cis_iop->op[io->ci_type].cio_iter_init == NULL)
487                         continue;
488                 result = scan->cis_iop->op[io->ci_type].cio_iter_init(env,
489                                                                       scan);
490                 if (result != 0)
491                         break;
492         }
493         if (result == 0)
494                 io->ci_state = CIS_IT_STARTED;
495         RETURN(result);
496 }
497 EXPORT_SYMBOL(cl_io_iter_init);
498
499 /**
500  * Finalizes io iteration.
501  *
502  * Calls cl_io_operations::cio_iter_fini() bottom-to-top.
503  */
504 void cl_io_iter_fini(const struct lu_env *env, struct cl_io *io)
505 {
506         const struct cl_io_slice *scan;
507
508         LINVRNT(cl_io_is_loopable(io));
509         LINVRNT(io->ci_state == CIS_UNLOCKED);
510         LINVRNT(cl_io_invariant(io));
511
512         ENTRY;
513         cl_io_for_each_reverse(scan, io) {
514                 if (scan->cis_iop->op[io->ci_type].cio_iter_fini != NULL)
515                         scan->cis_iop->op[io->ci_type].cio_iter_fini(env, scan);
516         }
517         io->ci_state = CIS_IT_ENDED;
518         EXIT;
519 }
520 EXPORT_SYMBOL(cl_io_iter_fini);
521
522 /**
523  * Records that read or write io progressed \a nob bytes forward.
524  */
525 void cl_io_rw_advance(const struct lu_env *env, struct cl_io *io, size_t nob)
526 {
527         const struct cl_io_slice *scan;
528
529         LINVRNT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE ||
530                 nob == 0);
531         LINVRNT(cl_io_is_loopable(io));
532         LINVRNT(cl_io_invariant(io));
533
534         ENTRY;
535
536         io->u.ci_rw.crw_pos   += nob;
537         io->u.ci_rw.crw_count -= nob;
538
539         /* layers have to be notified. */
540         cl_io_for_each_reverse(scan, io) {
541                 if (scan->cis_iop->op[io->ci_type].cio_advance != NULL)
542                         scan->cis_iop->op[io->ci_type].cio_advance(env, scan,
543                                                                    nob);
544         }
545         EXIT;
546 }
547 EXPORT_SYMBOL(cl_io_rw_advance);
548
549 /**
550  * Adds a lock to a lockset.
551  */
552 int cl_io_lock_add(const struct lu_env *env, struct cl_io *io,
553                    struct cl_io_lock_link *link)
554 {
555         int result;
556
557         ENTRY;
558         if (cl_lockset_match(&io->ci_lockset, &link->cill_descr, 1))
559                 result = +1;
560         else {
561                 cfs_list_add(&link->cill_linkage, &io->ci_lockset.cls_todo);
562                 result = 0;
563         }
564         RETURN(result);
565 }
566 EXPORT_SYMBOL(cl_io_lock_add);
567
568 static void cl_free_io_lock_link(const struct lu_env *env,
569                                  struct cl_io_lock_link *link)
570 {
571         OBD_FREE_PTR(link);
572 }
573
574 /**
575  * Allocates new lock link, and uses it to add a lock to a lockset.
576  */
577 int cl_io_lock_alloc_add(const struct lu_env *env, struct cl_io *io,
578                          struct cl_lock_descr *descr)
579 {
580         struct cl_io_lock_link *link;
581         int result;
582
583         ENTRY;
584         OBD_ALLOC_PTR(link);
585         if (link != NULL) {
586                 link->cill_descr     = *descr;
587                 link->cill_fini      = cl_free_io_lock_link;
588                 result = cl_io_lock_add(env, io, link);
589                 if (result) /* lock match */
590                         link->cill_fini(env, link);
591         } else
592                 result = -ENOMEM;
593
594         RETURN(result);
595 }
596 EXPORT_SYMBOL(cl_io_lock_alloc_add);
597
598 /**
599  * Starts io by calling cl_io_operations::cio_start() top-to-bottom.
600  */
601 int cl_io_start(const struct lu_env *env, struct cl_io *io)
602 {
603         const struct cl_io_slice *scan;
604         int result = 0;
605
606         LINVRNT(cl_io_is_loopable(io));
607         LINVRNT(io->ci_state == CIS_LOCKED);
608         LINVRNT(cl_io_invariant(io));
609         ENTRY;
610
611         io->ci_state = CIS_IO_GOING;
612         cl_io_for_each(scan, io) {
613                 if (scan->cis_iop->op[io->ci_type].cio_start == NULL)
614                         continue;
615                 result = scan->cis_iop->op[io->ci_type].cio_start(env, scan);
616                 if (result != 0)
617                         break;
618         }
619         if (result >= 0)
620                 result = 0;
621         RETURN(result);
622 }
623 EXPORT_SYMBOL(cl_io_start);
624
625 /**
626  * Wait until current io iteration is finished by calling
627  * cl_io_operations::cio_end() bottom-to-top.
628  */
629 void cl_io_end(const struct lu_env *env, struct cl_io *io)
630 {
631         const struct cl_io_slice *scan;
632
633         LINVRNT(cl_io_is_loopable(io));
634         LINVRNT(io->ci_state == CIS_IO_GOING);
635         LINVRNT(cl_io_invariant(io));
636         ENTRY;
637
638         cl_io_for_each_reverse(scan, io) {
639                 if (scan->cis_iop->op[io->ci_type].cio_end != NULL)
640                         scan->cis_iop->op[io->ci_type].cio_end(env, scan);
641                 /* TODO: error handling. */
642         }
643         io->ci_state = CIS_IO_FINISHED;
644         EXIT;
645 }
646 EXPORT_SYMBOL(cl_io_end);
647
648 static const struct cl_page_slice *
649 cl_io_slice_page(const struct cl_io_slice *ios, struct cl_page *page)
650 {
651         const struct cl_page_slice *slice;
652
653         slice = cl_page_at(page, ios->cis_obj->co_lu.lo_dev->ld_type);
654         LINVRNT(slice != NULL);
655         return slice;
656 }
657
658 /**
659  * True iff \a page is within \a io range.
660  */
661 static int cl_page_in_io(const struct cl_page *page, const struct cl_io *io)
662 {
663         int     result = 1;
664         loff_t  start;
665         loff_t  end;
666         pgoff_t idx;
667
668         idx = page->cp_index;
669         switch (io->ci_type) {
670         case CIT_READ:
671         case CIT_WRITE:
672                 /*
673                  * check that [start, end) and [pos, pos + count) extents
674                  * overlap.
675                  */
676                 if (!cl_io_is_append(io)) {
677                         const struct cl_io_rw_common *crw = &(io->u.ci_rw);
678                         start = cl_offset(page->cp_obj, idx);
679                         end   = cl_offset(page->cp_obj, idx + 1);
680                         result = crw->crw_pos < end &&
681                                  start < crw->crw_pos + crw->crw_count;
682                 }
683                 break;
684         case CIT_FAULT:
685                 result = io->u.ci_fault.ft_index == idx;
686                 break;
687         default:
688                 LBUG();
689         }
690         return result;
691 }
692
693 /**
694  * Called by read io, when page has to be read from the server.
695  *
696  * \see cl_io_operations::cio_read_page()
697  */
698 int cl_io_read_page(const struct lu_env *env, struct cl_io *io,
699                     struct cl_page *page)
700 {
701         const struct cl_io_slice *scan;
702         struct cl_2queue         *queue;
703         int                       result = 0;
704
705         LINVRNT(io->ci_type == CIT_READ || io->ci_type == CIT_FAULT);
706         LINVRNT(cl_page_is_owned(page, io));
707         LINVRNT(io->ci_state == CIS_IO_GOING || io->ci_state == CIS_LOCKED);
708         LINVRNT(cl_page_in_io(page, io));
709         LINVRNT(cl_io_invariant(io));
710         ENTRY;
711
712         queue = &io->ci_queue;
713
714         cl_2queue_init(queue);
715         /*
716          * ->cio_read_page() methods called in the loop below are supposed to
717          * never block waiting for network (the only subtle point is the
718          * creation of new pages for read-ahead that might result in cache
719          * shrinking, but currently only clean pages are shrunk and this
720          * requires no network io).
721          *
722          * Should this ever starts blocking, retry loop would be needed for
723          * "parallel io" (see CLO_REPEAT loops in cl_lock.c).
724          */
725         cl_io_for_each(scan, io) {
726                 if (scan->cis_iop->cio_read_page != NULL) {
727                         const struct cl_page_slice *slice;
728
729                         slice = cl_io_slice_page(scan, page);
730                         LINVRNT(slice != NULL);
731                         result = scan->cis_iop->cio_read_page(env, scan, slice);
732                         if (result != 0)
733                                 break;
734                 }
735         }
736         if (result == 0)
737                 result = cl_io_submit_rw(env, io, CRT_READ, queue, CRP_NORMAL);
738         /*
739          * Unlock unsent pages in case of error.
740          */
741         cl_page_list_disown(env, io, &queue->c2_qin);
742         cl_2queue_fini(env, queue);
743         RETURN(result);
744 }
745 EXPORT_SYMBOL(cl_io_read_page);
746
747 /**
748  * Called by write io to prepare page to receive data from user buffer.
749  *
750  * \see cl_io_operations::cio_prepare_write()
751  */
752 int cl_io_prepare_write(const struct lu_env *env, struct cl_io *io,
753                         struct cl_page *page, unsigned from, unsigned to)
754 {
755         const struct cl_io_slice *scan;
756         int result = 0;
757
758         LINVRNT(io->ci_type == CIT_WRITE);
759         LINVRNT(cl_page_is_owned(page, io));
760         LINVRNT(io->ci_state == CIS_IO_GOING || io->ci_state == CIS_LOCKED);
761         LINVRNT(cl_io_invariant(io));
762         LASSERT(cl_page_in_io(page, io));
763         ENTRY;
764
765         cl_io_for_each_reverse(scan, io) {
766                 if (scan->cis_iop->cio_prepare_write != NULL) {
767                         const struct cl_page_slice *slice;
768
769                         slice = cl_io_slice_page(scan, page);
770                         result = scan->cis_iop->cio_prepare_write(env, scan,
771                                                                   slice,
772                                                                   from, to);
773                         if (result != 0)
774                                 break;
775                 }
776         }
777         RETURN(result);
778 }
779 EXPORT_SYMBOL(cl_io_prepare_write);
780
781 /**
782  * Called by write io after user data were copied into a page.
783  *
784  * \see cl_io_operations::cio_commit_write()
785  */
786 int cl_io_commit_write(const struct lu_env *env, struct cl_io *io,
787                        struct cl_page *page, unsigned from, unsigned to)
788 {
789         const struct cl_io_slice *scan;
790         int result = 0;
791
792         LINVRNT(io->ci_type == CIT_WRITE);
793         LINVRNT(io->ci_state == CIS_IO_GOING || io->ci_state == CIS_LOCKED);
794         LINVRNT(cl_io_invariant(io));
795         /*
796          * XXX Uh... not nice. Top level cl_io_commit_write() call (vvp->lov)
797          * already called cl_page_cache_add(), moving page into CPS_CACHED
798          * state. Better (and more general) way of dealing with such situation
799          * is needed.
800          */
801         LASSERT(cl_page_is_owned(page, io) || page->cp_parent != NULL);
802         LASSERT(cl_page_in_io(page, io));
803         ENTRY;
804
805         cl_io_for_each(scan, io) {
806                 if (scan->cis_iop->cio_commit_write != NULL) {
807                         const struct cl_page_slice *slice;
808
809                         slice = cl_io_slice_page(scan, page);
810                         result = scan->cis_iop->cio_commit_write(env, scan,
811                                                                  slice,
812                                                                  from, to);
813                         if (result != 0)
814                                 break;
815                 }
816         }
817         LINVRNT(result <= 0);
818         RETURN(result);
819 }
820 EXPORT_SYMBOL(cl_io_commit_write);
821
822 /**
823  * Submits a list of pages for immediate io.
824  *
825  * After the function gets returned, The submitted pages are moved to
826  * queue->c2_qout queue, and queue->c2_qin contain both the pages don't need
827  * to be submitted, and the pages are errant to submit.
828  *
829  * \returns 0 if at least one page was submitted, error code otherwise.
830  * \see cl_io_operations::cio_submit()
831  */
832 int cl_io_submit_rw(const struct lu_env *env, struct cl_io *io,
833                     enum cl_req_type crt, struct cl_2queue *queue,
834                     enum cl_req_priority priority)
835 {
836         const struct cl_io_slice *scan;
837         int result = 0;
838
839         LINVRNT(crt < ARRAY_SIZE(scan->cis_iop->req_op));
840         ENTRY;
841
842         cl_io_for_each(scan, io) {
843                 if (scan->cis_iop->req_op[crt].cio_submit == NULL)
844                         continue;
845                 result = scan->cis_iop->req_op[crt].cio_submit(env, scan, crt,
846                                                                queue, priority);
847                 if (result != 0)
848                         break;
849         }
850         /*
851          * If ->cio_submit() failed, no pages were sent.
852          */
853         LASSERT(ergo(result != 0, cfs_list_empty(&queue->c2_qout.pl_pages)));
854         RETURN(result);
855 }
856 EXPORT_SYMBOL(cl_io_submit_rw);
857
858 /**
859  * Submit a sync_io and wait for the IO to be finished, or error happens.
860  * If \a timeout is zero, it means to wait for the IO unconditionally.
861  */
862 int cl_io_submit_sync(const struct lu_env *env, struct cl_io *io,
863                       enum cl_req_type iot, struct cl_2queue *queue,
864                       enum cl_req_priority prio, long timeout)
865 {
866         struct cl_sync_io *anchor = &cl_env_info(env)->clt_anchor;
867         struct cl_page *pg;
868         int rc;
869
870         LASSERT(prio == CRP_NORMAL || prio == CRP_CANCEL);
871
872         cl_page_list_for_each(pg, &queue->c2_qin) {
873                 LASSERT(pg->cp_sync_io == NULL);
874                 pg->cp_sync_io = anchor;
875         }
876
877         cl_sync_io_init(anchor, queue->c2_qin.pl_nr);
878         rc = cl_io_submit_rw(env, io, iot, queue, prio);
879         if (rc == 0) {
880                 /*
881                  * If some pages weren't sent for any reason (e.g.,
882                  * read found up-to-date pages in the cache, or write found
883                  * clean pages), count them as completed to avoid infinite
884                  * wait.
885                  */
886                  cl_page_list_for_each(pg, &queue->c2_qin) {
887                         pg->cp_sync_io = NULL;
888                         cl_sync_io_note(anchor, +1);
889                  }
890
891                  /* wait for the IO to be finished. */
892                  rc = cl_sync_io_wait(env, io, &queue->c2_qout,
893                                       anchor, timeout);
894         } else {
895                 LASSERT(cfs_list_empty(&queue->c2_qout.pl_pages));
896                 cl_page_list_for_each(pg, &queue->c2_qin)
897                         pg->cp_sync_io = NULL;
898         }
899         return rc;
900 }
901 EXPORT_SYMBOL(cl_io_submit_sync);
902
903 /**
904  * Cancel an IO which has been submitted by cl_io_submit_rw.
905  */
906 int cl_io_cancel(const struct lu_env *env, struct cl_io *io,
907                  struct cl_page_list *queue)
908 {
909         struct cl_page *page;
910         int result = 0;
911
912         CERROR("Canceling ongoing page trasmission\n");
913         cl_page_list_for_each(page, queue) {
914                 int rc;
915
916                 LINVRNT(cl_page_in_io(page, io));
917                 rc = cl_page_cancel(env, page);
918                 result = result ?: rc;
919         }
920         return result;
921 }
922 EXPORT_SYMBOL(cl_io_cancel);
923
924 /**
925  * Main io loop.
926  *
927  * Pumps io through iterations calling
928  *
929  *    - cl_io_iter_init()
930  *
931  *    - cl_io_lock()
932  *
933  *    - cl_io_start()
934  *
935  *    - cl_io_end()
936  *
937  *    - cl_io_unlock()
938  *
939  *    - cl_io_iter_fini()
940  *
941  * repeatedly until there is no more io to do.
942  */
943 int cl_io_loop(const struct lu_env *env, struct cl_io *io)
944 {
945         int result   = 0;
946
947         LINVRNT(cl_io_is_loopable(io));
948         ENTRY;
949
950         do {
951                 size_t nob;
952
953                 io->ci_continue = 0;
954                 result = cl_io_iter_init(env, io);
955                 if (result == 0) {
956                         nob    = io->ci_nob;
957                         result = cl_io_lock(env, io);
958                         if (result == 0) {
959                                 /*
960                                  * Notify layers that locks has been taken,
961                                  * and do actual i/o.
962                                  *
963                                  *   - llite: kms, short read;
964                                  *   - llite: generic_file_read();
965                                  */
966                                 result = cl_io_start(env, io);
967                                 /*
968                                  * Send any remaining pending
969                                  * io, etc.
970                                  *
971                                  *   - llite: ll_rw_stats_tally.
972                                  */
973                                 cl_io_end(env, io);
974                                 cl_io_unlock(env, io);
975                                 cl_io_rw_advance(env, io, io->ci_nob - nob);
976                         }
977                 }
978                 cl_io_iter_fini(env, io);
979         } while (result == 0 && io->ci_continue);
980         RETURN(result < 0 ? result : 0);
981 }
982 EXPORT_SYMBOL(cl_io_loop);
983
984 /**
985  * Adds io slice to the cl_io.
986  *
987  * This is called by cl_object_operations::coo_io_init() methods to add a
988  * per-layer state to the io. New state is added at the end of
989  * cl_io::ci_layers list, that is, it is at the bottom of the stack.
990  *
991  * \see cl_lock_slice_add(), cl_req_slice_add(), cl_page_slice_add()
992  */
993 void cl_io_slice_add(struct cl_io *io, struct cl_io_slice *slice,
994                      struct cl_object *obj,
995                      const struct cl_io_operations *ops)
996 {
997         cfs_list_t *linkage = &slice->cis_linkage;
998
999         LASSERT((linkage->prev == NULL && linkage->next == NULL) ||
1000                 cfs_list_empty(linkage));
1001         ENTRY;
1002
1003         cfs_list_add_tail(linkage, &io->ci_layers);
1004         slice->cis_io  = io;
1005         slice->cis_obj = obj;
1006         slice->cis_iop = ops;
1007         EXIT;
1008 }
1009 EXPORT_SYMBOL(cl_io_slice_add);
1010
1011
1012 /**
1013  * Initializes page list.
1014  */
1015 void cl_page_list_init(struct cl_page_list *plist)
1016 {
1017         ENTRY;
1018         plist->pl_nr = 0;
1019         CFS_INIT_LIST_HEAD(&plist->pl_pages);
1020         plist->pl_owner = cfs_current();
1021         EXIT;
1022 }
1023 EXPORT_SYMBOL(cl_page_list_init);
1024
1025 /**
1026  * Adds a page to a page list.
1027  */
1028 void cl_page_list_add(struct cl_page_list *plist, struct cl_page *page)
1029 {
1030         ENTRY;
1031         /* it would be better to check that page is owned by "current" io, but
1032          * it is not passed here. */
1033         LASSERT(page->cp_owner != NULL);
1034         LINVRNT(plist->pl_owner == cfs_current());
1035
1036         cfs_lockdep_off();
1037         cfs_mutex_lock(&page->cp_mutex);
1038         cfs_lockdep_on();
1039         LASSERT(cfs_list_empty(&page->cp_batch));
1040         cfs_list_add_tail(&page->cp_batch, &plist->pl_pages);
1041         ++plist->pl_nr;
1042         page->cp_queue_ref = lu_ref_add(&page->cp_reference, "queue", plist);
1043         cl_page_get(page);
1044         EXIT;
1045 }
1046 EXPORT_SYMBOL(cl_page_list_add);
1047
1048 /**
1049  * Removes a page from a page list.
1050  */
1051 void cl_page_list_del(const struct lu_env *env,
1052                       struct cl_page_list *plist, struct cl_page *page)
1053 {
1054         LASSERT(plist->pl_nr > 0);
1055         LINVRNT(plist->pl_owner == cfs_current());
1056
1057         ENTRY;
1058         cfs_list_del_init(&page->cp_batch);
1059         cfs_lockdep_off();
1060         cfs_mutex_unlock(&page->cp_mutex);
1061         cfs_lockdep_on();
1062         --plist->pl_nr;
1063         lu_ref_del_at(&page->cp_reference, page->cp_queue_ref, "queue", plist);
1064         cl_page_put(env, page);
1065         EXIT;
1066 }
1067 EXPORT_SYMBOL(cl_page_list_del);
1068
1069 /**
1070  * Moves a page from one page list to another.
1071  */
1072 void cl_page_list_move(struct cl_page_list *dst, struct cl_page_list *src,
1073                        struct cl_page *page)
1074 {
1075         LASSERT(src->pl_nr > 0);
1076         LINVRNT(dst->pl_owner == cfs_current());
1077         LINVRNT(src->pl_owner == cfs_current());
1078
1079         ENTRY;
1080         cfs_list_move_tail(&page->cp_batch, &dst->pl_pages);
1081         --src->pl_nr;
1082         ++dst->pl_nr;
1083         lu_ref_set_at(&page->cp_reference,
1084                       page->cp_queue_ref, "queue", src, dst);
1085         EXIT;
1086 }
1087 EXPORT_SYMBOL(cl_page_list_move);
1088
1089 /**
1090  * splice the cl_page_list, just as list head does
1091  */
1092 void cl_page_list_splice(struct cl_page_list *list, struct cl_page_list *head)
1093 {
1094         struct cl_page *page;
1095         struct cl_page *tmp;
1096
1097         LINVRNT(list->pl_owner == cfs_current());
1098         LINVRNT(head->pl_owner == cfs_current());
1099
1100         ENTRY;
1101         cl_page_list_for_each_safe(page, tmp, list)
1102                 cl_page_list_move(head, list, page);
1103         EXIT;
1104 }
1105 EXPORT_SYMBOL(cl_page_list_splice);
1106
1107 void cl_page_disown0(const struct lu_env *env,
1108                      struct cl_io *io, struct cl_page *pg);
1109
1110 /**
1111  * Disowns pages in a queue.
1112  */
1113 void cl_page_list_disown(const struct lu_env *env,
1114                          struct cl_io *io, struct cl_page_list *plist)
1115 {
1116         struct cl_page *page;
1117         struct cl_page *temp;
1118
1119         LINVRNT(plist->pl_owner == cfs_current());
1120
1121         ENTRY;
1122         cl_page_list_for_each_safe(page, temp, plist) {
1123                 LASSERT(plist->pl_nr > 0);
1124
1125                 cfs_list_del_init(&page->cp_batch);
1126                 cfs_lockdep_off();
1127                 cfs_mutex_unlock(&page->cp_mutex);
1128                 cfs_lockdep_on();
1129                 --plist->pl_nr;
1130                 /*
1131                  * cl_page_disown0 rather than usual cl_page_disown() is used,
1132                  * because pages are possibly in CPS_FREEING state already due
1133                  * to the call to cl_page_list_discard().
1134                  */
1135                 /*
1136                  * XXX cl_page_disown0() will fail if page is not locked.
1137                  */
1138                 cl_page_disown0(env, io, page);
1139                 lu_ref_del(&page->cp_reference, "queue", plist);
1140                 cl_page_put(env, page);
1141         }
1142         EXIT;
1143 }
1144 EXPORT_SYMBOL(cl_page_list_disown);
1145
1146 /**
1147  * Releases pages from queue.
1148  */
1149 void cl_page_list_fini(const struct lu_env *env, struct cl_page_list *plist)
1150 {
1151         struct cl_page *page;
1152         struct cl_page *temp;
1153
1154         LINVRNT(plist->pl_owner == cfs_current());
1155
1156         ENTRY;
1157         cl_page_list_for_each_safe(page, temp, plist)
1158                 cl_page_list_del(env, plist, page);
1159         LASSERT(plist->pl_nr == 0);
1160         EXIT;
1161 }
1162 EXPORT_SYMBOL(cl_page_list_fini);
1163
1164 /**
1165  * Owns all pages in a queue.
1166  */
1167 int cl_page_list_own(const struct lu_env *env,
1168                      struct cl_io *io, struct cl_page_list *plist)
1169 {
1170         struct cl_page *page;
1171         struct cl_page *temp;
1172         pgoff_t index = 0;
1173         int result;
1174
1175         LINVRNT(plist->pl_owner == cfs_current());
1176
1177         ENTRY;
1178         result = 0;
1179         cl_page_list_for_each_safe(page, temp, plist) {
1180                 LASSERT(index <= page->cp_index);
1181                 index = page->cp_index;
1182                 if (cl_page_own(env, io, page) == 0)
1183                         result = result ?: page->cp_error;
1184                 else
1185                         cl_page_list_del(env, plist, page);
1186         }
1187         RETURN(result);
1188 }
1189 EXPORT_SYMBOL(cl_page_list_own);
1190
1191 /**
1192  * Assumes all pages in a queue.
1193  */
1194 void cl_page_list_assume(const struct lu_env *env,
1195                          struct cl_io *io, struct cl_page_list *plist)
1196 {
1197         struct cl_page *page;
1198
1199         LINVRNT(plist->pl_owner == cfs_current());
1200
1201         cl_page_list_for_each(page, plist)
1202                 cl_page_assume(env, io, page);
1203 }
1204 EXPORT_SYMBOL(cl_page_list_assume);
1205
1206 /**
1207  * Discards all pages in a queue.
1208  */
1209 void cl_page_list_discard(const struct lu_env *env, struct cl_io *io,
1210                           struct cl_page_list *plist)
1211 {
1212         struct cl_page *page;
1213
1214         LINVRNT(plist->pl_owner == cfs_current());
1215         ENTRY;
1216         cl_page_list_for_each(page, plist)
1217                 cl_page_discard(env, io, page);
1218         EXIT;
1219 }
1220 EXPORT_SYMBOL(cl_page_list_discard);
1221
1222 /**
1223  * Unmaps all pages in a queue from user virtual memory.
1224  */
1225 int cl_page_list_unmap(const struct lu_env *env, struct cl_io *io,
1226                         struct cl_page_list *plist)
1227 {
1228         struct cl_page *page;
1229         int result;
1230
1231         LINVRNT(plist->pl_owner == cfs_current());
1232         ENTRY;
1233         result = 0;
1234         cl_page_list_for_each(page, plist) {
1235                 result = cl_page_unmap(env, io, page);
1236                 if (result != 0)
1237                         break;
1238         }
1239         RETURN(result);
1240 }
1241 EXPORT_SYMBOL(cl_page_list_unmap);
1242
1243 /**
1244  * Initialize dual page queue.
1245  */
1246 void cl_2queue_init(struct cl_2queue *queue)
1247 {
1248         ENTRY;
1249         cl_page_list_init(&queue->c2_qin);
1250         cl_page_list_init(&queue->c2_qout);
1251         EXIT;
1252 }
1253 EXPORT_SYMBOL(cl_2queue_init);
1254
1255 /**
1256  * Add a page to the incoming page list of 2-queue.
1257  */
1258 void cl_2queue_add(struct cl_2queue *queue, struct cl_page *page)
1259 {
1260         ENTRY;
1261         cl_page_list_add(&queue->c2_qin, page);
1262         EXIT;
1263 }
1264 EXPORT_SYMBOL(cl_2queue_add);
1265
1266 /**
1267  * Disown pages in both lists of a 2-queue.
1268  */
1269 void cl_2queue_disown(const struct lu_env *env,
1270                       struct cl_io *io, struct cl_2queue *queue)
1271 {
1272         ENTRY;
1273         cl_page_list_disown(env, io, &queue->c2_qin);
1274         cl_page_list_disown(env, io, &queue->c2_qout);
1275         EXIT;
1276 }
1277 EXPORT_SYMBOL(cl_2queue_disown);
1278
1279 /**
1280  * Discard (truncate) pages in both lists of a 2-queue.
1281  */
1282 void cl_2queue_discard(const struct lu_env *env,
1283                        struct cl_io *io, struct cl_2queue *queue)
1284 {
1285         ENTRY;
1286         cl_page_list_discard(env, io, &queue->c2_qin);
1287         cl_page_list_discard(env, io, &queue->c2_qout);
1288         EXIT;
1289 }
1290 EXPORT_SYMBOL(cl_2queue_discard);
1291
1292 /**
1293  * Assume to own the pages in cl_2queue
1294  */
1295 void cl_2queue_assume(const struct lu_env *env,
1296                       struct cl_io *io, struct cl_2queue *queue)
1297 {
1298         cl_page_list_assume(env, io, &queue->c2_qin);
1299         cl_page_list_assume(env, io, &queue->c2_qout);
1300 }
1301 EXPORT_SYMBOL(cl_2queue_assume);
1302
1303 /**
1304  * Finalize both page lists of a 2-queue.
1305  */
1306 void cl_2queue_fini(const struct lu_env *env, struct cl_2queue *queue)
1307 {
1308         ENTRY;
1309         cl_page_list_fini(env, &queue->c2_qout);
1310         cl_page_list_fini(env, &queue->c2_qin);
1311         EXIT;
1312 }
1313 EXPORT_SYMBOL(cl_2queue_fini);
1314
1315 /**
1316  * Initialize a 2-queue to contain \a page in its incoming page list.
1317  */
1318 void cl_2queue_init_page(struct cl_2queue *queue, struct cl_page *page)
1319 {
1320         ENTRY;
1321         cl_2queue_init(queue);
1322         cl_2queue_add(queue, page);
1323         EXIT;
1324 }
1325 EXPORT_SYMBOL(cl_2queue_init_page);
1326
1327 /**
1328  * Returns top-level io.
1329  *
1330  * \see cl_object_top(), cl_page_top().
1331  */
1332 struct cl_io *cl_io_top(struct cl_io *io)
1333 {
1334         ENTRY;
1335         while (io->ci_parent != NULL)
1336                 io = io->ci_parent;
1337         RETURN(io);
1338 }
1339 EXPORT_SYMBOL(cl_io_top);
1340
1341 /**
1342  * Prints human readable representation of \a io to the \a f.
1343  */
1344 void cl_io_print(const struct lu_env *env, void *cookie,
1345                  lu_printer_t printer, const struct cl_io *io)
1346 {
1347 }
1348
1349 /**
1350  * Adds request slice to the compound request.
1351  *
1352  * This is called by cl_device_operations::cdo_req_init() methods to add a
1353  * per-layer state to the request. New state is added at the end of
1354  * cl_req::crq_layers list, that is, it is at the bottom of the stack.
1355  *
1356  * \see cl_lock_slice_add(), cl_page_slice_add(), cl_io_slice_add()
1357  */
1358 void cl_req_slice_add(struct cl_req *req, struct cl_req_slice *slice,
1359                       struct cl_device *dev,
1360                       const struct cl_req_operations *ops)
1361 {
1362         ENTRY;
1363         cfs_list_add_tail(&slice->crs_linkage, &req->crq_layers);
1364         slice->crs_dev = dev;
1365         slice->crs_ops = ops;
1366         slice->crs_req = req;
1367         EXIT;
1368 }
1369 EXPORT_SYMBOL(cl_req_slice_add);
1370
1371 static void cl_req_free(const struct lu_env *env, struct cl_req *req)
1372 {
1373         unsigned i;
1374
1375         LASSERT(cfs_list_empty(&req->crq_pages));
1376         LASSERT(req->crq_nrpages == 0);
1377         LINVRNT(cfs_list_empty(&req->crq_layers));
1378         LINVRNT(equi(req->crq_nrobjs > 0, req->crq_o != NULL));
1379         ENTRY;
1380
1381         if (req->crq_o != NULL) {
1382                 for (i = 0; i < req->crq_nrobjs; ++i) {
1383                         struct cl_object *obj = req->crq_o[i].ro_obj;
1384                         if (obj != NULL) {
1385                                 lu_object_ref_del_at(&obj->co_lu,
1386                                                      req->crq_o[i].ro_obj_ref,
1387                                                      "cl_req", req);
1388                                 cl_object_put(env, obj);
1389                         }
1390                 }
1391                 OBD_FREE(req->crq_o, req->crq_nrobjs * sizeof req->crq_o[0]);
1392         }
1393         OBD_FREE_PTR(req);
1394         EXIT;
1395 }
1396
1397 static int cl_req_init(const struct lu_env *env, struct cl_req *req,
1398                        struct cl_page *page)
1399 {
1400         struct cl_device     *dev;
1401         struct cl_page_slice *slice;
1402         int result;
1403
1404         ENTRY;
1405         result = 0;
1406         page = cl_page_top(page);
1407         do {
1408                 cfs_list_for_each_entry(slice, &page->cp_layers, cpl_linkage) {
1409                         dev = lu2cl_dev(slice->cpl_obj->co_lu.lo_dev);
1410                         if (dev->cd_ops->cdo_req_init != NULL) {
1411                                 result = dev->cd_ops->cdo_req_init(env,
1412                                                                    dev, req);
1413                                 if (result != 0)
1414                                         break;
1415                         }
1416                 }
1417                 page = page->cp_child;
1418         } while (page != NULL && result == 0);
1419         RETURN(result);
1420 }
1421
1422 /**
1423  * Invokes per-request transfer completion call-backs
1424  * (cl_req_operations::cro_completion()) bottom-to-top.
1425  */
1426 void cl_req_completion(const struct lu_env *env, struct cl_req *req, int rc)
1427 {
1428         struct cl_req_slice *slice;
1429
1430         ENTRY;
1431         /*
1432          * for the lack of list_for_each_entry_reverse_safe()...
1433          */
1434         while (!cfs_list_empty(&req->crq_layers)) {
1435                 slice = cfs_list_entry(req->crq_layers.prev,
1436                                        struct cl_req_slice, crs_linkage);
1437                 cfs_list_del_init(&slice->crs_linkage);
1438                 if (slice->crs_ops->cro_completion != NULL)
1439                         slice->crs_ops->cro_completion(env, slice, rc);
1440         }
1441         cl_req_free(env, req);
1442         EXIT;
1443 }
1444 EXPORT_SYMBOL(cl_req_completion);
1445
1446 /**
1447  * Allocates new transfer request.
1448  */
1449 struct cl_req *cl_req_alloc(const struct lu_env *env, struct cl_page *page,
1450                             enum cl_req_type crt, int nr_objects)
1451 {
1452         struct cl_req *req;
1453
1454         LINVRNT(nr_objects > 0);
1455         ENTRY;
1456
1457         OBD_ALLOC_PTR(req);
1458         if (req != NULL) {
1459                 int result;
1460
1461                 OBD_ALLOC(req->crq_o, nr_objects * sizeof req->crq_o[0]);
1462                 if (req->crq_o != NULL) {
1463                         req->crq_nrobjs = nr_objects;
1464                         req->crq_type = crt;
1465                         CFS_INIT_LIST_HEAD(&req->crq_pages);
1466                         CFS_INIT_LIST_HEAD(&req->crq_layers);
1467                         result = cl_req_init(env, req, page);
1468                 } else
1469                         result = -ENOMEM;
1470                 if (result != 0) {
1471                         cl_req_completion(env, req, result);
1472                         req = ERR_PTR(result);
1473                 }
1474         } else
1475                 req = ERR_PTR(-ENOMEM);
1476         RETURN(req);
1477 }
1478 EXPORT_SYMBOL(cl_req_alloc);
1479
1480 /**
1481  * Adds a page to a request.
1482  */
1483 void cl_req_page_add(const struct lu_env *env,
1484                      struct cl_req *req, struct cl_page *page)
1485 {
1486         struct cl_object  *obj;
1487         struct cl_req_obj *rqo;
1488         int i;
1489
1490         ENTRY;
1491         page = cl_page_top(page);
1492
1493         LINVRNT(cl_page_is_vmlocked(env, page));
1494         LASSERT(cfs_list_empty(&page->cp_flight));
1495         LASSERT(page->cp_req == NULL);
1496
1497         cfs_list_add_tail(&page->cp_flight, &req->crq_pages);
1498         ++req->crq_nrpages;
1499         page->cp_req = req;
1500         obj = cl_object_top(page->cp_obj);
1501         for (i = 0, rqo = req->crq_o; obj != rqo->ro_obj; ++i, ++rqo) {
1502                 if (rqo->ro_obj == NULL) {
1503                         rqo->ro_obj = obj;
1504                         cl_object_get(obj);
1505                         rqo->ro_obj_ref = lu_object_ref_add(&obj->co_lu,
1506                                                             "cl_req", req);
1507                         break;
1508                 }
1509         }
1510         LASSERT(i < req->crq_nrobjs);
1511         EXIT;
1512 }
1513 EXPORT_SYMBOL(cl_req_page_add);
1514
1515 /**
1516  * Removes a page from a request.
1517  */
1518 void cl_req_page_done(const struct lu_env *env, struct cl_page *page)
1519 {
1520         struct cl_req *req = page->cp_req;
1521
1522         ENTRY;
1523         page = cl_page_top(page);
1524
1525         LINVRNT(cl_page_is_vmlocked(env, page));
1526         LASSERT(!cfs_list_empty(&page->cp_flight));
1527         LASSERT(req->crq_nrpages > 0);
1528
1529         cfs_list_del_init(&page->cp_flight);
1530         --req->crq_nrpages;
1531         page->cp_req = NULL;
1532         EXIT;
1533 }
1534 EXPORT_SYMBOL(cl_req_page_done);
1535
1536 /**
1537  * Notifies layers that request is about to depart by calling
1538  * cl_req_operations::cro_prep() top-to-bottom.
1539  */
1540 int cl_req_prep(const struct lu_env *env, struct cl_req *req)
1541 {
1542         int i;
1543         int result;
1544         const struct cl_req_slice *slice;
1545
1546         ENTRY;
1547         /*
1548          * Check that the caller of cl_req_alloc() didn't lie about the number
1549          * of objects.
1550          */
1551         for (i = 0; i < req->crq_nrobjs; ++i)
1552                 LASSERT(req->crq_o[i].ro_obj != NULL);
1553
1554         result = 0;
1555         cfs_list_for_each_entry(slice, &req->crq_layers, crs_linkage) {
1556                 if (slice->crs_ops->cro_prep != NULL) {
1557                         result = slice->crs_ops->cro_prep(env, slice);
1558                         if (result != 0)
1559                                 break;
1560                 }
1561         }
1562         RETURN(result);
1563 }
1564 EXPORT_SYMBOL(cl_req_prep);
1565
1566 /**
1567  * Fills in attributes that are passed to server together with transfer. Only
1568  * attributes from \a flags may be touched. This can be called multiple times
1569  * for the same request.
1570  */
1571 void cl_req_attr_set(const struct lu_env *env, struct cl_req *req,
1572                      struct cl_req_attr *attr, obd_valid flags)
1573 {
1574         const struct cl_req_slice *slice;
1575         struct cl_page            *page;
1576         int i;
1577
1578         LASSERT(!cfs_list_empty(&req->crq_pages));
1579         ENTRY;
1580
1581         /* Take any page to use as a model. */
1582         page = cfs_list_entry(req->crq_pages.next, struct cl_page, cp_flight);
1583
1584         for (i = 0; i < req->crq_nrobjs; ++i) {
1585                 cfs_list_for_each_entry(slice, &req->crq_layers, crs_linkage) {
1586                         const struct cl_page_slice *scan;
1587                         const struct cl_object     *obj;
1588
1589                         scan = cl_page_at(page,
1590                                           slice->crs_dev->cd_lu_dev.ld_type);
1591                         LASSERT(scan != NULL);
1592                         obj = scan->cpl_obj;
1593                         if (slice->crs_ops->cro_attr_set != NULL)
1594                                 slice->crs_ops->cro_attr_set(env, slice, obj,
1595                                                              attr + i, flags);
1596                 }
1597         }
1598         EXIT;
1599 }
1600 EXPORT_SYMBOL(cl_req_attr_set);
1601
1602 /* XXX complete(), init_completion(), and wait_for_completion(), until they are
1603  * implemented in libcfs. */
1604 #ifdef __KERNEL__
1605 # include <linux/sched.h>
1606 #else /* __KERNEL__ */
1607 # include <liblustre.h>
1608 #endif
1609
1610 /**
1611  * Initialize synchronous io wait anchor, for transfer of \a nrpages pages.
1612  */
1613 void cl_sync_io_init(struct cl_sync_io *anchor, int nrpages)
1614 {
1615         ENTRY;
1616         cfs_waitq_init(&anchor->csi_waitq);
1617         cfs_atomic_set(&anchor->csi_sync_nr, nrpages);
1618         anchor->csi_sync_rc  = 0;
1619         EXIT;
1620 }
1621 EXPORT_SYMBOL(cl_sync_io_init);
1622
1623 /**
1624  * Wait until all transfer completes. Transfer completion routine has to call
1625  * cl_sync_io_note() for every page.
1626  */
1627 int cl_sync_io_wait(const struct lu_env *env, struct cl_io *io,
1628                     struct cl_page_list *queue, struct cl_sync_io *anchor,
1629                     long timeout)
1630 {
1631         struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout),
1632                                                   NULL, NULL, NULL);
1633         int rc;
1634         ENTRY;
1635
1636         LASSERT(timeout >= 0);
1637
1638         rc = l_wait_event(anchor->csi_waitq,
1639                           cfs_atomic_read(&anchor->csi_sync_nr) == 0,
1640                           &lwi);
1641         if (rc < 0) {
1642                 CERROR("SYNC IO failed with error: %d, try to cancel "
1643                        "%d remaining pages\n",
1644                        rc, cfs_atomic_read(&anchor->csi_sync_nr));
1645
1646                 (void)cl_io_cancel(env, io, queue);
1647
1648                 lwi = (struct l_wait_info) { 0 };
1649                 (void)l_wait_event(anchor->csi_waitq,
1650                                    cfs_atomic_read(&anchor->csi_sync_nr) == 0,
1651                                    &lwi);
1652         } else {
1653                 rc = anchor->csi_sync_rc;
1654         }
1655         LASSERT(cfs_atomic_read(&anchor->csi_sync_nr) == 0);
1656         cl_page_list_assume(env, io, queue);
1657         POISON(anchor, 0x5a, sizeof *anchor);
1658         RETURN(rc);
1659 }
1660 EXPORT_SYMBOL(cl_sync_io_wait);
1661
1662 /**
1663  * Indicate that transfer of a single page completed.
1664  */
1665 void cl_sync_io_note(struct cl_sync_io *anchor, int ioret)
1666 {
1667         ENTRY;
1668         if (anchor->csi_sync_rc == 0 && ioret < 0)
1669                 anchor->csi_sync_rc = ioret;
1670         /*
1671          * Synchronous IO done without releasing page lock (e.g., as a part of
1672          * ->{prepare,commit}_write(). Completion is used to signal the end of
1673          * IO.
1674          */
1675         LASSERT(cfs_atomic_read(&anchor->csi_sync_nr) > 0);
1676         if (cfs_atomic_dec_and_test(&anchor->csi_sync_nr))
1677                 cfs_waitq_broadcast(&anchor->csi_waitq);
1678         EXIT;
1679 }
1680 EXPORT_SYMBOL(cl_sync_io_note);