Whamcloud - gitweb
b8edb13bb6ad42e421dc636a7fca5cdc73e32fe3
[fs/lustre-release.git] / lustre / lov / lov_io.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Implementation of cl_io for LOV layer.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  *   Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LOV
43
44 #include "lov_cl_internal.h"
45
46 /** \addtogroup lov
47  *  @{
48  */
49
50 static inline void lov_sub_enter(struct lov_io_sub *sub)
51 {
52         sub->sub_reenter++;
53 }
54 static inline void lov_sub_exit(struct lov_io_sub *sub)
55 {
56         sub->sub_reenter--;
57 }
58
59 static void lov_io_sub_fini(const struct lu_env *env, struct lov_io *lio,
60                             struct lov_io_sub *sub)
61 {
62         ENTRY;
63         if (sub->sub_io != NULL) {
64                 if (sub->sub_io_initialized) {
65                         lov_sub_enter(sub);
66                         cl_io_fini(sub->sub_env, sub->sub_io);
67                         lov_sub_exit(sub);
68                         sub->sub_io_initialized = 0;
69                         lio->lis_active_subios--;
70                 }
71                 if (sub->sub_stripe == lio->lis_single_subio_index)
72                         lio->lis_single_subio_index = -1;
73                 else if (!sub->sub_borrowed)
74                         OBD_FREE_PTR(sub->sub_io);
75                 sub->sub_io = NULL;
76         }
77         if (sub->sub_env != NULL && !IS_ERR(sub->sub_env)) {
78                 if (!sub->sub_borrowed)
79                         cl_env_put(sub->sub_env, &sub->sub_refcheck);
80                 sub->sub_env = NULL;
81         }
82         EXIT;
83 }
84
85 static void lov_io_sub_inherit(struct cl_io *io, struct lov_io *lio,
86                                int stripe, loff_t start, loff_t end)
87 {
88         struct lov_stripe_md *lsm    = lio->lis_object->lo_lsm;
89         struct cl_io         *parent = lio->lis_cl.cis_io;
90
91         switch (io->ci_type) {
92         case CIT_SETATTR: {
93                 io->u.ci_setattr.sa_attr = parent->u.ci_setattr.sa_attr;
94                 io->u.ci_setattr.sa_attr_flags =
95                         parent->u.ci_setattr.sa_attr_flags;
96                 io->u.ci_setattr.sa_valid = parent->u.ci_setattr.sa_valid;
97                 io->u.ci_setattr.sa_stripe_index = stripe;
98                 io->u.ci_setattr.sa_parent_fid =
99                                         parent->u.ci_setattr.sa_parent_fid;
100                 io->u.ci_setattr.sa_capa = parent->u.ci_setattr.sa_capa;
101                 if (cl_io_is_trunc(io)) {
102                         loff_t new_size = parent->u.ci_setattr.sa_attr.lvb_size;
103
104                         new_size = lov_size_to_stripe(lsm, new_size, stripe);
105                         io->u.ci_setattr.sa_attr.lvb_size = new_size;
106                 }
107                 break;
108         }
109         case CIT_FAULT: {
110                 struct cl_object *obj = parent->ci_obj;
111                 loff_t off = cl_offset(obj, parent->u.ci_fault.ft_index);
112
113                 io->u.ci_fault = parent->u.ci_fault;
114                 off = lov_size_to_stripe(lsm, off, stripe);
115                 io->u.ci_fault.ft_index = cl_index(obj, off);
116                 break;
117         }
118         case CIT_FSYNC: {
119                 io->u.ci_fsync.fi_start = start;
120                 io->u.ci_fsync.fi_end = end;
121                 io->u.ci_fsync.fi_capa = parent->u.ci_fsync.fi_capa;
122                 io->u.ci_fsync.fi_fid = parent->u.ci_fsync.fi_fid;
123                 io->u.ci_fsync.fi_mode = parent->u.ci_fsync.fi_mode;
124                 break;
125         }
126         case CIT_READ:
127         case CIT_WRITE: {
128                 io->u.ci_wr.wr_sync = cl_io_is_sync_write(parent);
129                 if (cl_io_is_append(parent)) {
130                         io->u.ci_wr.wr_append = 1;
131                 } else {
132                         io->u.ci_rw.crw_pos = start;
133                         io->u.ci_rw.crw_count = end - start;
134                 }
135                 break;
136         }
137         default:
138                 break;
139         }
140 }
141
142 static int lov_io_sub_init(const struct lu_env *env, struct lov_io *lio,
143                            struct lov_io_sub *sub)
144 {
145         struct lov_object *lov = lio->lis_object;
146         struct lov_device *ld  = lu2lov_dev(lov2cl(lov)->co_lu.lo_dev);
147         struct cl_io      *sub_io;
148         struct cl_object  *sub_obj;
149         struct cl_io      *io  = lio->lis_cl.cis_io;
150
151         int stripe = sub->sub_stripe;
152         int result;
153
154         LASSERT(sub->sub_io == NULL);
155         LASSERT(sub->sub_env == NULL);
156         LASSERT(sub->sub_stripe < lio->lis_stripe_count);
157         ENTRY;
158
159         if (unlikely(lov_r0(lov)->lo_sub[stripe] == NULL))
160                 RETURN(-EIO);
161
162         result = 0;
163         sub->sub_io_initialized = 0;
164         sub->sub_borrowed = 0;
165
166         if (lio->lis_mem_frozen) {
167                 LASSERT(mutex_is_locked(&ld->ld_mutex));
168                 sub->sub_io  = &ld->ld_emrg[stripe]->emrg_subio;
169                 sub->sub_env = ld->ld_emrg[stripe]->emrg_env;
170                 sub->sub_borrowed = 1;
171         } else {
172                 void *cookie;
173
174                 /* obtain new environment */
175                 cookie = cl_env_reenter();
176                 sub->sub_env = cl_env_get(&sub->sub_refcheck);
177                 cl_env_reexit(cookie);
178                 if (IS_ERR(sub->sub_env))
179                         result = PTR_ERR(sub->sub_env);
180
181                 if (result == 0) {
182                         /*
183                          * First sub-io. Use ->lis_single_subio to
184                          * avoid dynamic allocation.
185                          */
186                         if (lio->lis_active_subios == 0) {
187                                 sub->sub_io = &lio->lis_single_subio;
188                                 lio->lis_single_subio_index = stripe;
189                         } else {
190                                 OBD_ALLOC_PTR(sub->sub_io);
191                                 if (sub->sub_io == NULL)
192                                         result = -ENOMEM;
193                         }
194                 }
195         }
196
197         if (result == 0) {
198                 sub_obj = lovsub2cl(lov_r0(lov)->lo_sub[stripe]);
199                 sub_io  = sub->sub_io;
200
201                 sub_io->ci_obj    = sub_obj;
202                 sub_io->ci_result = 0;
203
204                 sub_io->ci_parent  = io;
205                 sub_io->ci_lockreq = io->ci_lockreq;
206                 sub_io->ci_type    = io->ci_type;
207                 sub_io->ci_no_srvlock = io->ci_no_srvlock;
208                 sub_io->ci_noatime = io->ci_noatime;
209
210                 lov_sub_enter(sub);
211                 result = cl_io_sub_init(sub->sub_env, sub_io,
212                                         io->ci_type, sub_obj);
213                 lov_sub_exit(sub);
214                 if (result >= 0) {
215                         lio->lis_active_subios++;
216                         sub->sub_io_initialized = 1;
217                         result = 0;
218                 }
219         }
220         if (result != 0)
221                 lov_io_sub_fini(env, lio, sub);
222         RETURN(result);
223 }
224
225 struct lov_io_sub *lov_sub_get(const struct lu_env *env,
226                                struct lov_io *lio, int stripe)
227 {
228         int rc;
229         struct lov_io_sub *sub = &lio->lis_subs[stripe];
230
231         LASSERT(stripe < lio->lis_stripe_count);
232         ENTRY;
233
234         if (!sub->sub_io_initialized) {
235                 sub->sub_stripe = stripe;
236                 rc = lov_io_sub_init(env, lio, sub);
237         } else
238                 rc = 0;
239         if (rc == 0)
240                 lov_sub_enter(sub);
241         else
242                 sub = ERR_PTR(rc);
243         RETURN(sub);
244 }
245
246 void lov_sub_put(struct lov_io_sub *sub)
247 {
248         lov_sub_exit(sub);
249 }
250
251 /*****************************************************************************
252  *
253  * Lov io operations.
254  *
255  */
256
257 int lov_page_stripe(const struct cl_page *page)
258 {
259         const struct cl_page_slice *slice;
260         ENTRY;
261
262         slice = cl_page_at(page, &lov_device_type);
263         LASSERT(slice != NULL);
264         LASSERT(slice->cpl_obj != NULL);
265
266         RETURN(cl2lov_page(slice)->lps_stripe);
267 }
268
269 struct lov_io_sub *lov_page_subio(const struct lu_env *env, struct lov_io *lio,
270                                   const struct cl_page_slice *slice)
271 {
272         struct lov_stripe_md *lsm  = lio->lis_object->lo_lsm;
273         struct cl_page       *page = slice->cpl_page;
274         int stripe;
275
276         LASSERT(lio->lis_cl.cis_io != NULL);
277         LASSERT(cl2lov(slice->cpl_obj) == lio->lis_object);
278         LASSERT(lsm != NULL);
279         LASSERT(lio->lis_nr_subios > 0);
280         ENTRY;
281
282         stripe = lov_page_stripe(page);
283         RETURN(lov_sub_get(env, lio, stripe));
284 }
285
286
287 static int lov_io_subio_init(const struct lu_env *env, struct lov_io *lio,
288                              struct cl_io *io)
289 {
290         struct lov_stripe_md *lsm;
291         int result;
292         ENTRY;
293
294         LASSERT(lio->lis_object != NULL);
295         lsm = lio->lis_object->lo_lsm;
296
297         /*
298          * Need to be optimized, we can't afford to allocate a piece of memory
299          * when writing a page. -jay
300          */
301         OBD_ALLOC_LARGE(lio->lis_subs,
302                         lsm->lsm_stripe_count * sizeof lio->lis_subs[0]);
303         if (lio->lis_subs != NULL) {
304                 lio->lis_nr_subios = lio->lis_stripe_count;
305                 lio->lis_single_subio_index = -1;
306                 lio->lis_active_subios = 0;
307                 result = 0;
308         } else
309                 result = -ENOMEM;
310         RETURN(result);
311 }
312
313 static int lov_io_slice_init(struct lov_io *lio,
314                              struct lov_object *obj, struct cl_io *io)
315 {
316         ENTRY;
317
318         io->ci_result = 0;
319         lio->lis_object = obj;
320
321         LASSERT(obj->lo_lsm != NULL);
322         lio->lis_stripe_count = obj->lo_lsm->lsm_stripe_count;
323
324         switch (io->ci_type) {
325         case CIT_READ:
326         case CIT_WRITE:
327                 lio->lis_pos = io->u.ci_rw.crw_pos;
328                 lio->lis_endpos = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count;
329                 lio->lis_io_endpos = lio->lis_endpos;
330                 if (cl_io_is_append(io)) {
331                         LASSERT(io->ci_type == CIT_WRITE);
332
333                         /* If there is LOV EA hole, then we may cannot locate
334                          * the current file-tail exactly. */
335                         if (unlikely(obj->lo_lsm->lsm_pattern &
336                                      LOV_PATTERN_F_HOLE))
337                                 RETURN(-EIO);
338
339                         lio->lis_pos = 0;
340                         lio->lis_endpos = OBD_OBJECT_EOF;
341                 }
342                 break;
343
344         case CIT_SETATTR:
345                 if (cl_io_is_trunc(io))
346                         lio->lis_pos = io->u.ci_setattr.sa_attr.lvb_size;
347                 else
348                         lio->lis_pos = 0;
349                 lio->lis_endpos = OBD_OBJECT_EOF;
350                 break;
351
352         case CIT_FAULT: {
353                 pgoff_t index = io->u.ci_fault.ft_index;
354                 lio->lis_pos = cl_offset(io->ci_obj, index);
355                 lio->lis_endpos = cl_offset(io->ci_obj, index + 1);
356                 break;
357         }
358
359         case CIT_FSYNC: {
360                 lio->lis_pos = io->u.ci_fsync.fi_start;
361                 lio->lis_endpos = io->u.ci_fsync.fi_end;
362                 break;
363         }
364
365         case CIT_MISC:
366                 lio->lis_pos = 0;
367                 lio->lis_endpos = OBD_OBJECT_EOF;
368                 break;
369
370         default:
371                 LBUG();
372         }
373
374         RETURN(0);
375 }
376
377 static void lov_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
378 {
379         struct lov_io *lio = cl2lov_io(env, ios);
380         struct lov_object *lov = cl2lov(ios->cis_obj);
381         int i;
382
383         ENTRY;
384         if (lio->lis_subs != NULL) {
385                 for (i = 0; i < lio->lis_nr_subios; i++)
386                         lov_io_sub_fini(env, lio, &lio->lis_subs[i]);
387                 OBD_FREE_LARGE(lio->lis_subs,
388                          lio->lis_nr_subios * sizeof lio->lis_subs[0]);
389                 lio->lis_nr_subios = 0;
390         }
391
392         LASSERT(atomic_read(&lov->lo_active_ios) > 0);
393         if (atomic_dec_and_test(&lov->lo_active_ios))
394                 wake_up_all(&lov->lo_waitq);
395         EXIT;
396 }
397
398 static loff_t lov_offset_mod(loff_t val, int delta)
399 {
400         if (val != OBD_OBJECT_EOF)
401                 val += delta;
402         return val;
403 }
404
405 static int lov_io_iter_init(const struct lu_env *env,
406                             const struct cl_io_slice *ios)
407 {
408         struct lov_io        *lio = cl2lov_io(env, ios);
409         struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
410         struct lov_io_sub    *sub;
411         loff_t endpos;
412         loff_t start;
413         loff_t end;
414         int stripe;
415         int rc = 0;
416
417         ENTRY;
418         endpos = lov_offset_mod(lio->lis_endpos, -1);
419         for (stripe = 0; stripe < lio->lis_stripe_count; stripe++) {
420                 if (!lov_stripe_intersects(lsm, stripe, lio->lis_pos,
421                                            endpos, &start, &end))
422                         continue;
423
424                 if (unlikely(lov_r0(lio->lis_object)->lo_sub[stripe] == NULL)) {
425                         if (ios->cis_io->ci_type == CIT_READ ||
426                             ios->cis_io->ci_type == CIT_WRITE ||
427                             ios->cis_io->ci_type == CIT_FAULT)
428                                 RETURN(-EIO);
429
430                         continue;
431                 }
432
433                 end = lov_offset_mod(end, +1);
434                 sub = lov_sub_get(env, lio, stripe);
435                 if (!IS_ERR(sub)) {
436                         lov_io_sub_inherit(sub->sub_io, lio, stripe,
437                                            start, end);
438                         rc = cl_io_iter_init(sub->sub_env, sub->sub_io);
439                         lov_sub_put(sub);
440                         CDEBUG(D_VFSTRACE, "shrink: %d ["LPU64", "LPU64")\n",
441                                stripe, start, end);
442                 } else
443                         rc = PTR_ERR(sub);
444
445                 if (!rc)
446                         list_add_tail(&sub->sub_linkage, &lio->lis_active);
447                 else
448                         break;
449         }
450         RETURN(rc);
451 }
452
453 static int lov_io_rw_iter_init(const struct lu_env *env,
454                                const struct cl_io_slice *ios)
455 {
456         struct lov_io        *lio = cl2lov_io(env, ios);
457         struct cl_io         *io  = ios->cis_io;
458         struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
459         loff_t start = io->u.ci_rw.crw_pos;
460         loff_t next;
461         unsigned long ssize = lsm->lsm_stripe_size;
462
463         LASSERT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
464         ENTRY;
465
466         /* fast path for common case. */
467         if (lio->lis_nr_subios != 1 && !cl_io_is_append(io)) {
468
469                 lov_do_div64(start, ssize);
470                 next = (start + 1) * ssize;
471                 if (next <= start * ssize)
472                         next = ~0ull;
473
474                 io->ci_continue = next < lio->lis_io_endpos;
475                 io->u.ci_rw.crw_count = min_t(loff_t, lio->lis_io_endpos,
476                                               next) - io->u.ci_rw.crw_pos;
477                 lio->lis_pos    = io->u.ci_rw.crw_pos;
478                 lio->lis_endpos = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count;
479                 CDEBUG(D_VFSTRACE, "stripe: "LPU64" chunk: ["LPU64", "LPU64") "
480                        LPU64"\n", (__u64)start, lio->lis_pos, lio->lis_endpos,
481                        (__u64)lio->lis_io_endpos);
482         }
483         /*
484          * XXX The following call should be optimized: we know, that
485          * [lio->lis_pos, lio->lis_endpos) intersects with exactly one stripe.
486          */
487         RETURN(lov_io_iter_init(env, ios));
488 }
489
490 static int lov_io_call(const struct lu_env *env, struct lov_io *lio,
491                        int (*iofunc)(const struct lu_env *, struct cl_io *))
492 {
493         struct cl_io *parent = lio->lis_cl.cis_io;
494         struct lov_io_sub *sub;
495         int rc = 0;
496
497         ENTRY;
498         list_for_each_entry(sub, &lio->lis_active, sub_linkage) {
499                 lov_sub_enter(sub);
500                 rc = iofunc(sub->sub_env, sub->sub_io);
501                 lov_sub_exit(sub);
502                 if (rc)
503                         break;
504
505                 if (parent->ci_result == 0)
506                         parent->ci_result = sub->sub_io->ci_result;
507         }
508         RETURN(rc);
509 }
510
511 static int lov_io_lock(const struct lu_env *env, const struct cl_io_slice *ios)
512 {
513         ENTRY;
514         RETURN(lov_io_call(env, cl2lov_io(env, ios), cl_io_lock));
515 }
516
517 static int lov_io_start(const struct lu_env *env, const struct cl_io_slice *ios)
518 {
519         ENTRY;
520         RETURN(lov_io_call(env, cl2lov_io(env, ios), cl_io_start));
521 }
522
523 static int lov_io_end_wrapper(const struct lu_env *env, struct cl_io *io)
524 {
525         ENTRY;
526         /*
527          * It's possible that lov_io_start() wasn't called against this
528          * sub-io, either because previous sub-io failed, or upper layer
529          * completed IO.
530          */
531         if (io->ci_state == CIS_IO_GOING)
532                 cl_io_end(env, io);
533         else
534                 io->ci_state = CIS_IO_FINISHED;
535         RETURN(0);
536 }
537
538 static int lov_io_iter_fini_wrapper(const struct lu_env *env, struct cl_io *io)
539 {
540         cl_io_iter_fini(env, io);
541         RETURN(0);
542 }
543
544 static int lov_io_unlock_wrapper(const struct lu_env *env, struct cl_io *io)
545 {
546         cl_io_unlock(env, io);
547         RETURN(0);
548 }
549
550 static void lov_io_end(const struct lu_env *env, const struct cl_io_slice *ios)
551 {
552         int rc;
553
554         rc = lov_io_call(env, cl2lov_io(env, ios), lov_io_end_wrapper);
555         LASSERT(rc == 0);
556 }
557
558 static void lov_io_iter_fini(const struct lu_env *env,
559                              const struct cl_io_slice *ios)
560 {
561         struct lov_io *lio = cl2lov_io(env, ios);
562         int rc;
563
564         ENTRY;
565         rc = lov_io_call(env, lio, lov_io_iter_fini_wrapper);
566         LASSERT(rc == 0);
567         while (!list_empty(&lio->lis_active))
568                 list_del_init(lio->lis_active.next);
569         EXIT;
570 }
571
572 static void lov_io_unlock(const struct lu_env *env,
573                           const struct cl_io_slice *ios)
574 {
575         int rc;
576
577         ENTRY;
578         rc = lov_io_call(env, cl2lov_io(env, ios), lov_io_unlock_wrapper);
579         LASSERT(rc == 0);
580         EXIT;
581 }
582
583 static int lov_io_read_ahead(const struct lu_env *env,
584                              const struct cl_io_slice *ios,
585                              pgoff_t start, struct cl_read_ahead *ra)
586 {
587         struct lov_io           *lio = cl2lov_io(env, ios);
588         struct lov_object       *loo = lio->lis_object;
589         struct cl_object        *obj = lov2cl(loo);
590         struct lov_layout_raid0 *r0 = lov_r0(loo);
591         struct lov_io_sub       *sub;
592         loff_t                   suboff;
593         pgoff_t                  ra_end;
594         unsigned int             pps; /* pages per stripe */
595         int                      stripe;
596         int                      rc;
597         ENTRY;
598
599         stripe = lov_stripe_number(loo->lo_lsm, cl_offset(obj, start));
600         if (unlikely(r0->lo_sub[stripe] == NULL))
601                 RETURN(-EIO);
602
603         sub = lov_sub_get(env, lio, stripe);
604
605         lov_stripe_offset(loo->lo_lsm, cl_offset(obj, start), stripe, &suboff);
606         rc = cl_io_read_ahead(sub->sub_env, sub->sub_io,
607                               cl_index(lovsub2cl(r0->lo_sub[stripe]), suboff),
608                               ra);
609         lov_sub_put(sub);
610
611         CDEBUG(D_READA, DFID " cra_end = %lu, stripes = %d, rc = %d\n",
612                PFID(lu_object_fid(lov2lu(loo))), ra->cra_end, r0->lo_nr, rc);
613         if (rc != 0)
614                 RETURN(rc);
615
616         /**
617          * Adjust the stripe index by layout of raid0. ra->cra_end is the maximum
618          * page index covered by an underlying DLM lock.
619          * This function converts cra_end from stripe level to file level, and
620          * make sure it's not beyond stripe boundary.
621          */
622         if (r0->lo_nr == 1) /* single stripe file */
623                 RETURN(0);
624
625         /* cra_end is stripe level, convert it into file level */
626         ra_end = ra->cra_end;
627         if (ra_end != CL_PAGE_EOF)
628                 ra_end = lov_stripe_pgoff(loo->lo_lsm, ra_end, stripe);
629
630         pps = loo->lo_lsm->lsm_stripe_size >> PAGE_CACHE_SHIFT;
631
632         CDEBUG(D_READA, DFID " max_index = %lu, pps = %u, "
633                "stripe_size = %u, stripe no = %u, start index = %lu\n",
634                PFID(lu_object_fid(lov2lu(loo))), ra_end, pps,
635                loo->lo_lsm->lsm_stripe_size, stripe, start);
636
637         /* never exceed the end of the stripe */
638         ra->cra_end = min_t(pgoff_t, ra_end, start + pps - start % pps - 1);
639         RETURN(0);
640 }
641
642 /**
643  * lov implementation of cl_operations::cio_submit() method. It takes a list
644  * of pages in \a queue, splits it into per-stripe sub-lists, invokes
645  * cl_io_submit() on underlying devices to submit sub-lists, and then splices
646  * everything back.
647  *
648  * Major complication of this function is a need to handle memory cleansing:
649  * cl_io_submit() is called to write out pages as a part of VM memory
650  * reclamation, and hence it may not fail due to memory shortages (system
651  * dead-locks otherwise). To deal with this, some resources (sub-lists,
652  * sub-environment, etc.) are allocated per-device on "startup" (i.e., in a
653  * not-memory cleansing context), and in case of memory shortage, these
654  * pre-allocated resources are used by lov_io_submit() under
655  * lov_device::ld_mutex mutex.
656  */
657 static int lov_io_submit(const struct lu_env *env,
658                          const struct cl_io_slice *ios,
659                          enum cl_req_type crt, struct cl_2queue *queue)
660 {
661         struct cl_page_list     *qin = &queue->c2_qin;
662         struct lov_io           *lio = cl2lov_io(env, ios);
663         struct lov_io_sub       *sub;
664         struct cl_page_list     *plist = &lov_env_info(env)->lti_plist;
665         struct cl_page          *page;
666         int stripe;
667         int rc = 0;
668         ENTRY;
669
670         if (lio->lis_active_subios == 1) {
671                 int idx = lio->lis_single_subio_index;
672
673                 LASSERT(idx < lio->lis_nr_subios);
674                 sub = lov_sub_get(env, lio, idx);
675                 LASSERT(!IS_ERR(sub));
676                 LASSERT(sub->sub_io == &lio->lis_single_subio);
677                 rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
678                                      crt, queue);
679                 lov_sub_put(sub);
680                 RETURN(rc);
681         }
682
683         LASSERT(lio->lis_subs != NULL);
684
685         cl_page_list_init(plist);
686         while (qin->pl_nr > 0) {
687                 struct cl_2queue  *cl2q = &lov_env_info(env)->lti_cl2q;
688
689                 cl_2queue_init(cl2q);
690
691                 page = cl_page_list_first(qin);
692                 cl_page_list_move(&cl2q->c2_qin, qin, page);
693
694                 stripe = lov_page_stripe(page);
695                 while (qin->pl_nr > 0) {
696                         page = cl_page_list_first(qin);
697                         if (stripe != lov_page_stripe(page))
698                                 break;
699
700                         cl_page_list_move(&cl2q->c2_qin, qin, page);
701                 }
702
703                 sub = lov_sub_get(env, lio, stripe);
704                 if (!IS_ERR(sub)) {
705                         rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
706                                              crt, cl2q);
707                         lov_sub_put(sub);
708                 } else {
709                         rc = PTR_ERR(sub);
710                 }
711
712                 cl_page_list_splice(&cl2q->c2_qin, plist);
713                 cl_page_list_splice(&cl2q->c2_qout, &queue->c2_qout);
714                 cl_2queue_fini(env, cl2q);
715
716                 if (rc != 0)
717                         break;
718         }
719
720         cl_page_list_splice(plist, qin);
721         cl_page_list_fini(env, plist);
722
723         RETURN(rc);
724 }
725
726 static int lov_io_commit_async(const struct lu_env *env,
727                                const struct cl_io_slice *ios,
728                                struct cl_page_list *queue, int from, int to,
729                                cl_commit_cbt cb)
730 {
731         struct cl_page_list *plist = &lov_env_info(env)->lti_plist;
732         struct lov_io     *lio = cl2lov_io(env, ios);
733         struct lov_io_sub *sub;
734         struct cl_page *page;
735         int rc = 0;
736         ENTRY;
737
738         if (lio->lis_active_subios == 1) {
739                 int idx = lio->lis_single_subio_index;
740
741                 LASSERT(idx < lio->lis_nr_subios);
742                 sub = lov_sub_get(env, lio, idx);
743                 LASSERT(!IS_ERR(sub));
744                 LASSERT(sub->sub_io == &lio->lis_single_subio);
745                 rc = cl_io_commit_async(sub->sub_env, sub->sub_io, queue,
746                                         from, to, cb);
747                 lov_sub_put(sub);
748                 RETURN(rc);
749         }
750
751         LASSERT(lio->lis_subs != NULL);
752
753         cl_page_list_init(plist);
754         while (queue->pl_nr > 0) {
755                 int stripe_to = to;
756                 int stripe;
757
758                 LASSERT(plist->pl_nr == 0);
759                 page = cl_page_list_first(queue);
760                 cl_page_list_move(plist, queue, page);
761
762                 stripe = lov_page_stripe(page);
763                 while (queue->pl_nr > 0) {
764                         page = cl_page_list_first(queue);
765                         if (stripe != lov_page_stripe(page))
766                                 break;
767
768                         cl_page_list_move(plist, queue, page);
769                 }
770
771                 if (queue->pl_nr > 0) /* still has more pages */
772                         stripe_to = PAGE_SIZE;
773
774                 sub = lov_sub_get(env, lio, stripe);
775                 if (!IS_ERR(sub)) {
776                         rc = cl_io_commit_async(sub->sub_env, sub->sub_io,
777                                                 plist, from, stripe_to, cb);
778                         lov_sub_put(sub);
779                 } else {
780                         rc = PTR_ERR(sub);
781                         break;
782                 }
783
784                 if (plist->pl_nr > 0) /* short write */
785                         break;
786
787                 from = 0;
788         }
789
790         /* for error case, add the page back into the qin list */
791         LASSERT(ergo(rc == 0, plist->pl_nr == 0));
792         while (plist->pl_nr > 0) {
793                 /* error occurred, add the uncommitted pages back into queue */
794                 page = cl_page_list_last(plist);
795                 cl_page_list_move_head(queue, plist, page);
796         }
797
798         RETURN(rc);
799 }
800
801 static int lov_io_fault_start(const struct lu_env *env,
802                               const struct cl_io_slice *ios)
803 {
804         struct cl_fault_io *fio;
805         struct lov_io      *lio;
806         struct lov_io_sub  *sub;
807
808         ENTRY;
809         fio = &ios->cis_io->u.ci_fault;
810         lio = cl2lov_io(env, ios);
811         sub = lov_sub_get(env, lio, lov_page_stripe(fio->ft_page));
812         sub->sub_io->u.ci_fault.ft_nob = fio->ft_nob;
813         lov_sub_put(sub);
814         RETURN(lov_io_start(env, ios));
815 }
816
817 static void lov_io_fsync_end(const struct lu_env *env,
818                              const struct cl_io_slice *ios)
819 {
820         struct lov_io *lio = cl2lov_io(env, ios);
821         struct lov_io_sub *sub;
822         unsigned int *written = &ios->cis_io->u.ci_fsync.fi_nr_written;
823         ENTRY;
824
825         *written = 0;
826         list_for_each_entry(sub, &lio->lis_active, sub_linkage) {
827                 struct cl_io *subio = sub->sub_io;
828
829                 lov_sub_enter(sub);
830                 lov_io_end_wrapper(sub->sub_env, subio);
831                 lov_sub_exit(sub);
832
833                 if (subio->ci_result == 0)
834                         *written += subio->u.ci_fsync.fi_nr_written;
835         }
836         RETURN_EXIT;
837 }
838
839 static const struct cl_io_operations lov_io_ops = {
840         .op = {
841                 [CIT_READ] = {
842                         .cio_fini      = lov_io_fini,
843                         .cio_iter_init = lov_io_rw_iter_init,
844                         .cio_iter_fini = lov_io_iter_fini,
845                         .cio_lock      = lov_io_lock,
846                         .cio_unlock    = lov_io_unlock,
847                         .cio_start     = lov_io_start,
848                         .cio_end       = lov_io_end
849                 },
850                 [CIT_WRITE] = {
851                         .cio_fini      = lov_io_fini,
852                         .cio_iter_init = lov_io_rw_iter_init,
853                         .cio_iter_fini = lov_io_iter_fini,
854                         .cio_lock      = lov_io_lock,
855                         .cio_unlock    = lov_io_unlock,
856                         .cio_start     = lov_io_start,
857                         .cio_end       = lov_io_end
858                 },
859                 [CIT_SETATTR] = {
860                         .cio_fini      = lov_io_fini,
861                         .cio_iter_init = lov_io_iter_init,
862                         .cio_iter_fini = lov_io_iter_fini,
863                         .cio_lock      = lov_io_lock,
864                         .cio_unlock    = lov_io_unlock,
865                         .cio_start     = lov_io_start,
866                         .cio_end       = lov_io_end
867                 },
868                 [CIT_FAULT] = {
869                         .cio_fini      = lov_io_fini,
870                         .cio_iter_init = lov_io_iter_init,
871                         .cio_iter_fini = lov_io_iter_fini,
872                         .cio_lock      = lov_io_lock,
873                         .cio_unlock    = lov_io_unlock,
874                         .cio_start     = lov_io_fault_start,
875                         .cio_end       = lov_io_end
876                 },
877                 [CIT_FSYNC] = {
878                         .cio_fini      = lov_io_fini,
879                         .cio_iter_init = lov_io_iter_init,
880                         .cio_iter_fini = lov_io_iter_fini,
881                         .cio_lock      = lov_io_lock,
882                         .cio_unlock    = lov_io_unlock,
883                         .cio_start     = lov_io_start,
884                         .cio_end       = lov_io_fsync_end
885                 },
886                 [CIT_MISC] = {
887                         .cio_fini      = lov_io_fini
888                 }
889         },
890         .cio_read_ahead                = lov_io_read_ahead,
891         .cio_submit                    = lov_io_submit,
892         .cio_commit_async              = lov_io_commit_async,
893 };
894
895 /*****************************************************************************
896  *
897  * Empty lov io operations.
898  *
899  */
900
901 static void lov_empty_io_fini(const struct lu_env *env,
902                               const struct cl_io_slice *ios)
903 {
904         struct lov_object *lov = cl2lov(ios->cis_obj);
905         ENTRY;
906
907         if (atomic_dec_and_test(&lov->lo_active_ios))
908                 wake_up_all(&lov->lo_waitq);
909         EXIT;
910 }
911
912 static void lov_empty_impossible(const struct lu_env *env,
913                                  struct cl_io_slice *ios)
914 {
915         LBUG();
916 }
917
918 #define LOV_EMPTY_IMPOSSIBLE ((void *)lov_empty_impossible)
919
920 /**
921  * An io operation vector for files without stripes.
922  */
923 static const struct cl_io_operations lov_empty_io_ops = {
924         .op = {
925                 [CIT_READ] = {
926                         .cio_fini       = lov_empty_io_fini,
927 #if 0
928                         .cio_iter_init  = LOV_EMPTY_IMPOSSIBLE,
929                         .cio_lock       = LOV_EMPTY_IMPOSSIBLE,
930                         .cio_start      = LOV_EMPTY_IMPOSSIBLE,
931                         .cio_end        = LOV_EMPTY_IMPOSSIBLE
932 #endif
933                 },
934                 [CIT_WRITE] = {
935                         .cio_fini      = lov_empty_io_fini,
936                         .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
937                         .cio_lock      = LOV_EMPTY_IMPOSSIBLE,
938                         .cio_start     = LOV_EMPTY_IMPOSSIBLE,
939                         .cio_end       = LOV_EMPTY_IMPOSSIBLE
940                 },
941                 [CIT_SETATTR] = {
942                         .cio_fini      = lov_empty_io_fini,
943                         .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
944                         .cio_lock      = LOV_EMPTY_IMPOSSIBLE,
945                         .cio_start     = LOV_EMPTY_IMPOSSIBLE,
946                         .cio_end       = LOV_EMPTY_IMPOSSIBLE
947                 },
948                 [CIT_FAULT] = {
949                         .cio_fini      = lov_empty_io_fini,
950                         .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
951                         .cio_lock      = LOV_EMPTY_IMPOSSIBLE,
952                         .cio_start     = LOV_EMPTY_IMPOSSIBLE,
953                         .cio_end       = LOV_EMPTY_IMPOSSIBLE
954                 },
955                 [CIT_FSYNC] = {
956                         .cio_fini      = lov_empty_io_fini
957                 },
958                 [CIT_MISC] = {
959                         .cio_fini      = lov_empty_io_fini
960                 }
961         },
962         .cio_submit                    = LOV_EMPTY_IMPOSSIBLE,
963         .cio_commit_async              = LOV_EMPTY_IMPOSSIBLE
964 };
965
966 int lov_io_init_raid0(const struct lu_env *env, struct cl_object *obj,
967                       struct cl_io *io)
968 {
969         struct lov_io       *lio = lov_env_io(env);
970         struct lov_object   *lov = cl2lov(obj);
971
972         ENTRY;
973         INIT_LIST_HEAD(&lio->lis_active);
974         io->ci_result = lov_io_slice_init(lio, lov, io);
975         if (io->ci_result != 0)
976                 RETURN(io->ci_result);
977
978         if (io->ci_result == 0) {
979                 io->ci_result = lov_io_subio_init(env, lio, io);
980                 if (io->ci_result == 0) {
981                         cl_io_slice_add(io, &lio->lis_cl, obj, &lov_io_ops);
982                         atomic_inc(&lov->lo_active_ios);
983                 }
984         }
985         RETURN(io->ci_result);
986 }
987
988 int lov_io_init_empty(const struct lu_env *env, struct cl_object *obj,
989                       struct cl_io *io)
990 {
991         struct lov_object *lov = cl2lov(obj);
992         struct lov_io *lio = lov_env_io(env);
993         int result;
994         ENTRY;
995
996         lio->lis_object = lov;
997         switch (io->ci_type) {
998         default:
999                 LBUG();
1000         case CIT_MISC:
1001         case CIT_READ:
1002                 result = 0;
1003                 break;
1004         case CIT_FSYNC:
1005         case CIT_SETATTR:
1006                 result = +1;
1007                 break;
1008         case CIT_WRITE:
1009                 result = -EBADF;
1010                 break;
1011         case CIT_FAULT:
1012                 result = -EFAULT;
1013                 CERROR("Page fault on a file without stripes: "DFID"\n",
1014                        PFID(lu_object_fid(&obj->co_lu)));
1015                 break;
1016         }
1017         if (result == 0) {
1018                 cl_io_slice_add(io, &lio->lis_cl, obj, &lov_empty_io_ops);
1019                 atomic_inc(&lov->lo_active_ios);
1020         }
1021
1022         io->ci_result = result < 0 ? result : 0;
1023         RETURN(result != 0);
1024 }
1025
1026 int lov_io_init_released(const struct lu_env *env, struct cl_object *obj,
1027                         struct cl_io *io)
1028 {
1029         struct lov_object *lov = cl2lov(obj);
1030         struct lov_io *lio = lov_env_io(env);
1031         int result;
1032         ENTRY;
1033
1034         LASSERT(lov->lo_lsm != NULL);
1035         lio->lis_object = lov;
1036
1037         switch (io->ci_type) {
1038         default:
1039                 LASSERTF(0, "invalid type %d\n", io->ci_type);
1040         case CIT_MISC:
1041         case CIT_FSYNC:
1042                 result = 1;
1043                 break;
1044         case CIT_SETATTR:
1045                 /* the truncate to 0 is managed by MDT:
1046                  * - in open, for open O_TRUNC
1047                  * - in setattr, for truncate
1048                  */
1049                 /* the truncate is for size > 0 so triggers a restore */
1050                 if (cl_io_is_trunc(io))
1051                         io->ci_restore_needed = 1;
1052                 result = -ENODATA;
1053                 break;
1054         case CIT_READ:
1055         case CIT_WRITE:
1056         case CIT_FAULT:
1057                 io->ci_restore_needed = 1;
1058                 result = -ENODATA;
1059                 break;
1060         }
1061         if (result == 0) {
1062                 cl_io_slice_add(io, &lio->lis_cl, obj, &lov_empty_io_ops);
1063                 atomic_inc(&lov->lo_active_ios);
1064         }
1065
1066         io->ci_result = result < 0 ? result : 0;
1067         RETURN(result != 0);
1068 }
1069 /** @} lov */