Whamcloud - gitweb
346a99241e5220ab3fe4ef16f1b034d44a1e0ef1
[fs/lustre-release.git] / lustre / lov / lov_io.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Implementation of cl_io for LOV layer.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_LOV
42
43 #include "lov_cl_internal.h"
44
45 /** \addtogroup lov lov @{ */
46
47 static void lov_sub_enter(struct lov_io_sub *sub)
48 {
49         ENTRY;
50         if (sub->sub_reenter++ == 0) {
51                 sub->sub_cookie = cl_env_reenter();
52                 cl_env_implant(sub->sub_env, &sub->sub_refcheck2);
53         }
54         EXIT;
55 }
56
57 static void lov_sub_exit(struct lov_io_sub *sub)
58 {
59         ENTRY;
60         if (--sub->sub_reenter == 0) {
61                 cl_env_unplant(sub->sub_env, &sub->sub_refcheck2);
62                 cl_env_reexit(sub->sub_cookie);
63         }
64         EXIT;
65 }
66
67 static void lov_io_sub_fini(const struct lu_env *env, struct lov_io *lio,
68                             struct lov_io_sub *sub)
69 {
70         ENTRY;
71         if (sub->sub_io != NULL) {
72                 if (sub->sub_io_initialized) {
73                         lov_sub_enter(sub);
74                         cl_io_fini(sub->sub_env, sub->sub_io);
75                         lov_sub_exit(sub);
76                         sub->sub_io_initialized = 0;
77                         lio->lis_active_subios--;
78                 }
79                 if (sub->sub_stripe == lio->lis_single_subio_index)
80                         lio->lis_single_subio_index = -1;
81                 else if (!sub->sub_borrowed)
82                         OBD_FREE_PTR(sub->sub_io);
83                 sub->sub_io = NULL;
84         }
85         if (sub->sub_env != NULL && !IS_ERR(sub->sub_env)) {
86                 if (!sub->sub_borrowed)
87                         cl_env_put(sub->sub_env, &sub->sub_refcheck);
88                 sub->sub_env = NULL;
89         }
90         EXIT;
91 }
92
93 static void lov_io_sub_inherit(struct cl_io *io, struct lov_io *lio,
94                                int stripe, loff_t start, loff_t end)
95 {
96         struct lov_stripe_md *lsm    = lov_r0(lio->lis_object)->lo_lsm;
97         struct cl_io         *parent = lio->lis_cl.cis_io;
98
99         switch(io->ci_type) {
100         case CIT_TRUNC: {
101                 size_t new_size = parent->u.ci_truncate.tr_size;
102
103                 new_size = lov_size_to_stripe(lsm, new_size, stripe);
104                 io->u.ci_truncate.tr_capa = parent->u.ci_truncate.tr_capa;
105                 io->u.ci_truncate.tr_size = new_size;
106                 break;
107         }
108         case CIT_FAULT: {
109                 struct cl_object *obj = parent->ci_obj;
110                 loff_t off = cl_offset(obj, parent->u.ci_fault.ft_index);
111
112                 io->u.ci_fault = parent->u.ci_fault;
113                 off = lov_size_to_stripe(lsm, off, stripe);
114                 io->u.ci_fault.ft_index = cl_index(obj, off);
115                 break;
116         }
117         case CIT_READ:
118         case CIT_WRITE: {
119                 io->u.ci_rw.crw_pos = start;
120                 io->u.ci_rw.crw_count = end - start;
121                 break;
122         }
123         default:
124                 break;
125         }
126 }
127
128 static int lov_io_sub_init(const struct lu_env *env, struct lov_io *lio,
129                            struct lov_io_sub *sub)
130 {
131         struct lov_object *lov = lio->lis_object;
132         struct lov_device *ld  = lu2lov_dev(lov2cl(lov)->co_lu.lo_dev);
133         struct cl_io      *sub_io;
134         struct cl_object  *sub_obj;
135         struct cl_io      *io  = lio->lis_cl.cis_io;
136
137         int stripe = sub->sub_stripe;
138         int result;
139
140         LASSERT(sub->sub_io == NULL);
141         LASSERT(sub->sub_env == NULL);
142         LASSERT(sub->sub_stripe < lio->lis_stripe_count);
143         ENTRY;
144
145         result = 0;
146         sub->sub_io_initialized = 0;
147         sub->sub_borrowed = 0;
148
149         /*
150          * First sub-io. Use ->lis_single_subio and current environment, to
151          * avoid dynamic allocation.
152          */
153         if (lio->lis_active_subios == 0) {
154                 sub->sub_io = &lio->lis_single_subio;
155                 lio->lis_single_subio_index = stripe;
156                 sub->sub_env = cl_env_get(&sub->sub_refcheck);
157                 LASSERT(sub->sub_env == env);
158         } else if (lio->lis_mem_frozen) {
159                 LASSERT(mutex_is_locked(&ld->ld_mutex));
160                 sub->sub_io  = &ld->ld_emrg[stripe]->emrg_subio;
161                 sub->sub_env = ld->ld_emrg[stripe]->emrg_env;
162                 sub->sub_borrowed = 1;
163         } else {
164                 void *cookie;
165
166                 /* obtain new environment */
167                 cookie = cl_env_reenter();
168                 sub->sub_env = cl_env_get(&sub->sub_refcheck);
169                 cl_env_reexit(cookie);
170
171                 OBD_ALLOC_PTR(sub->sub_io);
172                 if (IS_ERR(sub->sub_env))
173                         result = PTR_ERR(sub->sub_env);
174                 else if (sub->sub_io == NULL)
175                         result = -ENOMEM;
176         }
177
178         if (result == 0) {
179                 sub_obj = lovsub2cl(lov_r0(lov)->lo_sub[stripe]);
180                 sub_io  = sub->sub_io;
181
182                 sub_io->ci_obj    = sub_obj;
183                 sub_io->ci_result = 0;
184
185                 sub_io->ci_parent  = io;
186                 sub_io->ci_lockreq = io->ci_lockreq;
187                 sub_io->ci_type    = io->ci_type;
188
189                 lov_sub_enter(sub);
190                 result = cl_io_sub_init(sub->sub_env, sub_io,
191                                         io->ci_type, sub_obj);
192                 lov_sub_exit(sub);
193                 if (result >= 0) {
194                         lio->lis_active_subios++;
195                         sub->sub_io_initialized = 1;
196                         result = 0;
197                 }
198         }
199         if (result != 0)
200                 lov_io_sub_fini(env, lio, sub);
201         RETURN(result);
202 }
203
204 static struct lov_io_sub *lov_sub_get(const struct lu_env *env,
205                                       struct lov_io *lio, int stripe)
206 {
207         int rc;
208         struct lov_io_sub *sub = &lio->lis_subs[stripe];
209
210         LASSERT(stripe < lio->lis_stripe_count);
211         ENTRY;
212
213         if (!sub->sub_io_initialized) {
214                 sub->sub_stripe = stripe;
215                 rc = lov_io_sub_init(env, lio, sub);
216         } else
217                 rc = 0;
218         if (rc == 0)
219                 lov_sub_enter(sub);
220         else
221                 sub = ERR_PTR(rc);
222         RETURN(sub);
223 }
224
225 void lov_sub_put(struct lov_io_sub *sub)
226 {
227         lov_sub_exit(sub);
228 }
229
230 /*****************************************************************************
231  *
232  * Lov io operations.
233  *
234  */
235
236 static int lov_page_stripe(const struct cl_page *page)
237 {
238         struct lovsub_object *subobj;
239
240         ENTRY;
241         subobj = lu2lovsub(
242                 lu_object_locate(page->cp_child->cp_obj->co_lu.lo_header,
243                                  &lovsub_device_type));
244         LASSERT(subobj != NULL);
245         RETURN(subobj->lso_index);
246 }
247
248 struct lov_io_sub *lov_page_subio(const struct lu_env *env, struct lov_io *lio,
249                                   const struct cl_page_slice *slice)
250 {
251         struct lov_stripe_md *lsm  = lov_r0(lio->lis_object)->lo_lsm;
252         struct cl_page       *page = slice->cpl_page;
253         int stripe;
254
255         LASSERT(lio->lis_cl.cis_io != NULL);
256         LASSERT(cl2lov(slice->cpl_obj) == lio->lis_object);
257         LASSERT(lsm != NULL);
258         LASSERT(lio->lis_nr_subios > 0);
259         ENTRY;
260
261         stripe = lov_page_stripe(page);
262         RETURN(lov_sub_get(env, lio, stripe));
263 }
264
265
266 static int lov_io_subio_init(const struct lu_env *env, struct lov_io *lio,
267                              struct cl_io *io)
268 {
269         struct lov_object    *lov = lio->lis_object;
270         struct lov_stripe_md *lsm = lov_r0(lov)->lo_lsm;
271         int result;
272
273         LASSERT(lio->lis_object != NULL);
274         ENTRY;
275
276         /*
277          * Need to be optimized, we can't afford to allocate a piece of memory
278          * when writing a page. -jay
279          */
280         OBD_ALLOC(lio->lis_subs,
281                   lsm->lsm_stripe_count * sizeof lio->lis_subs[0]);
282         if (lio->lis_subs != NULL) {
283                 lio->lis_nr_subios = lio->lis_stripe_count;
284                 lio->lis_single_subio_index = -1;
285                 lio->lis_active_subios = 0;
286                 result = 0;
287         } else
288                 result = -ENOMEM;
289         RETURN(result);
290 }
291
292 static void lov_io_slice_init(struct lov_io *lio,
293                               struct lov_object *obj, struct cl_io *io)
294 {
295         struct lov_stripe_md *lsm = lov_r0(obj)->lo_lsm;
296
297         LASSERT(lsm != NULL);
298         ENTRY;
299
300         io->ci_result = 0;
301         lio->lis_object = obj;
302         lio->lis_stripe_count = lsm->lsm_stripe_count;
303
304         switch (io->ci_type) {
305         case CIT_READ:
306         case CIT_WRITE:
307                 lio->lis_pos = io->u.ci_rw.crw_pos;
308                 lio->lis_endpos = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count;
309                 lio->lis_io_endpos = lio->lis_endpos;
310                 if (cl_io_is_append(io)) {
311                         LASSERT(io->ci_type == CIT_WRITE);
312                         lio->lis_pos = 0;
313                         lio->lis_endpos = OBD_OBJECT_EOF;
314                 }
315                 break;
316
317         case CIT_TRUNC:
318                 lio->lis_pos = io->u.ci_truncate.tr_size;
319                 lio->lis_endpos = OBD_OBJECT_EOF;
320                 break;
321
322         case CIT_FAULT: {
323                 pgoff_t index = io->u.ci_fault.ft_index;
324                 lio->lis_pos = cl_offset(io->ci_obj, index);
325                 lio->lis_endpos = cl_offset(io->ci_obj, index + 1);
326                 break;
327         }
328
329         case CIT_MISC:
330                 lio->lis_pos = 0;
331                 lio->lis_endpos = OBD_OBJECT_EOF;
332                 break;
333
334         default:
335                 LBUG();
336         }
337
338         EXIT;
339 }
340
341 static void lov_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
342 {
343         struct lov_io *lio = cl2lov_io(env, ios);
344         int i;
345
346         ENTRY;
347         if (lio->lis_subs != NULL) {
348                 for (i = 0; i < lio->lis_nr_subios; i++)
349                         lov_io_sub_fini(env, lio, &lio->lis_subs[i]);
350                 OBD_FREE(lio->lis_subs,
351                          lio->lis_nr_subios * sizeof lio->lis_subs[0]);
352                 lio->lis_nr_subios = 0;
353         }
354         EXIT;
355 }
356
357 static obd_off lov_offset_mod(obd_off val, int delta)
358 {
359         if (val != OBD_OBJECT_EOF)
360                 val += delta;
361         return val;
362 }
363
364 static int lov_io_iter_init(const struct lu_env *env,
365                             const struct cl_io_slice *ios)
366 {
367         struct lov_io        *lio = cl2lov_io(env, ios);
368         struct lov_stripe_md *lsm = lov_r0(lio->lis_object)->lo_lsm;
369         struct lov_io_sub    *sub;
370         obd_off endpos;
371         obd_off start;
372         obd_off end;
373         int stripe;
374         int rc = 0;
375
376         ENTRY;
377         endpos = lov_offset_mod(lio->lis_endpos, -1);
378         for (stripe = 0; stripe < lio->lis_stripe_count; stripe++) {
379                 if (!lov_stripe_intersects(lsm, stripe, lio->lis_pos,
380                                            endpos, &start, &end))
381                         continue;
382
383                 end = lov_offset_mod(end, +1);
384                 sub = lov_sub_get(env, lio, stripe);
385                 if (!IS_ERR(sub)) {
386                         lov_io_sub_inherit(sub->sub_io, lio, stripe,
387                                            start, end);
388                         rc = cl_io_iter_init(sub->sub_env, sub->sub_io);
389                         lov_sub_put(sub);
390                         CDEBUG(D_VFSTRACE, "shrink: %i [%llu, %llu)\n",
391                                stripe, start, end);
392                 } else
393                         rc = PTR_ERR(sub);
394                 if (!rc)
395                         list_add_tail(&sub->sub_linkage, &lio->lis_active);
396                 else
397                         break;
398         }
399         RETURN(rc);
400 }
401
402 static int lov_io_rw_iter_init(const struct lu_env *env,
403                                const struct cl_io_slice *ios)
404 {
405         struct lov_io        *lio = cl2lov_io(env, ios);
406         struct cl_io         *io  = ios->cis_io;
407         struct lov_stripe_md *lsm = lov_r0(cl2lov(ios->cis_obj))->lo_lsm;
408         loff_t start = io->u.ci_rw.crw_pos;
409         loff_t next;
410         int ssize = lsm->lsm_stripe_size;
411
412         LASSERT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
413         ENTRY;
414
415         /* fast path for common case. */
416         if (lio->lis_nr_subios != 1 && !cl_io_is_append(io)) {
417
418                 do_div(start, ssize);
419                 next = (start + 1) * ssize;
420                 if (next <= start * ssize)
421                         next = ~0ull;
422
423                 io->ci_continue = next < lio->lis_io_endpos;
424                 io->u.ci_rw.crw_count = min_t(loff_t, lio->lis_io_endpos,
425                                               next) - io->u.ci_rw.crw_pos;
426                 lio->lis_pos    = io->u.ci_rw.crw_pos;
427                 lio->lis_endpos = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count;
428                 CDEBUG(D_VFSTRACE, "stripe: %llu chunk: [%llu, %llu) %llu\n",
429                        (__u64)start, lio->lis_pos, lio->lis_endpos,
430                        (__u64)lio->lis_io_endpos);
431         }
432         /*
433          * XXX The following call should be optimized: we know, that
434          * [lio->lis_pos, lio->lis_endpos) intersects with exactly one stripe.
435          */
436         RETURN(lov_io_iter_init(env, ios));
437 }
438
439 static int lov_io_call(const struct lu_env *env, struct lov_io *lio,
440                        int (*iofunc)(const struct lu_env *, struct cl_io *))
441 {
442         struct lov_io_sub *sub;
443         int rc = 0;
444
445         ENTRY;
446         list_for_each_entry(sub, &lio->lis_active, sub_linkage) {
447                 lov_sub_enter(sub);
448                 rc = iofunc(sub->sub_env, sub->sub_io);
449                 lov_sub_exit(sub);
450                 if (rc)
451                         break;
452         }
453         RETURN(rc);
454 }
455
456 static int lov_io_lock(const struct lu_env *env, const struct cl_io_slice *ios)
457 {
458         ENTRY;
459         RETURN(lov_io_call(env, cl2lov_io(env, ios), cl_io_lock));
460 }
461
462 static int lov_io_start(const struct lu_env *env, const struct cl_io_slice *ios)
463 {
464         ENTRY;
465         RETURN(lov_io_call(env, cl2lov_io(env, ios), cl_io_start));
466 }
467
468 static int lov_io_end_wrapper(const struct lu_env *env, struct cl_io *io)
469 {
470         ENTRY;
471         /*
472          * It's possible that lov_io_start() wasn't called against this
473          * sub-io, either because previous sub-io failed, or upper layer
474          * completed IO.
475          */
476         if (io->ci_state == CIS_IO_GOING)
477                 cl_io_end(env, io);
478         else
479                 io->ci_state = CIS_IO_FINISHED;
480         RETURN(0);
481 }
482
483 static int lov_io_iter_fini_wrapper(const struct lu_env *env, struct cl_io *io)
484 {
485         cl_io_iter_fini(env, io);
486         RETURN(0);
487 }
488
489 static int lov_io_unlock_wrapper(const struct lu_env *env, struct cl_io *io)
490 {
491         cl_io_unlock(env, io);
492         RETURN(0);
493 }
494
495 static void lov_io_end(const struct lu_env *env, const struct cl_io_slice *ios)
496 {
497         int rc;
498
499         rc = lov_io_call(env, cl2lov_io(env, ios), lov_io_end_wrapper);
500         LASSERT(rc == 0);
501 }
502
503 static void lov_io_iter_fini(const struct lu_env *env,
504                              const struct cl_io_slice *ios)
505 {
506         struct lov_io *lio = cl2lov_io(env, ios);
507         int rc;
508
509         ENTRY;
510         rc = lov_io_call(env, lio, lov_io_iter_fini_wrapper);
511         LASSERT(rc == 0);
512         while (!list_empty(&lio->lis_active))
513                 list_del_init(lio->lis_active.next);
514         EXIT;
515 }
516
517 static void lov_io_unlock(const struct lu_env *env,
518                           const struct cl_io_slice *ios)
519 {
520         int rc;
521
522         ENTRY;
523         rc = lov_io_call(env, cl2lov_io(env, ios), lov_io_unlock_wrapper);
524         LASSERT(rc == 0);
525         EXIT;
526 }
527
528
529 static struct cl_page_list *lov_io_submit_qin(struct lov_device *ld,
530                                               struct cl_page_list *qin,
531                                               int idx, int alloc)
532 {
533         return alloc ? &qin[idx] : &ld->ld_emrg[idx]->emrg_page_list;
534 }
535
536 /**
537  * lov implementation of cl_operations::cio_submit() method. It takes a list
538  * of pages in \a queue, splits it into per-stripe sub-lists, invokes
539  * cl_io_submit() on underlying devices to submit sub-lists, and then splices
540  * everything back.
541  *
542  * Major complication of this function is a need to handle memory cleansing:
543  * cl_io_submit() is called to write out pages as a part of VM memory
544  * reclamation, and hence it may not fail due to memory shortages (system
545  * dead-locks otherwise). To deal with this, some resources (sub-lists,
546  * sub-environment, etc.) are allocated per-device on "startup" (i.e., in a
547  * not-memory cleansing context), and in case of memory shortage, these
548  * pre-allocated resources are used by lov_io_submit() under
549  * lov_device::ld_mutex mutex.
550  */
551 static int lov_io_submit(const struct lu_env *env,
552                          const struct cl_io_slice *ios,
553                          enum cl_req_type crt, struct cl_2queue *queue)
554 {
555         struct lov_io          *lio = cl2lov_io(env, ios);
556         struct lov_object      *obj = lio->lis_object;
557         struct lov_device       *ld = lu2lov_dev(lov2cl(obj)->co_lu.lo_dev);
558         struct cl_page_list    *qin = &queue->c2_qin;
559         struct cl_2queue      *cl2q = &lov_env_info(env)->lti_cl2q;
560         struct cl_page_list *stripes_qin = NULL;
561         struct cl_page *page;
562         struct cl_page *tmp;
563         int stripe;
564
565 #define QIN(stripe) lov_io_submit_qin(ld, stripes_qin, stripe, alloc)
566
567         int rc = 0;
568         int alloc =
569 #if defined(__KERNEL__) && defined(__linux__)
570                 !(current->flags & PF_MEMALLOC);
571 #else
572                 1;
573 #endif
574         ENTRY;
575         if (lio->lis_active_subios == 1) {
576                 int idx = lio->lis_single_subio_index;
577                 struct lov_io_sub *sub;
578
579                 LASSERT(idx < lio->lis_nr_subios);
580                 sub = lov_sub_get(env, lio, idx);
581                 LASSERT(!IS_ERR(sub));
582                 LASSERT(sub->sub_io == &lio->lis_single_subio);
583                 rc = cl_io_submit_rw(sub->sub_env, sub->sub_io, crt, queue);
584                 lov_sub_put(sub);
585                 RETURN(rc);
586         }
587
588         LASSERT(lio->lis_subs != NULL);
589         if (alloc) {
590                 OBD_ALLOC(stripes_qin,
591                           sizeof(*stripes_qin) * lio->lis_nr_subios);
592                 if (stripes_qin == NULL)
593                         RETURN(-ENOMEM);
594
595                 for (stripe = 0; stripe < lio->lis_nr_subios; stripe++)
596                         cl_page_list_init(&stripes_qin[stripe]);
597         } else {
598                 /*
599                  * If we get here, it means pageout & swap doesn't help.
600                  * In order to not make things worse, even don't try to
601                  * allocate the memory with __GFP_NOWARN. -jay
602                  */
603                 mutex_lock(&ld->ld_mutex);
604                 lio->lis_mem_frozen = 1;
605         }
606
607         cl_2queue_init(cl2q);
608         cl_page_list_for_each_safe(page, tmp, qin) {
609                 stripe = lov_page_stripe(page);
610                 cl_page_list_move(QIN(stripe), qin, page);
611         }
612
613         for (stripe = 0; stripe < lio->lis_nr_subios; stripe++) {
614                 struct lov_io_sub   *sub;
615                 struct cl_page_list *sub_qin = QIN(stripe);
616
617                 if (list_empty(&sub_qin->pl_pages))
618                         continue;
619
620                 cl_page_list_splice(sub_qin, &cl2q->c2_qin);
621                 sub = lov_sub_get(env, lio, stripe);
622                 if (!IS_ERR(sub)) {
623                         rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
624                                              crt, cl2q);
625                         lov_sub_put(sub);
626                 } else
627                         rc = PTR_ERR(sub);
628                 cl_page_list_splice(&cl2q->c2_qin,  &queue->c2_qin);
629                 cl_page_list_splice(&cl2q->c2_qout, &queue->c2_qout);
630                 if (rc != 0)
631                         break;
632         }
633
634         for (stripe = 0; stripe < lio->lis_nr_subios; stripe++) {
635                 struct cl_page_list *sub_qin = QIN(stripe);
636
637                 if (list_empty(&sub_qin->pl_pages))
638                         continue;
639
640                 cl_page_list_splice(sub_qin, qin);
641         }
642
643         if (alloc) {
644                 OBD_FREE(stripes_qin,
645                          sizeof(*stripes_qin) * lio->lis_nr_subios);
646         } else {
647                 int i;
648
649                 for (i = 0; i < lio->lis_nr_subios; i++) {
650                         struct cl_io *cio = lio->lis_subs[i].sub_io;
651
652                         if (cio && cio == &ld->ld_emrg[i]->emrg_subio)
653                                 lov_io_sub_fini(env, lio, &lio->lis_subs[i]);
654                 }
655                 lio->lis_mem_frozen = 0;
656                 mutex_unlock(&ld->ld_mutex);
657         }
658
659         RETURN(rc);
660 #undef QIN
661 }
662
663 static int lov_io_prepare_write(const struct lu_env *env,
664                                 const struct cl_io_slice *ios,
665                                 const struct cl_page_slice *slice,
666                                 unsigned from, unsigned to)
667 {
668         struct lov_io     *lio      = cl2lov_io(env, ios);
669         struct cl_page    *sub_page = lov_sub_page(slice);
670         struct lov_io_sub *sub;
671         int result;
672
673         ENTRY;
674         sub = lov_page_subio(env, lio, slice);
675         if (!IS_ERR(sub)) {
676                 result = cl_io_prepare_write(sub->sub_env, sub->sub_io,
677                                              sub_page, from, to);
678                 lov_sub_put(sub);
679         } else
680                 result = PTR_ERR(sub);
681         RETURN(result);
682 }
683
684 static int lov_io_commit_write(const struct lu_env *env,
685                                const struct cl_io_slice *ios,
686                                const struct cl_page_slice *slice,
687                                unsigned from, unsigned to)
688 {
689         struct lov_io     *lio      = cl2lov_io(env, ios);
690         struct cl_page    *sub_page = lov_sub_page(slice);
691         struct lov_io_sub *sub;
692         int result;
693
694         ENTRY;
695         sub = lov_page_subio(env, lio, slice);
696         if (!IS_ERR(sub)) {
697                 result = cl_io_commit_write(sub->sub_env, sub->sub_io,
698                                             sub_page, from, to);
699                 lov_sub_put(sub);
700         } else
701                 result = PTR_ERR(sub);
702         RETURN(result);
703 }
704
705 static int lov_io_fault_start(const struct lu_env *env,
706                               const struct cl_io_slice *ios)
707 {
708         struct cl_fault_io *fio;
709         struct lov_io      *lio;
710         struct lov_io_sub  *sub;
711
712         ENTRY;
713         fio = &ios->cis_io->u.ci_fault;
714         lio = cl2lov_io(env, ios);
715         sub = lov_sub_get(env, lio, lov_page_stripe(fio->ft_page));
716         sub->sub_io->u.ci_fault.ft_nob = fio->ft_nob;
717         lov_sub_put(sub);
718         RETURN(lov_io_start(env, ios));
719 }
720
721 static const struct cl_io_operations lov_io_ops = {
722         .op = {
723                 [CIT_READ] = {
724                         .cio_fini      = lov_io_fini,
725                         .cio_iter_init = lov_io_rw_iter_init,
726                         .cio_iter_fini = lov_io_iter_fini,
727                         .cio_lock      = lov_io_lock,
728                         .cio_unlock    = lov_io_unlock,
729                         .cio_start     = lov_io_start,
730                         .cio_end       = lov_io_end
731                 },
732                 [CIT_WRITE] = {
733                         .cio_fini      = lov_io_fini,
734                         .cio_iter_init = lov_io_rw_iter_init,
735                         .cio_iter_fini = lov_io_iter_fini,
736                         .cio_lock      = lov_io_lock,
737                         .cio_unlock    = lov_io_unlock,
738                         .cio_start     = lov_io_start,
739                         .cio_end       = lov_io_end
740                 },
741                 [CIT_TRUNC] = {
742                         .cio_fini      = lov_io_fini,
743                         .cio_iter_init = lov_io_iter_init,
744                         .cio_iter_fini = lov_io_iter_fini,
745                         .cio_lock      = lov_io_lock,
746                         .cio_unlock    = lov_io_unlock,
747                         .cio_start     = lov_io_start,
748                         .cio_end       = lov_io_end
749                 },
750                 [CIT_FAULT] = {
751                         .cio_fini      = lov_io_fini,
752                         .cio_iter_init = lov_io_iter_init,
753                         .cio_iter_fini = lov_io_iter_fini,
754                         .cio_lock      = lov_io_lock,
755                         .cio_unlock    = lov_io_unlock,
756                         .cio_start     = lov_io_fault_start,
757                         .cio_end       = lov_io_end
758                 },
759                 [CIT_MISC] = {
760                         .cio_fini   = lov_io_fini
761                 }
762         },
763         .req_op = {
764                  [CRT_READ] = {
765                          .cio_submit    = lov_io_submit
766                  },
767                  [CRT_WRITE] = {
768                          .cio_submit    = lov_io_submit
769                  }
770          },
771         .cio_prepare_write = lov_io_prepare_write,
772         .cio_commit_write  = lov_io_commit_write
773 };
774
775 /*****************************************************************************
776  *
777  * Empty lov io operations.
778  *
779  */
780
781 static void lov_empty_io_fini(const struct lu_env *env,
782                               const struct cl_io_slice *ios)
783 {
784         ENTRY;
785         EXIT;
786 }
787
788 static void lov_empty_impossible(const struct lu_env *env,
789                                  struct cl_io_slice *ios)
790 {
791         LBUG();
792 }
793
794 #define LOV_EMPTY_IMPOSSIBLE ((void *)lov_empty_impossible)
795
796 /**
797  * An io operation vector for files without stripes.
798  */
799 static const struct cl_io_operations lov_empty_io_ops = {
800         .op = {
801                 [CIT_READ] = {
802 #if 0
803                         .cio_fini       = lov_empty_io_fini,
804                         .cio_iter_init  = LOV_EMPTY_IMPOSSIBLE,
805                         .cio_lock       = LOV_EMPTY_IMPOSSIBLE,
806                         .cio_start      = LOV_EMPTY_IMPOSSIBLE,
807                         .cio_end        = LOV_EMPTY_IMPOSSIBLE
808 #endif
809                 },
810                 [CIT_WRITE] = {
811                         .cio_fini      = lov_empty_io_fini,
812                         .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
813                         .cio_lock      = LOV_EMPTY_IMPOSSIBLE,
814                         .cio_start     = LOV_EMPTY_IMPOSSIBLE,
815                         .cio_end       = LOV_EMPTY_IMPOSSIBLE
816                 },
817                 [CIT_TRUNC] = {
818                         .cio_fini      = lov_empty_io_fini,
819                         .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
820                         .cio_lock      = LOV_EMPTY_IMPOSSIBLE,
821                         .cio_start     = LOV_EMPTY_IMPOSSIBLE,
822                         .cio_end       = LOV_EMPTY_IMPOSSIBLE
823                 },
824                 [CIT_FAULT] = {
825                         .cio_fini      = lov_empty_io_fini,
826                         .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
827                         .cio_lock      = LOV_EMPTY_IMPOSSIBLE,
828                         .cio_start     = LOV_EMPTY_IMPOSSIBLE,
829                         .cio_end       = LOV_EMPTY_IMPOSSIBLE
830                 },
831                 [CIT_MISC] = {
832                         .cio_fini   = lov_empty_io_fini
833                 }
834         },
835         .req_op = {
836                  [CRT_READ] = {
837                          .cio_submit    = LOV_EMPTY_IMPOSSIBLE
838                  },
839                  [CRT_WRITE] = {
840                          .cio_submit    = LOV_EMPTY_IMPOSSIBLE
841                  }
842          },
843         .cio_commit_write = LOV_EMPTY_IMPOSSIBLE
844 };
845
846 int lov_io_init_raid0(const struct lu_env *env, struct cl_object *obj,
847                       struct cl_io *io)
848 {
849         struct lov_io       *lio = lov_env_io(env);
850         struct lov_object   *lov = cl2lov(obj);
851
852         ENTRY;
853         CFS_INIT_LIST_HEAD(&lio->lis_active);
854         lov_io_slice_init(lio, lov, io);
855         if (io->ci_result == 0) {
856                 LASSERT(lov_r0(lov)->lo_lsm != NULL);
857                 io->ci_result = lov_io_subio_init(env, lio, io);
858                 if (io->ci_result == 0)
859                         cl_io_slice_add(io, &lio->lis_cl, obj, &lov_io_ops);
860         }
861         RETURN(io->ci_result);
862 }
863
864 int lov_io_init_empty(const struct lu_env *env, struct cl_object *obj,
865                       struct cl_io *io)
866 {
867         struct lov_io *lio = lov_env_io(env);
868         int result;
869
870         ENTRY;
871         switch (io->ci_type) {
872         default:
873                 LBUG();
874         case CIT_MISC:
875         case CIT_READ:
876                 result = 0;
877                 break;
878         case CIT_WRITE:
879         case CIT_TRUNC:
880                 result = -EBADF;
881                 break;
882         case CIT_FAULT:
883                 result = -EFAULT;
884                 CERROR("Page fault on a file without stripes: "DFID"\n",
885                        PFID(lu_object_fid(&obj->co_lu)));
886                 break;
887         }
888         if (result == 0)
889                 cl_io_slice_add(io, &lio->lis_cl, obj, &lov_empty_io_ops);
890         io->ci_result = result;
891         RETURN(result != 0);
892 }
893
894 /** @} lov */