Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / lov / lov_io.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Implementation of cl_io for LOV layer.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_LOV
42
43 #include "lov_cl_internal.h"
44
45 /** \addtogroup lov lov @{ */
46
47 static void lov_sub_enter(struct lov_io_sub *sub)
48 {
49         ENTRY;
50         if (sub->sub_reenter++ == 0) {
51                 sub->sub_cookie = cl_env_reenter();
52                 cl_env_implant(sub->sub_env, &sub->sub_refcheck2);
53         }
54         EXIT;
55 }
56
57 static void lov_sub_exit(struct lov_io_sub *sub)
58 {
59         ENTRY;
60         if (--sub->sub_reenter == 0) {
61                 cl_env_unplant(sub->sub_env, &sub->sub_refcheck2);
62                 cl_env_reexit(sub->sub_cookie);
63         }
64         EXIT;
65 }
66
67 static void lov_io_sub_fini(const struct lu_env *env, struct lov_io *lio,
68                             struct lov_io_sub *sub)
69 {
70         ENTRY;
71         if (sub->sub_io != NULL) {
72                 if (sub->sub_io_initialized) {
73                         lov_sub_enter(sub);
74                         cl_io_fini(sub->sub_env, sub->sub_io);
75                         lov_sub_exit(sub);
76                         sub->sub_io_initialized = 0;
77                         lio->lis_active_subios--;
78                 }
79                 if (sub->sub_stripe == lio->lis_single_subio_index)
80                         lio->lis_single_subio_index = -1;
81                 else if (!sub->sub_borrowed)
82                         OBD_FREE_PTR(sub->sub_io);
83                 sub->sub_io = NULL;
84         }
85         if (sub->sub_env != NULL && !IS_ERR(sub->sub_env)) {
86                 if (!sub->sub_borrowed)
87                         cl_env_put(sub->sub_env, &sub->sub_refcheck);
88                 sub->sub_env = NULL;
89         }
90         EXIT;
91 }
92
93 static void lov_io_sub_inherit(struct cl_io *io, struct lov_io *lio,
94                                int stripe, loff_t start, loff_t end)
95 {
96         struct lov_stripe_md *lsm    = lov_r0(lio->lis_object)->lo_lsm;
97         struct cl_io         *parent = lio->lis_cl.cis_io;
98
99         switch(io->ci_type) {
100         case CIT_TRUNC: {
101                 size_t new_size = parent->u.ci_truncate.tr_size;
102
103                 new_size = lov_size_to_stripe(lsm, new_size, stripe);
104                 io->u.ci_truncate.tr_capa = parent->u.ci_truncate.tr_capa;
105                 io->u.ci_truncate.tr_size = new_size;
106                 break;
107         }
108         case CIT_FAULT: {
109                 struct cl_object *obj = parent->ci_obj;
110                 loff_t off = cl_offset(obj, parent->u.ci_fault.ft_index);
111
112                 io->u.ci_fault = parent->u.ci_fault;
113                 off = lov_size_to_stripe(lsm, off, stripe);
114                 io->u.ci_fault.ft_index = cl_index(obj, off);
115                 break;
116         }
117         case CIT_READ:
118         case CIT_WRITE: {
119                 io->u.ci_rw.crw_pos = start;
120                 io->u.ci_rw.crw_count = end - start;
121                 break;
122         }
123         default:
124                 break;
125         }
126 }
127
128 static int lov_io_sub_init(const struct lu_env *env, struct lov_io *lio,
129                            struct lov_io_sub *sub)
130 {
131         struct lov_object *lov = lio->lis_object;
132         struct lov_device *ld  = lu2lov_dev(lov2cl(lov)->co_lu.lo_dev);
133         struct cl_io      *sub_io;
134         struct cl_object  *sub_obj;
135         struct cl_io      *io  = lio->lis_cl.cis_io;
136
137         int stripe = sub->sub_stripe;
138         int result;
139
140         LASSERT(sub->sub_io == NULL);
141         LASSERT(sub->sub_env == NULL);
142         LASSERT(sub->sub_stripe < lio->lis_stripe_count);
143         ENTRY;
144
145         result = 0;
146         sub->sub_io_initialized = 0;
147         sub->sub_borrowed = 0;
148
149         /*
150          * First sub-io. Use ->lis_single_subio and current environment, to
151          * avoid dynamic allocation.
152          */
153         if (lio->lis_active_subios == 0) {
154                 sub->sub_io = &lio->lis_single_subio;
155                 lio->lis_single_subio_index = stripe;
156                 sub->sub_env = cl_env_get(&sub->sub_refcheck);
157                 LASSERT(sub->sub_env == env);
158         } else if (lio->lis_mem_frozen) {
159                 LASSERT(mutex_is_locked(&ld->ld_mutex));
160                 sub->sub_io  = &ld->ld_emrg[stripe]->emrg_subio;
161                 sub->sub_env = ld->ld_emrg[stripe]->emrg_env;
162                 sub->sub_borrowed = 1;
163         } else {
164                 void *cookie;
165
166                 /* obtain new environment */
167                 cookie = cl_env_reenter();
168                 sub->sub_env = cl_env_get(&sub->sub_refcheck);
169                 cl_env_reexit(cookie);
170
171                 OBD_ALLOC_PTR(sub->sub_io);
172                 if (IS_ERR(sub->sub_env))
173                         result = PTR_ERR(sub->sub_env);
174                 else if (sub->sub_io == NULL)
175                         result = -ENOMEM;
176         }
177
178         if (result == 0) {
179                 sub_obj = lovsub2cl(lov_r0(lov)->lo_sub[stripe]);
180                 sub_io  = sub->sub_io;
181
182                 sub_io->ci_obj    = sub_obj;
183                 sub_io->ci_result = 0;
184
185                 sub_io->ci_parent  = io;
186                 sub_io->ci_lockreq = io->ci_lockreq;
187                 sub_io->ci_type    = io->ci_type;
188
189                 lov_sub_enter(sub);
190                 result = cl_io_sub_init(sub->sub_env, sub_io,
191                                         io->ci_type, sub_obj);
192                 lov_sub_exit(sub);
193                 if (result >= 0) {
194                         lio->lis_active_subios++;
195                         sub->sub_io_initialized = 1;
196                         result = 0;
197                 }
198         }
199         if (result != 0)
200                 lov_io_sub_fini(env, lio, sub);
201         RETURN(result);
202 }
203
204 struct lov_io_sub *lov_sub_get(const struct lu_env *env,
205                                struct lov_io *lio, int stripe)
206 {
207         int rc;
208         struct lov_io_sub *sub = &lio->lis_subs[stripe];
209
210         LASSERT(stripe < lio->lis_stripe_count);
211         ENTRY;
212
213         if (!sub->sub_io_initialized) {
214                 sub->sub_stripe = stripe;
215                 rc = lov_io_sub_init(env, lio, sub);
216         } else
217                 rc = 0;
218         if (rc == 0)
219                 lov_sub_enter(sub);
220         else
221                 sub = ERR_PTR(rc);
222         RETURN(sub);
223 }
224
225 void lov_sub_put(struct lov_io_sub *sub)
226 {
227         lov_sub_exit(sub);
228 }
229
230 /*****************************************************************************
231  *
232  * Lov io operations.
233  *
234  */
235
236 static int lov_page_stripe(const struct cl_page *page)
237 {
238         struct lovsub_object *subobj;
239
240         ENTRY;
241         subobj = lu2lovsub(
242                 lu_object_locate(page->cp_child->cp_obj->co_lu.lo_header,
243                                  &lovsub_device_type));
244         LASSERT(subobj != NULL);
245         RETURN(subobj->lso_index);
246 }
247
248 struct lov_io_sub *lov_page_subio(const struct lu_env *env, struct lov_io *lio,
249                                   const struct cl_page_slice *slice)
250 {
251         struct lov_stripe_md *lsm  = lov_r0(lio->lis_object)->lo_lsm;
252         struct cl_page       *page = slice->cpl_page;
253         int stripe;
254
255         LASSERT(lio->lis_cl.cis_io != NULL);
256         LASSERT(cl2lov(slice->cpl_obj) == lio->lis_object);
257         LASSERT(lsm != NULL);
258         LASSERT(lio->lis_nr_subios > 0);
259         ENTRY;
260
261         stripe = lov_page_stripe(page);
262         RETURN(lov_sub_get(env, lio, stripe));
263 }
264
265
266 static int lov_io_subio_init(const struct lu_env *env, struct lov_io *lio,
267                              struct cl_io *io)
268 {
269         struct lov_object    *lov = lio->lis_object;
270         struct lov_stripe_md *lsm = lov_r0(lov)->lo_lsm;
271         int result;
272
273         LASSERT(lio->lis_object != NULL);
274         ENTRY;
275
276         /*
277          * Need to be optimized, we can't afford to allocate a piece of memory
278          * when writing a page. -jay
279          */
280         OBD_ALLOC(lio->lis_subs,
281                   lsm->lsm_stripe_count * sizeof lio->lis_subs[0]);
282         if (lio->lis_subs != NULL) {
283                 lio->lis_nr_subios = lio->lis_stripe_count;
284                 lio->lis_single_subio_index = -1;
285                 lio->lis_active_subios = 0;
286                 result = 0;
287         } else
288                 result = -ENOMEM;
289         RETURN(result);
290 }
291
292 static void lov_io_slice_init(struct lov_io *lio,
293                               struct lov_object *obj, struct cl_io *io)
294 {
295         struct lov_stripe_md *lsm = lov_r0(obj)->lo_lsm;
296
297         LASSERT(lsm != NULL);
298         ENTRY;
299
300         io->ci_result = 0;
301         lio->lis_object = obj;
302         lio->lis_stripe_count = lsm->lsm_stripe_count;
303
304         switch (io->ci_type) {
305         case CIT_READ:
306         case CIT_WRITE:
307                 lio->lis_pos = io->u.ci_rw.crw_pos;
308                 lio->lis_endpos = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count;
309                 lio->lis_io_endpos = lio->lis_endpos;
310                 if (cl_io_is_append(io)) {
311                         LASSERT(io->ci_type == CIT_WRITE);
312                         lio->lis_pos = 0;
313                         lio->lis_endpos = OBD_OBJECT_EOF;
314                 }
315                 break;
316
317         case CIT_TRUNC:
318                 lio->lis_pos = io->u.ci_truncate.tr_size;
319                 lio->lis_endpos = OBD_OBJECT_EOF;
320                 break;
321
322         case CIT_FAULT: {
323                 pgoff_t index = io->u.ci_fault.ft_index;
324                 lio->lis_pos = cl_offset(io->ci_obj, index);
325                 lio->lis_endpos = cl_offset(io->ci_obj, index + 1);
326                 break;
327         }
328
329         case CIT_MISC:
330                 lio->lis_pos = 0;
331                 lio->lis_endpos = OBD_OBJECT_EOF;
332                 break;
333
334         default:
335                 LBUG();
336         }
337
338         EXIT;
339 }
340
341 static void lov_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
342 {
343         struct lov_io *lio = cl2lov_io(env, ios);
344         int i;
345
346         ENTRY;
347         if (lio->lis_subs != NULL) {
348                 for (i = 0; i < lio->lis_nr_subios; i++)
349                         lov_io_sub_fini(env, lio, &lio->lis_subs[i]);
350                 OBD_FREE(lio->lis_subs,
351                          lio->lis_nr_subios * sizeof lio->lis_subs[0]);
352                 lio->lis_nr_subios = 0;
353         }
354         EXIT;
355 }
356
357 static obd_off lov_offset_mod(obd_off val, int delta)
358 {
359         if (val != OBD_OBJECT_EOF)
360                 val += delta;
361         return val;
362 }
363
364 static int lov_io_iter_init(const struct lu_env *env,
365                             const struct cl_io_slice *ios)
366 {
367         struct lov_io        *lio = cl2lov_io(env, ios);
368         struct lov_stripe_md *lsm = lov_r0(lio->lis_object)->lo_lsm;
369         struct lov_io_sub    *sub;
370         obd_off endpos;
371         obd_off start;
372         obd_off end;
373         int stripe;
374         int rc = 0;
375
376         ENTRY;
377         endpos = lov_offset_mod(lio->lis_endpos, -1);
378         for (stripe = 0; stripe < lio->lis_stripe_count; stripe++) {
379                 if (!lov_stripe_intersects(lsm, stripe, lio->lis_pos,
380                                            endpos, &start, &end))
381                         continue;
382
383                 end = lov_offset_mod(end, +1);
384                 sub = lov_sub_get(env, lio, stripe);
385                 if (!IS_ERR(sub)) {
386                         lov_io_sub_inherit(sub->sub_io, lio, stripe,
387                                            start, end);
388                         rc = cl_io_iter_init(sub->sub_env, sub->sub_io);
389                         lov_sub_put(sub);
390                         CDEBUG(D_VFSTRACE, "shrink: %i [%llu, %llu)\n",
391                                stripe, start, end);
392                 } else
393                         rc = PTR_ERR(sub);
394
395                 if (!rc)
396                         list_add_tail(&sub->sub_linkage, &lio->lis_active);
397                 else
398                         break;
399         }
400         RETURN(rc);
401 }
402
403 static int lov_io_rw_iter_init(const struct lu_env *env,
404                                const struct cl_io_slice *ios)
405 {
406         struct lov_io        *lio = cl2lov_io(env, ios);
407         struct cl_io         *io  = ios->cis_io;
408         struct lov_stripe_md *lsm = lov_r0(cl2lov(ios->cis_obj))->lo_lsm;
409         loff_t start = io->u.ci_rw.crw_pos;
410         loff_t next;
411         int ssize = lsm->lsm_stripe_size;
412
413         LASSERT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
414         ENTRY;
415
416         /* fast path for common case. */
417         if (lio->lis_nr_subios != 1 && !cl_io_is_append(io)) {
418
419                 do_div(start, ssize);
420                 next = (start + 1) * ssize;
421                 if (next <= start * ssize)
422                         next = ~0ull;
423
424                 io->ci_continue = next < lio->lis_io_endpos;
425                 io->u.ci_rw.crw_count = min_t(loff_t, lio->lis_io_endpos,
426                                               next) - io->u.ci_rw.crw_pos;
427                 lio->lis_pos    = io->u.ci_rw.crw_pos;
428                 lio->lis_endpos = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count;
429                 CDEBUG(D_VFSTRACE, "stripe: %llu chunk: [%llu, %llu) %llu\n",
430                        (__u64)start, lio->lis_pos, lio->lis_endpos,
431                        (__u64)lio->lis_io_endpos);
432         }
433         /*
434          * XXX The following call should be optimized: we know, that
435          * [lio->lis_pos, lio->lis_endpos) intersects with exactly one stripe.
436          */
437         RETURN(lov_io_iter_init(env, ios));
438 }
439
440 static int lov_io_call(const struct lu_env *env, struct lov_io *lio,
441                        int (*iofunc)(const struct lu_env *, struct cl_io *))
442 {
443         struct lov_io_sub *sub;
444         int rc = 0;
445
446         ENTRY;
447         list_for_each_entry(sub, &lio->lis_active, sub_linkage) {
448                 lov_sub_enter(sub);
449                 rc = iofunc(sub->sub_env, sub->sub_io);
450                 lov_sub_exit(sub);
451                 if (rc)
452                         break;
453         }
454         RETURN(rc);
455 }
456
457 static int lov_io_lock(const struct lu_env *env, const struct cl_io_slice *ios)
458 {
459         ENTRY;
460         RETURN(lov_io_call(env, cl2lov_io(env, ios), cl_io_lock));
461 }
462
463 static int lov_io_start(const struct lu_env *env, const struct cl_io_slice *ios)
464 {
465         ENTRY;
466         RETURN(lov_io_call(env, cl2lov_io(env, ios), cl_io_start));
467 }
468
469 static int lov_io_end_wrapper(const struct lu_env *env, struct cl_io *io)
470 {
471         ENTRY;
472         /*
473          * It's possible that lov_io_start() wasn't called against this
474          * sub-io, either because previous sub-io failed, or upper layer
475          * completed IO.
476          */
477         if (io->ci_state == CIS_IO_GOING)
478                 cl_io_end(env, io);
479         else
480                 io->ci_state = CIS_IO_FINISHED;
481         RETURN(0);
482 }
483
484 static int lov_io_iter_fini_wrapper(const struct lu_env *env, struct cl_io *io)
485 {
486         cl_io_iter_fini(env, io);
487         RETURN(0);
488 }
489
490 static int lov_io_unlock_wrapper(const struct lu_env *env, struct cl_io *io)
491 {
492         cl_io_unlock(env, io);
493         RETURN(0);
494 }
495
496 static void lov_io_end(const struct lu_env *env, const struct cl_io_slice *ios)
497 {
498         int rc;
499
500         rc = lov_io_call(env, cl2lov_io(env, ios), lov_io_end_wrapper);
501         LASSERT(rc == 0);
502 }
503
504 static void lov_io_iter_fini(const struct lu_env *env,
505                              const struct cl_io_slice *ios)
506 {
507         struct lov_io *lio = cl2lov_io(env, ios);
508         int rc;
509
510         ENTRY;
511         rc = lov_io_call(env, lio, lov_io_iter_fini_wrapper);
512         LASSERT(rc == 0);
513         while (!list_empty(&lio->lis_active))
514                 list_del_init(lio->lis_active.next);
515         EXIT;
516 }
517
518 static void lov_io_unlock(const struct lu_env *env,
519                           const struct cl_io_slice *ios)
520 {
521         int rc;
522
523         ENTRY;
524         rc = lov_io_call(env, cl2lov_io(env, ios), lov_io_unlock_wrapper);
525         LASSERT(rc == 0);
526         EXIT;
527 }
528
529
530 static struct cl_page_list *lov_io_submit_qin(struct lov_device *ld,
531                                               struct cl_page_list *qin,
532                                               int idx, int alloc)
533 {
534         return alloc ? &qin[idx] : &ld->ld_emrg[idx]->emrg_page_list;
535 }
536
537 /**
538  * lov implementation of cl_operations::cio_submit() method. It takes a list
539  * of pages in \a queue, splits it into per-stripe sub-lists, invokes
540  * cl_io_submit() on underlying devices to submit sub-lists, and then splices
541  * everything back.
542  *
543  * Major complication of this function is a need to handle memory cleansing:
544  * cl_io_submit() is called to write out pages as a part of VM memory
545  * reclamation, and hence it may not fail due to memory shortages (system
546  * dead-locks otherwise). To deal with this, some resources (sub-lists,
547  * sub-environment, etc.) are allocated per-device on "startup" (i.e., in a
548  * not-memory cleansing context), and in case of memory shortage, these
549  * pre-allocated resources are used by lov_io_submit() under
550  * lov_device::ld_mutex mutex.
551  */
552 static int lov_io_submit(const struct lu_env *env,
553                          const struct cl_io_slice *ios,
554                          enum cl_req_type crt, struct cl_2queue *queue,
555                          enum cl_req_priority priority)
556 {
557         struct lov_io          *lio = cl2lov_io(env, ios);
558         struct lov_object      *obj = lio->lis_object;
559         struct lov_device       *ld = lu2lov_dev(lov2cl(obj)->co_lu.lo_dev);
560         struct cl_page_list    *qin = &queue->c2_qin;
561         struct cl_2queue      *cl2q = &lov_env_info(env)->lti_cl2q;
562         struct cl_page_list *stripes_qin = NULL;
563         struct cl_page *page;
564         struct cl_page *tmp;
565         int stripe;
566
567 #define QIN(stripe) lov_io_submit_qin(ld, stripes_qin, stripe, alloc)
568
569         int rc = 0;
570         int alloc =
571 #if defined(__KERNEL__) && defined(__linux__)
572                 !(current->flags & PF_MEMALLOC);
573 #else
574                 1;
575 #endif
576         ENTRY;
577         if (lio->lis_active_subios == 1) {
578                 int idx = lio->lis_single_subio_index;
579                 struct lov_io_sub *sub;
580
581                 LASSERT(idx < lio->lis_nr_subios);
582                 sub = lov_sub_get(env, lio, idx);
583                 LASSERT(!IS_ERR(sub));
584                 LASSERT(sub->sub_io == &lio->lis_single_subio);
585                 rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
586                                      crt, queue, priority);
587                 lov_sub_put(sub);
588                 RETURN(rc);
589         }
590
591         LASSERT(lio->lis_subs != NULL);
592         if (alloc) {
593                 OBD_ALLOC(stripes_qin,
594                           sizeof(*stripes_qin) * lio->lis_nr_subios);
595                 if (stripes_qin == NULL)
596                         RETURN(-ENOMEM);
597
598                 for (stripe = 0; stripe < lio->lis_nr_subios; stripe++)
599                         cl_page_list_init(&stripes_qin[stripe]);
600         } else {
601                 /*
602                  * If we get here, it means pageout & swap doesn't help.
603                  * In order to not make things worse, even don't try to
604                  * allocate the memory with __GFP_NOWARN. -jay
605                  */
606                 mutex_lock(&ld->ld_mutex);
607                 lio->lis_mem_frozen = 1;
608         }
609
610         cl_2queue_init(cl2q);
611         cl_page_list_for_each_safe(page, tmp, qin) {
612                 stripe = lov_page_stripe(page);
613                 cl_page_list_move(QIN(stripe), qin, page);
614         }
615
616         for (stripe = 0; stripe < lio->lis_nr_subios; stripe++) {
617                 struct lov_io_sub   *sub;
618                 struct cl_page_list *sub_qin = QIN(stripe);
619
620                 if (list_empty(&sub_qin->pl_pages))
621                         continue;
622
623                 cl_page_list_splice(sub_qin, &cl2q->c2_qin);
624                 sub = lov_sub_get(env, lio, stripe);
625                 if (!IS_ERR(sub)) {
626                         rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
627                                              crt, cl2q, priority);
628                         lov_sub_put(sub);
629                 } else
630                         rc = PTR_ERR(sub);
631                 cl_page_list_splice(&cl2q->c2_qin,  &queue->c2_qin);
632                 cl_page_list_splice(&cl2q->c2_qout, &queue->c2_qout);
633                 if (rc != 0)
634                         break;
635         }
636
637         for (stripe = 0; stripe < lio->lis_nr_subios; stripe++) {
638                 struct cl_page_list *sub_qin = QIN(stripe);
639
640                 if (list_empty(&sub_qin->pl_pages))
641                         continue;
642
643                 cl_page_list_splice(sub_qin, qin);
644         }
645
646         if (alloc) {
647                 OBD_FREE(stripes_qin,
648                          sizeof(*stripes_qin) * lio->lis_nr_subios);
649         } else {
650                 int i;
651
652                 for (i = 0; i < lio->lis_nr_subios; i++) {
653                         struct cl_io *cio = lio->lis_subs[i].sub_io;
654
655                         if (cio && cio == &ld->ld_emrg[i]->emrg_subio)
656                                 lov_io_sub_fini(env, lio, &lio->lis_subs[i]);
657                 }
658                 lio->lis_mem_frozen = 0;
659                 mutex_unlock(&ld->ld_mutex);
660         }
661
662         RETURN(rc);
663 #undef QIN
664 }
665
666 static int lov_io_prepare_write(const struct lu_env *env,
667                                 const struct cl_io_slice *ios,
668                                 const struct cl_page_slice *slice,
669                                 unsigned from, unsigned to)
670 {
671         struct lov_io     *lio      = cl2lov_io(env, ios);
672         struct cl_page    *sub_page = lov_sub_page(slice);
673         struct lov_io_sub *sub;
674         int result;
675
676         ENTRY;
677         sub = lov_page_subio(env, lio, slice);
678         if (!IS_ERR(sub)) {
679                 result = cl_io_prepare_write(sub->sub_env, sub->sub_io,
680                                              sub_page, from, to);
681                 lov_sub_put(sub);
682         } else
683                 result = PTR_ERR(sub);
684         RETURN(result);
685 }
686
687 static int lov_io_commit_write(const struct lu_env *env,
688                                const struct cl_io_slice *ios,
689                                const struct cl_page_slice *slice,
690                                unsigned from, unsigned to)
691 {
692         struct lov_io     *lio      = cl2lov_io(env, ios);
693         struct cl_page    *sub_page = lov_sub_page(slice);
694         struct lov_io_sub *sub;
695         int result;
696
697         ENTRY;
698         sub = lov_page_subio(env, lio, slice);
699         if (!IS_ERR(sub)) {
700                 result = cl_io_commit_write(sub->sub_env, sub->sub_io,
701                                             sub_page, from, to);
702                 lov_sub_put(sub);
703         } else
704                 result = PTR_ERR(sub);
705         RETURN(result);
706 }
707
708 static int lov_io_fault_start(const struct lu_env *env,
709                               const struct cl_io_slice *ios)
710 {
711         struct cl_fault_io *fio;
712         struct lov_io      *lio;
713         struct lov_io_sub  *sub;
714
715         ENTRY;
716         fio = &ios->cis_io->u.ci_fault;
717         lio = cl2lov_io(env, ios);
718         sub = lov_sub_get(env, lio, lov_page_stripe(fio->ft_page));
719         sub->sub_io->u.ci_fault.ft_nob = fio->ft_nob;
720         lov_sub_put(sub);
721         RETURN(lov_io_start(env, ios));
722 }
723
724 static const struct cl_io_operations lov_io_ops = {
725         .op = {
726                 [CIT_READ] = {
727                         .cio_fini      = lov_io_fini,
728                         .cio_iter_init = lov_io_rw_iter_init,
729                         .cio_iter_fini = lov_io_iter_fini,
730                         .cio_lock      = lov_io_lock,
731                         .cio_unlock    = lov_io_unlock,
732                         .cio_start     = lov_io_start,
733                         .cio_end       = lov_io_end
734                 },
735                 [CIT_WRITE] = {
736                         .cio_fini      = lov_io_fini,
737                         .cio_iter_init = lov_io_rw_iter_init,
738                         .cio_iter_fini = lov_io_iter_fini,
739                         .cio_lock      = lov_io_lock,
740                         .cio_unlock    = lov_io_unlock,
741                         .cio_start     = lov_io_start,
742                         .cio_end       = lov_io_end
743                 },
744                 [CIT_TRUNC] = {
745                         .cio_fini      = lov_io_fini,
746                         .cio_iter_init = lov_io_iter_init,
747                         .cio_iter_fini = lov_io_iter_fini,
748                         .cio_lock      = lov_io_lock,
749                         .cio_unlock    = lov_io_unlock,
750                         .cio_start     = lov_io_start,
751                         .cio_end       = lov_io_end
752                 },
753                 [CIT_FAULT] = {
754                         .cio_fini      = lov_io_fini,
755                         .cio_iter_init = lov_io_iter_init,
756                         .cio_iter_fini = lov_io_iter_fini,
757                         .cio_lock      = lov_io_lock,
758                         .cio_unlock    = lov_io_unlock,
759                         .cio_start     = lov_io_fault_start,
760                         .cio_end       = lov_io_end
761                 },
762                 [CIT_MISC] = {
763                         .cio_fini   = lov_io_fini
764                 }
765         },
766         .req_op = {
767                  [CRT_READ] = {
768                          .cio_submit    = lov_io_submit
769                  },
770                  [CRT_WRITE] = {
771                          .cio_submit    = lov_io_submit
772                  }
773          },
774         .cio_prepare_write = lov_io_prepare_write,
775         .cio_commit_write  = lov_io_commit_write
776 };
777
778 /*****************************************************************************
779  *
780  * Empty lov io operations.
781  *
782  */
783
784 static void lov_empty_io_fini(const struct lu_env *env,
785                               const struct cl_io_slice *ios)
786 {
787         ENTRY;
788         EXIT;
789 }
790
791 static void lov_empty_impossible(const struct lu_env *env,
792                                  struct cl_io_slice *ios)
793 {
794         LBUG();
795 }
796
797 #define LOV_EMPTY_IMPOSSIBLE ((void *)lov_empty_impossible)
798
799 /**
800  * An io operation vector for files without stripes.
801  */
802 static const struct cl_io_operations lov_empty_io_ops = {
803         .op = {
804                 [CIT_READ] = {
805 #if 0
806                         .cio_fini       = lov_empty_io_fini,
807                         .cio_iter_init  = LOV_EMPTY_IMPOSSIBLE,
808                         .cio_lock       = LOV_EMPTY_IMPOSSIBLE,
809                         .cio_start      = LOV_EMPTY_IMPOSSIBLE,
810                         .cio_end        = LOV_EMPTY_IMPOSSIBLE
811 #endif
812                 },
813                 [CIT_WRITE] = {
814                         .cio_fini      = lov_empty_io_fini,
815                         .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
816                         .cio_lock      = LOV_EMPTY_IMPOSSIBLE,
817                         .cio_start     = LOV_EMPTY_IMPOSSIBLE,
818                         .cio_end       = LOV_EMPTY_IMPOSSIBLE
819                 },
820                 [CIT_TRUNC] = {
821                         .cio_fini      = lov_empty_io_fini,
822                         .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
823                         .cio_lock      = LOV_EMPTY_IMPOSSIBLE,
824                         .cio_start     = LOV_EMPTY_IMPOSSIBLE,
825                         .cio_end       = LOV_EMPTY_IMPOSSIBLE
826                 },
827                 [CIT_FAULT] = {
828                         .cio_fini      = lov_empty_io_fini,
829                         .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
830                         .cio_lock      = LOV_EMPTY_IMPOSSIBLE,
831                         .cio_start     = LOV_EMPTY_IMPOSSIBLE,
832                         .cio_end       = LOV_EMPTY_IMPOSSIBLE
833                 },
834                 [CIT_MISC] = {
835                         .cio_fini   = lov_empty_io_fini
836                 }
837         },
838         .req_op = {
839                  [CRT_READ] = {
840                          .cio_submit    = LOV_EMPTY_IMPOSSIBLE
841                  },
842                  [CRT_WRITE] = {
843                          .cio_submit    = LOV_EMPTY_IMPOSSIBLE
844                  }
845          },
846         .cio_commit_write = LOV_EMPTY_IMPOSSIBLE
847 };
848
849 int lov_io_init_raid0(const struct lu_env *env, struct cl_object *obj,
850                       struct cl_io *io)
851 {
852         struct lov_io       *lio = lov_env_io(env);
853         struct lov_object   *lov = cl2lov(obj);
854
855         ENTRY;
856         CFS_INIT_LIST_HEAD(&lio->lis_active);
857         lov_io_slice_init(lio, lov, io);
858         if (io->ci_result == 0) {
859                 LASSERT(lov_r0(lov)->lo_lsm != NULL);
860                 io->ci_result = lov_io_subio_init(env, lio, io);
861                 if (io->ci_result == 0)
862                         cl_io_slice_add(io, &lio->lis_cl, obj, &lov_io_ops);
863         }
864         RETURN(io->ci_result);
865 }
866
867 int lov_io_init_empty(const struct lu_env *env, struct cl_object *obj,
868                       struct cl_io *io)
869 {
870         struct lov_io *lio = lov_env_io(env);
871         int result;
872
873         ENTRY;
874         switch (io->ci_type) {
875         default:
876                 LBUG();
877         case CIT_MISC:
878         case CIT_READ:
879                 result = 0;
880                 break;
881         case CIT_WRITE:
882         case CIT_TRUNC:
883                 result = -EBADF;
884                 break;
885         case CIT_FAULT:
886                 result = -EFAULT;
887                 CERROR("Page fault on a file without stripes: "DFID"\n",
888                        PFID(lu_object_fid(&obj->co_lu)));
889                 break;
890         }
891         if (result == 0)
892                 cl_io_slice_add(io, &lio->lis_cl, obj, &lov_empty_io_ops);
893         io->ci_result = result;
894         RETURN(result != 0);
895 }
896
897 /** @} lov */