Whamcloud - gitweb
LU-4048 build: fix 'control flow' errors
[fs/lustre-release.git] / lustre / lov / lov_io.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Implementation of cl_io for LOV layer.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  *   Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LOV
43
44 #include "lov_cl_internal.h"
45
46 /** \addtogroup lov
47  *  @{
48  */
49
50 static inline void lov_sub_enter(struct lov_io_sub *sub)
51 {
52         sub->sub_reenter++;
53 }
54 static inline void lov_sub_exit(struct lov_io_sub *sub)
55 {
56         sub->sub_reenter--;
57 }
58
59 static void lov_io_sub_fini(const struct lu_env *env, struct lov_io *lio,
60                             struct lov_io_sub *sub)
61 {
62         ENTRY;
63         if (sub->sub_io != NULL) {
64                 if (sub->sub_io_initialized) {
65                         lov_sub_enter(sub);
66                         cl_io_fini(sub->sub_env, sub->sub_io);
67                         lov_sub_exit(sub);
68                         sub->sub_io_initialized = 0;
69                         lio->lis_active_subios--;
70                 }
71                 if (sub->sub_stripe == lio->lis_single_subio_index)
72                         lio->lis_single_subio_index = -1;
73                 else if (!sub->sub_borrowed)
74                         OBD_FREE_PTR(sub->sub_io);
75                 sub->sub_io = NULL;
76         }
77         if (sub->sub_env != NULL && !IS_ERR(sub->sub_env)) {
78                 if (!sub->sub_borrowed)
79                         cl_env_put(sub->sub_env, &sub->sub_refcheck);
80                 sub->sub_env = NULL;
81         }
82         EXIT;
83 }
84
85 static void lov_io_sub_inherit(struct cl_io *io, struct lov_io *lio,
86                                int stripe, loff_t start, loff_t end)
87 {
88         struct lov_stripe_md *lsm    = lio->lis_object->lo_lsm;
89         struct cl_io         *parent = lio->lis_cl.cis_io;
90
91         switch (io->ci_type) {
92         case CIT_SETATTR: {
93                 io->u.ci_setattr.sa_attr = parent->u.ci_setattr.sa_attr;
94                 io->u.ci_setattr.sa_attr_flags =
95                         parent->u.ci_setattr.sa_attr_flags;
96                 io->u.ci_setattr.sa_valid = parent->u.ci_setattr.sa_valid;
97                 io->u.ci_setattr.sa_stripe_index = stripe;
98                 io->u.ci_setattr.sa_parent_fid =
99                                         parent->u.ci_setattr.sa_parent_fid;
100                 if (cl_io_is_trunc(io)) {
101                         loff_t new_size = parent->u.ci_setattr.sa_attr.lvb_size;
102
103                         new_size = lov_size_to_stripe(lsm, new_size, stripe);
104                         io->u.ci_setattr.sa_attr.lvb_size = new_size;
105                 }
106                 break;
107         }
108         case CIT_DATA_VERSION: {
109                 io->u.ci_data_version.dv_data_version = 0;
110                 io->u.ci_data_version.dv_flags =
111                         parent->u.ci_data_version.dv_flags;
112                 break;
113         }
114         case CIT_FAULT: {
115                 struct cl_object *obj = parent->ci_obj;
116                 loff_t off = cl_offset(obj, parent->u.ci_fault.ft_index);
117
118                 io->u.ci_fault = parent->u.ci_fault;
119                 off = lov_size_to_stripe(lsm, off, stripe);
120                 io->u.ci_fault.ft_index = cl_index(obj, off);
121                 break;
122         }
123         case CIT_FSYNC: {
124                 io->u.ci_fsync.fi_start = start;
125                 io->u.ci_fsync.fi_end = end;
126                 io->u.ci_fsync.fi_fid = parent->u.ci_fsync.fi_fid;
127                 io->u.ci_fsync.fi_mode = parent->u.ci_fsync.fi_mode;
128                 break;
129         }
130         case CIT_READ:
131         case CIT_WRITE: {
132                 io->u.ci_wr.wr_sync = cl_io_is_sync_write(parent);
133                 if (cl_io_is_append(parent)) {
134                         io->u.ci_wr.wr_append = 1;
135                 } else {
136                         io->u.ci_rw.crw_pos = start;
137                         io->u.ci_rw.crw_count = end - start;
138                 }
139                 break;
140         }
141         case CIT_LADVISE: {
142                 io->u.ci_ladvise.li_start = start;
143                 io->u.ci_ladvise.li_end = end;
144                 io->u.ci_ladvise.li_fid = parent->u.ci_ladvise.li_fid;
145                 io->u.ci_ladvise.li_advice = parent->u.ci_ladvise.li_advice;
146                 io->u.ci_ladvise.li_flags = parent->u.ci_ladvise.li_flags;
147                 break;
148         }
149         default:
150                 break;
151         }
152 }
153
154 static int lov_io_sub_init(const struct lu_env *env, struct lov_io *lio,
155                            struct lov_io_sub *sub)
156 {
157         struct lov_object *lov = lio->lis_object;
158         struct lov_device *ld  = lu2lov_dev(lov2cl(lov)->co_lu.lo_dev);
159         struct cl_io      *sub_io;
160         struct cl_object  *sub_obj;
161         struct cl_io      *io  = lio->lis_cl.cis_io;
162
163         int stripe = sub->sub_stripe;
164         int result;
165
166         LASSERT(sub->sub_io == NULL);
167         LASSERT(sub->sub_env == NULL);
168         LASSERT(sub->sub_stripe < lio->lis_stripe_count);
169         ENTRY;
170
171         if (unlikely(lov_r0(lov)->lo_sub[stripe] == NULL))
172                 RETURN(-EIO);
173
174         result = 0;
175         sub->sub_io_initialized = 0;
176         sub->sub_borrowed = 0;
177
178         if (lio->lis_mem_frozen) {
179                 LASSERT(mutex_is_locked(&ld->ld_mutex));
180                 sub->sub_io  = &ld->ld_emrg[stripe]->emrg_subio;
181                 sub->sub_env = ld->ld_emrg[stripe]->emrg_env;
182                 sub->sub_borrowed = 1;
183         } else {
184                 sub->sub_env = cl_env_get(&sub->sub_refcheck);
185                 if (IS_ERR(sub->sub_env))
186                         result = PTR_ERR(sub->sub_env);
187
188                 if (result == 0) {
189                         /*
190                          * First sub-io. Use ->lis_single_subio to
191                          * avoid dynamic allocation.
192                          */
193                         if (lio->lis_active_subios == 0) {
194                                 sub->sub_io = &lio->lis_single_subio;
195                                 lio->lis_single_subio_index = stripe;
196                         } else {
197                                 OBD_ALLOC_PTR(sub->sub_io);
198                                 if (sub->sub_io == NULL)
199                                         result = -ENOMEM;
200                         }
201                 }
202         }
203
204         if (result == 0) {
205                 sub_obj = lovsub2cl(lov_r0(lov)->lo_sub[stripe]);
206                 sub_io  = sub->sub_io;
207
208                 sub_io->ci_obj    = sub_obj;
209                 sub_io->ci_result = 0;
210
211                 sub_io->ci_parent  = io;
212                 sub_io->ci_lockreq = io->ci_lockreq;
213                 sub_io->ci_type    = io->ci_type;
214                 sub_io->ci_no_srvlock = io->ci_no_srvlock;
215                 sub_io->ci_noatime = io->ci_noatime;
216
217                 lov_sub_enter(sub);
218                 result = cl_io_sub_init(sub->sub_env, sub_io,
219                                         io->ci_type, sub_obj);
220                 lov_sub_exit(sub);
221                 if (result >= 0) {
222                         lio->lis_active_subios++;
223                         sub->sub_io_initialized = 1;
224                         result = 0;
225                 }
226         }
227         if (result != 0)
228                 lov_io_sub_fini(env, lio, sub);
229         RETURN(result);
230 }
231
232 struct lov_io_sub *lov_sub_get(const struct lu_env *env,
233                                struct lov_io *lio, int stripe)
234 {
235         int rc;
236         struct lov_io_sub *sub = &lio->lis_subs[stripe];
237
238         LASSERT(stripe < lio->lis_stripe_count);
239         ENTRY;
240
241         if (!sub->sub_io_initialized) {
242                 sub->sub_stripe = stripe;
243                 rc = lov_io_sub_init(env, lio, sub);
244         } else
245                 rc = 0;
246         if (rc == 0)
247                 lov_sub_enter(sub);
248         else
249                 sub = ERR_PTR(rc);
250         RETURN(sub);
251 }
252
253 void lov_sub_put(struct lov_io_sub *sub)
254 {
255         lov_sub_exit(sub);
256 }
257
258 /*****************************************************************************
259  *
260  * Lov io operations.
261  *
262  */
263
264 int lov_page_stripe(const struct cl_page *page)
265 {
266         const struct cl_page_slice *slice;
267         ENTRY;
268
269         slice = cl_page_at(page, &lov_device_type);
270         LASSERT(slice != NULL);
271         LASSERT(slice->cpl_obj != NULL);
272
273         RETURN(cl2lov_page(slice)->lps_stripe);
274 }
275
276 struct lov_io_sub *lov_page_subio(const struct lu_env *env, struct lov_io *lio,
277                                   const struct cl_page_slice *slice)
278 {
279         struct lov_stripe_md *lsm  = lio->lis_object->lo_lsm;
280         struct cl_page       *page = slice->cpl_page;
281         int stripe;
282
283         LASSERT(lio->lis_cl.cis_io != NULL);
284         LASSERT(cl2lov(slice->cpl_obj) == lio->lis_object);
285         LASSERT(lsm != NULL);
286         LASSERT(lio->lis_nr_subios > 0);
287         ENTRY;
288
289         stripe = lov_page_stripe(page);
290         RETURN(lov_sub_get(env, lio, stripe));
291 }
292
293
294 static int lov_io_subio_init(const struct lu_env *env, struct lov_io *lio,
295                              struct cl_io *io)
296 {
297         struct lov_stripe_md *lsm;
298         int result;
299         ENTRY;
300
301         LASSERT(lio->lis_object != NULL);
302         lsm = lio->lis_object->lo_lsm;
303
304         /*
305          * Need to be optimized, we can't afford to allocate a piece of memory
306          * when writing a page. -jay
307          */
308         OBD_ALLOC_LARGE(lio->lis_subs,
309                         lsm->lsm_stripe_count * sizeof lio->lis_subs[0]);
310         if (lio->lis_subs != NULL) {
311                 lio->lis_nr_subios = lio->lis_stripe_count;
312                 lio->lis_single_subio_index = -1;
313                 lio->lis_active_subios = 0;
314                 result = 0;
315         } else
316                 result = -ENOMEM;
317
318         RETURN(result);
319 }
320
321 static int lov_io_slice_init(struct lov_io *lio,
322                              struct lov_object *obj, struct cl_io *io)
323 {
324         ENTRY;
325
326         io->ci_result = 0;
327         lio->lis_object = obj;
328
329         LASSERT(obj->lo_lsm != NULL);
330         lio->lis_stripe_count = obj->lo_lsm->lsm_stripe_count;
331
332         switch (io->ci_type) {
333         case CIT_READ:
334         case CIT_WRITE:
335                 lio->lis_pos = io->u.ci_rw.crw_pos;
336                 lio->lis_endpos = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count;
337                 lio->lis_io_endpos = lio->lis_endpos;
338                 if (cl_io_is_append(io)) {
339                         LASSERT(io->ci_type == CIT_WRITE);
340
341                         /* If there is LOV EA hole, then we may cannot locate
342                          * the current file-tail exactly. */
343                         if (unlikely(obj->lo_lsm->lsm_pattern &
344                                      LOV_PATTERN_F_HOLE))
345                                 RETURN(-EIO);
346
347                         lio->lis_pos = 0;
348                         lio->lis_endpos = OBD_OBJECT_EOF;
349                 }
350                 break;
351
352         case CIT_SETATTR:
353                 if (cl_io_is_trunc(io))
354                         lio->lis_pos = io->u.ci_setattr.sa_attr.lvb_size;
355                 else
356                         lio->lis_pos = 0;
357                 lio->lis_endpos = OBD_OBJECT_EOF;
358                 break;
359
360         case CIT_DATA_VERSION:
361                 lio->lis_pos = 0;
362                 lio->lis_endpos = OBD_OBJECT_EOF;
363                 break;
364
365         case CIT_FAULT: {
366                 pgoff_t index = io->u.ci_fault.ft_index;
367                 lio->lis_pos = cl_offset(io->ci_obj, index);
368                 lio->lis_endpos = cl_offset(io->ci_obj, index + 1);
369                 break;
370         }
371
372         case CIT_FSYNC: {
373                 lio->lis_pos = io->u.ci_fsync.fi_start;
374                 lio->lis_endpos = io->u.ci_fsync.fi_end;
375                 break;
376         }
377
378         case CIT_LADVISE: {
379                 lio->lis_pos = io->u.ci_ladvise.li_start;
380                 lio->lis_endpos = io->u.ci_ladvise.li_end;
381                 break;
382         }
383
384         case CIT_MISC:
385                 lio->lis_pos = 0;
386                 lio->lis_endpos = OBD_OBJECT_EOF;
387                 break;
388
389         default:
390                 LBUG();
391         }
392
393         RETURN(0);
394 }
395
396 static void lov_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
397 {
398         struct lov_io *lio = cl2lov_io(env, ios);
399         struct lov_object *lov = cl2lov(ios->cis_obj);
400         int i;
401
402         ENTRY;
403         if (lio->lis_subs != NULL) {
404                 for (i = 0; i < lio->lis_nr_subios; i++)
405                         lov_io_sub_fini(env, lio, &lio->lis_subs[i]);
406                 OBD_FREE_LARGE(lio->lis_subs,
407                          lio->lis_nr_subios * sizeof lio->lis_subs[0]);
408                 lio->lis_nr_subios = 0;
409         }
410
411         LASSERT(atomic_read(&lov->lo_active_ios) > 0);
412         if (atomic_dec_and_test(&lov->lo_active_ios))
413                 wake_up_all(&lov->lo_waitq);
414         EXIT;
415 }
416
417 static loff_t lov_offset_mod(loff_t val, int delta)
418 {
419         if (val != OBD_OBJECT_EOF)
420                 val += delta;
421         return val;
422 }
423
424 static int lov_io_iter_init(const struct lu_env *env,
425                             const struct cl_io_slice *ios)
426 {
427         struct lov_io        *lio = cl2lov_io(env, ios);
428         struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
429         struct lov_io_sub    *sub;
430         loff_t endpos;
431         loff_t start;
432         loff_t end;
433         int stripe;
434         int rc = 0;
435
436         ENTRY;
437         endpos = lov_offset_mod(lio->lis_endpos, -1);
438         for (stripe = 0; stripe < lio->lis_stripe_count; stripe++) {
439                 if (!lov_stripe_intersects(lsm, stripe, lio->lis_pos,
440                                            endpos, &start, &end))
441                         continue;
442
443                 if (unlikely(lov_r0(lio->lis_object)->lo_sub[stripe] == NULL)) {
444                         if (ios->cis_io->ci_type == CIT_READ ||
445                             ios->cis_io->ci_type == CIT_WRITE ||
446                             ios->cis_io->ci_type == CIT_FAULT)
447                                 RETURN(-EIO);
448
449                         continue;
450                 }
451
452                 end = lov_offset_mod(end, +1);
453                 sub = lov_sub_get(env, lio, stripe);
454                 if (IS_ERR(sub)) {
455                         rc = PTR_ERR(sub);
456                         break;
457                 }
458
459                 lov_io_sub_inherit(sub->sub_io, lio, stripe, start, end);
460                 rc = cl_io_iter_init(sub->sub_env, sub->sub_io);
461                 if (rc != 0)
462                         cl_io_iter_fini(sub->sub_env, sub->sub_io);
463                 lov_sub_put(sub);
464                 if (rc != 0)
465                         break;
466
467                 CDEBUG(D_VFSTRACE, "shrink: %d ["LPU64", "LPU64")\n",
468                        stripe, start, end);
469
470                 list_add_tail(&sub->sub_linkage, &lio->lis_active);
471         }
472         RETURN(rc);
473 }
474
475 static int lov_io_rw_iter_init(const struct lu_env *env,
476                                const struct cl_io_slice *ios)
477 {
478         struct lov_io        *lio = cl2lov_io(env, ios);
479         struct cl_io         *io  = ios->cis_io;
480         struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
481         loff_t start = io->u.ci_rw.crw_pos;
482         loff_t next;
483         unsigned long ssize = lsm->lsm_stripe_size;
484
485         LASSERT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
486         ENTRY;
487
488         /* fast path for common case. */
489         if (lio->lis_nr_subios != 1 && !cl_io_is_append(io)) {
490
491                 lov_do_div64(start, ssize);
492                 next = (start + 1) * ssize;
493                 if (next <= start * ssize)
494                         next = ~0ull;
495
496                 io->ci_continue = next < lio->lis_io_endpos;
497                 io->u.ci_rw.crw_count = min_t(loff_t, lio->lis_io_endpos,
498                                               next) - io->u.ci_rw.crw_pos;
499                 lio->lis_pos    = io->u.ci_rw.crw_pos;
500                 lio->lis_endpos = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count;
501                 CDEBUG(D_VFSTRACE, "stripe: "LPU64" chunk: ["LPU64", "LPU64") "
502                        LPU64"\n", (__u64)start, lio->lis_pos, lio->lis_endpos,
503                        (__u64)lio->lis_io_endpos);
504         }
505         /*
506          * XXX The following call should be optimized: we know, that
507          * [lio->lis_pos, lio->lis_endpos) intersects with exactly one stripe.
508          */
509         RETURN(lov_io_iter_init(env, ios));
510 }
511
512 static int lov_io_call(const struct lu_env *env, struct lov_io *lio,
513                        int (*iofunc)(const struct lu_env *, struct cl_io *))
514 {
515         struct cl_io *parent = lio->lis_cl.cis_io;
516         struct lov_io_sub *sub;
517         int rc = 0;
518
519         ENTRY;
520         list_for_each_entry(sub, &lio->lis_active, sub_linkage) {
521                 lov_sub_enter(sub);
522                 rc = iofunc(sub->sub_env, sub->sub_io);
523                 lov_sub_exit(sub);
524                 if (rc)
525                         break;
526
527                 if (parent->ci_result == 0)
528                         parent->ci_result = sub->sub_io->ci_result;
529         }
530         RETURN(rc);
531 }
532
533 static int lov_io_lock(const struct lu_env *env, const struct cl_io_slice *ios)
534 {
535         ENTRY;
536         RETURN(lov_io_call(env, cl2lov_io(env, ios), cl_io_lock));
537 }
538
539 static int lov_io_start(const struct lu_env *env, const struct cl_io_slice *ios)
540 {
541         ENTRY;
542         RETURN(lov_io_call(env, cl2lov_io(env, ios), cl_io_start));
543 }
544
545 static int lov_io_end_wrapper(const struct lu_env *env, struct cl_io *io)
546 {
547         ENTRY;
548         /*
549          * It's possible that lov_io_start() wasn't called against this
550          * sub-io, either because previous sub-io failed, or upper layer
551          * completed IO.
552          */
553         if (io->ci_state == CIS_IO_GOING)
554                 cl_io_end(env, io);
555         else
556                 io->ci_state = CIS_IO_FINISHED;
557         RETURN(0);
558 }
559
560 static int lov_io_iter_fini_wrapper(const struct lu_env *env, struct cl_io *io)
561 {
562         cl_io_iter_fini(env, io);
563         RETURN(0);
564 }
565
566 static int lov_io_unlock_wrapper(const struct lu_env *env, struct cl_io *io)
567 {
568         cl_io_unlock(env, io);
569         RETURN(0);
570 }
571
572 static void lov_io_end(const struct lu_env *env, const struct cl_io_slice *ios)
573 {
574         int rc;
575
576         rc = lov_io_call(env, cl2lov_io(env, ios), lov_io_end_wrapper);
577         LASSERT(rc == 0);
578 }
579
580 static void
581 lov_io_data_version_end(const struct lu_env *env, const struct cl_io_slice *ios)
582 {
583         struct lov_io *lio = cl2lov_io(env, ios);
584         struct cl_io *parent = lio->lis_cl.cis_io;
585         struct lov_io_sub *sub;
586
587         ENTRY;
588         list_for_each_entry(sub, &lio->lis_active, sub_linkage) {
589                 lov_io_end_wrapper(env, sub->sub_io);
590
591                 parent->u.ci_data_version.dv_data_version +=
592                         sub->sub_io->u.ci_data_version.dv_data_version;
593
594                 if (parent->ci_result == 0)
595                         parent->ci_result = sub->sub_io->ci_result;
596         }
597
598         EXIT;
599 }
600
601 static void lov_io_iter_fini(const struct lu_env *env,
602                              const struct cl_io_slice *ios)
603 {
604         struct lov_io *lio = cl2lov_io(env, ios);
605         int rc;
606
607         ENTRY;
608         rc = lov_io_call(env, lio, lov_io_iter_fini_wrapper);
609         LASSERT(rc == 0);
610         while (!list_empty(&lio->lis_active))
611                 list_del_init(lio->lis_active.next);
612         EXIT;
613 }
614
615 static void lov_io_unlock(const struct lu_env *env,
616                           const struct cl_io_slice *ios)
617 {
618         int rc;
619
620         ENTRY;
621         rc = lov_io_call(env, cl2lov_io(env, ios), lov_io_unlock_wrapper);
622         LASSERT(rc == 0);
623         EXIT;
624 }
625
626 static int lov_io_read_ahead(const struct lu_env *env,
627                              const struct cl_io_slice *ios,
628                              pgoff_t start, struct cl_read_ahead *ra)
629 {
630         struct lov_io           *lio = cl2lov_io(env, ios);
631         struct lov_object       *loo = lio->lis_object;
632         struct cl_object        *obj = lov2cl(loo);
633         struct lov_layout_raid0 *r0 = lov_r0(loo);
634         struct lov_io_sub       *sub;
635         loff_t                   suboff;
636         pgoff_t                  ra_end;
637         unsigned int             pps; /* pages per stripe */
638         int                      stripe;
639         int                      rc;
640         ENTRY;
641
642         stripe = lov_stripe_number(loo->lo_lsm, cl_offset(obj, start));
643         if (unlikely(r0->lo_sub[stripe] == NULL))
644                 RETURN(-EIO);
645
646         sub = lov_sub_get(env, lio, stripe);
647
648         lov_stripe_offset(loo->lo_lsm, cl_offset(obj, start), stripe, &suboff);
649         rc = cl_io_read_ahead(sub->sub_env, sub->sub_io,
650                               cl_index(lovsub2cl(r0->lo_sub[stripe]), suboff),
651                               ra);
652         lov_sub_put(sub);
653
654         CDEBUG(D_READA, DFID " cra_end = %lu, stripes = %d, rc = %d\n",
655                PFID(lu_object_fid(lov2lu(loo))), ra->cra_end, r0->lo_nr, rc);
656         if (rc != 0)
657                 RETURN(rc);
658
659         /**
660          * Adjust the stripe index by layout of raid0. ra->cra_end is the maximum
661          * page index covered by an underlying DLM lock.
662          * This function converts cra_end from stripe level to file level, and
663          * make sure it's not beyond stripe boundary.
664          */
665         if (r0->lo_nr == 1) /* single stripe file */
666                 RETURN(0);
667
668         /* cra_end is stripe level, convert it into file level */
669         ra_end = ra->cra_end;
670         if (ra_end != CL_PAGE_EOF)
671                 ra_end = lov_stripe_pgoff(loo->lo_lsm, ra_end, stripe);
672
673         pps = loo->lo_lsm->lsm_stripe_size >> PAGE_CACHE_SHIFT;
674
675         CDEBUG(D_READA, DFID " max_index = %lu, pps = %u, "
676                "stripe_size = %u, stripe no = %u, start index = %lu\n",
677                PFID(lu_object_fid(lov2lu(loo))), ra_end, pps,
678                loo->lo_lsm->lsm_stripe_size, stripe, start);
679
680         /* never exceed the end of the stripe */
681         ra->cra_end = min_t(pgoff_t, ra_end, start + pps - start % pps - 1);
682         RETURN(0);
683 }
684
685 /**
686  * lov implementation of cl_operations::cio_submit() method. It takes a list
687  * of pages in \a queue, splits it into per-stripe sub-lists, invokes
688  * cl_io_submit() on underlying devices to submit sub-lists, and then splices
689  * everything back.
690  *
691  * Major complication of this function is a need to handle memory cleansing:
692  * cl_io_submit() is called to write out pages as a part of VM memory
693  * reclamation, and hence it may not fail due to memory shortages (system
694  * dead-locks otherwise). To deal with this, some resources (sub-lists,
695  * sub-environment, etc.) are allocated per-device on "startup" (i.e., in a
696  * not-memory cleansing context), and in case of memory shortage, these
697  * pre-allocated resources are used by lov_io_submit() under
698  * lov_device::ld_mutex mutex.
699  */
700 static int lov_io_submit(const struct lu_env *env,
701                          const struct cl_io_slice *ios,
702                          enum cl_req_type crt, struct cl_2queue *queue)
703 {
704         struct cl_page_list     *qin = &queue->c2_qin;
705         struct lov_io           *lio = cl2lov_io(env, ios);
706         struct lov_io_sub       *sub;
707         struct cl_page_list     *plist = &lov_env_info(env)->lti_plist;
708         struct cl_page          *page;
709         int stripe;
710         int rc = 0;
711         ENTRY;
712
713         if (lio->lis_active_subios == 1) {
714                 int idx = lio->lis_single_subio_index;
715
716                 LASSERT(idx < lio->lis_nr_subios);
717                 sub = lov_sub_get(env, lio, idx);
718                 LASSERT(!IS_ERR(sub));
719                 LASSERT(sub->sub_io == &lio->lis_single_subio);
720                 rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
721                                      crt, queue);
722                 lov_sub_put(sub);
723                 RETURN(rc);
724         }
725
726         LASSERT(lio->lis_subs != NULL);
727
728         cl_page_list_init(plist);
729         while (qin->pl_nr > 0) {
730                 struct cl_2queue  *cl2q = &lov_env_info(env)->lti_cl2q;
731
732                 cl_2queue_init(cl2q);
733
734                 page = cl_page_list_first(qin);
735                 cl_page_list_move(&cl2q->c2_qin, qin, page);
736
737                 stripe = lov_page_stripe(page);
738                 while (qin->pl_nr > 0) {
739                         page = cl_page_list_first(qin);
740                         if (stripe != lov_page_stripe(page))
741                                 break;
742
743                         cl_page_list_move(&cl2q->c2_qin, qin, page);
744                 }
745
746                 sub = lov_sub_get(env, lio, stripe);
747                 if (!IS_ERR(sub)) {
748                         rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
749                                              crt, cl2q);
750                         lov_sub_put(sub);
751                 } else {
752                         rc = PTR_ERR(sub);
753                 }
754
755                 cl_page_list_splice(&cl2q->c2_qin, plist);
756                 cl_page_list_splice(&cl2q->c2_qout, &queue->c2_qout);
757                 cl_2queue_fini(env, cl2q);
758
759                 if (rc != 0)
760                         break;
761         }
762
763         cl_page_list_splice(plist, qin);
764         cl_page_list_fini(env, plist);
765
766         RETURN(rc);
767 }
768
769 static int lov_io_commit_async(const struct lu_env *env,
770                                const struct cl_io_slice *ios,
771                                struct cl_page_list *queue, int from, int to,
772                                cl_commit_cbt cb)
773 {
774         struct cl_page_list *plist = &lov_env_info(env)->lti_plist;
775         struct lov_io     *lio = cl2lov_io(env, ios);
776         struct lov_io_sub *sub;
777         struct cl_page *page;
778         int rc = 0;
779         ENTRY;
780
781         if (lio->lis_active_subios == 1) {
782                 int idx = lio->lis_single_subio_index;
783
784                 LASSERT(idx < lio->lis_nr_subios);
785                 sub = lov_sub_get(env, lio, idx);
786                 LASSERT(!IS_ERR(sub));
787                 LASSERT(sub->sub_io == &lio->lis_single_subio);
788                 rc = cl_io_commit_async(sub->sub_env, sub->sub_io, queue,
789                                         from, to, cb);
790                 lov_sub_put(sub);
791                 RETURN(rc);
792         }
793
794         LASSERT(lio->lis_subs != NULL);
795
796         cl_page_list_init(plist);
797         while (queue->pl_nr > 0) {
798                 int stripe_to = to;
799                 int stripe;
800
801                 LASSERT(plist->pl_nr == 0);
802                 page = cl_page_list_first(queue);
803                 cl_page_list_move(plist, queue, page);
804
805                 stripe = lov_page_stripe(page);
806                 while (queue->pl_nr > 0) {
807                         page = cl_page_list_first(queue);
808                         if (stripe != lov_page_stripe(page))
809                                 break;
810
811                         cl_page_list_move(plist, queue, page);
812                 }
813
814                 if (queue->pl_nr > 0) /* still has more pages */
815                         stripe_to = PAGE_SIZE;
816
817                 sub = lov_sub_get(env, lio, stripe);
818                 if (!IS_ERR(sub)) {
819                         rc = cl_io_commit_async(sub->sub_env, sub->sub_io,
820                                                 plist, from, stripe_to, cb);
821                         lov_sub_put(sub);
822                 } else {
823                         rc = PTR_ERR(sub);
824                         break;
825                 }
826
827                 if (plist->pl_nr > 0) /* short write */
828                         break;
829
830                 from = 0;
831         }
832
833         /* for error case, add the page back into the qin list */
834         LASSERT(ergo(rc == 0, plist->pl_nr == 0));
835         while (plist->pl_nr > 0) {
836                 /* error occurred, add the uncommitted pages back into queue */
837                 page = cl_page_list_last(plist);
838                 cl_page_list_move_head(queue, plist, page);
839         }
840
841         RETURN(rc);
842 }
843
844 static int lov_io_fault_start(const struct lu_env *env,
845                               const struct cl_io_slice *ios)
846 {
847         struct cl_fault_io *fio;
848         struct lov_io      *lio;
849         struct lov_io_sub  *sub;
850
851         ENTRY;
852         fio = &ios->cis_io->u.ci_fault;
853         lio = cl2lov_io(env, ios);
854         sub = lov_sub_get(env, lio, lov_page_stripe(fio->ft_page));
855         sub->sub_io->u.ci_fault.ft_nob = fio->ft_nob;
856         lov_sub_put(sub);
857         RETURN(lov_io_start(env, ios));
858 }
859
860 static void lov_io_fsync_end(const struct lu_env *env,
861                              const struct cl_io_slice *ios)
862 {
863         struct lov_io *lio = cl2lov_io(env, ios);
864         struct lov_io_sub *sub;
865         unsigned int *written = &ios->cis_io->u.ci_fsync.fi_nr_written;
866         ENTRY;
867
868         *written = 0;
869         list_for_each_entry(sub, &lio->lis_active, sub_linkage) {
870                 struct cl_io *subio = sub->sub_io;
871
872                 lov_sub_enter(sub);
873                 lov_io_end_wrapper(sub->sub_env, subio);
874                 lov_sub_exit(sub);
875
876                 if (subio->ci_result == 0)
877                         *written += subio->u.ci_fsync.fi_nr_written;
878         }
879         RETURN_EXIT;
880 }
881
882 static const struct cl_io_operations lov_io_ops = {
883         .op = {
884                 [CIT_READ] = {
885                         .cio_fini      = lov_io_fini,
886                         .cio_iter_init = lov_io_rw_iter_init,
887                         .cio_iter_fini = lov_io_iter_fini,
888                         .cio_lock      = lov_io_lock,
889                         .cio_unlock    = lov_io_unlock,
890                         .cio_start     = lov_io_start,
891                         .cio_end       = lov_io_end
892                 },
893                 [CIT_WRITE] = {
894                         .cio_fini      = lov_io_fini,
895                         .cio_iter_init = lov_io_rw_iter_init,
896                         .cio_iter_fini = lov_io_iter_fini,
897                         .cio_lock      = lov_io_lock,
898                         .cio_unlock    = lov_io_unlock,
899                         .cio_start     = lov_io_start,
900                         .cio_end       = lov_io_end
901                 },
902                 [CIT_SETATTR] = {
903                         .cio_fini      = lov_io_fini,
904                         .cio_iter_init = lov_io_iter_init,
905                         .cio_iter_fini = lov_io_iter_fini,
906                         .cio_lock      = lov_io_lock,
907                         .cio_unlock    = lov_io_unlock,
908                         .cio_start     = lov_io_start,
909                         .cio_end       = lov_io_end
910                 },
911                 [CIT_DATA_VERSION] = {
912                         .cio_fini       = lov_io_fini,
913                         .cio_iter_init  = lov_io_iter_init,
914                         .cio_iter_fini  = lov_io_iter_fini,
915                         .cio_lock       = lov_io_lock,
916                         .cio_unlock     = lov_io_unlock,
917                         .cio_start      = lov_io_start,
918                         .cio_end        = lov_io_data_version_end,
919                 },
920                 [CIT_FAULT] = {
921                         .cio_fini      = lov_io_fini,
922                         .cio_iter_init = lov_io_iter_init,
923                         .cio_iter_fini = lov_io_iter_fini,
924                         .cio_lock      = lov_io_lock,
925                         .cio_unlock    = lov_io_unlock,
926                         .cio_start     = lov_io_fault_start,
927                         .cio_end       = lov_io_end
928                 },
929                 [CIT_FSYNC] = {
930                         .cio_fini      = lov_io_fini,
931                         .cio_iter_init = lov_io_iter_init,
932                         .cio_iter_fini = lov_io_iter_fini,
933                         .cio_lock      = lov_io_lock,
934                         .cio_unlock    = lov_io_unlock,
935                         .cio_start     = lov_io_start,
936                         .cio_end       = lov_io_fsync_end
937                 },
938                 [CIT_LADVISE] = {
939                         .cio_fini      = lov_io_fini,
940                         .cio_iter_init = lov_io_iter_init,
941                         .cio_iter_fini = lov_io_iter_fini,
942                         .cio_lock      = lov_io_lock,
943                         .cio_unlock    = lov_io_unlock,
944                         .cio_start     = lov_io_start,
945                         .cio_end       = lov_io_end
946                 },
947                 [CIT_MISC] = {
948                         .cio_fini      = lov_io_fini
949                 }
950         },
951         .cio_read_ahead                = lov_io_read_ahead,
952         .cio_submit                    = lov_io_submit,
953         .cio_commit_async              = lov_io_commit_async,
954 };
955
956 /*****************************************************************************
957  *
958  * Empty lov io operations.
959  *
960  */
961
962 static void lov_empty_io_fini(const struct lu_env *env,
963                               const struct cl_io_slice *ios)
964 {
965         struct lov_object *lov = cl2lov(ios->cis_obj);
966         ENTRY;
967
968         if (atomic_dec_and_test(&lov->lo_active_ios))
969                 wake_up_all(&lov->lo_waitq);
970         EXIT;
971 }
972
973 static int lov_empty_io_submit(const struct lu_env *env,
974                                const struct cl_io_slice *ios,
975                                enum cl_req_type crt, struct cl_2queue *queue)
976 {
977         return -EBADF;
978 }
979
980 static void lov_empty_impossible(const struct lu_env *env,
981                                  struct cl_io_slice *ios)
982 {
983         LBUG();
984 }
985
986 #define LOV_EMPTY_IMPOSSIBLE ((void *)lov_empty_impossible)
987
988 /**
989  * An io operation vector for files without stripes.
990  */
991 static const struct cl_io_operations lov_empty_io_ops = {
992         .op = {
993                 [CIT_READ] = {
994                         .cio_fini       = lov_empty_io_fini,
995 #if 0
996                         .cio_iter_init  = LOV_EMPTY_IMPOSSIBLE,
997                         .cio_lock       = LOV_EMPTY_IMPOSSIBLE,
998                         .cio_start      = LOV_EMPTY_IMPOSSIBLE,
999                         .cio_end        = LOV_EMPTY_IMPOSSIBLE
1000 #endif
1001                 },
1002                 [CIT_WRITE] = {
1003                         .cio_fini      = lov_empty_io_fini,
1004                         .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
1005                         .cio_lock      = LOV_EMPTY_IMPOSSIBLE,
1006                         .cio_start     = LOV_EMPTY_IMPOSSIBLE,
1007                         .cio_end       = LOV_EMPTY_IMPOSSIBLE
1008                 },
1009                 [CIT_SETATTR] = {
1010                         .cio_fini      = lov_empty_io_fini,
1011                         .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
1012                         .cio_lock      = LOV_EMPTY_IMPOSSIBLE,
1013                         .cio_start     = LOV_EMPTY_IMPOSSIBLE,
1014                         .cio_end       = LOV_EMPTY_IMPOSSIBLE
1015                 },
1016                 [CIT_FAULT] = {
1017                         .cio_fini      = lov_empty_io_fini,
1018                         .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
1019                         .cio_lock      = LOV_EMPTY_IMPOSSIBLE,
1020                         .cio_start     = LOV_EMPTY_IMPOSSIBLE,
1021                         .cio_end       = LOV_EMPTY_IMPOSSIBLE
1022                 },
1023                 [CIT_FSYNC] = {
1024                         .cio_fini      = lov_empty_io_fini
1025                 },
1026                 [CIT_LADVISE] = {
1027                         .cio_fini   = lov_empty_io_fini
1028                 },
1029                 [CIT_MISC] = {
1030                         .cio_fini      = lov_empty_io_fini
1031                 }
1032         },
1033         .cio_submit                    = lov_empty_io_submit,
1034         .cio_commit_async              = LOV_EMPTY_IMPOSSIBLE
1035 };
1036
1037 int lov_io_init_raid0(const struct lu_env *env, struct cl_object *obj,
1038                       struct cl_io *io)
1039 {
1040         struct lov_io       *lio = lov_env_io(env);
1041         struct lov_object   *lov = cl2lov(obj);
1042
1043         ENTRY;
1044         INIT_LIST_HEAD(&lio->lis_active);
1045         io->ci_result = lov_io_slice_init(lio, lov, io);
1046         if (io->ci_result != 0)
1047                 RETURN(io->ci_result);
1048
1049         if (io->ci_result == 0) {
1050                 io->ci_result = lov_io_subio_init(env, lio, io);
1051                 if (io->ci_result == 0) {
1052                         cl_io_slice_add(io, &lio->lis_cl, obj, &lov_io_ops);
1053                         atomic_inc(&lov->lo_active_ios);
1054                 }
1055         }
1056         RETURN(io->ci_result);
1057 }
1058
1059 int lov_io_init_empty(const struct lu_env *env, struct cl_object *obj,
1060                       struct cl_io *io)
1061 {
1062         struct lov_object *lov = cl2lov(obj);
1063         struct lov_io *lio = lov_env_io(env);
1064         int result;
1065         ENTRY;
1066
1067         lio->lis_object = lov;
1068         switch (io->ci_type) {
1069         default:
1070                 LBUG();
1071         case CIT_MISC:
1072         case CIT_READ:
1073                 result = 0;
1074                 break;
1075         case CIT_FSYNC:
1076         case CIT_LADVISE:
1077         case CIT_SETATTR:
1078         case CIT_DATA_VERSION:
1079                 result = +1;
1080                 break;
1081         case CIT_WRITE:
1082                 result = -EBADF;
1083                 break;
1084         case CIT_FAULT:
1085                 result = -EFAULT;
1086                 CERROR("Page fault on a file without stripes: "DFID"\n",
1087                        PFID(lu_object_fid(&obj->co_lu)));
1088                 break;
1089         }
1090         if (result == 0) {
1091                 cl_io_slice_add(io, &lio->lis_cl, obj, &lov_empty_io_ops);
1092                 atomic_inc(&lov->lo_active_ios);
1093         }
1094
1095         io->ci_result = result < 0 ? result : 0;
1096         RETURN(result);
1097 }
1098
1099 int lov_io_init_released(const struct lu_env *env, struct cl_object *obj,
1100                         struct cl_io *io)
1101 {
1102         struct lov_object *lov = cl2lov(obj);
1103         struct lov_io *lio = lov_env_io(env);
1104         int result;
1105         ENTRY;
1106
1107         LASSERT(lov->lo_lsm != NULL);
1108         lio->lis_object = lov;
1109
1110         switch (io->ci_type) {
1111         default:
1112                 LASSERTF(0, "invalid type %d\n", io->ci_type);
1113                 result = -EOPNOTSUPP;
1114                 break;
1115         case CIT_MISC:
1116         case CIT_FSYNC:
1117         case CIT_LADVISE:
1118         case CIT_DATA_VERSION:
1119                 result = 1;
1120                 break;
1121         case CIT_SETATTR:
1122                 /* the truncate to 0 is managed by MDT:
1123                  * - in open, for open O_TRUNC
1124                  * - in setattr, for truncate
1125                  */
1126                 /* the truncate is for size > 0 so triggers a restore */
1127                 if (cl_io_is_trunc(io)) {
1128                         io->ci_restore_needed = 1;
1129                         result = -ENODATA;
1130                 } else
1131                         result = 1;
1132                 break;
1133         case CIT_READ:
1134         case CIT_WRITE:
1135         case CIT_FAULT:
1136                 io->ci_restore_needed = 1;
1137                 result = -ENODATA;
1138                 break;
1139         }
1140
1141         if (result == 0) {
1142                 cl_io_slice_add(io, &lio->lis_cl, obj, &lov_empty_io_ops);
1143                 atomic_inc(&lov->lo_active_ios);
1144         }
1145
1146         io->ci_result = result < 0 ? result : 0;
1147         RETURN(result);
1148 }
1149 /** @} lov */