Whamcloud - gitweb
f3d88930a22be681530a0b6e477c0c2e6b3f7178
[fs/lustre-release.git] / lustre / lov / lov_io.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Implementation of cl_io for LOV layer.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_LOV
42
43 #include "lov_cl_internal.h"
44
45 /** \addtogroup lov lov @{ */
46
47 static void lov_sub_enter(struct lov_io_sub *sub)
48 {
49         ENTRY;
50         if (sub->sub_reenter++ == 0) {
51                 sub->sub_cookie = cl_env_reenter();
52                 cl_env_implant(sub->sub_env, &sub->sub_refcheck2);
53         }
54         EXIT;
55 }
56
57 static void lov_sub_exit(struct lov_io_sub *sub)
58 {
59         ENTRY;
60         if (--sub->sub_reenter == 0) {
61                 cl_env_unplant(sub->sub_env, &sub->sub_refcheck2);
62                 cl_env_reexit(sub->sub_cookie);
63         }
64         EXIT;
65 }
66
67 static void lov_io_sub_fini(const struct lu_env *env, struct lov_io *lio,
68                             struct lov_io_sub *sub)
69 {
70         ENTRY;
71         if (sub->sub_io != NULL) {
72                 if (sub->sub_io_initialized) {
73                         lov_sub_enter(sub);
74                         cl_io_fini(sub->sub_env, sub->sub_io);
75                         lov_sub_exit(sub);
76                         sub->sub_io_initialized = 0;
77                         lio->lis_active_subios--;
78                 }
79                 if (sub->sub_stripe == lio->lis_single_subio_index)
80                         lio->lis_single_subio_index = -1;
81                 else if (!sub->sub_borrowed)
82                         OBD_FREE_PTR(sub->sub_io);
83                 sub->sub_io = NULL;
84         }
85         if (sub->sub_env != NULL && !IS_ERR(sub->sub_env)) {
86                 if (!sub->sub_borrowed)
87                         cl_env_put(sub->sub_env, &sub->sub_refcheck);
88                 sub->sub_env = NULL;
89         }
90         EXIT;
91 }
92
93 static void lov_io_sub_inherit(struct cl_io *io, struct lov_io *lio,
94                                int stripe, loff_t start, loff_t end)
95 {
96         struct lov_stripe_md *lsm    = lov_r0(lio->lis_object)->lo_lsm;
97         struct cl_io         *parent = lio->lis_cl.cis_io;
98
99         switch(io->ci_type) {
100         case CIT_TRUNC: {
101                 size_t new_size = parent->u.ci_truncate.tr_size;
102
103                 new_size = lov_size_to_stripe(lsm, new_size, stripe);
104                 io->u.ci_truncate.tr_capa = parent->u.ci_truncate.tr_capa;
105                 io->u.ci_truncate.tr_size = new_size;
106                 break;
107         }
108         case CIT_FAULT: {
109                 struct cl_object *obj = parent->ci_obj;
110                 loff_t off = cl_offset(obj, parent->u.ci_fault.ft_index);
111
112                 io->u.ci_fault = parent->u.ci_fault;
113                 off = lov_size_to_stripe(lsm, off, stripe);
114                 io->u.ci_fault.ft_index = cl_index(obj, off);
115                 break;
116         }
117         case CIT_READ:
118         case CIT_WRITE: {
119                 io->u.ci_rw.crw_pos = start;
120                 io->u.ci_rw.crw_count = end - start;
121                 break;
122         }
123         default:
124                 break;
125         }
126 }
127
128 static int lov_io_sub_init(const struct lu_env *env, struct lov_io *lio,
129                            struct lov_io_sub *sub)
130 {
131         struct lov_object *lov = lio->lis_object;
132         struct lov_device *ld  = lu2lov_dev(lov2cl(lov)->co_lu.lo_dev);
133         struct cl_io      *sub_io;
134         struct cl_object  *sub_obj;
135         struct cl_io      *io  = lio->lis_cl.cis_io;
136
137         int stripe = sub->sub_stripe;
138         int result;
139
140         LASSERT(sub->sub_io == NULL);
141         LASSERT(sub->sub_env == NULL);
142         LASSERT(sub->sub_stripe < lio->lis_stripe_count);
143         ENTRY;
144
145         result = 0;
146         sub->sub_io_initialized = 0;
147         sub->sub_borrowed = 0;
148
149         /*
150          * First sub-io. Use ->lis_single_subio and current environment, to
151          * avoid dynamic allocation.
152          */
153         if (lio->lis_active_subios == 0) {
154                 sub->sub_io = &lio->lis_single_subio;
155                 lio->lis_single_subio_index = stripe;
156                 sub->sub_env = cl_env_get(&sub->sub_refcheck);
157                 LASSERT(sub->sub_env == env);
158         } else if (lio->lis_mem_frozen) {
159                 LASSERT(mutex_is_locked(&ld->ld_mutex));
160                 sub->sub_io  = &ld->ld_emrg[stripe]->emrg_subio;
161                 sub->sub_env = ld->ld_emrg[stripe]->emrg_env;
162                 sub->sub_borrowed = 1;
163         } else {
164                 void *cookie;
165
166                 /* obtain new environment */
167                 cookie = cl_env_reenter();
168                 sub->sub_env = cl_env_get(&sub->sub_refcheck);
169                 cl_env_reexit(cookie);
170
171                 OBD_ALLOC_PTR(sub->sub_io);
172                 if (IS_ERR(sub->sub_env))
173                         result = PTR_ERR(sub->sub_env);
174                 else if (sub->sub_io == NULL)
175                         result = -ENOMEM;
176         }
177
178         if (result == 0) {
179                 sub_obj = lovsub2cl(lov_r0(lov)->lo_sub[stripe]);
180                 sub_io  = sub->sub_io;
181
182                 sub_io->ci_obj    = sub_obj;
183                 sub_io->ci_result = 0;
184
185                 sub_io->ci_parent  = io;
186                 sub_io->ci_lockreq = io->ci_lockreq;
187                 sub_io->ci_type    = io->ci_type;
188                 sub_io->ci_no_srvlock = io->ci_no_srvlock;
189
190                 lov_sub_enter(sub);
191                 result = cl_io_sub_init(sub->sub_env, sub_io,
192                                         io->ci_type, sub_obj);
193                 lov_sub_exit(sub);
194                 if (result >= 0) {
195                         lio->lis_active_subios++;
196                         sub->sub_io_initialized = 1;
197                         result = 0;
198                 }
199         }
200         if (result != 0)
201                 lov_io_sub_fini(env, lio, sub);
202         RETURN(result);
203 }
204
205 struct lov_io_sub *lov_sub_get(const struct lu_env *env,
206                                struct lov_io *lio, int stripe)
207 {
208         int rc;
209         struct lov_io_sub *sub = &lio->lis_subs[stripe];
210
211         LASSERT(stripe < lio->lis_stripe_count);
212         ENTRY;
213
214         if (!sub->sub_io_initialized) {
215                 sub->sub_stripe = stripe;
216                 rc = lov_io_sub_init(env, lio, sub);
217         } else
218                 rc = 0;
219         if (rc == 0)
220                 lov_sub_enter(sub);
221         else
222                 sub = ERR_PTR(rc);
223         RETURN(sub);
224 }
225
226 void lov_sub_put(struct lov_io_sub *sub)
227 {
228         lov_sub_exit(sub);
229 }
230
231 /*****************************************************************************
232  *
233  * Lov io operations.
234  *
235  */
236
237 static int lov_page_stripe(const struct cl_page *page)
238 {
239         struct lovsub_object *subobj;
240
241         ENTRY;
242         subobj = lu2lovsub(
243                 lu_object_locate(page->cp_child->cp_obj->co_lu.lo_header,
244                                  &lovsub_device_type));
245         LASSERT(subobj != NULL);
246         RETURN(subobj->lso_index);
247 }
248
249 struct lov_io_sub *lov_page_subio(const struct lu_env *env, struct lov_io *lio,
250                                   const struct cl_page_slice *slice)
251 {
252         struct lov_stripe_md *lsm  = lov_r0(lio->lis_object)->lo_lsm;
253         struct cl_page       *page = slice->cpl_page;
254         int stripe;
255
256         LASSERT(lio->lis_cl.cis_io != NULL);
257         LASSERT(cl2lov(slice->cpl_obj) == lio->lis_object);
258         LASSERT(lsm != NULL);
259         LASSERT(lio->lis_nr_subios > 0);
260         ENTRY;
261
262         stripe = lov_page_stripe(page);
263         RETURN(lov_sub_get(env, lio, stripe));
264 }
265
266
267 static int lov_io_subio_init(const struct lu_env *env, struct lov_io *lio,
268                              struct cl_io *io)
269 {
270         struct lov_object    *lov = lio->lis_object;
271         struct lov_stripe_md *lsm = lov_r0(lov)->lo_lsm;
272         int result;
273
274         LASSERT(lio->lis_object != NULL);
275         ENTRY;
276
277         /*
278          * Need to be optimized, we can't afford to allocate a piece of memory
279          * when writing a page. -jay
280          */
281         OBD_ALLOC(lio->lis_subs,
282                   lsm->lsm_stripe_count * sizeof lio->lis_subs[0]);
283         if (lio->lis_subs != NULL) {
284                 lio->lis_nr_subios = lio->lis_stripe_count;
285                 lio->lis_single_subio_index = -1;
286                 lio->lis_active_subios = 0;
287                 result = 0;
288         } else
289                 result = -ENOMEM;
290         RETURN(result);
291 }
292
293 static void lov_io_slice_init(struct lov_io *lio,
294                               struct lov_object *obj, struct cl_io *io)
295 {
296         struct lov_stripe_md *lsm = lov_r0(obj)->lo_lsm;
297
298         LASSERT(lsm != NULL);
299         ENTRY;
300
301         io->ci_result = 0;
302         lio->lis_object = obj;
303         lio->lis_stripe_count = lsm->lsm_stripe_count;
304
305         switch (io->ci_type) {
306         case CIT_READ:
307         case CIT_WRITE:
308                 lio->lis_pos = io->u.ci_rw.crw_pos;
309                 lio->lis_endpos = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count;
310                 lio->lis_io_endpos = lio->lis_endpos;
311                 if (cl_io_is_append(io)) {
312                         LASSERT(io->ci_type == CIT_WRITE);
313                         lio->lis_pos = 0;
314                         lio->lis_endpos = OBD_OBJECT_EOF;
315                 }
316                 break;
317
318         case CIT_TRUNC:
319                 lio->lis_pos = io->u.ci_truncate.tr_size;
320                 lio->lis_endpos = OBD_OBJECT_EOF;
321                 break;
322
323         case CIT_FAULT: {
324                 pgoff_t index = io->u.ci_fault.ft_index;
325                 lio->lis_pos = cl_offset(io->ci_obj, index);
326                 lio->lis_endpos = cl_offset(io->ci_obj, index + 1);
327                 break;
328         }
329
330         case CIT_MISC:
331                 lio->lis_pos = 0;
332                 lio->lis_endpos = OBD_OBJECT_EOF;
333                 break;
334
335         default:
336                 LBUG();
337         }
338
339         EXIT;
340 }
341
342 static void lov_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
343 {
344         struct lov_io *lio = cl2lov_io(env, ios);
345         int i;
346
347         ENTRY;
348         if (lio->lis_subs != NULL) {
349                 for (i = 0; i < lio->lis_nr_subios; i++)
350                         lov_io_sub_fini(env, lio, &lio->lis_subs[i]);
351                 OBD_FREE(lio->lis_subs,
352                          lio->lis_nr_subios * sizeof lio->lis_subs[0]);
353                 lio->lis_nr_subios = 0;
354         }
355         EXIT;
356 }
357
358 static obd_off lov_offset_mod(obd_off val, int delta)
359 {
360         if (val != OBD_OBJECT_EOF)
361                 val += delta;
362         return val;
363 }
364
365 static int lov_io_iter_init(const struct lu_env *env,
366                             const struct cl_io_slice *ios)
367 {
368         struct lov_io        *lio = cl2lov_io(env, ios);
369         struct lov_stripe_md *lsm = lov_r0(lio->lis_object)->lo_lsm;
370         struct lov_io_sub    *sub;
371         obd_off endpos;
372         obd_off start;
373         obd_off end;
374         int stripe;
375         int rc = 0;
376
377         ENTRY;
378         endpos = lov_offset_mod(lio->lis_endpos, -1);
379         for (stripe = 0; stripe < lio->lis_stripe_count; stripe++) {
380                 if (!lov_stripe_intersects(lsm, stripe, lio->lis_pos,
381                                            endpos, &start, &end))
382                         continue;
383
384                 end = lov_offset_mod(end, +1);
385                 sub = lov_sub_get(env, lio, stripe);
386                 if (!IS_ERR(sub)) {
387                         lov_io_sub_inherit(sub->sub_io, lio, stripe,
388                                            start, end);
389                         rc = cl_io_iter_init(sub->sub_env, sub->sub_io);
390                         lov_sub_put(sub);
391                         CDEBUG(D_VFSTRACE, "shrink: %i [%llu, %llu)\n",
392                                stripe, start, end);
393                 } else
394                         rc = PTR_ERR(sub);
395
396                 if (!rc)
397                         list_add_tail(&sub->sub_linkage, &lio->lis_active);
398                 else
399                         break;
400         }
401         RETURN(rc);
402 }
403
404 static int lov_io_rw_iter_init(const struct lu_env *env,
405                                const struct cl_io_slice *ios)
406 {
407         struct lov_io        *lio = cl2lov_io(env, ios);
408         struct cl_io         *io  = ios->cis_io;
409         struct lov_stripe_md *lsm = lov_r0(cl2lov(ios->cis_obj))->lo_lsm;
410         loff_t start = io->u.ci_rw.crw_pos;
411         loff_t next;
412         int ssize = lsm->lsm_stripe_size;
413
414         LASSERT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
415         ENTRY;
416
417         /* fast path for common case. */
418         if (lio->lis_nr_subios != 1 && !cl_io_is_append(io)) {
419
420                 do_div(start, ssize);
421                 next = (start + 1) * ssize;
422                 if (next <= start * ssize)
423                         next = ~0ull;
424
425                 io->ci_continue = next < lio->lis_io_endpos;
426                 io->u.ci_rw.crw_count = min_t(loff_t, lio->lis_io_endpos,
427                                               next) - io->u.ci_rw.crw_pos;
428                 lio->lis_pos    = io->u.ci_rw.crw_pos;
429                 lio->lis_endpos = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count;
430                 CDEBUG(D_VFSTRACE, "stripe: %llu chunk: [%llu, %llu) %llu\n",
431                        (__u64)start, lio->lis_pos, lio->lis_endpos,
432                        (__u64)lio->lis_io_endpos);
433         }
434         /*
435          * XXX The following call should be optimized: we know, that
436          * [lio->lis_pos, lio->lis_endpos) intersects with exactly one stripe.
437          */
438         RETURN(lov_io_iter_init(env, ios));
439 }
440
441 static int lov_io_call(const struct lu_env *env, struct lov_io *lio,
442                        int (*iofunc)(const struct lu_env *, struct cl_io *))
443 {
444         struct lov_io_sub *sub;
445         int rc = 0;
446
447         ENTRY;
448         list_for_each_entry(sub, &lio->lis_active, sub_linkage) {
449                 lov_sub_enter(sub);
450                 rc = iofunc(sub->sub_env, sub->sub_io);
451                 lov_sub_exit(sub);
452                 if (rc)
453                         break;
454         }
455         RETURN(rc);
456 }
457
458 static int lov_io_lock(const struct lu_env *env, const struct cl_io_slice *ios)
459 {
460         ENTRY;
461         RETURN(lov_io_call(env, cl2lov_io(env, ios), cl_io_lock));
462 }
463
464 static int lov_io_start(const struct lu_env *env, const struct cl_io_slice *ios)
465 {
466         ENTRY;
467         RETURN(lov_io_call(env, cl2lov_io(env, ios), cl_io_start));
468 }
469
470 static int lov_io_end_wrapper(const struct lu_env *env, struct cl_io *io)
471 {
472         ENTRY;
473         /*
474          * It's possible that lov_io_start() wasn't called against this
475          * sub-io, either because previous sub-io failed, or upper layer
476          * completed IO.
477          */
478         if (io->ci_state == CIS_IO_GOING)
479                 cl_io_end(env, io);
480         else
481                 io->ci_state = CIS_IO_FINISHED;
482         RETURN(0);
483 }
484
485 static int lov_io_iter_fini_wrapper(const struct lu_env *env, struct cl_io *io)
486 {
487         cl_io_iter_fini(env, io);
488         RETURN(0);
489 }
490
491 static int lov_io_unlock_wrapper(const struct lu_env *env, struct cl_io *io)
492 {
493         cl_io_unlock(env, io);
494         RETURN(0);
495 }
496
497 static void lov_io_end(const struct lu_env *env, const struct cl_io_slice *ios)
498 {
499         int rc;
500
501         rc = lov_io_call(env, cl2lov_io(env, ios), lov_io_end_wrapper);
502         LASSERT(rc == 0);
503 }
504
505 static void lov_io_iter_fini(const struct lu_env *env,
506                              const struct cl_io_slice *ios)
507 {
508         struct lov_io *lio = cl2lov_io(env, ios);
509         int rc;
510
511         ENTRY;
512         rc = lov_io_call(env, lio, lov_io_iter_fini_wrapper);
513         LASSERT(rc == 0);
514         while (!list_empty(&lio->lis_active))
515                 list_del_init(lio->lis_active.next);
516         EXIT;
517 }
518
519 static void lov_io_unlock(const struct lu_env *env,
520                           const struct cl_io_slice *ios)
521 {
522         int rc;
523
524         ENTRY;
525         rc = lov_io_call(env, cl2lov_io(env, ios), lov_io_unlock_wrapper);
526         LASSERT(rc == 0);
527         EXIT;
528 }
529
530
531 static struct cl_page_list *lov_io_submit_qin(struct lov_device *ld,
532                                               struct cl_page_list *qin,
533                                               int idx, int alloc)
534 {
535         return alloc ? &qin[idx] : &ld->ld_emrg[idx]->emrg_page_list;
536 }
537
538 /**
539  * lov implementation of cl_operations::cio_submit() method. It takes a list
540  * of pages in \a queue, splits it into per-stripe sub-lists, invokes
541  * cl_io_submit() on underlying devices to submit sub-lists, and then splices
542  * everything back.
543  *
544  * Major complication of this function is a need to handle memory cleansing:
545  * cl_io_submit() is called to write out pages as a part of VM memory
546  * reclamation, and hence it may not fail due to memory shortages (system
547  * dead-locks otherwise). To deal with this, some resources (sub-lists,
548  * sub-environment, etc.) are allocated per-device on "startup" (i.e., in a
549  * not-memory cleansing context), and in case of memory shortage, these
550  * pre-allocated resources are used by lov_io_submit() under
551  * lov_device::ld_mutex mutex.
552  */
553 static int lov_io_submit(const struct lu_env *env,
554                          const struct cl_io_slice *ios,
555                          enum cl_req_type crt, struct cl_2queue *queue,
556                          enum cl_req_priority priority)
557 {
558         struct lov_io          *lio = cl2lov_io(env, ios);
559         struct lov_object      *obj = lio->lis_object;
560         struct lov_device       *ld = lu2lov_dev(lov2cl(obj)->co_lu.lo_dev);
561         struct cl_page_list    *qin = &queue->c2_qin;
562         struct cl_2queue      *cl2q = &lov_env_info(env)->lti_cl2q;
563         struct cl_page_list *stripes_qin = NULL;
564         struct cl_page *page;
565         struct cl_page *tmp;
566         int stripe;
567
568 #define QIN(stripe) lov_io_submit_qin(ld, stripes_qin, stripe, alloc)
569
570         int rc = 0;
571         int alloc =
572 #if defined(__KERNEL__) && defined(__linux__)
573                 !(current->flags & PF_MEMALLOC);
574 #else
575                 1;
576 #endif
577         ENTRY;
578         if (lio->lis_active_subios == 1) {
579                 int idx = lio->lis_single_subio_index;
580                 struct lov_io_sub *sub;
581
582                 LASSERT(idx < lio->lis_nr_subios);
583                 sub = lov_sub_get(env, lio, idx);
584                 LASSERT(!IS_ERR(sub));
585                 LASSERT(sub->sub_io == &lio->lis_single_subio);
586                 rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
587                                      crt, queue, priority);
588                 lov_sub_put(sub);
589                 RETURN(rc);
590         }
591
592         LASSERT(lio->lis_subs != NULL);
593         if (alloc) {
594                 OBD_ALLOC(stripes_qin,
595                           sizeof(*stripes_qin) * lio->lis_nr_subios);
596                 if (stripes_qin == NULL)
597                         RETURN(-ENOMEM);
598
599                 for (stripe = 0; stripe < lio->lis_nr_subios; stripe++)
600                         cl_page_list_init(&stripes_qin[stripe]);
601         } else {
602                 /*
603                  * If we get here, it means pageout & swap doesn't help.
604                  * In order to not make things worse, even don't try to
605                  * allocate the memory with __GFP_NOWARN. -jay
606                  */
607                 mutex_lock(&ld->ld_mutex);
608                 lio->lis_mem_frozen = 1;
609         }
610
611         cl_2queue_init(cl2q);
612         cl_page_list_for_each_safe(page, tmp, qin) {
613                 stripe = lov_page_stripe(page);
614                 cl_page_list_move(QIN(stripe), qin, page);
615         }
616
617         for (stripe = 0; stripe < lio->lis_nr_subios; stripe++) {
618                 struct lov_io_sub   *sub;
619                 struct cl_page_list *sub_qin = QIN(stripe);
620
621                 if (list_empty(&sub_qin->pl_pages))
622                         continue;
623
624                 cl_page_list_splice(sub_qin, &cl2q->c2_qin);
625                 sub = lov_sub_get(env, lio, stripe);
626                 if (!IS_ERR(sub)) {
627                         rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
628                                              crt, cl2q, priority);
629                         lov_sub_put(sub);
630                 } else
631                         rc = PTR_ERR(sub);
632                 cl_page_list_splice(&cl2q->c2_qin,  &queue->c2_qin);
633                 cl_page_list_splice(&cl2q->c2_qout, &queue->c2_qout);
634                 if (rc != 0)
635                         break;
636         }
637
638         for (stripe = 0; stripe < lio->lis_nr_subios; stripe++) {
639                 struct cl_page_list *sub_qin = QIN(stripe);
640
641                 if (list_empty(&sub_qin->pl_pages))
642                         continue;
643
644                 cl_page_list_splice(sub_qin, qin);
645         }
646
647         if (alloc) {
648                 OBD_FREE(stripes_qin,
649                          sizeof(*stripes_qin) * lio->lis_nr_subios);
650         } else {
651                 int i;
652
653                 for (i = 0; i < lio->lis_nr_subios; i++) {
654                         struct cl_io *cio = lio->lis_subs[i].sub_io;
655
656                         if (cio && cio == &ld->ld_emrg[i]->emrg_subio)
657                                 lov_io_sub_fini(env, lio, &lio->lis_subs[i]);
658                 }
659                 lio->lis_mem_frozen = 0;
660                 mutex_unlock(&ld->ld_mutex);
661         }
662
663         RETURN(rc);
664 #undef QIN
665 }
666
667 static int lov_io_prepare_write(const struct lu_env *env,
668                                 const struct cl_io_slice *ios,
669                                 const struct cl_page_slice *slice,
670                                 unsigned from, unsigned to)
671 {
672         struct lov_io     *lio      = cl2lov_io(env, ios);
673         struct cl_page    *sub_page = lov_sub_page(slice);
674         struct lov_io_sub *sub;
675         int result;
676
677         ENTRY;
678         sub = lov_page_subio(env, lio, slice);
679         if (!IS_ERR(sub)) {
680                 result = cl_io_prepare_write(sub->sub_env, sub->sub_io,
681                                              sub_page, from, to);
682                 lov_sub_put(sub);
683         } else
684                 result = PTR_ERR(sub);
685         RETURN(result);
686 }
687
688 static int lov_io_commit_write(const struct lu_env *env,
689                                const struct cl_io_slice *ios,
690                                const struct cl_page_slice *slice,
691                                unsigned from, unsigned to)
692 {
693         struct lov_io     *lio      = cl2lov_io(env, ios);
694         struct cl_page    *sub_page = lov_sub_page(slice);
695         struct lov_io_sub *sub;
696         int result;
697
698         ENTRY;
699         sub = lov_page_subio(env, lio, slice);
700         if (!IS_ERR(sub)) {
701                 result = cl_io_commit_write(sub->sub_env, sub->sub_io,
702                                             sub_page, from, to);
703                 lov_sub_put(sub);
704         } else
705                 result = PTR_ERR(sub);
706         RETURN(result);
707 }
708
709 static int lov_io_fault_start(const struct lu_env *env,
710                               const struct cl_io_slice *ios)
711 {
712         struct cl_fault_io *fio;
713         struct lov_io      *lio;
714         struct lov_io_sub  *sub;
715
716         ENTRY;
717         fio = &ios->cis_io->u.ci_fault;
718         lio = cl2lov_io(env, ios);
719         sub = lov_sub_get(env, lio, lov_page_stripe(fio->ft_page));
720         sub->sub_io->u.ci_fault.ft_nob = fio->ft_nob;
721         lov_sub_put(sub);
722         RETURN(lov_io_start(env, ios));
723 }
724
725 static const struct cl_io_operations lov_io_ops = {
726         .op = {
727                 [CIT_READ] = {
728                         .cio_fini      = lov_io_fini,
729                         .cio_iter_init = lov_io_rw_iter_init,
730                         .cio_iter_fini = lov_io_iter_fini,
731                         .cio_lock      = lov_io_lock,
732                         .cio_unlock    = lov_io_unlock,
733                         .cio_start     = lov_io_start,
734                         .cio_end       = lov_io_end
735                 },
736                 [CIT_WRITE] = {
737                         .cio_fini      = lov_io_fini,
738                         .cio_iter_init = lov_io_rw_iter_init,
739                         .cio_iter_fini = lov_io_iter_fini,
740                         .cio_lock      = lov_io_lock,
741                         .cio_unlock    = lov_io_unlock,
742                         .cio_start     = lov_io_start,
743                         .cio_end       = lov_io_end
744                 },
745                 [CIT_TRUNC] = {
746                         .cio_fini      = lov_io_fini,
747                         .cio_iter_init = lov_io_iter_init,
748                         .cio_iter_fini = lov_io_iter_fini,
749                         .cio_lock      = lov_io_lock,
750                         .cio_unlock    = lov_io_unlock,
751                         .cio_start     = lov_io_start,
752                         .cio_end       = lov_io_end
753                 },
754                 [CIT_FAULT] = {
755                         .cio_fini      = lov_io_fini,
756                         .cio_iter_init = lov_io_iter_init,
757                         .cio_iter_fini = lov_io_iter_fini,
758                         .cio_lock      = lov_io_lock,
759                         .cio_unlock    = lov_io_unlock,
760                         .cio_start     = lov_io_fault_start,
761                         .cio_end       = lov_io_end
762                 },
763                 [CIT_MISC] = {
764                         .cio_fini   = lov_io_fini
765                 }
766         },
767         .req_op = {
768                  [CRT_READ] = {
769                          .cio_submit    = lov_io_submit
770                  },
771                  [CRT_WRITE] = {
772                          .cio_submit    = lov_io_submit
773                  }
774          },
775         .cio_prepare_write = lov_io_prepare_write,
776         .cio_commit_write  = lov_io_commit_write
777 };
778
779 /*****************************************************************************
780  *
781  * Empty lov io operations.
782  *
783  */
784
785 static void lov_empty_io_fini(const struct lu_env *env,
786                               const struct cl_io_slice *ios)
787 {
788         ENTRY;
789         EXIT;
790 }
791
792 static void lov_empty_impossible(const struct lu_env *env,
793                                  struct cl_io_slice *ios)
794 {
795         LBUG();
796 }
797
798 #define LOV_EMPTY_IMPOSSIBLE ((void *)lov_empty_impossible)
799
800 /**
801  * An io operation vector for files without stripes.
802  */
803 static const struct cl_io_operations lov_empty_io_ops = {
804         .op = {
805                 [CIT_READ] = {
806 #if 0
807                         .cio_fini       = lov_empty_io_fini,
808                         .cio_iter_init  = LOV_EMPTY_IMPOSSIBLE,
809                         .cio_lock       = LOV_EMPTY_IMPOSSIBLE,
810                         .cio_start      = LOV_EMPTY_IMPOSSIBLE,
811                         .cio_end        = LOV_EMPTY_IMPOSSIBLE
812 #endif
813                 },
814                 [CIT_WRITE] = {
815                         .cio_fini      = lov_empty_io_fini,
816                         .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
817                         .cio_lock      = LOV_EMPTY_IMPOSSIBLE,
818                         .cio_start     = LOV_EMPTY_IMPOSSIBLE,
819                         .cio_end       = LOV_EMPTY_IMPOSSIBLE
820                 },
821                 [CIT_TRUNC] = {
822                         .cio_fini      = lov_empty_io_fini,
823                         .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
824                         .cio_lock      = LOV_EMPTY_IMPOSSIBLE,
825                         .cio_start     = LOV_EMPTY_IMPOSSIBLE,
826                         .cio_end       = LOV_EMPTY_IMPOSSIBLE
827                 },
828                 [CIT_FAULT] = {
829                         .cio_fini      = lov_empty_io_fini,
830                         .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
831                         .cio_lock      = LOV_EMPTY_IMPOSSIBLE,
832                         .cio_start     = LOV_EMPTY_IMPOSSIBLE,
833                         .cio_end       = LOV_EMPTY_IMPOSSIBLE
834                 },
835                 [CIT_MISC] = {
836                         .cio_fini   = lov_empty_io_fini
837                 }
838         },
839         .req_op = {
840                  [CRT_READ] = {
841                          .cio_submit    = LOV_EMPTY_IMPOSSIBLE
842                  },
843                  [CRT_WRITE] = {
844                          .cio_submit    = LOV_EMPTY_IMPOSSIBLE
845                  }
846          },
847         .cio_commit_write = LOV_EMPTY_IMPOSSIBLE
848 };
849
850 int lov_io_init_raid0(const struct lu_env *env, struct cl_object *obj,
851                       struct cl_io *io)
852 {
853         struct lov_io       *lio = lov_env_io(env);
854         struct lov_object   *lov = cl2lov(obj);
855
856         ENTRY;
857         CFS_INIT_LIST_HEAD(&lio->lis_active);
858         lov_io_slice_init(lio, lov, io);
859         if (io->ci_result == 0) {
860                 LASSERT(lov_r0(lov)->lo_lsm != NULL);
861                 io->ci_result = lov_io_subio_init(env, lio, io);
862                 if (io->ci_result == 0)
863                         cl_io_slice_add(io, &lio->lis_cl, obj, &lov_io_ops);
864         }
865         RETURN(io->ci_result);
866 }
867
868 int lov_io_init_empty(const struct lu_env *env, struct cl_object *obj,
869                       struct cl_io *io)
870 {
871         struct lov_io *lio = lov_env_io(env);
872         int result;
873
874         ENTRY;
875         switch (io->ci_type) {
876         default:
877                 LBUG();
878         case CIT_MISC:
879         case CIT_READ:
880                 result = 0;
881                 break;
882         case CIT_WRITE:
883         case CIT_TRUNC:
884                 result = -EBADF;
885                 break;
886         case CIT_FAULT:
887                 result = -EFAULT;
888                 CERROR("Page fault on a file without stripes: "DFID"\n",
889                        PFID(lu_object_fid(&obj->co_lu)));
890                 break;
891         }
892         if (result == 0)
893                 cl_io_slice_add(io, &lio->lis_cl, obj, &lov_empty_io_ops);
894         io->ci_result = result;
895         RETURN(result != 0);
896 }
897
898 /** @} lov */