Whamcloud - gitweb
b=11063 set mtime to past under PW EOF extent lock (v6)
[fs/lustre-release.git] / lustre / lov / lov_io.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Implementation of cl_io for LOV layer.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_LOV
42
43 #include "lov_cl_internal.h"
44
45 /** \addtogroup lov
46  *  @{
47  */
48
49 static void lov_sub_enter(struct lov_io_sub *sub)
50 {
51         ENTRY;
52         if (sub->sub_reenter++ == 0) {
53                 sub->sub_cookie = cl_env_reenter();
54                 cl_env_implant(sub->sub_env, &sub->sub_refcheck2);
55         }
56         EXIT;
57 }
58
59 static void lov_sub_exit(struct lov_io_sub *sub)
60 {
61         ENTRY;
62         if (--sub->sub_reenter == 0) {
63                 cl_env_unplant(sub->sub_env, &sub->sub_refcheck2);
64                 cl_env_reexit(sub->sub_cookie);
65         }
66         EXIT;
67 }
68
69 static void lov_io_sub_fini(const struct lu_env *env, struct lov_io *lio,
70                             struct lov_io_sub *sub)
71 {
72         ENTRY;
73         if (sub->sub_io != NULL) {
74                 if (sub->sub_io_initialized) {
75                         lov_sub_enter(sub);
76                         cl_io_fini(sub->sub_env, sub->sub_io);
77                         lov_sub_exit(sub);
78                         sub->sub_io_initialized = 0;
79                         lio->lis_active_subios--;
80                 }
81                 if (sub->sub_stripe == lio->lis_single_subio_index)
82                         lio->lis_single_subio_index = -1;
83                 else if (!sub->sub_borrowed)
84                         OBD_FREE_PTR(sub->sub_io);
85                 sub->sub_io = NULL;
86         }
87         if (sub->sub_env != NULL && !IS_ERR(sub->sub_env)) {
88                 if (!sub->sub_borrowed)
89                         cl_env_put(sub->sub_env, &sub->sub_refcheck);
90                 sub->sub_env = NULL;
91         }
92         EXIT;
93 }
94
95 static void lov_io_sub_inherit(struct cl_io *io, struct lov_io *lio,
96                                int stripe, loff_t start, loff_t end)
97 {
98         struct lov_stripe_md *lsm    = lov_r0(lio->lis_object)->lo_lsm;
99         struct cl_io         *parent = lio->lis_cl.cis_io;
100
101         switch(io->ci_type) {
102         case CIT_SETATTR: {
103                 io->u.ci_setattr.sa_attr = parent->u.ci_setattr.sa_attr;
104                 io->u.ci_setattr.sa_valid = parent->u.ci_setattr.sa_valid;
105                 io->u.ci_setattr.sa_capa = parent->u.ci_setattr.sa_capa;
106                 if (cl_io_is_trunc(io)) {
107                         loff_t new_size = parent->u.ci_setattr.sa_attr.lvb_size;
108
109                         new_size = lov_size_to_stripe(lsm, new_size, stripe);
110                         io->u.ci_setattr.sa_attr.lvb_size = new_size;
111                 }
112                 break;
113         }
114         case CIT_FAULT: {
115                 struct cl_object *obj = parent->ci_obj;
116                 loff_t off = cl_offset(obj, parent->u.ci_fault.ft_index);
117
118                 io->u.ci_fault = parent->u.ci_fault;
119                 off = lov_size_to_stripe(lsm, off, stripe);
120                 io->u.ci_fault.ft_index = cl_index(obj, off);
121                 break;
122         }
123         case CIT_READ:
124         case CIT_WRITE: {
125                 if (cl_io_is_append(parent)) {
126                         io->u.ci_wr.wr_append = 1;
127                 } else {
128                         io->u.ci_rw.crw_pos = start;
129                         io->u.ci_rw.crw_count = end - start;
130                 }
131                 break;
132         }
133         default:
134                 break;
135         }
136 }
137
138 static int lov_io_sub_init(const struct lu_env *env, struct lov_io *lio,
139                            struct lov_io_sub *sub)
140 {
141         struct lov_object *lov = lio->lis_object;
142         struct lov_device *ld  = lu2lov_dev(lov2cl(lov)->co_lu.lo_dev);
143         struct cl_io      *sub_io;
144         struct cl_object  *sub_obj;
145         struct cl_io      *io  = lio->lis_cl.cis_io;
146
147         int stripe = sub->sub_stripe;
148         int result;
149
150         LASSERT(sub->sub_io == NULL);
151         LASSERT(sub->sub_env == NULL);
152         LASSERT(sub->sub_stripe < lio->lis_stripe_count);
153         ENTRY;
154
155         result = 0;
156         sub->sub_io_initialized = 0;
157         sub->sub_borrowed = 0;
158
159         if (lio->lis_mem_frozen) {
160                 LASSERT(cfs_mutex_is_locked(&ld->ld_mutex));
161                 sub->sub_io  = &ld->ld_emrg[stripe]->emrg_subio;
162                 sub->sub_env = ld->ld_emrg[stripe]->emrg_env;
163                 sub->sub_borrowed = 1;
164         } else {
165                 void *cookie;
166
167                 /* obtain new environment */
168                 cookie = cl_env_reenter();
169                 sub->sub_env = cl_env_get(&sub->sub_refcheck);
170                 cl_env_reexit(cookie);
171                 if (IS_ERR(sub->sub_env))
172                         result = PTR_ERR(sub->sub_env);
173
174                 if (result == 0) {
175                         /*
176                          * First sub-io. Use ->lis_single_subio to
177                          * avoid dynamic allocation.
178                          */
179                         if (lio->lis_active_subios == 0) {
180                                 sub->sub_io = &lio->lis_single_subio;
181                                 lio->lis_single_subio_index = stripe;
182                         } else {
183                                 OBD_ALLOC_PTR(sub->sub_io);
184                                 if (sub->sub_io == NULL)
185                                         result = -ENOMEM;
186                         }
187                 }
188         }
189
190         if (result == 0) {
191                 sub_obj = lovsub2cl(lov_r0(lov)->lo_sub[stripe]);
192                 sub_io  = sub->sub_io;
193
194                 sub_io->ci_obj    = sub_obj;
195                 sub_io->ci_result = 0;
196
197                 sub_io->ci_parent  = io;
198                 sub_io->ci_lockreq = io->ci_lockreq;
199                 sub_io->ci_type    = io->ci_type;
200                 sub_io->ci_no_srvlock = io->ci_no_srvlock;
201
202                 lov_sub_enter(sub);
203                 result = cl_io_sub_init(sub->sub_env, sub_io,
204                                         io->ci_type, sub_obj);
205                 lov_sub_exit(sub);
206                 if (result >= 0) {
207                         lio->lis_active_subios++;
208                         sub->sub_io_initialized = 1;
209                         result = 0;
210                 }
211         }
212         if (result != 0)
213                 lov_io_sub_fini(env, lio, sub);
214         RETURN(result);
215 }
216
217 struct lov_io_sub *lov_sub_get(const struct lu_env *env,
218                                struct lov_io *lio, int stripe)
219 {
220         int rc;
221         struct lov_io_sub *sub = &lio->lis_subs[stripe];
222
223         LASSERT(stripe < lio->lis_stripe_count);
224         ENTRY;
225
226         if (!sub->sub_io_initialized) {
227                 sub->sub_stripe = stripe;
228                 rc = lov_io_sub_init(env, lio, sub);
229         } else
230                 rc = 0;
231         if (rc == 0)
232                 lov_sub_enter(sub);
233         else
234                 sub = ERR_PTR(rc);
235         RETURN(sub);
236 }
237
238 void lov_sub_put(struct lov_io_sub *sub)
239 {
240         lov_sub_exit(sub);
241 }
242
243 /*****************************************************************************
244  *
245  * Lov io operations.
246  *
247  */
248
249 static int lov_page_stripe(const struct cl_page *page)
250 {
251         struct lovsub_object *subobj;
252
253         ENTRY;
254         subobj = lu2lovsub(
255                 lu_object_locate(page->cp_child->cp_obj->co_lu.lo_header,
256                                  &lovsub_device_type));
257         LASSERT(subobj != NULL);
258         RETURN(subobj->lso_index);
259 }
260
261 struct lov_io_sub *lov_page_subio(const struct lu_env *env, struct lov_io *lio,
262                                   const struct cl_page_slice *slice)
263 {
264         struct lov_stripe_md *lsm  = lov_r0(lio->lis_object)->lo_lsm;
265         struct cl_page       *page = slice->cpl_page;
266         int stripe;
267
268         LASSERT(lio->lis_cl.cis_io != NULL);
269         LASSERT(cl2lov(slice->cpl_obj) == lio->lis_object);
270         LASSERT(lsm != NULL);
271         LASSERT(lio->lis_nr_subios > 0);
272         ENTRY;
273
274         stripe = lov_page_stripe(page);
275         RETURN(lov_sub_get(env, lio, stripe));
276 }
277
278
279 static int lov_io_subio_init(const struct lu_env *env, struct lov_io *lio,
280                              struct cl_io *io)
281 {
282         struct lov_object    *lov = lio->lis_object;
283         struct lov_stripe_md *lsm = lov_r0(lov)->lo_lsm;
284         int result;
285
286         LASSERT(lio->lis_object != NULL);
287         ENTRY;
288
289         /*
290          * Need to be optimized, we can't afford to allocate a piece of memory
291          * when writing a page. -jay
292          */
293         OBD_ALLOC(lio->lis_subs,
294                   lsm->lsm_stripe_count * sizeof lio->lis_subs[0]);
295         if (lio->lis_subs != NULL) {
296                 lio->lis_nr_subios = lio->lis_stripe_count;
297                 lio->lis_single_subio_index = -1;
298                 lio->lis_active_subios = 0;
299                 result = 0;
300         } else
301                 result = -ENOMEM;
302         RETURN(result);
303 }
304
305 static void lov_io_slice_init(struct lov_io *lio,
306                               struct lov_object *obj, struct cl_io *io)
307 {
308         struct lov_stripe_md *lsm = lov_r0(obj)->lo_lsm;
309
310         LASSERT(lsm != NULL);
311         ENTRY;
312
313         io->ci_result = 0;
314         lio->lis_object = obj;
315         lio->lis_stripe_count = lsm->lsm_stripe_count;
316
317         switch (io->ci_type) {
318         case CIT_READ:
319         case CIT_WRITE:
320                 lio->lis_pos = io->u.ci_rw.crw_pos;
321                 lio->lis_endpos = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count;
322                 lio->lis_io_endpos = lio->lis_endpos;
323                 if (cl_io_is_append(io)) {
324                         LASSERT(io->ci_type == CIT_WRITE);
325                         lio->lis_pos = 0;
326                         lio->lis_endpos = OBD_OBJECT_EOF;
327                 }
328                 break;
329
330         case CIT_SETATTR:
331                 if (cl_io_is_trunc(io))
332                         lio->lis_pos = io->u.ci_setattr.sa_attr.lvb_size;
333                 else
334                         lio->lis_pos = 0;
335                 lio->lis_endpos = OBD_OBJECT_EOF;
336                 break;
337
338         case CIT_FAULT: {
339                 pgoff_t index = io->u.ci_fault.ft_index;
340                 lio->lis_pos = cl_offset(io->ci_obj, index);
341                 lio->lis_endpos = cl_offset(io->ci_obj, index + 1);
342                 break;
343         }
344
345         case CIT_MISC:
346                 lio->lis_pos = 0;
347                 lio->lis_endpos = OBD_OBJECT_EOF;
348                 break;
349
350         default:
351                 LBUG();
352         }
353
354         EXIT;
355 }
356
357 static void lov_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
358 {
359         struct lov_io *lio = cl2lov_io(env, ios);
360         int i;
361
362         ENTRY;
363         if (lio->lis_subs != NULL) {
364                 for (i = 0; i < lio->lis_nr_subios; i++)
365                         lov_io_sub_fini(env, lio, &lio->lis_subs[i]);
366                 OBD_FREE(lio->lis_subs,
367                          lio->lis_nr_subios * sizeof lio->lis_subs[0]);
368                 lio->lis_nr_subios = 0;
369         }
370         EXIT;
371 }
372
373 static obd_off lov_offset_mod(obd_off val, int delta)
374 {
375         if (val != OBD_OBJECT_EOF)
376                 val += delta;
377         return val;
378 }
379
380 static int lov_io_iter_init(const struct lu_env *env,
381                             const struct cl_io_slice *ios)
382 {
383         struct lov_io        *lio = cl2lov_io(env, ios);
384         struct lov_stripe_md *lsm = lov_r0(lio->lis_object)->lo_lsm;
385         struct lov_io_sub    *sub;
386         obd_off endpos;
387         obd_off start;
388         obd_off end;
389         int stripe;
390         int rc = 0;
391
392         ENTRY;
393         endpos = lov_offset_mod(lio->lis_endpos, -1);
394         for (stripe = 0; stripe < lio->lis_stripe_count; stripe++) {
395                 if (!lov_stripe_intersects(lsm, stripe, lio->lis_pos,
396                                            endpos, &start, &end))
397                         continue;
398
399                 end = lov_offset_mod(end, +1);
400                 sub = lov_sub_get(env, lio, stripe);
401                 if (!IS_ERR(sub)) {
402                         lov_io_sub_inherit(sub->sub_io, lio, stripe,
403                                            start, end);
404                         rc = cl_io_iter_init(sub->sub_env, sub->sub_io);
405                         lov_sub_put(sub);
406                         CDEBUG(D_VFSTRACE, "shrink: %i ["LPU64", "LPU64")\n",
407                                stripe, start, end);
408                 } else
409                         rc = PTR_ERR(sub);
410
411                 if (!rc)
412                         cfs_list_add_tail(&sub->sub_linkage, &lio->lis_active);
413                 else
414                         break;
415         }
416         RETURN(rc);
417 }
418
419 static int lov_io_rw_iter_init(const struct lu_env *env,
420                                const struct cl_io_slice *ios)
421 {
422         struct lov_io        *lio = cl2lov_io(env, ios);
423         struct cl_io         *io  = ios->cis_io;
424         struct lov_stripe_md *lsm = lov_r0(cl2lov(ios->cis_obj))->lo_lsm;
425         loff_t start = io->u.ci_rw.crw_pos;
426         loff_t next;
427         unsigned long ssize = lsm->lsm_stripe_size;
428
429         LASSERT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
430         ENTRY;
431
432         /* fast path for common case. */
433         if (lio->lis_nr_subios != 1 && !cl_io_is_append(io)) {
434
435                 do_div(start, ssize);
436                 next = (start + 1) * ssize;
437                 if (next <= start * ssize)
438                         next = ~0ull;
439
440                 io->ci_continue = next < lio->lis_io_endpos;
441                 io->u.ci_rw.crw_count = min_t(loff_t, lio->lis_io_endpos,
442                                               next) - io->u.ci_rw.crw_pos;
443                 lio->lis_pos    = io->u.ci_rw.crw_pos;
444                 lio->lis_endpos = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count;
445                 CDEBUG(D_VFSTRACE, "stripe: "LPU64" chunk: ["LPU64", "LPU64") "LPU64"\n",
446                        (__u64)start, lio->lis_pos, lio->lis_endpos,
447                        (__u64)lio->lis_io_endpos);
448         }
449         /*
450          * XXX The following call should be optimized: we know, that
451          * [lio->lis_pos, lio->lis_endpos) intersects with exactly one stripe.
452          */
453         RETURN(lov_io_iter_init(env, ios));
454 }
455
456 static int lov_io_call(const struct lu_env *env, struct lov_io *lio,
457                        int (*iofunc)(const struct lu_env *, struct cl_io *))
458 {
459         struct lov_io_sub *sub;
460         int rc = 0;
461
462         ENTRY;
463         cfs_list_for_each_entry(sub, &lio->lis_active, sub_linkage) {
464                 lov_sub_enter(sub);
465                 rc = iofunc(sub->sub_env, sub->sub_io);
466                 lov_sub_exit(sub);
467                 if (rc)
468                         break;
469         }
470         RETURN(rc);
471 }
472
473 static int lov_io_lock(const struct lu_env *env, const struct cl_io_slice *ios)
474 {
475         ENTRY;
476         RETURN(lov_io_call(env, cl2lov_io(env, ios), cl_io_lock));
477 }
478
479 static int lov_io_start(const struct lu_env *env, const struct cl_io_slice *ios)
480 {
481         ENTRY;
482         RETURN(lov_io_call(env, cl2lov_io(env, ios), cl_io_start));
483 }
484
485 static int lov_io_end_wrapper(const struct lu_env *env, struct cl_io *io)
486 {
487         ENTRY;
488         /*
489          * It's possible that lov_io_start() wasn't called against this
490          * sub-io, either because previous sub-io failed, or upper layer
491          * completed IO.
492          */
493         if (io->ci_state == CIS_IO_GOING)
494                 cl_io_end(env, io);
495         else
496                 io->ci_state = CIS_IO_FINISHED;
497         RETURN(0);
498 }
499
500 static int lov_io_iter_fini_wrapper(const struct lu_env *env, struct cl_io *io)
501 {
502         cl_io_iter_fini(env, io);
503         RETURN(0);
504 }
505
506 static int lov_io_unlock_wrapper(const struct lu_env *env, struct cl_io *io)
507 {
508         cl_io_unlock(env, io);
509         RETURN(0);
510 }
511
512 static void lov_io_end(const struct lu_env *env, const struct cl_io_slice *ios)
513 {
514         int rc;
515
516         rc = lov_io_call(env, cl2lov_io(env, ios), lov_io_end_wrapper);
517         LASSERT(rc == 0);
518 }
519
520 static void lov_io_iter_fini(const struct lu_env *env,
521                              const struct cl_io_slice *ios)
522 {
523         struct lov_io *lio = cl2lov_io(env, ios);
524         int rc;
525
526         ENTRY;
527         rc = lov_io_call(env, lio, lov_io_iter_fini_wrapper);
528         LASSERT(rc == 0);
529         while (!cfs_list_empty(&lio->lis_active))
530                 cfs_list_del_init(lio->lis_active.next);
531         EXIT;
532 }
533
534 static void lov_io_unlock(const struct lu_env *env,
535                           const struct cl_io_slice *ios)
536 {
537         int rc;
538
539         ENTRY;
540         rc = lov_io_call(env, cl2lov_io(env, ios), lov_io_unlock_wrapper);
541         LASSERT(rc == 0);
542         EXIT;
543 }
544
545
546 static struct cl_page_list *lov_io_submit_qin(struct lov_device *ld,
547                                               struct cl_page_list *qin,
548                                               int idx, int alloc)
549 {
550         return alloc ? &qin[idx] : &ld->ld_emrg[idx]->emrg_page_list;
551 }
552
553 /**
554  * lov implementation of cl_operations::cio_submit() method. It takes a list
555  * of pages in \a queue, splits it into per-stripe sub-lists, invokes
556  * cl_io_submit() on underlying devices to submit sub-lists, and then splices
557  * everything back.
558  *
559  * Major complication of this function is a need to handle memory cleansing:
560  * cl_io_submit() is called to write out pages as a part of VM memory
561  * reclamation, and hence it may not fail due to memory shortages (system
562  * dead-locks otherwise). To deal with this, some resources (sub-lists,
563  * sub-environment, etc.) are allocated per-device on "startup" (i.e., in a
564  * not-memory cleansing context), and in case of memory shortage, these
565  * pre-allocated resources are used by lov_io_submit() under
566  * lov_device::ld_mutex mutex.
567  */
568 static int lov_io_submit(const struct lu_env *env,
569                          const struct cl_io_slice *ios,
570                          enum cl_req_type crt, struct cl_2queue *queue,
571                          enum cl_req_priority priority)
572 {
573         struct lov_io          *lio = cl2lov_io(env, ios);
574         struct lov_object      *obj = lio->lis_object;
575         struct lov_device       *ld = lu2lov_dev(lov2cl(obj)->co_lu.lo_dev);
576         struct cl_page_list    *qin = &queue->c2_qin;
577         struct cl_2queue      *cl2q = &lov_env_info(env)->lti_cl2q;
578         struct cl_page_list *stripes_qin = NULL;
579         struct cl_page *page;
580         struct cl_page *tmp;
581         int stripe;
582
583 #define QIN(stripe) lov_io_submit_qin(ld, stripes_qin, stripe, alloc)
584
585         int rc = 0;
586         int alloc =
587 #if defined(__KERNEL__) && defined(__linux__)
588                 !(current->flags & PF_MEMALLOC);
589 #else
590                 1;
591 #endif
592         ENTRY;
593         if (lio->lis_active_subios == 1) {
594                 int idx = lio->lis_single_subio_index;
595                 struct lov_io_sub *sub;
596
597                 LASSERT(idx < lio->lis_nr_subios);
598                 sub = lov_sub_get(env, lio, idx);
599                 LASSERT(!IS_ERR(sub));
600                 LASSERT(sub->sub_io == &lio->lis_single_subio);
601                 rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
602                                      crt, queue, priority);
603                 lov_sub_put(sub);
604                 RETURN(rc);
605         }
606
607         LASSERT(lio->lis_subs != NULL);
608         if (alloc) {
609                 OBD_ALLOC(stripes_qin,
610                           sizeof(*stripes_qin) * lio->lis_nr_subios);
611                 if (stripes_qin == NULL)
612                         RETURN(-ENOMEM);
613
614                 for (stripe = 0; stripe < lio->lis_nr_subios; stripe++)
615                         cl_page_list_init(&stripes_qin[stripe]);
616         } else {
617                 /*
618                  * If we get here, it means pageout & swap doesn't help.
619                  * In order to not make things worse, even don't try to
620                  * allocate the memory with __GFP_NOWARN. -jay
621                  */
622                 cfs_mutex_lock(&ld->ld_mutex);
623                 lio->lis_mem_frozen = 1;
624         }
625
626         cl_2queue_init(cl2q);
627         cl_page_list_for_each_safe(page, tmp, qin) {
628                 stripe = lov_page_stripe(page);
629                 cl_page_list_move(QIN(stripe), qin, page);
630         }
631
632         for (stripe = 0; stripe < lio->lis_nr_subios; stripe++) {
633                 struct lov_io_sub   *sub;
634                 struct cl_page_list *sub_qin = QIN(stripe);
635
636                 if (cfs_list_empty(&sub_qin->pl_pages))
637                         continue;
638
639                 cl_page_list_splice(sub_qin, &cl2q->c2_qin);
640                 sub = lov_sub_get(env, lio, stripe);
641                 if (!IS_ERR(sub)) {
642                         rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
643                                              crt, cl2q, priority);
644                         lov_sub_put(sub);
645                 } else
646                         rc = PTR_ERR(sub);
647                 cl_page_list_splice(&cl2q->c2_qin,  &queue->c2_qin);
648                 cl_page_list_splice(&cl2q->c2_qout, &queue->c2_qout);
649                 if (rc != 0)
650                         break;
651         }
652
653         for (stripe = 0; stripe < lio->lis_nr_subios; stripe++) {
654                 struct cl_page_list *sub_qin = QIN(stripe);
655
656                 if (cfs_list_empty(&sub_qin->pl_pages))
657                         continue;
658
659                 cl_page_list_splice(sub_qin, qin);
660         }
661
662         if (alloc) {
663                 OBD_FREE(stripes_qin,
664                          sizeof(*stripes_qin) * lio->lis_nr_subios);
665         } else {
666                 int i;
667
668                 for (i = 0; i < lio->lis_nr_subios; i++) {
669                         struct cl_io *cio = lio->lis_subs[i].sub_io;
670
671                         if (cio && cio == &ld->ld_emrg[i]->emrg_subio)
672                                 lov_io_sub_fini(env, lio, &lio->lis_subs[i]);
673                 }
674                 lio->lis_mem_frozen = 0;
675                 cfs_mutex_unlock(&ld->ld_mutex);
676         }
677
678         RETURN(rc);
679 #undef QIN
680 }
681
682 static int lov_io_prepare_write(const struct lu_env *env,
683                                 const struct cl_io_slice *ios,
684                                 const struct cl_page_slice *slice,
685                                 unsigned from, unsigned to)
686 {
687         struct lov_io     *lio      = cl2lov_io(env, ios);
688         struct cl_page    *sub_page = lov_sub_page(slice);
689         struct lov_io_sub *sub;
690         int result;
691
692         ENTRY;
693         sub = lov_page_subio(env, lio, slice);
694         if (!IS_ERR(sub)) {
695                 result = cl_io_prepare_write(sub->sub_env, sub->sub_io,
696                                              sub_page, from, to);
697                 lov_sub_put(sub);
698         } else
699                 result = PTR_ERR(sub);
700         RETURN(result);
701 }
702
703 static int lov_io_commit_write(const struct lu_env *env,
704                                const struct cl_io_slice *ios,
705                                const struct cl_page_slice *slice,
706                                unsigned from, unsigned to)
707 {
708         struct lov_io     *lio      = cl2lov_io(env, ios);
709         struct cl_page    *sub_page = lov_sub_page(slice);
710         struct lov_io_sub *sub;
711         int result;
712
713         ENTRY;
714         sub = lov_page_subio(env, lio, slice);
715         if (!IS_ERR(sub)) {
716                 result = cl_io_commit_write(sub->sub_env, sub->sub_io,
717                                             sub_page, from, to);
718                 lov_sub_put(sub);
719         } else
720                 result = PTR_ERR(sub);
721         RETURN(result);
722 }
723
724 static int lov_io_fault_start(const struct lu_env *env,
725                               const struct cl_io_slice *ios)
726 {
727         struct cl_fault_io *fio;
728         struct lov_io      *lio;
729         struct lov_io_sub  *sub;
730
731         ENTRY;
732         fio = &ios->cis_io->u.ci_fault;
733         lio = cl2lov_io(env, ios);
734         sub = lov_sub_get(env, lio, lov_page_stripe(fio->ft_page));
735         sub->sub_io->u.ci_fault.ft_nob = fio->ft_nob;
736         lov_sub_put(sub);
737         RETURN(lov_io_start(env, ios));
738 }
739
740 static const struct cl_io_operations lov_io_ops = {
741         .op = {
742                 [CIT_READ] = {
743                         .cio_fini      = lov_io_fini,
744                         .cio_iter_init = lov_io_rw_iter_init,
745                         .cio_iter_fini = lov_io_iter_fini,
746                         .cio_lock      = lov_io_lock,
747                         .cio_unlock    = lov_io_unlock,
748                         .cio_start     = lov_io_start,
749                         .cio_end       = lov_io_end
750                 },
751                 [CIT_WRITE] = {
752                         .cio_fini      = lov_io_fini,
753                         .cio_iter_init = lov_io_rw_iter_init,
754                         .cio_iter_fini = lov_io_iter_fini,
755                         .cio_lock      = lov_io_lock,
756                         .cio_unlock    = lov_io_unlock,
757                         .cio_start     = lov_io_start,
758                         .cio_end       = lov_io_end
759                 },
760                 [CIT_SETATTR] = {
761                         .cio_fini      = lov_io_fini,
762                         .cio_iter_init = lov_io_iter_init,
763                         .cio_iter_fini = lov_io_iter_fini,
764                         .cio_lock      = lov_io_lock,
765                         .cio_unlock    = lov_io_unlock,
766                         .cio_start     = lov_io_start,
767                         .cio_end       = lov_io_end
768                 },
769                 [CIT_FAULT] = {
770                         .cio_fini      = lov_io_fini,
771                         .cio_iter_init = lov_io_iter_init,
772                         .cio_iter_fini = lov_io_iter_fini,
773                         .cio_lock      = lov_io_lock,
774                         .cio_unlock    = lov_io_unlock,
775                         .cio_start     = lov_io_fault_start,
776                         .cio_end       = lov_io_end
777                 },
778                 [CIT_MISC] = {
779                         .cio_fini   = lov_io_fini
780                 }
781         },
782         .req_op = {
783                  [CRT_READ] = {
784                          .cio_submit    = lov_io_submit
785                  },
786                  [CRT_WRITE] = {
787                          .cio_submit    = lov_io_submit
788                  }
789          },
790         .cio_prepare_write = lov_io_prepare_write,
791         .cio_commit_write  = lov_io_commit_write
792 };
793
794 /*****************************************************************************
795  *
796  * Empty lov io operations.
797  *
798  */
799
800 static void lov_empty_io_fini(const struct lu_env *env,
801                               const struct cl_io_slice *ios)
802 {
803         ENTRY;
804         EXIT;
805 }
806
807 static void lov_empty_impossible(const struct lu_env *env,
808                                  struct cl_io_slice *ios)
809 {
810         LBUG();
811 }
812
813 #define LOV_EMPTY_IMPOSSIBLE ((void *)lov_empty_impossible)
814
815 /**
816  * An io operation vector for files without stripes.
817  */
818 static const struct cl_io_operations lov_empty_io_ops = {
819         .op = {
820                 [CIT_READ] = {
821 #if 0
822                         .cio_fini       = lov_empty_io_fini,
823                         .cio_iter_init  = LOV_EMPTY_IMPOSSIBLE,
824                         .cio_lock       = LOV_EMPTY_IMPOSSIBLE,
825                         .cio_start      = LOV_EMPTY_IMPOSSIBLE,
826                         .cio_end        = LOV_EMPTY_IMPOSSIBLE
827 #endif
828                 },
829                 [CIT_WRITE] = {
830                         .cio_fini      = lov_empty_io_fini,
831                         .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
832                         .cio_lock      = LOV_EMPTY_IMPOSSIBLE,
833                         .cio_start     = LOV_EMPTY_IMPOSSIBLE,
834                         .cio_end       = LOV_EMPTY_IMPOSSIBLE
835                 },
836                 [CIT_SETATTR] = {
837                         .cio_fini      = lov_empty_io_fini,
838                         .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
839                         .cio_lock      = LOV_EMPTY_IMPOSSIBLE,
840                         .cio_start     = LOV_EMPTY_IMPOSSIBLE,
841                         .cio_end       = LOV_EMPTY_IMPOSSIBLE
842                 },
843                 [CIT_FAULT] = {
844                         .cio_fini      = lov_empty_io_fini,
845                         .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
846                         .cio_lock      = LOV_EMPTY_IMPOSSIBLE,
847                         .cio_start     = LOV_EMPTY_IMPOSSIBLE,
848                         .cio_end       = LOV_EMPTY_IMPOSSIBLE
849                 },
850                 [CIT_MISC] = {
851                         .cio_fini   = lov_empty_io_fini
852                 }
853         },
854         .req_op = {
855                  [CRT_READ] = {
856                          .cio_submit    = LOV_EMPTY_IMPOSSIBLE
857                  },
858                  [CRT_WRITE] = {
859                          .cio_submit    = LOV_EMPTY_IMPOSSIBLE
860                  }
861          },
862         .cio_commit_write = LOV_EMPTY_IMPOSSIBLE
863 };
864
865 int lov_io_init_raid0(const struct lu_env *env, struct cl_object *obj,
866                       struct cl_io *io)
867 {
868         struct lov_io       *lio = lov_env_io(env);
869         struct lov_object   *lov = cl2lov(obj);
870
871         ENTRY;
872         CFS_INIT_LIST_HEAD(&lio->lis_active);
873         lov_io_slice_init(lio, lov, io);
874         if (io->ci_result == 0) {
875                 LASSERT(lov_r0(lov)->lo_lsm != NULL);
876                 io->ci_result = lov_io_subio_init(env, lio, io);
877                 if (io->ci_result == 0)
878                         cl_io_slice_add(io, &lio->lis_cl, obj, &lov_io_ops);
879         }
880         RETURN(io->ci_result);
881 }
882
883 int lov_io_init_empty(const struct lu_env *env, struct cl_object *obj,
884                       struct cl_io *io)
885 {
886         struct lov_io *lio = lov_env_io(env);
887         int result;
888
889         ENTRY;
890         switch (io->ci_type) {
891         default:
892                 LBUG();
893         case CIT_MISC:
894         case CIT_READ:
895                 result = 0;
896                 break;
897         case CIT_WRITE:
898         case CIT_SETATTR:
899                 result = -EBADF;
900                 break;
901         case CIT_FAULT:
902                 result = -EFAULT;
903                 CERROR("Page fault on a file without stripes: "DFID"\n",
904                        PFID(lu_object_fid(&obj->co_lu)));
905                 break;
906         }
907         if (result == 0)
908                 cl_io_slice_add(io, &lio->lis_cl, obj, &lov_empty_io_ops);
909         io->ci_result = result;
910         RETURN(result != 0);
911 }
912
913 /** @} lov */