Whamcloud - gitweb
LU-10308 misc: update Intel copyright messages for 2017
[fs/lustre-release.git] / lustre / obdclass / cl_io.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * Client IO.
33  *
34  *   Author: Nikita Danilov <nikita.danilov@sun.com>
35  *   Author: Jinshan Xiong <jinshan.xiong@intel.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/sched.h>
41 #include <linux/list.h>
42 #include <obd_class.h>
43 #include <obd_support.h>
44 #include <lustre_fid.h>
45 #include <cl_object.h>
46 #include "cl_internal.h"
47 #include <lustre_compat.h>
48
49 /*****************************************************************************
50  *
51  * cl_io interface.
52  *
53  */
54
55 static inline int cl_io_type_is_valid(enum cl_io_type type)
56 {
57         return CIT_READ <= type && type < CIT_OP_NR;
58 }
59
60 static inline int cl_io_is_loopable(const struct cl_io *io)
61 {
62         return cl_io_type_is_valid(io->ci_type) && io->ci_type != CIT_MISC;
63 }
64
65 /**
66  * cl_io invariant that holds at all times when exported cl_io_*() functions
67  * are entered and left.
68  */
69 static int cl_io_invariant(const struct cl_io *io)
70 {
71         struct cl_io *up;
72
73         up = io->ci_parent;
74         return
75                 /*
76                  * io can own pages only when it is ongoing. Sub-io might
77                  * still be in CIS_LOCKED state when top-io is in
78                  * CIS_IO_GOING.
79                  */
80                 ergo(io->ci_owned_nr > 0, io->ci_state == CIS_IO_GOING ||
81                      (io->ci_state == CIS_LOCKED && up != NULL));
82 }
83
84 /**
85  * Finalize \a io, by calling cl_io_operations::cio_fini() bottom-to-top.
86  */
87 void cl_io_fini(const struct lu_env *env, struct cl_io *io)
88 {
89         struct cl_io_slice    *slice;
90
91         LINVRNT(cl_io_type_is_valid(io->ci_type));
92         LINVRNT(cl_io_invariant(io));
93         ENTRY;
94
95         while (!list_empty(&io->ci_layers)) {
96                 slice = container_of(io->ci_layers.prev, struct cl_io_slice,
97                                      cis_linkage);
98                 list_del_init(&slice->cis_linkage);
99                 if (slice->cis_iop->op[io->ci_type].cio_fini != NULL)
100                         slice->cis_iop->op[io->ci_type].cio_fini(env, slice);
101                 /*
102                  * Invalidate slice to catch use after free. This assumes that
103                  * slices are allocated within session and can be touched
104                  * after ->cio_fini() returns.
105                  */
106                 slice->cis_io = NULL;
107         }
108         io->ci_state = CIS_FINI;
109
110         /* sanity check for layout change */
111         switch(io->ci_type) {
112         case CIT_READ:
113         case CIT_WRITE:
114         case CIT_DATA_VERSION:
115         case CIT_FAULT:
116                 break;
117         case CIT_FSYNC:
118                 LASSERT(!io->ci_need_restart);
119                 break;
120         case CIT_SETATTR:
121         case CIT_MISC:
122                 /* Check ignore layout change conf */
123                 LASSERT(ergo(io->ci_ignore_layout || !io->ci_verify_layout,
124                                 !io->ci_need_restart));
125         case CIT_GLIMPSE:
126                 break;
127         case CIT_LADVISE:
128                 break;
129         default:
130                 LBUG();
131         }
132         EXIT;
133 }
134 EXPORT_SYMBOL(cl_io_fini);
135
136 static int cl_io_init0(const struct lu_env *env, struct cl_io *io,
137                        enum cl_io_type iot, struct cl_object *obj)
138 {
139         struct cl_object *scan;
140         int result;
141
142         LINVRNT(io->ci_state == CIS_ZERO || io->ci_state == CIS_FINI);
143         LINVRNT(cl_io_type_is_valid(iot));
144         LINVRNT(cl_io_invariant(io));
145         ENTRY;
146
147         io->ci_type = iot;
148         INIT_LIST_HEAD(&io->ci_lockset.cls_todo);
149         INIT_LIST_HEAD(&io->ci_lockset.cls_done);
150         INIT_LIST_HEAD(&io->ci_layers);
151
152         result = 0;
153         cl_object_for_each(scan, obj) {
154                 if (scan->co_ops->coo_io_init != NULL) {
155                         result = scan->co_ops->coo_io_init(env, scan, io);
156                         if (result != 0)
157                                 break;
158                 }
159         }
160         if (result == 0)
161                 io->ci_state = CIS_INIT;
162         RETURN(result);
163 }
164
165 /**
166  * Initialize sub-io, by calling cl_io_operations::cio_init() top-to-bottom.
167  *
168  * \pre obj != cl_object_top(obj)
169  */
170 int cl_io_sub_init(const struct lu_env *env, struct cl_io *io,
171                    enum cl_io_type iot, struct cl_object *obj)
172 {
173         LASSERT(obj != cl_object_top(obj));
174
175         return cl_io_init0(env, io, iot, obj);
176 }
177 EXPORT_SYMBOL(cl_io_sub_init);
178
179 /**
180  * Initialize \a io, by calling cl_io_operations::cio_init() top-to-bottom.
181  *
182  * Caller has to call cl_io_fini() after a call to cl_io_init(), no matter
183  * what the latter returned.
184  *
185  * \pre obj == cl_object_top(obj)
186  * \pre cl_io_type_is_valid(iot)
187  * \post cl_io_type_is_valid(io->ci_type) && io->ci_type == iot
188  */
189 int cl_io_init(const struct lu_env *env, struct cl_io *io,
190                enum cl_io_type iot, struct cl_object *obj)
191 {
192         LASSERT(obj == cl_object_top(obj));
193
194         /* clear I/O restart from previous instance */
195         io->ci_need_restart = 0;
196
197         return cl_io_init0(env, io, iot, obj);
198 }
199 EXPORT_SYMBOL(cl_io_init);
200
201 /**
202  * Initialize read or write io.
203  *
204  * \pre iot == CIT_READ || iot == CIT_WRITE
205  */
206 int cl_io_rw_init(const struct lu_env *env, struct cl_io *io,
207                   enum cl_io_type iot, loff_t pos, size_t count)
208 {
209         LINVRNT(iot == CIT_READ || iot == CIT_WRITE);
210         LINVRNT(io->ci_obj != NULL);
211         ENTRY;
212
213         if (cfs_ptengine_weight(cl_io_engine) < 2)
214                 io->ci_pio = 0;
215
216         LU_OBJECT_HEADER(D_VFSTRACE, env, &io->ci_obj->co_lu,
217                          "io %s range: [%llu, %llu) %s %s %s %s\n",
218                          iot == CIT_READ ? "read" : "write",
219                          pos, pos + count,
220                          io->u.ci_rw.rw_nonblock ? "nonblock" : "block",
221                          io->u.ci_rw.rw_append ? "append" : "-",
222                          io->u.ci_rw.rw_sync ? "sync" : "-",
223                          io->ci_pio ? "pio" : "-");
224
225         io->u.ci_rw.rw_range.cir_pos   = pos;
226         io->u.ci_rw.rw_range.cir_count = count;
227
228         RETURN(cl_io_init(env, io, iot, io->ci_obj));
229 }
230 EXPORT_SYMBOL(cl_io_rw_init);
231
232 static int cl_lock_descr_sort(const struct cl_lock_descr *d0,
233                               const struct cl_lock_descr *d1)
234 {
235         return lu_fid_cmp(lu_object_fid(&d0->cld_obj->co_lu),
236                           lu_object_fid(&d1->cld_obj->co_lu));
237 }
238
239 /*
240  * Sort locks in lexicographical order of their (fid, start-offset) pairs.
241  */
242 static void cl_io_locks_sort(struct cl_io *io)
243 {
244         int done = 0;
245
246         ENTRY;
247         /* hidden treasure: bubble sort for now. */
248         do {
249                 struct cl_io_lock_link *curr;
250                 struct cl_io_lock_link *prev;
251                 struct cl_io_lock_link *temp;
252
253                 done = 1;
254                 prev = NULL;
255
256                 list_for_each_entry_safe(curr, temp, &io->ci_lockset.cls_todo,
257                                          cill_linkage) {
258                         if (prev != NULL) {
259                                 switch (cl_lock_descr_sort(&prev->cill_descr,
260                                                            &curr->cill_descr)) {
261                                 case 0:
262                                         /*
263                                          * IMPOSSIBLE:  Identical locks are
264                                          *              already removed at
265                                          *              this point.
266                                          */
267                                 default:
268                                         LBUG();
269                                 case +1:
270                                         list_move_tail(&curr->cill_linkage,
271                                                        &prev->cill_linkage);
272                                         done = 0;
273                                         continue; /* don't change prev: it's
274                                                    * still "previous" */
275                                 case -1: /* already in order */
276                                         break;
277                                 }
278                         }
279                         prev = curr;
280                 }
281         } while (!done);
282         EXIT;
283 }
284
285 static void cl_lock_descr_merge(struct cl_lock_descr *d0,
286                                 const struct cl_lock_descr *d1)
287 {
288         d0->cld_start = min(d0->cld_start, d1->cld_start);
289         d0->cld_end = max(d0->cld_end, d1->cld_end);
290
291         if (d1->cld_mode == CLM_WRITE && d0->cld_mode != CLM_WRITE)
292                 d0->cld_mode = CLM_WRITE;
293
294         if (d1->cld_mode == CLM_GROUP && d0->cld_mode != CLM_GROUP)
295                 d0->cld_mode = CLM_GROUP;
296 }
297
298 static int cl_lockset_merge(const struct cl_lockset *set,
299                             const struct cl_lock_descr *need)
300 {
301         struct cl_io_lock_link *scan;
302
303         ENTRY;
304         list_for_each_entry(scan, &set->cls_todo, cill_linkage) {
305                 if (!cl_object_same(scan->cill_descr.cld_obj, need->cld_obj))
306                         continue;
307
308                 /* Merge locks for the same object because ldlm lock server
309                  * may expand the lock extent, otherwise there is a deadlock
310                  * case if two conflicted locks are queueud for the same object
311                  * and lock server expands one lock to overlap the another.
312                  * The side effect is that it can generate a multi-stripe lock
313                  * that may cause casacading problem */
314                 cl_lock_descr_merge(&scan->cill_descr, need);
315                 CDEBUG(D_VFSTRACE, "lock: %d: [%lu, %lu]\n",
316                        scan->cill_descr.cld_mode, scan->cill_descr.cld_start,
317                        scan->cill_descr.cld_end);
318                 RETURN(+1);
319         }
320         RETURN(0);
321 }
322
323 static int cl_lockset_lock(const struct lu_env *env, struct cl_io *io,
324                            struct cl_lockset *set)
325 {
326         struct cl_io_lock_link *link;
327         struct cl_io_lock_link *temp;
328         int result;
329
330         ENTRY;
331         result = 0;
332         list_for_each_entry_safe(link, temp, &set->cls_todo, cill_linkage) {
333                 result = cl_lock_request(env, io, &link->cill_lock);
334                 if (result < 0)
335                         break;
336
337                 list_move(&link->cill_linkage, &set->cls_done);
338         }
339         RETURN(result);
340 }
341
342 /**
343  * Takes locks necessary for the current iteration of io.
344  *
345  * Calls cl_io_operations::cio_lock() top-to-bottom to collect locks required
346  * by layers for the current iteration. Then sort locks (to avoid dead-locks),
347  * and acquire them.
348  */
349 int cl_io_lock(const struct lu_env *env, struct cl_io *io)
350 {
351         const struct cl_io_slice *scan;
352         int result = 0;
353
354         LINVRNT(cl_io_is_loopable(io));
355         LINVRNT(io->ci_state == CIS_IT_STARTED);
356         LINVRNT(cl_io_invariant(io));
357
358         ENTRY;
359         list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
360                 if (scan->cis_iop->op[io->ci_type].cio_lock == NULL)
361                         continue;
362                 result = scan->cis_iop->op[io->ci_type].cio_lock(env, scan);
363                 if (result != 0)
364                         break;
365         }
366         if (result == 0) {
367                 cl_io_locks_sort(io);
368                 result = cl_lockset_lock(env, io, &io->ci_lockset);
369         }
370         if (result != 0)
371                 cl_io_unlock(env, io);
372         else
373                 io->ci_state = CIS_LOCKED;
374         RETURN(result);
375 }
376 EXPORT_SYMBOL(cl_io_lock);
377
378 /**
379  * Release locks takes by io.
380  */
381 void cl_io_unlock(const struct lu_env *env, struct cl_io *io)
382 {
383         struct cl_lockset        *set;
384         struct cl_io_lock_link   *link;
385         struct cl_io_lock_link   *temp;
386         const struct cl_io_slice *scan;
387
388         LASSERT(cl_io_is_loopable(io));
389         LASSERT(CIS_IT_STARTED <= io->ci_state && io->ci_state < CIS_UNLOCKED);
390         LINVRNT(cl_io_invariant(io));
391
392         ENTRY;
393         set = &io->ci_lockset;
394
395         list_for_each_entry_safe(link, temp, &set->cls_todo, cill_linkage) {
396                 list_del_init(&link->cill_linkage);
397                 if (link->cill_fini != NULL)
398                         link->cill_fini(env, link);
399         }
400
401         list_for_each_entry_safe(link, temp, &set->cls_done, cill_linkage) {
402                 list_del_init(&link->cill_linkage);
403                 cl_lock_release(env, &link->cill_lock);
404                 if (link->cill_fini != NULL)
405                         link->cill_fini(env, link);
406         }
407
408         list_for_each_entry_reverse(scan, &io->ci_layers, cis_linkage) {
409                 if (scan->cis_iop->op[io->ci_type].cio_unlock != NULL)
410                         scan->cis_iop->op[io->ci_type].cio_unlock(env, scan);
411         }
412         io->ci_state = CIS_UNLOCKED;
413         EXIT;
414 }
415 EXPORT_SYMBOL(cl_io_unlock);
416
417 /**
418  * Prepares next iteration of io.
419  *
420  * Calls cl_io_operations::cio_iter_init() top-to-bottom. This exists to give
421  * layers a chance to modify io parameters, e.g., so that lov can restrict io
422  * to a single stripe.
423  */
424 int cl_io_iter_init(const struct lu_env *env, struct cl_io *io)
425 {
426         const struct cl_io_slice *scan;
427         int result;
428
429         LINVRNT(cl_io_is_loopable(io));
430         LINVRNT(io->ci_state == CIS_INIT || io->ci_state == CIS_IT_ENDED);
431         LINVRNT(cl_io_invariant(io));
432
433         ENTRY;
434         result = 0;
435         list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
436                 if (scan->cis_iop->op[io->ci_type].cio_iter_init == NULL)
437                         continue;
438                 result = scan->cis_iop->op[io->ci_type].cio_iter_init(env,
439                                                                       scan);
440                 if (result != 0)
441                         break;
442         }
443         if (result == 0)
444                 io->ci_state = CIS_IT_STARTED;
445         RETURN(result);
446 }
447 EXPORT_SYMBOL(cl_io_iter_init);
448
449 /**
450  * Finalizes io iteration.
451  *
452  * Calls cl_io_operations::cio_iter_fini() bottom-to-top.
453  */
454 void cl_io_iter_fini(const struct lu_env *env, struct cl_io *io)
455 {
456         const struct cl_io_slice *scan;
457
458         LINVRNT(cl_io_is_loopable(io));
459         LINVRNT(io->ci_state == CIS_UNLOCKED);
460         LINVRNT(cl_io_invariant(io));
461
462         ENTRY;
463         list_for_each_entry_reverse(scan, &io->ci_layers, cis_linkage) {
464                 if (scan->cis_iop->op[io->ci_type].cio_iter_fini != NULL)
465                         scan->cis_iop->op[io->ci_type].cio_iter_fini(env, scan);
466         }
467         io->ci_state = CIS_IT_ENDED;
468         EXIT;
469 }
470 EXPORT_SYMBOL(cl_io_iter_fini);
471
472 /**
473  * Records that read or write io progressed \a nob bytes forward.
474  */
475 void cl_io_rw_advance(const struct lu_env *env, struct cl_io *io, size_t nob)
476 {
477         const struct cl_io_slice *scan;
478
479         LINVRNT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE ||
480                 nob == 0);
481         LINVRNT(cl_io_is_loopable(io));
482         LINVRNT(cl_io_invariant(io));
483
484         ENTRY;
485
486         io->u.ci_rw.rw_range.cir_pos   += nob;
487         io->u.ci_rw.rw_range.cir_count -= nob;
488
489         /* layers have to be notified. */
490         list_for_each_entry_reverse(scan, &io->ci_layers, cis_linkage) {
491                 if (scan->cis_iop->op[io->ci_type].cio_advance != NULL)
492                         scan->cis_iop->op[io->ci_type].cio_advance(env, scan,
493                                                                    nob);
494         }
495         EXIT;
496 }
497
498 /**
499  * Adds a lock to a lockset.
500  */
501 int cl_io_lock_add(const struct lu_env *env, struct cl_io *io,
502                    struct cl_io_lock_link *link)
503 {
504         int result;
505
506         ENTRY;
507         if (cl_lockset_merge(&io->ci_lockset, &link->cill_descr))
508                 result = +1;
509         else {
510                 list_add(&link->cill_linkage, &io->ci_lockset.cls_todo);
511                 result = 0;
512         }
513         RETURN(result);
514 }
515 EXPORT_SYMBOL(cl_io_lock_add);
516
517 static void cl_free_io_lock_link(const struct lu_env *env,
518                                  struct cl_io_lock_link *link)
519 {
520         OBD_FREE_PTR(link);
521 }
522
523 /**
524  * Allocates new lock link, and uses it to add a lock to a lockset.
525  */
526 int cl_io_lock_alloc_add(const struct lu_env *env, struct cl_io *io,
527                          struct cl_lock_descr *descr)
528 {
529         struct cl_io_lock_link *link;
530         int result;
531
532         ENTRY;
533         OBD_ALLOC_PTR(link);
534         if (link != NULL) {
535                 link->cill_descr = *descr;
536                 link->cill_fini  = cl_free_io_lock_link;
537                 result = cl_io_lock_add(env, io, link);
538                 if (result) /* lock match */
539                         link->cill_fini(env, link);
540         } else
541                 result = -ENOMEM;
542
543         RETURN(result);
544 }
545 EXPORT_SYMBOL(cl_io_lock_alloc_add);
546
547 /**
548  * Starts io by calling cl_io_operations::cio_start() top-to-bottom.
549  */
550 int cl_io_start(const struct lu_env *env, struct cl_io *io)
551 {
552         const struct cl_io_slice *scan;
553         int result = 0;
554
555         LINVRNT(cl_io_is_loopable(io));
556         LINVRNT(io->ci_state == CIS_LOCKED);
557         LINVRNT(cl_io_invariant(io));
558         ENTRY;
559
560         io->ci_state = CIS_IO_GOING;
561         list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
562                 if (scan->cis_iop->op[io->ci_type].cio_start == NULL)
563                         continue;
564                 result = scan->cis_iop->op[io->ci_type].cio_start(env, scan);
565                 if (result != 0)
566                         break;
567         }
568         if (result >= 0)
569                 result = 0;
570         RETURN(result);
571 }
572 EXPORT_SYMBOL(cl_io_start);
573
574 /**
575  * Wait until current io iteration is finished by calling
576  * cl_io_operations::cio_end() bottom-to-top.
577  */
578 void cl_io_end(const struct lu_env *env, struct cl_io *io)
579 {
580         const struct cl_io_slice *scan;
581
582         LINVRNT(cl_io_is_loopable(io));
583         LINVRNT(io->ci_state == CIS_IO_GOING);
584         LINVRNT(cl_io_invariant(io));
585         ENTRY;
586
587         list_for_each_entry_reverse(scan, &io->ci_layers, cis_linkage) {
588                 if (scan->cis_iop->op[io->ci_type].cio_end != NULL)
589                         scan->cis_iop->op[io->ci_type].cio_end(env, scan);
590                 /* TODO: error handling. */
591         }
592         io->ci_state = CIS_IO_FINISHED;
593         EXIT;
594 }
595 EXPORT_SYMBOL(cl_io_end);
596
597 /**
598  * Called by read io, to decide the readahead extent
599  *
600  * \see cl_io_operations::cio_read_ahead()
601  */
602 int cl_io_read_ahead(const struct lu_env *env, struct cl_io *io,
603                      pgoff_t start, struct cl_read_ahead *ra)
604 {
605         const struct cl_io_slice *scan;
606         int                       result = 0;
607
608         LINVRNT(io->ci_type == CIT_READ || io->ci_type == CIT_FAULT);
609         LINVRNT(io->ci_state == CIS_IO_GOING || io->ci_state == CIS_LOCKED);
610         LINVRNT(cl_io_invariant(io));
611         ENTRY;
612
613         list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
614                 if (scan->cis_iop->cio_read_ahead == NULL)
615                         continue;
616
617                 result = scan->cis_iop->cio_read_ahead(env, scan, start, ra);
618                 if (result != 0)
619                         break;
620         }
621         RETURN(result > 0 ? 0 : result);
622 }
623 EXPORT_SYMBOL(cl_io_read_ahead);
624
625 /**
626  * Commit a list of contiguous pages into writeback cache.
627  *
628  * \returns 0 if all pages committed, or errcode if error occurred.
629  * \see cl_io_operations::cio_commit_async()
630  */
631 int cl_io_commit_async(const struct lu_env *env, struct cl_io *io,
632                         struct cl_page_list *queue, int from, int to,
633                         cl_commit_cbt cb)
634 {
635         const struct cl_io_slice *scan;
636         int result = 0;
637         ENTRY;
638
639         list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
640                 if (scan->cis_iop->cio_commit_async == NULL)
641                         continue;
642                 result = scan->cis_iop->cio_commit_async(env, scan, queue,
643                                                          from, to, cb);
644                 if (result != 0)
645                         break;
646         }
647         RETURN(result);
648 }
649 EXPORT_SYMBOL(cl_io_commit_async);
650
651 /**
652  * Submits a list of pages for immediate io.
653  *
654  * After the function gets returned, The submitted pages are moved to
655  * queue->c2_qout queue, and queue->c2_qin contain both the pages don't need
656  * to be submitted, and the pages are errant to submit.
657  *
658  * \returns 0 if at least one page was submitted, error code otherwise.
659  * \see cl_io_operations::cio_submit()
660  */
661 int cl_io_submit_rw(const struct lu_env *env, struct cl_io *io,
662                     enum cl_req_type crt, struct cl_2queue *queue)
663 {
664         const struct cl_io_slice *scan;
665         int result = 0;
666         ENTRY;
667
668         list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
669                 if (scan->cis_iop->cio_submit == NULL)
670                         continue;
671                 result = scan->cis_iop->cio_submit(env, scan, crt, queue);
672                 if (result != 0)
673                         break;
674         }
675         /*
676          * If ->cio_submit() failed, no pages were sent.
677          */
678         LASSERT(ergo(result != 0, list_empty(&queue->c2_qout.pl_pages)));
679         RETURN(result);
680 }
681 EXPORT_SYMBOL(cl_io_submit_rw);
682
683 /**
684  * Submit a sync_io and wait for the IO to be finished, or error happens.
685  * If \a timeout is zero, it means to wait for the IO unconditionally.
686  */
687 int cl_io_submit_sync(const struct lu_env *env, struct cl_io *io,
688                       enum cl_req_type iot, struct cl_2queue *queue,
689                       long timeout)
690 {
691         struct cl_sync_io *anchor = &cl_env_info(env)->clt_anchor;
692         struct cl_page *pg;
693         int rc;
694
695         cl_page_list_for_each(pg, &queue->c2_qin) {
696                 LASSERT(pg->cp_sync_io == NULL);
697                 pg->cp_sync_io = anchor;
698         }
699
700         cl_sync_io_init(anchor, queue->c2_qin.pl_nr, &cl_sync_io_end);
701         rc = cl_io_submit_rw(env, io, iot, queue);
702         if (rc == 0) {
703                 /*
704                  * If some pages weren't sent for any reason (e.g.,
705                  * read found up-to-date pages in the cache, or write found
706                  * clean pages), count them as completed to avoid infinite
707                  * wait.
708                  */
709                 cl_page_list_for_each(pg, &queue->c2_qin) {
710                         pg->cp_sync_io = NULL;
711                         cl_sync_io_note(env, anchor, 1);
712                 }
713
714                 /* wait for the IO to be finished. */
715                 rc = cl_sync_io_wait(env, anchor, timeout);
716                 cl_page_list_assume(env, io, &queue->c2_qout);
717         } else {
718                 LASSERT(list_empty(&queue->c2_qout.pl_pages));
719                 cl_page_list_for_each(pg, &queue->c2_qin)
720                         pg->cp_sync_io = NULL;
721         }
722         return rc;
723 }
724 EXPORT_SYMBOL(cl_io_submit_sync);
725
726 /**
727  * Cancel an IO which has been submitted by cl_io_submit_rw.
728  */
729 int cl_io_cancel(const struct lu_env *env, struct cl_io *io,
730                  struct cl_page_list *queue)
731 {
732         struct cl_page *page;
733         int result = 0;
734
735         CERROR("Canceling ongoing page trasmission\n");
736         cl_page_list_for_each(page, queue) {
737                 int rc;
738
739                 rc = cl_page_cancel(env, page);
740                 result = result ?: rc;
741         }
742         return result;
743 }
744
745 static
746 struct cl_io_pt *cl_io_submit_pt(struct cl_io *io, loff_t pos, size_t count)
747 {
748         struct cl_io_pt *pt;
749         int rc;
750
751         OBD_ALLOC(pt, sizeof(*pt));
752         if (pt == NULL)
753                 RETURN(ERR_PTR(-ENOMEM));
754
755         pt->cip_next = NULL;
756         init_sync_kiocb(&pt->cip_iocb, io->u.ci_rw.rw_file);
757         pt->cip_iocb.ki_pos = pos;
758 #ifdef HAVE_KIOCB_KI_LEFT
759         pt->cip_iocb.ki_left = count;
760 #elif defined(HAVE_KI_NBYTES)
761         pt->cip_iocb.ki_nbytes = count;
762 #endif
763         pt->cip_iter = io->u.ci_rw.rw_iter;
764         iov_iter_truncate(&pt->cip_iter, count);
765         pt->cip_file   = io->u.ci_rw.rw_file;
766         pt->cip_iot    = io->ci_type;
767         pt->cip_pos    = pos;
768         pt->cip_count  = count;
769         pt->cip_result = 0;
770
771         rc = cfs_ptask_init(&pt->cip_task, io->u.ci_rw.rw_ptask, pt,
772                             PTF_ORDERED | PTF_COMPLETE |
773                             PTF_USER_MM | PTF_RETRY, smp_processor_id());
774         if (rc)
775                 GOTO(out_error, rc);
776
777         CDEBUG(D_VFSTRACE, "submit %s range: [%llu, %llu)\n",
778                 io->ci_type == CIT_READ ? "read" : "write",
779                 pos, pos + count);
780
781         rc = cfs_ptask_submit(&pt->cip_task, cl_io_engine);
782         if (rc)
783                 GOTO(out_error, rc);
784
785         RETURN(pt);
786
787 out_error:
788         OBD_FREE(pt, sizeof(*pt));
789         RETURN(ERR_PTR(rc));
790 }
791
792 /**
793  * Main io loop.
794  *
795  * Pumps io through iterations calling
796  *
797  *    - cl_io_iter_init()
798  *
799  *    - cl_io_lock()
800  *
801  *    - cl_io_start()
802  *
803  *    - cl_io_end()
804  *
805  *    - cl_io_unlock()
806  *
807  *    - cl_io_iter_fini()
808  *
809  * repeatedly until there is no more io to do.
810  */
811 int cl_io_loop(const struct lu_env *env, struct cl_io *io)
812 {
813         struct cl_io_pt *pt = NULL, *head = NULL;
814         struct cl_io_pt **tail = &head;
815         loff_t pos;
816         size_t count;
817         size_t last_chunk_count = 0;
818         bool short_io = false;
819         int rc = 0;
820         ENTRY;
821
822         LINVRNT(cl_io_is_loopable(io));
823
824         do {
825                 io->ci_continue = 0;
826
827                 rc = cl_io_iter_init(env, io);
828                 if (rc) {
829                         cl_io_iter_fini(env, io);
830                         break;
831                 }
832
833                 pos   = io->u.ci_rw.rw_range.cir_pos;
834                 count = io->u.ci_rw.rw_range.cir_count;
835
836                 if (io->ci_pio) {
837                         /* submit this range for parallel execution */
838                         pt = cl_io_submit_pt(io, pos, count);
839                         if (IS_ERR(pt)) {
840                                 cl_io_iter_fini(env, io);
841                                 rc = PTR_ERR(pt);
842                                 break;
843                         }
844
845                         *tail = pt;
846                         tail = &pt->cip_next;
847                 } else {
848                         size_t nob = io->ci_nob;
849
850                         CDEBUG(D_VFSTRACE,
851                                 "execute type %u range: [%llu, %llu) nob: %zu %s\n",
852                                 io->ci_type, pos, pos + count, nob,
853                                 io->ci_continue ? "continue" : "stop");
854
855                         rc = cl_io_lock(env, io);
856                         if (rc) {
857                                 cl_io_iter_fini(env, io);
858                                 break;
859                         }
860
861                         /*
862                          * Notify layers that locks has been taken,
863                          * and do actual i/o.
864                          *
865                          *   - llite: kms, short read;
866                          *   - llite: generic_file_read();
867                          */
868                         rc = cl_io_start(env, io);
869
870                         /*
871                          * Send any remaining pending
872                          * io, etc.
873                          *
874                          *   - llite: ll_rw_stats_tally.
875                          */
876                         cl_io_end(env, io);
877                         cl_io_unlock(env, io);
878
879                         count = io->ci_nob - nob;
880                         last_chunk_count = count;
881                 }
882
883                 cl_io_rw_advance(env, io, count);
884                 cl_io_iter_fini(env, io);
885         } while (!rc && io->ci_continue);
886
887         if (rc == -EWOULDBLOCK && io->ci_ndelay) {
888                 io->ci_need_restart = 1;
889                 rc = 0;
890         }
891
892         CDEBUG(D_VFSTRACE, "loop type %u done: nob: %zu, rc: %d %s\n",
893                 io->ci_type, io->ci_nob, rc,
894                 io->ci_continue ? "continue" : "stop");
895
896         while (head != NULL) {
897                 int rc2;
898
899                 pt = head;
900                 head = head->cip_next;
901
902                 rc2 = cfs_ptask_wait_for(&pt->cip_task);
903                 LASSERTF(!rc2, "wait for task error: %d\n", rc2);
904
905                 rc2 = cfs_ptask_result(&pt->cip_task);
906                 CDEBUG(D_VFSTRACE,
907                         "done %s range: [%llu, %llu) ret: %zd, rc: %d\n",
908                         pt->cip_iot == CIT_READ ? "read" : "write",
909                         pt->cip_pos, pt->cip_pos + pt->cip_count,
910                         pt->cip_result, rc2);
911
912                 /* save the result of ptask */
913                 rc = rc ? : rc2;
914                 io->ci_need_restart |= pt->cip_need_restart;
915
916                 if (!short_io) {
917                         if (!rc2) /* IO is done by this task successfully */
918                                 io->ci_nob += pt->cip_result;
919                         if (pt->cip_result < pt->cip_count) {
920                                 /* short IO happened.
921                                  * Not necessary to be an error */
922                                 CDEBUG(D_VFSTRACE,
923                                         "incomplete range: [%llu, %llu) "
924                                         "last_chunk_count: %zu\n",
925                                         pt->cip_pos,
926                                         pt->cip_pos + pt->cip_count,
927                                         last_chunk_count);
928                                 io->ci_nob -= last_chunk_count;
929                                 short_io = true;
930                         }
931                 }
932                 OBD_FREE(pt, sizeof(*pt));
933         }
934
935         CDEBUG(D_VFSTRACE, "return nob: %zu (%s io), rc: %d\n",
936                 io->ci_nob, short_io ? "short" : "full", rc);
937
938         RETURN(rc < 0 ? rc : io->ci_result);
939 }
940 EXPORT_SYMBOL(cl_io_loop);
941
942 /**
943  * Adds io slice to the cl_io.
944  *
945  * This is called by cl_object_operations::coo_io_init() methods to add a
946  * per-layer state to the io. New state is added at the end of
947  * cl_io::ci_layers list, that is, it is at the bottom of the stack.
948  *
949  * \see cl_lock_slice_add(), cl_req_slice_add(), cl_page_slice_add()
950  */
951 void cl_io_slice_add(struct cl_io *io, struct cl_io_slice *slice,
952                      struct cl_object *obj,
953                      const struct cl_io_operations *ops)
954 {
955         struct list_head *linkage = &slice->cis_linkage;
956
957         LASSERT((linkage->prev == NULL && linkage->next == NULL) ||
958                 list_empty(linkage));
959         ENTRY;
960
961         list_add_tail(linkage, &io->ci_layers);
962         slice->cis_io  = io;
963         slice->cis_obj = obj;
964         slice->cis_iop = ops;
965         EXIT;
966 }
967 EXPORT_SYMBOL(cl_io_slice_add);
968
969
970 /**
971  * Initializes page list.
972  */
973 void cl_page_list_init(struct cl_page_list *plist)
974 {
975         ENTRY;
976         plist->pl_nr = 0;
977         INIT_LIST_HEAD(&plist->pl_pages);
978         plist->pl_owner = current;
979         EXIT;
980 }
981 EXPORT_SYMBOL(cl_page_list_init);
982
983 /**
984  * Adds a page to a page list.
985  */
986 void cl_page_list_add(struct cl_page_list *plist, struct cl_page *page)
987 {
988         ENTRY;
989         /* it would be better to check that page is owned by "current" io, but
990          * it is not passed here. */
991         LASSERT(page->cp_owner != NULL);
992         LINVRNT(plist->pl_owner == current);
993
994         LASSERT(list_empty(&page->cp_batch));
995         list_add_tail(&page->cp_batch, &plist->pl_pages);
996         ++plist->pl_nr;
997         lu_ref_add_at(&page->cp_reference, &page->cp_queue_ref, "queue", plist);
998         cl_page_get(page);
999         EXIT;
1000 }
1001 EXPORT_SYMBOL(cl_page_list_add);
1002
1003 /**
1004  * Removes a page from a page list.
1005  */
1006 void cl_page_list_del(const struct lu_env *env,
1007                       struct cl_page_list *plist, struct cl_page *page)
1008 {
1009         LASSERT(plist->pl_nr > 0);
1010         LASSERT(cl_page_is_vmlocked(env, page));
1011         LINVRNT(plist->pl_owner == current);
1012
1013         ENTRY;
1014         list_del_init(&page->cp_batch);
1015         --plist->pl_nr;
1016         lu_ref_del_at(&page->cp_reference, &page->cp_queue_ref, "queue", plist);
1017         cl_page_put(env, page);
1018         EXIT;
1019 }
1020 EXPORT_SYMBOL(cl_page_list_del);
1021
1022 /**
1023  * Moves a page from one page list to another.
1024  */
1025 void cl_page_list_move(struct cl_page_list *dst, struct cl_page_list *src,
1026                        struct cl_page *page)
1027 {
1028         LASSERT(src->pl_nr > 0);
1029         LINVRNT(dst->pl_owner == current);
1030         LINVRNT(src->pl_owner == current);
1031
1032         ENTRY;
1033         list_move_tail(&page->cp_batch, &dst->pl_pages);
1034         --src->pl_nr;
1035         ++dst->pl_nr;
1036         lu_ref_set_at(&page->cp_reference, &page->cp_queue_ref, "queue",
1037                       src, dst);
1038         EXIT;
1039 }
1040 EXPORT_SYMBOL(cl_page_list_move);
1041
1042 /**
1043  * Moves a page from one page list to the head of another list.
1044  */
1045 void cl_page_list_move_head(struct cl_page_list *dst, struct cl_page_list *src,
1046                             struct cl_page *page)
1047 {
1048         LASSERT(src->pl_nr > 0);
1049         LINVRNT(dst->pl_owner == current);
1050         LINVRNT(src->pl_owner == current);
1051
1052         ENTRY;
1053         list_move(&page->cp_batch, &dst->pl_pages);
1054         --src->pl_nr;
1055         ++dst->pl_nr;
1056         lu_ref_set_at(&page->cp_reference, &page->cp_queue_ref, "queue",
1057                         src, dst);
1058         EXIT;
1059 }
1060 EXPORT_SYMBOL(cl_page_list_move_head);
1061
1062 /**
1063  * splice the cl_page_list, just as list head does
1064  */
1065 void cl_page_list_splice(struct cl_page_list *list, struct cl_page_list *head)
1066 {
1067         struct cl_page *page;
1068         struct cl_page *tmp;
1069
1070         LINVRNT(list->pl_owner == current);
1071         LINVRNT(head->pl_owner == current);
1072
1073         ENTRY;
1074         cl_page_list_for_each_safe(page, tmp, list)
1075                 cl_page_list_move(head, list, page);
1076         EXIT;
1077 }
1078 EXPORT_SYMBOL(cl_page_list_splice);
1079
1080 /**
1081  * Disowns pages in a queue.
1082  */
1083 void cl_page_list_disown(const struct lu_env *env,
1084                          struct cl_io *io, struct cl_page_list *plist)
1085 {
1086         struct cl_page *page;
1087         struct cl_page *temp;
1088
1089         LINVRNT(plist->pl_owner == current);
1090
1091         ENTRY;
1092         cl_page_list_for_each_safe(page, temp, plist) {
1093                 LASSERT(plist->pl_nr > 0);
1094
1095                 list_del_init(&page->cp_batch);
1096                 --plist->pl_nr;
1097                 /*
1098                  * cl_page_disown0 rather than usual cl_page_disown() is used,
1099                  * because pages are possibly in CPS_FREEING state already due
1100                  * to the call to cl_page_list_discard().
1101                  */
1102                 /*
1103                  * XXX cl_page_disown0() will fail if page is not locked.
1104                  */
1105                 cl_page_disown0(env, io, page);
1106                 lu_ref_del_at(&page->cp_reference, &page->cp_queue_ref, "queue",
1107                               plist);
1108                 cl_page_put(env, page);
1109         }
1110         EXIT;
1111 }
1112 EXPORT_SYMBOL(cl_page_list_disown);
1113
1114 /**
1115  * Releases pages from queue.
1116  */
1117 void cl_page_list_fini(const struct lu_env *env, struct cl_page_list *plist)
1118 {
1119         struct cl_page *page;
1120         struct cl_page *temp;
1121
1122         LINVRNT(plist->pl_owner == current);
1123
1124         ENTRY;
1125         cl_page_list_for_each_safe(page, temp, plist)
1126                 cl_page_list_del(env, plist, page);
1127         LASSERT(plist->pl_nr == 0);
1128         EXIT;
1129 }
1130 EXPORT_SYMBOL(cl_page_list_fini);
1131
1132 /**
1133  * Assumes all pages in a queue.
1134  */
1135 void cl_page_list_assume(const struct lu_env *env,
1136                          struct cl_io *io, struct cl_page_list *plist)
1137 {
1138         struct cl_page *page;
1139
1140         LINVRNT(plist->pl_owner == current);
1141
1142         cl_page_list_for_each(page, plist)
1143                 cl_page_assume(env, io, page);
1144 }
1145
1146 /**
1147  * Discards all pages in a queue.
1148  */
1149 void cl_page_list_discard(const struct lu_env *env, struct cl_io *io,
1150                           struct cl_page_list *plist)
1151 {
1152         struct cl_page *page;
1153
1154         LINVRNT(plist->pl_owner == current);
1155         ENTRY;
1156         cl_page_list_for_each(page, plist)
1157                 cl_page_discard(env, io, page);
1158         EXIT;
1159 }
1160 EXPORT_SYMBOL(cl_page_list_discard);
1161
1162 /**
1163  * Initialize dual page queue.
1164  */
1165 void cl_2queue_init(struct cl_2queue *queue)
1166 {
1167         ENTRY;
1168         cl_page_list_init(&queue->c2_qin);
1169         cl_page_list_init(&queue->c2_qout);
1170         EXIT;
1171 }
1172 EXPORT_SYMBOL(cl_2queue_init);
1173
1174 /**
1175  * Add a page to the incoming page list of 2-queue.
1176  */
1177 void cl_2queue_add(struct cl_2queue *queue, struct cl_page *page)
1178 {
1179         ENTRY;
1180         cl_page_list_add(&queue->c2_qin, page);
1181         EXIT;
1182 }
1183 EXPORT_SYMBOL(cl_2queue_add);
1184
1185 /**
1186  * Disown pages in both lists of a 2-queue.
1187  */
1188 void cl_2queue_disown(const struct lu_env *env,
1189                       struct cl_io *io, struct cl_2queue *queue)
1190 {
1191         ENTRY;
1192         cl_page_list_disown(env, io, &queue->c2_qin);
1193         cl_page_list_disown(env, io, &queue->c2_qout);
1194         EXIT;
1195 }
1196 EXPORT_SYMBOL(cl_2queue_disown);
1197
1198 /**
1199  * Discard (truncate) pages in both lists of a 2-queue.
1200  */
1201 void cl_2queue_discard(const struct lu_env *env,
1202                        struct cl_io *io, struct cl_2queue *queue)
1203 {
1204         ENTRY;
1205         cl_page_list_discard(env, io, &queue->c2_qin);
1206         cl_page_list_discard(env, io, &queue->c2_qout);
1207         EXIT;
1208 }
1209 EXPORT_SYMBOL(cl_2queue_discard);
1210
1211 /**
1212  * Assume to own the pages in cl_2queue
1213  */
1214 void cl_2queue_assume(const struct lu_env *env,
1215                       struct cl_io *io, struct cl_2queue *queue)
1216 {
1217         cl_page_list_assume(env, io, &queue->c2_qin);
1218         cl_page_list_assume(env, io, &queue->c2_qout);
1219 }
1220
1221 /**
1222  * Finalize both page lists of a 2-queue.
1223  */
1224 void cl_2queue_fini(const struct lu_env *env, struct cl_2queue *queue)
1225 {
1226         ENTRY;
1227         cl_page_list_fini(env, &queue->c2_qout);
1228         cl_page_list_fini(env, &queue->c2_qin);
1229         EXIT;
1230 }
1231 EXPORT_SYMBOL(cl_2queue_fini);
1232
1233 /**
1234  * Initialize a 2-queue to contain \a page in its incoming page list.
1235  */
1236 void cl_2queue_init_page(struct cl_2queue *queue, struct cl_page *page)
1237 {
1238         ENTRY;
1239         cl_2queue_init(queue);
1240         cl_2queue_add(queue, page);
1241         EXIT;
1242 }
1243 EXPORT_SYMBOL(cl_2queue_init_page);
1244
1245 /**
1246  * Returns top-level io.
1247  *
1248  * \see cl_object_top()
1249  */
1250 struct cl_io *cl_io_top(struct cl_io *io)
1251 {
1252         ENTRY;
1253         while (io->ci_parent != NULL)
1254                 io = io->ci_parent;
1255         RETURN(io);
1256 }
1257 EXPORT_SYMBOL(cl_io_top);
1258
1259 /**
1260  * Prints human readable representation of \a io to the \a f.
1261  */
1262 void cl_io_print(const struct lu_env *env, void *cookie,
1263                  lu_printer_t printer, const struct cl_io *io)
1264 {
1265 }
1266
1267 /**
1268  * Fills in attributes that are passed to server together with transfer. Only
1269  * attributes from \a flags may be touched. This can be called multiple times
1270  * for the same request.
1271  */
1272 void cl_req_attr_set(const struct lu_env *env, struct cl_object *obj,
1273                      struct cl_req_attr *attr)
1274 {
1275         struct cl_object *scan;
1276         ENTRY;
1277
1278         cl_object_for_each(scan, obj) {
1279                 if (scan->co_ops->coo_req_attr_set != NULL)
1280                         scan->co_ops->coo_req_attr_set(env, scan, attr);
1281         }
1282         EXIT;
1283 }
1284 EXPORT_SYMBOL(cl_req_attr_set);
1285
1286 /* cl_sync_io_callback assumes the caller must call cl_sync_io_wait() to
1287  * wait for the IO to finish. */
1288 void cl_sync_io_end(const struct lu_env *env, struct cl_sync_io *anchor)
1289 {
1290         wake_up_all(&anchor->csi_waitq);
1291
1292         /* it's safe to nuke or reuse anchor now */
1293         atomic_set(&anchor->csi_barrier, 0);
1294 }
1295 EXPORT_SYMBOL(cl_sync_io_end);
1296
1297 /**
1298  * Initialize synchronous io wait anchor
1299  */
1300 void cl_sync_io_init(struct cl_sync_io *anchor, int nr,
1301                      void (*end)(const struct lu_env *, struct cl_sync_io *))
1302 {
1303         ENTRY;
1304         memset(anchor, 0, sizeof(*anchor));
1305         init_waitqueue_head(&anchor->csi_waitq);
1306         atomic_set(&anchor->csi_sync_nr, nr);
1307         atomic_set(&anchor->csi_barrier, nr > 0);
1308         anchor->csi_sync_rc = 0;
1309         anchor->csi_end_io = end;
1310         LASSERT(end != NULL);
1311         EXIT;
1312 }
1313 EXPORT_SYMBOL(cl_sync_io_init);
1314
1315 /**
1316  * Wait until all IO completes. Transfer completion routine has to call
1317  * cl_sync_io_note() for every entity.
1318  */
1319 int cl_sync_io_wait(const struct lu_env *env, struct cl_sync_io *anchor,
1320                     long timeout)
1321 {
1322         struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout),
1323                                                   NULL, NULL, NULL);
1324         int rc;
1325         ENTRY;
1326
1327         LASSERT(timeout >= 0);
1328
1329         rc = l_wait_event(anchor->csi_waitq,
1330                           atomic_read(&anchor->csi_sync_nr) == 0,
1331                           &lwi);
1332         if (rc < 0) {
1333                 CERROR("IO failed: %d, still wait for %d remaining entries\n",
1334                        rc, atomic_read(&anchor->csi_sync_nr));
1335
1336                 lwi = (struct l_wait_info) { 0 };
1337                 (void)l_wait_event(anchor->csi_waitq,
1338                                    atomic_read(&anchor->csi_sync_nr) == 0,
1339                                    &lwi);
1340         } else {
1341                 rc = anchor->csi_sync_rc;
1342         }
1343         LASSERT(atomic_read(&anchor->csi_sync_nr) == 0);
1344
1345         /* wait until cl_sync_io_note() has done wakeup */
1346         while (unlikely(atomic_read(&anchor->csi_barrier) != 0)) {
1347                 cpu_relax();
1348         }
1349         RETURN(rc);
1350 }
1351 EXPORT_SYMBOL(cl_sync_io_wait);
1352
1353 /**
1354  * Indicate that transfer of a single page completed.
1355  */
1356 void cl_sync_io_note(const struct lu_env *env, struct cl_sync_io *anchor,
1357                      int ioret)
1358 {
1359         ENTRY;
1360         if (anchor->csi_sync_rc == 0 && ioret < 0)
1361                 anchor->csi_sync_rc = ioret;
1362         /*
1363          * Synchronous IO done without releasing page lock (e.g., as a part of
1364          * ->{prepare,commit}_write(). Completion is used to signal the end of
1365          * IO.
1366          */
1367         LASSERT(atomic_read(&anchor->csi_sync_nr) > 0);
1368         if (atomic_dec_and_test(&anchor->csi_sync_nr)) {
1369                 LASSERT(anchor->csi_end_io != NULL);
1370                 anchor->csi_end_io(env, anchor);
1371                 /* Can't access anchor any more */
1372         }
1373         EXIT;
1374 }
1375 EXPORT_SYMBOL(cl_sync_io_note);