Whamcloud - gitweb
LU-1214 ptlrpc: removes client lu_target.h/target.c dependency
[fs/lustre-release.git] / lustre / lov / lov_io.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011, Whamcloud, Inc.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * Implementation of cl_io for LOV layer.
39  *
40  *   Author: Nikita Danilov <nikita.danilov@sun.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_LOV
44
45 #include "lov_cl_internal.h"
46
47 /** \addtogroup lov
48  *  @{
49  */
50
51 static inline void lov_sub_enter(struct lov_io_sub *sub)
52 {
53         sub->sub_reenter++;
54 }
55 static inline void lov_sub_exit(struct lov_io_sub *sub)
56 {
57         sub->sub_reenter--;
58 }
59
60 static void lov_io_sub_fini(const struct lu_env *env, struct lov_io *lio,
61                             struct lov_io_sub *sub)
62 {
63         ENTRY;
64         if (sub->sub_io != NULL) {
65                 if (sub->sub_io_initialized) {
66                         lov_sub_enter(sub);
67                         cl_io_fini(sub->sub_env, sub->sub_io);
68                         lov_sub_exit(sub);
69                         sub->sub_io_initialized = 0;
70                         lio->lis_active_subios--;
71                 }
72                 if (sub->sub_stripe == lio->lis_single_subio_index)
73                         lio->lis_single_subio_index = -1;
74                 else if (!sub->sub_borrowed)
75                         OBD_FREE_PTR(sub->sub_io);
76                 sub->sub_io = NULL;
77         }
78         if (sub->sub_env != NULL && !IS_ERR(sub->sub_env)) {
79                 if (!sub->sub_borrowed)
80                         cl_env_put(sub->sub_env, &sub->sub_refcheck);
81                 sub->sub_env = NULL;
82         }
83         EXIT;
84 }
85
86 static void lov_io_sub_inherit(struct cl_io *io, struct lov_io *lio,
87                                int stripe, loff_t start, loff_t end)
88 {
89         struct lov_stripe_md *lsm    = lov_r0(lio->lis_object)->lo_lsm;
90         struct cl_io         *parent = lio->lis_cl.cis_io;
91
92         switch(io->ci_type) {
93         case CIT_SETATTR: {
94                 io->u.ci_setattr.sa_attr = parent->u.ci_setattr.sa_attr;
95                 io->u.ci_setattr.sa_valid = parent->u.ci_setattr.sa_valid;
96                 io->u.ci_setattr.sa_capa = parent->u.ci_setattr.sa_capa;
97                 if (cl_io_is_trunc(io)) {
98                         loff_t new_size = parent->u.ci_setattr.sa_attr.lvb_size;
99
100                         new_size = lov_size_to_stripe(lsm, new_size, stripe);
101                         io->u.ci_setattr.sa_attr.lvb_size = new_size;
102                 }
103                 break;
104         }
105         case CIT_FAULT: {
106                 struct cl_object *obj = parent->ci_obj;
107                 loff_t off = cl_offset(obj, parent->u.ci_fault.ft_index);
108
109                 io->u.ci_fault = parent->u.ci_fault;
110                 off = lov_size_to_stripe(lsm, off, stripe);
111                 io->u.ci_fault.ft_index = cl_index(obj, off);
112                 break;
113         }
114         case CIT_READ:
115         case CIT_WRITE: {
116                 if (cl_io_is_append(parent)) {
117                         io->u.ci_wr.wr_append = 1;
118                 } else {
119                         io->u.ci_rw.crw_pos = start;
120                         io->u.ci_rw.crw_count = end - start;
121                 }
122                 break;
123         }
124         default:
125                 break;
126         }
127 }
128
129 static int lov_io_sub_init(const struct lu_env *env, struct lov_io *lio,
130                            struct lov_io_sub *sub)
131 {
132         struct lov_object *lov = lio->lis_object;
133         struct lov_device *ld  = lu2lov_dev(lov2cl(lov)->co_lu.lo_dev);
134         struct cl_io      *sub_io;
135         struct cl_object  *sub_obj;
136         struct cl_io      *io  = lio->lis_cl.cis_io;
137
138         int stripe = sub->sub_stripe;
139         int result;
140
141         LASSERT(sub->sub_io == NULL);
142         LASSERT(sub->sub_env == NULL);
143         LASSERT(sub->sub_stripe < lio->lis_stripe_count);
144         ENTRY;
145
146         result = 0;
147         sub->sub_io_initialized = 0;
148         sub->sub_borrowed = 0;
149
150         if (lio->lis_mem_frozen) {
151                 LASSERT(cfs_mutex_is_locked(&ld->ld_mutex));
152                 sub->sub_io  = &ld->ld_emrg[stripe]->emrg_subio;
153                 sub->sub_env = ld->ld_emrg[stripe]->emrg_env;
154                 sub->sub_borrowed = 1;
155         } else {
156                 void *cookie;
157
158                 /* obtain new environment */
159                 cookie = cl_env_reenter();
160                 sub->sub_env = cl_env_get(&sub->sub_refcheck);
161                 cl_env_reexit(cookie);
162                 if (IS_ERR(sub->sub_env))
163                         result = PTR_ERR(sub->sub_env);
164
165                 if (result == 0) {
166                         /*
167                          * First sub-io. Use ->lis_single_subio to
168                          * avoid dynamic allocation.
169                          */
170                         if (lio->lis_active_subios == 0) {
171                                 sub->sub_io = &lio->lis_single_subio;
172                                 lio->lis_single_subio_index = stripe;
173                         } else {
174                                 OBD_ALLOC_PTR(sub->sub_io);
175                                 if (sub->sub_io == NULL)
176                                         result = -ENOMEM;
177                         }
178                 }
179         }
180
181         if (result == 0) {
182                 sub_obj = lovsub2cl(lov_r0(lov)->lo_sub[stripe]);
183                 sub_io  = sub->sub_io;
184
185                 sub_io->ci_obj    = sub_obj;
186                 sub_io->ci_result = 0;
187
188                 sub_io->ci_parent  = io;
189                 sub_io->ci_lockreq = io->ci_lockreq;
190                 sub_io->ci_type    = io->ci_type;
191                 sub_io->ci_no_srvlock = io->ci_no_srvlock;
192
193                 lov_sub_enter(sub);
194                 result = cl_io_sub_init(sub->sub_env, sub_io,
195                                         io->ci_type, sub_obj);
196                 lov_sub_exit(sub);
197                 if (result >= 0) {
198                         lio->lis_active_subios++;
199                         sub->sub_io_initialized = 1;
200                         result = 0;
201                 }
202         }
203         if (result != 0)
204                 lov_io_sub_fini(env, lio, sub);
205         RETURN(result);
206 }
207
208 struct lov_io_sub *lov_sub_get(const struct lu_env *env,
209                                struct lov_io *lio, int stripe)
210 {
211         int rc;
212         struct lov_io_sub *sub = &lio->lis_subs[stripe];
213
214         LASSERT(stripe < lio->lis_stripe_count);
215         ENTRY;
216
217         if (!sub->sub_io_initialized) {
218                 sub->sub_stripe = stripe;
219                 rc = lov_io_sub_init(env, lio, sub);
220         } else
221                 rc = 0;
222         if (rc == 0)
223                 lov_sub_enter(sub);
224         else
225                 sub = ERR_PTR(rc);
226         RETURN(sub);
227 }
228
229 void lov_sub_put(struct lov_io_sub *sub)
230 {
231         lov_sub_exit(sub);
232 }
233
234 /*****************************************************************************
235  *
236  * Lov io operations.
237  *
238  */
239
240 static int lov_page_stripe(const struct cl_page *page)
241 {
242         struct lovsub_object *subobj;
243
244         ENTRY;
245         subobj = lu2lovsub(
246                 lu_object_locate(page->cp_child->cp_obj->co_lu.lo_header,
247                                  &lovsub_device_type));
248         LASSERT(subobj != NULL);
249         RETURN(subobj->lso_index);
250 }
251
252 struct lov_io_sub *lov_page_subio(const struct lu_env *env, struct lov_io *lio,
253                                   const struct cl_page_slice *slice)
254 {
255         struct lov_stripe_md *lsm  = lov_r0(lio->lis_object)->lo_lsm;
256         struct cl_page       *page = slice->cpl_page;
257         int stripe;
258
259         LASSERT(lio->lis_cl.cis_io != NULL);
260         LASSERT(cl2lov(slice->cpl_obj) == lio->lis_object);
261         LASSERT(lsm != NULL);
262         LASSERT(lio->lis_nr_subios > 0);
263         ENTRY;
264
265         stripe = lov_page_stripe(page);
266         RETURN(lov_sub_get(env, lio, stripe));
267 }
268
269
270 static int lov_io_subio_init(const struct lu_env *env, struct lov_io *lio,
271                              struct cl_io *io)
272 {
273         struct lov_object    *lov = lio->lis_object;
274         struct lov_stripe_md *lsm = lov_r0(lov)->lo_lsm;
275         int result;
276
277         LASSERT(lio->lis_object != NULL);
278         ENTRY;
279
280         /*
281          * Need to be optimized, we can't afford to allocate a piece of memory
282          * when writing a page. -jay
283          */
284         OBD_ALLOC_LARGE(lio->lis_subs,
285                         lsm->lsm_stripe_count * sizeof lio->lis_subs[0]);
286         if (lio->lis_subs != NULL) {
287                 lio->lis_nr_subios = lio->lis_stripe_count;
288                 lio->lis_single_subio_index = -1;
289                 lio->lis_active_subios = 0;
290                 result = 0;
291         } else
292                 result = -ENOMEM;
293         RETURN(result);
294 }
295
296 static void lov_io_slice_init(struct lov_io *lio,
297                               struct lov_object *obj, struct cl_io *io)
298 {
299         struct lov_stripe_md *lsm = lov_r0(obj)->lo_lsm;
300
301         LASSERT(lsm != NULL);
302         ENTRY;
303
304         io->ci_result = 0;
305         lio->lis_object = obj;
306         lio->lis_stripe_count = lsm->lsm_stripe_count;
307
308         switch (io->ci_type) {
309         case CIT_READ:
310         case CIT_WRITE:
311                 lio->lis_pos = io->u.ci_rw.crw_pos;
312                 lio->lis_endpos = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count;
313                 lio->lis_io_endpos = lio->lis_endpos;
314                 if (cl_io_is_append(io)) {
315                         LASSERT(io->ci_type == CIT_WRITE);
316                         lio->lis_pos = 0;
317                         lio->lis_endpos = OBD_OBJECT_EOF;
318                 }
319                 break;
320
321         case CIT_SETATTR:
322                 if (cl_io_is_trunc(io))
323                         lio->lis_pos = io->u.ci_setattr.sa_attr.lvb_size;
324                 else
325                         lio->lis_pos = 0;
326                 lio->lis_endpos = OBD_OBJECT_EOF;
327                 break;
328
329         case CIT_FAULT: {
330                 pgoff_t index = io->u.ci_fault.ft_index;
331                 lio->lis_pos = cl_offset(io->ci_obj, index);
332                 lio->lis_endpos = cl_offset(io->ci_obj, index + 1);
333                 break;
334         }
335
336         case CIT_MISC:
337                 lio->lis_pos = 0;
338                 lio->lis_endpos = OBD_OBJECT_EOF;
339                 break;
340
341         default:
342                 LBUG();
343         }
344
345         EXIT;
346 }
347
348 static void lov_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
349 {
350         struct lov_io *lio = cl2lov_io(env, ios);
351         int i;
352
353         ENTRY;
354         if (lio->lis_subs != NULL) {
355                 for (i = 0; i < lio->lis_nr_subios; i++)
356                         lov_io_sub_fini(env, lio, &lio->lis_subs[i]);
357                 OBD_FREE_LARGE(lio->lis_subs,
358                          lio->lis_nr_subios * sizeof lio->lis_subs[0]);
359                 lio->lis_nr_subios = 0;
360         }
361         EXIT;
362 }
363
364 static obd_off lov_offset_mod(obd_off val, int delta)
365 {
366         if (val != OBD_OBJECT_EOF)
367                 val += delta;
368         return val;
369 }
370
371 static int lov_io_iter_init(const struct lu_env *env,
372                             const struct cl_io_slice *ios)
373 {
374         struct lov_io        *lio = cl2lov_io(env, ios);
375         struct lov_stripe_md *lsm = lov_r0(lio->lis_object)->lo_lsm;
376         struct lov_io_sub    *sub;
377         obd_off endpos;
378         obd_off start;
379         obd_off end;
380         int stripe;
381         int rc = 0;
382
383         ENTRY;
384         endpos = lov_offset_mod(lio->lis_endpos, -1);
385         for (stripe = 0; stripe < lio->lis_stripe_count; stripe++) {
386                 if (!lov_stripe_intersects(lsm, stripe, lio->lis_pos,
387                                            endpos, &start, &end))
388                         continue;
389
390                 end = lov_offset_mod(end, +1);
391                 sub = lov_sub_get(env, lio, stripe);
392                 if (!IS_ERR(sub)) {
393                         lov_io_sub_inherit(sub->sub_io, lio, stripe,
394                                            start, end);
395                         rc = cl_io_iter_init(sub->sub_env, sub->sub_io);
396                         lov_sub_put(sub);
397                         CDEBUG(D_VFSTRACE, "shrink: %d ["LPU64", "LPU64")\n",
398                                stripe, start, end);
399                 } else
400                         rc = PTR_ERR(sub);
401
402                 if (!rc)
403                         cfs_list_add_tail(&sub->sub_linkage, &lio->lis_active);
404                 else
405                         break;
406         }
407         RETURN(rc);
408 }
409
410 static int lov_io_rw_iter_init(const struct lu_env *env,
411                                const struct cl_io_slice *ios)
412 {
413         struct lov_io        *lio = cl2lov_io(env, ios);
414         struct cl_io         *io  = ios->cis_io;
415         struct lov_stripe_md *lsm = lov_r0(cl2lov(ios->cis_obj))->lo_lsm;
416         loff_t start = io->u.ci_rw.crw_pos;
417         loff_t next;
418         unsigned long ssize = lsm->lsm_stripe_size;
419
420         LASSERT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
421         ENTRY;
422
423         /* fast path for common case. */
424         if (lio->lis_nr_subios != 1 && !cl_io_is_append(io)) {
425
426                 do_div(start, ssize);
427                 next = (start + 1) * ssize;
428                 if (next <= start * ssize)
429                         next = ~0ull;
430
431                 io->ci_continue = next < lio->lis_io_endpos;
432                 io->u.ci_rw.crw_count = min_t(loff_t, lio->lis_io_endpos,
433                                               next) - io->u.ci_rw.crw_pos;
434                 lio->lis_pos    = io->u.ci_rw.crw_pos;
435                 lio->lis_endpos = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count;
436                 CDEBUG(D_VFSTRACE, "stripe: "LPU64" chunk: ["LPU64", "LPU64") "LPU64"\n",
437                        (__u64)start, lio->lis_pos, lio->lis_endpos,
438                        (__u64)lio->lis_io_endpos);
439         }
440         /*
441          * XXX The following call should be optimized: we know, that
442          * [lio->lis_pos, lio->lis_endpos) intersects with exactly one stripe.
443          */
444         RETURN(lov_io_iter_init(env, ios));
445 }
446
447 static int lov_io_call(const struct lu_env *env, struct lov_io *lio,
448                        int (*iofunc)(const struct lu_env *, struct cl_io *))
449 {
450         struct lov_io_sub *sub;
451         int rc = 0;
452
453         ENTRY;
454         cfs_list_for_each_entry(sub, &lio->lis_active, sub_linkage) {
455                 lov_sub_enter(sub);
456                 rc = iofunc(sub->sub_env, sub->sub_io);
457                 lov_sub_exit(sub);
458                 if (rc)
459                         break;
460         }
461         RETURN(rc);
462 }
463
464 static int lov_io_lock(const struct lu_env *env, const struct cl_io_slice *ios)
465 {
466         ENTRY;
467         RETURN(lov_io_call(env, cl2lov_io(env, ios), cl_io_lock));
468 }
469
470 static int lov_io_start(const struct lu_env *env, const struct cl_io_slice *ios)
471 {
472         ENTRY;
473         RETURN(lov_io_call(env, cl2lov_io(env, ios), cl_io_start));
474 }
475
476 static int lov_io_end_wrapper(const struct lu_env *env, struct cl_io *io)
477 {
478         ENTRY;
479         /*
480          * It's possible that lov_io_start() wasn't called against this
481          * sub-io, either because previous sub-io failed, or upper layer
482          * completed IO.
483          */
484         if (io->ci_state == CIS_IO_GOING)
485                 cl_io_end(env, io);
486         else
487                 io->ci_state = CIS_IO_FINISHED;
488         RETURN(0);
489 }
490
491 static int lov_io_iter_fini_wrapper(const struct lu_env *env, struct cl_io *io)
492 {
493         cl_io_iter_fini(env, io);
494         RETURN(0);
495 }
496
497 static int lov_io_unlock_wrapper(const struct lu_env *env, struct cl_io *io)
498 {
499         cl_io_unlock(env, io);
500         RETURN(0);
501 }
502
503 static void lov_io_end(const struct lu_env *env, const struct cl_io_slice *ios)
504 {
505         int rc;
506
507         rc = lov_io_call(env, cl2lov_io(env, ios), lov_io_end_wrapper);
508         LASSERT(rc == 0);
509 }
510
511 static void lov_io_iter_fini(const struct lu_env *env,
512                              const struct cl_io_slice *ios)
513 {
514         struct lov_io *lio = cl2lov_io(env, ios);
515         int rc;
516
517         ENTRY;
518         rc = lov_io_call(env, lio, lov_io_iter_fini_wrapper);
519         LASSERT(rc == 0);
520         while (!cfs_list_empty(&lio->lis_active))
521                 cfs_list_del_init(lio->lis_active.next);
522         EXIT;
523 }
524
525 static void lov_io_unlock(const struct lu_env *env,
526                           const struct cl_io_slice *ios)
527 {
528         int rc;
529
530         ENTRY;
531         rc = lov_io_call(env, cl2lov_io(env, ios), lov_io_unlock_wrapper);
532         LASSERT(rc == 0);
533         EXIT;
534 }
535
536
537 static struct cl_page_list *lov_io_submit_qin(struct lov_device *ld,
538                                               struct cl_page_list *qin,
539                                               int idx, int alloc)
540 {
541         return alloc ? &qin[idx] : &ld->ld_emrg[idx]->emrg_page_list;
542 }
543
544 /**
545  * lov implementation of cl_operations::cio_submit() method. It takes a list
546  * of pages in \a queue, splits it into per-stripe sub-lists, invokes
547  * cl_io_submit() on underlying devices to submit sub-lists, and then splices
548  * everything back.
549  *
550  * Major complication of this function is a need to handle memory cleansing:
551  * cl_io_submit() is called to write out pages as a part of VM memory
552  * reclamation, and hence it may not fail due to memory shortages (system
553  * dead-locks otherwise). To deal with this, some resources (sub-lists,
554  * sub-environment, etc.) are allocated per-device on "startup" (i.e., in a
555  * not-memory cleansing context), and in case of memory shortage, these
556  * pre-allocated resources are used by lov_io_submit() under
557  * lov_device::ld_mutex mutex.
558  */
559 static int lov_io_submit(const struct lu_env *env,
560                          const struct cl_io_slice *ios,
561                          enum cl_req_type crt, struct cl_2queue *queue,
562                          enum cl_req_priority priority)
563 {
564         struct lov_io          *lio = cl2lov_io(env, ios);
565         struct lov_object      *obj = lio->lis_object;
566         struct lov_device       *ld = lu2lov_dev(lov2cl(obj)->co_lu.lo_dev);
567         struct cl_page_list    *qin = &queue->c2_qin;
568         struct cl_2queue      *cl2q = &lov_env_info(env)->lti_cl2q;
569         struct cl_page_list *stripes_qin = NULL;
570         struct cl_page *page;
571         struct cl_page *tmp;
572         int stripe;
573
574 #define QIN(stripe) lov_io_submit_qin(ld, stripes_qin, stripe, alloc)
575
576         int rc = 0;
577         int alloc =
578 #if defined(__KERNEL__) && defined(__linux__)
579                 !(current->flags & PF_MEMALLOC);
580 #else
581                 1;
582 #endif
583         ENTRY;
584         if (lio->lis_active_subios == 1) {
585                 int idx = lio->lis_single_subio_index;
586                 struct lov_io_sub *sub;
587
588                 LASSERT(idx < lio->lis_nr_subios);
589                 sub = lov_sub_get(env, lio, idx);
590                 LASSERT(!IS_ERR(sub));
591                 LASSERT(sub->sub_io == &lio->lis_single_subio);
592                 rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
593                                      crt, queue, priority);
594                 lov_sub_put(sub);
595                 RETURN(rc);
596         }
597
598         LASSERT(lio->lis_subs != NULL);
599         if (alloc) {
600                 OBD_ALLOC_LARGE(stripes_qin,
601                                 sizeof(*stripes_qin) * lio->lis_nr_subios);
602                 if (stripes_qin == NULL)
603                         RETURN(-ENOMEM);
604
605                 for (stripe = 0; stripe < lio->lis_nr_subios; stripe++)
606                         cl_page_list_init(&stripes_qin[stripe]);
607         } else {
608                 /*
609                  * If we get here, it means pageout & swap doesn't help.
610                  * In order to not make things worse, even don't try to
611                  * allocate the memory with __GFP_NOWARN. -jay
612                  */
613                 cfs_mutex_lock(&ld->ld_mutex);
614                 lio->lis_mem_frozen = 1;
615         }
616
617         cl_2queue_init(cl2q);
618         cl_page_list_for_each_safe(page, tmp, qin) {
619                 stripe = lov_page_stripe(page);
620                 cl_page_list_move(QIN(stripe), qin, page);
621         }
622
623         for (stripe = 0; stripe < lio->lis_nr_subios; stripe++) {
624                 struct lov_io_sub   *sub;
625                 struct cl_page_list *sub_qin = QIN(stripe);
626
627                 if (cfs_list_empty(&sub_qin->pl_pages))
628                         continue;
629
630                 cl_page_list_splice(sub_qin, &cl2q->c2_qin);
631                 sub = lov_sub_get(env, lio, stripe);
632                 if (!IS_ERR(sub)) {
633                         rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
634                                              crt, cl2q, priority);
635                         lov_sub_put(sub);
636                 } else
637                         rc = PTR_ERR(sub);
638                 cl_page_list_splice(&cl2q->c2_qin,  &queue->c2_qin);
639                 cl_page_list_splice(&cl2q->c2_qout, &queue->c2_qout);
640                 if (rc != 0)
641                         break;
642         }
643
644         for (stripe = 0; stripe < lio->lis_nr_subios; stripe++) {
645                 struct cl_page_list *sub_qin = QIN(stripe);
646
647                 if (cfs_list_empty(&sub_qin->pl_pages))
648                         continue;
649
650                 cl_page_list_splice(sub_qin, qin);
651         }
652
653         if (alloc) {
654                 OBD_FREE_LARGE(stripes_qin,
655                          sizeof(*stripes_qin) * lio->lis_nr_subios);
656         } else {
657                 int i;
658
659                 for (i = 0; i < lio->lis_nr_subios; i++) {
660                         struct cl_io *cio = lio->lis_subs[i].sub_io;
661
662                         if (cio && cio == &ld->ld_emrg[i]->emrg_subio)
663                                 lov_io_sub_fini(env, lio, &lio->lis_subs[i]);
664                 }
665                 lio->lis_mem_frozen = 0;
666                 cfs_mutex_unlock(&ld->ld_mutex);
667         }
668
669         RETURN(rc);
670 #undef QIN
671 }
672
673 static int lov_io_prepare_write(const struct lu_env *env,
674                                 const struct cl_io_slice *ios,
675                                 const struct cl_page_slice *slice,
676                                 unsigned from, unsigned to)
677 {
678         struct lov_io     *lio      = cl2lov_io(env, ios);
679         struct cl_page    *sub_page = lov_sub_page(slice);
680         struct lov_io_sub *sub;
681         int result;
682
683         ENTRY;
684         sub = lov_page_subio(env, lio, slice);
685         if (!IS_ERR(sub)) {
686                 result = cl_io_prepare_write(sub->sub_env, sub->sub_io,
687                                              sub_page, from, to);
688                 lov_sub_put(sub);
689         } else
690                 result = PTR_ERR(sub);
691         RETURN(result);
692 }
693
694 static int lov_io_commit_write(const struct lu_env *env,
695                                const struct cl_io_slice *ios,
696                                const struct cl_page_slice *slice,
697                                unsigned from, unsigned to)
698 {
699         struct lov_io     *lio      = cl2lov_io(env, ios);
700         struct cl_page    *sub_page = lov_sub_page(slice);
701         struct lov_io_sub *sub;
702         int result;
703
704         ENTRY;
705         sub = lov_page_subio(env, lio, slice);
706         if (!IS_ERR(sub)) {
707                 result = cl_io_commit_write(sub->sub_env, sub->sub_io,
708                                             sub_page, from, to);
709                 lov_sub_put(sub);
710         } else
711                 result = PTR_ERR(sub);
712         RETURN(result);
713 }
714
715 static int lov_io_fault_start(const struct lu_env *env,
716                               const struct cl_io_slice *ios)
717 {
718         struct cl_fault_io *fio;
719         struct lov_io      *lio;
720         struct lov_io_sub  *sub;
721
722         ENTRY;
723         fio = &ios->cis_io->u.ci_fault;
724         lio = cl2lov_io(env, ios);
725         sub = lov_sub_get(env, lio, lov_page_stripe(fio->ft_page));
726         sub->sub_io->u.ci_fault.ft_nob = fio->ft_nob;
727         lov_sub_put(sub);
728         RETURN(lov_io_start(env, ios));
729 }
730
731 static const struct cl_io_operations lov_io_ops = {
732         .op = {
733                 [CIT_READ] = {
734                         .cio_fini      = lov_io_fini,
735                         .cio_iter_init = lov_io_rw_iter_init,
736                         .cio_iter_fini = lov_io_iter_fini,
737                         .cio_lock      = lov_io_lock,
738                         .cio_unlock    = lov_io_unlock,
739                         .cio_start     = lov_io_start,
740                         .cio_end       = lov_io_end
741                 },
742                 [CIT_WRITE] = {
743                         .cio_fini      = lov_io_fini,
744                         .cio_iter_init = lov_io_rw_iter_init,
745                         .cio_iter_fini = lov_io_iter_fini,
746                         .cio_lock      = lov_io_lock,
747                         .cio_unlock    = lov_io_unlock,
748                         .cio_start     = lov_io_start,
749                         .cio_end       = lov_io_end
750                 },
751                 [CIT_SETATTR] = {
752                         .cio_fini      = lov_io_fini,
753                         .cio_iter_init = lov_io_iter_init,
754                         .cio_iter_fini = lov_io_iter_fini,
755                         .cio_lock      = lov_io_lock,
756                         .cio_unlock    = lov_io_unlock,
757                         .cio_start     = lov_io_start,
758                         .cio_end       = lov_io_end
759                 },
760                 [CIT_FAULT] = {
761                         .cio_fini      = lov_io_fini,
762                         .cio_iter_init = lov_io_iter_init,
763                         .cio_iter_fini = lov_io_iter_fini,
764                         .cio_lock      = lov_io_lock,
765                         .cio_unlock    = lov_io_unlock,
766                         .cio_start     = lov_io_fault_start,
767                         .cio_end       = lov_io_end
768                 },
769                 [CIT_MISC] = {
770                         .cio_fini   = lov_io_fini
771                 }
772         },
773         .req_op = {
774                  [CRT_READ] = {
775                          .cio_submit    = lov_io_submit
776                  },
777                  [CRT_WRITE] = {
778                          .cio_submit    = lov_io_submit
779                  }
780          },
781         .cio_prepare_write = lov_io_prepare_write,
782         .cio_commit_write  = lov_io_commit_write
783 };
784
785 /*****************************************************************************
786  *
787  * Empty lov io operations.
788  *
789  */
790
791 static void lov_empty_io_fini(const struct lu_env *env,
792                               const struct cl_io_slice *ios)
793 {
794         ENTRY;
795         EXIT;
796 }
797
798 static void lov_empty_impossible(const struct lu_env *env,
799                                  struct cl_io_slice *ios)
800 {
801         LBUG();
802 }
803
804 #define LOV_EMPTY_IMPOSSIBLE ((void *)lov_empty_impossible)
805
806 /**
807  * An io operation vector for files without stripes.
808  */
809 static const struct cl_io_operations lov_empty_io_ops = {
810         .op = {
811                 [CIT_READ] = {
812 #if 0
813                         .cio_fini       = lov_empty_io_fini,
814                         .cio_iter_init  = LOV_EMPTY_IMPOSSIBLE,
815                         .cio_lock       = LOV_EMPTY_IMPOSSIBLE,
816                         .cio_start      = LOV_EMPTY_IMPOSSIBLE,
817                         .cio_end        = LOV_EMPTY_IMPOSSIBLE
818 #endif
819                 },
820                 [CIT_WRITE] = {
821                         .cio_fini      = lov_empty_io_fini,
822                         .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
823                         .cio_lock      = LOV_EMPTY_IMPOSSIBLE,
824                         .cio_start     = LOV_EMPTY_IMPOSSIBLE,
825                         .cio_end       = LOV_EMPTY_IMPOSSIBLE
826                 },
827                 [CIT_SETATTR] = {
828                         .cio_fini      = lov_empty_io_fini,
829                         .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
830                         .cio_lock      = LOV_EMPTY_IMPOSSIBLE,
831                         .cio_start     = LOV_EMPTY_IMPOSSIBLE,
832                         .cio_end       = LOV_EMPTY_IMPOSSIBLE
833                 },
834                 [CIT_FAULT] = {
835                         .cio_fini      = lov_empty_io_fini,
836                         .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
837                         .cio_lock      = LOV_EMPTY_IMPOSSIBLE,
838                         .cio_start     = LOV_EMPTY_IMPOSSIBLE,
839                         .cio_end       = LOV_EMPTY_IMPOSSIBLE
840                 },
841                 [CIT_MISC] = {
842                         .cio_fini   = lov_empty_io_fini
843                 }
844         },
845         .req_op = {
846                  [CRT_READ] = {
847                          .cio_submit    = LOV_EMPTY_IMPOSSIBLE
848                  },
849                  [CRT_WRITE] = {
850                          .cio_submit    = LOV_EMPTY_IMPOSSIBLE
851                  }
852          },
853         .cio_commit_write = LOV_EMPTY_IMPOSSIBLE
854 };
855
856 int lov_io_init_raid0(const struct lu_env *env, struct cl_object *obj,
857                       struct cl_io *io)
858 {
859         struct lov_io       *lio = lov_env_io(env);
860         struct lov_object   *lov = cl2lov(obj);
861
862         ENTRY;
863         CFS_INIT_LIST_HEAD(&lio->lis_active);
864         lov_io_slice_init(lio, lov, io);
865         if (io->ci_result == 0) {
866                 LASSERT(lov_r0(lov)->lo_lsm != NULL);
867                 io->ci_result = lov_io_subio_init(env, lio, io);
868                 if (io->ci_result == 0)
869                         cl_io_slice_add(io, &lio->lis_cl, obj, &lov_io_ops);
870         }
871         RETURN(io->ci_result);
872 }
873
874 int lov_io_init_empty(const struct lu_env *env, struct cl_object *obj,
875                       struct cl_io *io)
876 {
877         struct lov_io *lio = lov_env_io(env);
878         int result;
879
880         ENTRY;
881         switch (io->ci_type) {
882         default:
883                 LBUG();
884         case CIT_MISC:
885         case CIT_READ:
886                 result = 0;
887                 break;
888         case CIT_WRITE:
889         case CIT_SETATTR:
890                 result = -EBADF;
891                 break;
892         case CIT_FAULT:
893                 result = -EFAULT;
894                 CERROR("Page fault on a file without stripes: "DFID"\n",
895                        PFID(lu_object_fid(&obj->co_lu)));
896                 break;
897         }
898         if (result == 0)
899                 cl_io_slice_add(io, &lio->lis_cl, obj, &lov_empty_io_ops);
900         io->ci_result = result;
901         RETURN(result != 0);
902 }
903
904 /** @} lov */