Whamcloud - gitweb
LU-12142 clio: fix hang on urgent cached pages
[fs/lustre-release.git] / lustre / osc / osc_io.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * Implementation of cl_io for OSC layer.
33  *
34  *   Author: Nikita Danilov <nikita.danilov@sun.com>
35  *   Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_OSC
39
40 #include <lustre_obdo.h>
41 #include <lustre_osc.h>
42 #include <linux/pagevec.h>
43 #include <linux/falloc.h>
44
45 #include "osc_internal.h"
46
47 /** \addtogroup osc
48  *  @{
49  */
50
51 /*****************************************************************************
52  *
53  * io operations.
54  *
55  */
56
57 static void osc_io_fini(const struct lu_env *env, const struct cl_io_slice *io)
58 {
59 }
60
61 void osc_read_ahead_release(const struct lu_env *env, struct cl_read_ahead *ra)
62 {
63         struct ldlm_lock *dlmlock = ra->cra_dlmlock;
64         struct osc_io *oio = ra->cra_oio;
65         struct lustre_handle lockh;
66
67         oio->oi_is_readahead = 0;
68         ldlm_lock2handle(dlmlock, &lockh);
69         ldlm_lock_decref(&lockh, LCK_PR);
70         LDLM_LOCK_PUT(dlmlock);
71 }
72 EXPORT_SYMBOL(osc_read_ahead_release);
73
74 static int osc_io_read_ahead(const struct lu_env *env,
75                              const struct cl_io_slice *ios,
76                              pgoff_t start, struct cl_read_ahead *ra)
77 {
78         struct osc_object *osc = cl2osc(ios->cis_obj);
79         struct osc_io *oio = cl2osc_io(env, ios);
80         struct ldlm_lock *dlmlock;
81         int result = -ENODATA;
82
83         ENTRY;
84
85         oio->oi_is_readahead = true;
86         dlmlock = osc_dlmlock_at_pgoff(env, osc, start, 0);
87         if (dlmlock != NULL) {
88                 LASSERT(dlmlock->l_ast_data == osc);
89                 if (dlmlock->l_req_mode != LCK_PR) {
90                         struct lustre_handle lockh;
91                         ldlm_lock2handle(dlmlock, &lockh);
92                         ldlm_lock_addref(&lockh, LCK_PR);
93                         ldlm_lock_decref(&lockh, dlmlock->l_req_mode);
94                 }
95
96                 ra->cra_rpc_pages = osc_cli(osc)->cl_max_pages_per_rpc;
97                 ra->cra_end_idx = cl_index(osc2cl(osc),
98                                            dlmlock->l_policy_data.l_extent.end);
99                 ra->cra_release = osc_read_ahead_release;
100                 ra->cra_dlmlock = dlmlock;
101                 ra->cra_oio = oio;
102                 if (ra->cra_end_idx != CL_PAGE_EOF)
103                         ra->cra_contention = true;
104                 result = 0;
105         }
106
107         RETURN(result);
108 }
109
110 /**
111  * An implementation of cl_io_operations::cio_io_submit() method for osc
112  * layer. Iterates over pages in the in-queue, prepares each for io by calling
113  * cl_page_prep() and then either submits them through osc_io_submit_page()
114  * or, if page is already submitted, changes osc flags through
115  * osc_set_async_flags().
116  */
117 int osc_io_submit(const struct lu_env *env, const struct cl_io_slice *ios,
118                   enum cl_req_type crt, struct cl_2queue *queue)
119 {
120         struct cl_page    *page;
121         struct cl_page    *tmp;
122         struct client_obd *cli  = NULL;
123         struct osc_object *osc  = NULL; /* to keep gcc happy */
124         struct osc_page   *opg;
125         struct cl_io      *io;
126         LIST_HEAD(list);
127
128         struct cl_page_list *qin      = &queue->c2_qin;
129         struct cl_page_list *qout     = &queue->c2_qout;
130         unsigned int queued = 0;
131         int result = 0;
132         int brw_flags;
133         unsigned int max_pages;
134         unsigned int ppc_bits; /* pages per chunk bits */
135         unsigned int ppc;
136         bool sync_queue = false;
137
138         LASSERT(qin->pl_nr > 0);
139
140         CDEBUG(D_CACHE|D_READA, "%d %d\n", qin->pl_nr, crt);
141
142         osc = cl2osc(ios->cis_obj);
143         cli = osc_cli(osc);
144         max_pages = cli->cl_max_pages_per_rpc;
145         ppc_bits = cli->cl_chunkbits - PAGE_SHIFT;
146         ppc = 1 << ppc_bits;
147
148         brw_flags = osc_io_srvlock(cl2osc_io(env, ios)) ? OBD_BRW_SRVLOCK : 0;
149         brw_flags |= crt == CRT_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ;
150         if (crt == CRT_READ && ios->cis_io->ci_ndelay)
151                 brw_flags |= OBD_BRW_NDELAY;
152
153         page = cl_page_list_first(qin);
154         if (page->cp_type == CPT_TRANSIENT)
155                 brw_flags |= OBD_BRW_NOCACHE;
156
157         /*
158          * NOTE: here @page is a top-level page. This is done to avoid
159          *       creation of sub-page-list.
160          */
161         cl_page_list_for_each_safe(page, tmp, qin) {
162                 struct osc_async_page *oap;
163
164                 /* Top level IO. */
165                 io = page->cp_owner;
166                 LASSERT(io != NULL);
167
168                 opg = osc_cl_page_osc(page, osc);
169                 oap = &opg->ops_oap;
170                 LASSERT(osc == oap->oap_obj);
171
172                 if (!list_empty(&oap->oap_pending_item) ||
173                     !list_empty(&oap->oap_rpc_item)) {
174                         CDEBUG(D_CACHE, "Busy oap %p page %p for submit.\n",
175                                oap, opg);
176                         result = -EBUSY;
177                         break;
178                 }
179
180                 result = cl_page_prep(env, io, page, crt);
181                 if (result != 0) {
182                         LASSERT(result < 0);
183                         if (result != -EALREADY)
184                                 break;
185                         /*
186                          * Handle -EALREADY error: for read case, the page is
187                          * already in UPTODATE state; for write, the page
188                          * is not dirty.
189                          */
190                         result = 0;
191                         continue;
192                 }
193
194                 spin_lock(&oap->oap_lock);
195                 oap->oap_async_flags = ASYNC_URGENT|ASYNC_READY;
196                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
197                 spin_unlock(&oap->oap_lock);
198
199                 osc_page_submit(env, opg, crt, brw_flags);
200                 list_add_tail(&oap->oap_pending_item, &list);
201
202                 if (page->cp_sync_io != NULL)
203                         cl_page_list_move(qout, qin, page);
204                 else /* async IO */
205                         cl_page_list_del(env, qin, page);
206
207                 queued++;
208                 if (queued == max_pages) {
209                         sync_queue = true;
210                 } else if (crt == CRT_WRITE) {
211                         unsigned int chunks;
212                         unsigned int next_chunks;
213
214                         chunks = (queued + ppc - 1) >> ppc_bits;
215                         /* chunk number if add another page */
216                         next_chunks = (queued + ppc) >> ppc_bits;
217
218                         /* next page will excceed write chunk limit */
219                         if (chunks == osc_max_write_chunks(cli) &&
220                             next_chunks > chunks)
221                                 sync_queue = true;
222                 }
223
224                 if (sync_queue) {
225                         result = osc_queue_sync_pages(env, io, osc, &list,
226                                                       brw_flags);
227                         if (result < 0)
228                                 break;
229                         queued = 0;
230                         sync_queue = false;
231                 }
232         }
233
234         if (queued > 0)
235                 result = osc_queue_sync_pages(env, io, osc, &list, brw_flags);
236
237         /* Update c/mtime for sync write. LU-7310 */
238         if (crt == CRT_WRITE && qout->pl_nr > 0 && result == 0) {
239                 struct cl_object *obj   = ios->cis_obj;
240                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
241
242                 cl_object_attr_lock(obj);
243                 attr->cat_mtime = attr->cat_ctime = ktime_get_real_seconds();
244                 cl_object_attr_update(env, obj, attr, CAT_MTIME | CAT_CTIME);
245                 cl_object_attr_unlock(obj);
246         }
247
248         CDEBUG(D_INFO, "%d/%d %d\n", qin->pl_nr, qout->pl_nr, result);
249         return qout->pl_nr > 0 ? 0 : result;
250 }
251 EXPORT_SYMBOL(osc_io_submit);
252
253 /**
254  * This is called to update the attributes when modifying a specific page,
255  * both when making new pages and when doing updates to existing cached pages.
256  *
257  * Expand stripe KMS if necessary.
258  */
259 void osc_page_touch_at(const struct lu_env *env, struct cl_object *obj,
260                        pgoff_t idx, size_t to)
261 {
262         struct lov_oinfo  *loi  = cl2osc(obj)->oo_oinfo;
263         struct cl_attr    *attr = &osc_env_info(env)->oti_attr;
264         int valid;
265         __u64 kms;
266
267         ENTRY;
268
269         /* offset within stripe */
270         kms = cl_offset(obj, idx) + to;
271
272         cl_object_attr_lock(obj);
273         CDEBUG(D_INODE, "stripe KMS %sincreasing %llu->%llu %llu\n",
274                kms > loi->loi_kms ? "" : "not ", loi->loi_kms, kms,
275                loi->loi_lvb.lvb_size);
276
277         attr->cat_mtime = attr->cat_ctime = ktime_get_real_seconds();
278         valid = CAT_MTIME | CAT_CTIME;
279         if (kms > loi->loi_kms) {
280                 attr->cat_kms = kms;
281                 valid |= CAT_KMS;
282         }
283         if (kms > loi->loi_lvb.lvb_size) {
284                 attr->cat_size = kms;
285                 valid |= CAT_SIZE;
286         }
287         cl_object_attr_update(env, obj, attr, valid);
288         cl_object_attr_unlock(obj);
289
290         EXIT;
291 }
292
293 int osc_io_commit_async(const struct lu_env *env,
294                         const struct cl_io_slice *ios,
295                         struct cl_page_list *qin, int from, int to,
296                         cl_commit_cbt cb)
297 {
298         struct cl_io    *io = ios->cis_io;
299         struct osc_io   *oio = cl2osc_io(env, ios);
300         struct osc_object *osc = cl2osc(ios->cis_obj);
301         struct cl_page  *page;
302         struct cl_page  *last_page;
303         struct osc_page *opg;
304         struct pagevec  *pvec = &osc_env_info(env)->oti_pagevec;
305         int result = 0;
306         ENTRY;
307
308         LASSERT(qin->pl_nr > 0);
309
310         /* Handle partial page cases */
311         last_page = cl_page_list_last(qin);
312         if (oio->oi_lockless) {
313                 page = cl_page_list_first(qin);
314                 if (page == last_page) {
315                         cl_page_clip(env, page, from, to);
316                 } else {
317                         if (from != 0)
318                                 cl_page_clip(env, page, from, PAGE_SIZE);
319                         if (to != PAGE_SIZE)
320                                 cl_page_clip(env, last_page, 0, to);
321                 }
322         }
323
324         ll_pagevec_init(pvec, 0);
325
326         while (qin->pl_nr > 0) {
327                 struct osc_async_page *oap;
328
329                 page = cl_page_list_first(qin);
330                 opg = osc_cl_page_osc(page, osc);
331                 oap = &opg->ops_oap;
332
333                 LASSERTF(osc == oap->oap_obj,
334                          "obj mismatch: %p / %p\n", osc, oap->oap_obj);
335
336                 if (!list_empty(&oap->oap_rpc_item)) {
337                         CDEBUG(D_CACHE, "Busy oap %p page %p for submit.\n",
338                                oap, opg);
339                         result = -EBUSY;
340                         break;
341                 }
342
343                 /* The page may be already in dirty cache. */
344                 if (list_empty(&oap->oap_pending_item)) {
345                         result = osc_page_cache_add(env, opg, io, cb);
346                         if (result != 0)
347                                 break;
348                 }
349
350                 osc_page_touch_at(env, osc2cl(osc), osc_index(opg),
351                                   page == last_page ? to : PAGE_SIZE);
352
353                 cl_page_list_del(env, qin, page);
354
355                 /* if there are no more slots, do the callback & reinit */
356                 if (pagevec_add(pvec, page->cp_vmpage) == 0) {
357                         (*cb)(env, io, pvec);
358                         pagevec_reinit(pvec);
359                 }
360         }
361
362         /* Clean up any partially full pagevecs */
363         if (pagevec_count(pvec) != 0)
364                 (*cb)(env, io, pvec);
365
366         /* Can't access these pages any more. Page can be in transfer and
367          * complete at any time. */
368
369         /* for sync write, kernel will wait for this page to be flushed before
370          * osc_io_end() is called, so release it earlier.
371          * for mkwrite(), it's known there is no further pages. */
372         if (cl_io_is_sync_write(io) && oio->oi_active != NULL) {
373                 osc_extent_release(env, oio->oi_active);
374                 oio->oi_active = NULL;
375         }
376
377         CDEBUG(D_INFO, "%d %d\n", qin->pl_nr, result);
378         RETURN(result);
379 }
380 EXPORT_SYMBOL(osc_io_commit_async);
381
382 void osc_io_extent_release(const struct lu_env *env,
383                            const struct cl_io_slice *ios)
384 {
385         struct osc_io *oio = cl2osc_io(env, ios);
386
387         if (oio->oi_active != NULL) {
388                 osc_extent_release(env, oio->oi_active);
389                 oio->oi_active = NULL;
390         }
391 }
392 EXPORT_SYMBOL(osc_io_extent_release);
393
394 static bool osc_import_not_healthy(struct obd_import *imp)
395 {
396         return imp->imp_invalid || imp->imp_deactive ||
397                !(imp->imp_state == LUSTRE_IMP_FULL ||
398                  imp->imp_state == LUSTRE_IMP_IDLE);
399 }
400
401 int osc_io_iter_init(const struct lu_env *env, const struct cl_io_slice *ios)
402 {
403         struct osc_object *osc = cl2osc(ios->cis_obj);
404         struct obd_import *imp = osc_cli(osc)->cl_import;
405         struct osc_io *oio = osc_env_io(env);
406         int rc = -EIO;
407
408         ENTRY;
409
410         spin_lock(&imp->imp_lock);
411         /**
412          * check whether this OSC device is available for non-delay read,
413          * fast switching mirror if we haven't tried all mirrors.
414          */
415         if (ios->cis_io->ci_type == CIT_READ && ios->cis_io->ci_ndelay &&
416             !ios->cis_io->ci_tried_all_mirrors && osc_import_not_healthy(imp)) {
417                 rc = -EAGAIN;
418         } else if (likely(!imp->imp_invalid)) {
419                 atomic_inc(&osc->oo_nr_ios);
420                 oio->oi_is_active = 1;
421                 rc = 0;
422         }
423         spin_unlock(&imp->imp_lock);
424
425         if (capable(CAP_SYS_RESOURCE))
426                 oio->oi_cap_sys_resource = 1;
427
428         RETURN(rc);
429 }
430 EXPORT_SYMBOL(osc_io_iter_init);
431
432 void osc_io_iter_fini(const struct lu_env *env,
433                       const struct cl_io_slice *ios)
434 {
435         struct osc_io *oio = osc_env_io(env);
436
437         if (oio->oi_is_active) {
438                 struct osc_object *osc = cl2osc(ios->cis_obj);
439
440                 oio->oi_is_active = 0;
441                 LASSERT(atomic_read(&osc->oo_nr_ios) > 0);
442                 if (atomic_dec_and_test(&osc->oo_nr_ios))
443                         wake_up_all(&osc->oo_io_waitq);
444         }
445 }
446 EXPORT_SYMBOL(osc_io_iter_fini);
447
448 void osc_io_rw_iter_fini(const struct lu_env *env,
449                          const struct cl_io_slice *ios)
450 {
451         struct osc_io *oio = osc_env_io(env);
452         struct osc_object *osc = cl2osc(ios->cis_obj);
453
454         if (oio->oi_lru_reserved > 0) {
455                 osc_lru_unreserve(osc_cli(osc), oio->oi_lru_reserved);
456                 oio->oi_lru_reserved = 0;
457         }
458         oio->oi_write_osclock = NULL;
459
460         osc_io_iter_fini(env, ios);
461 }
462 EXPORT_SYMBOL(osc_io_rw_iter_fini);
463
464 int osc_io_fault_start(const struct lu_env *env, const struct cl_io_slice *ios)
465 {
466         struct cl_io       *io;
467         struct cl_fault_io *fio;
468         ENTRY;
469
470         io  = ios->cis_io;
471         fio = &io->u.ci_fault;
472         CDEBUG(D_INFO, "%lu %d %zu\n",
473                 fio->ft_index, fio->ft_writable, fio->ft_nob);
474         /*
475          * If mapping is writeable, adjust kms to cover this page,
476          * but do not extend kms beyond actual file size.
477          * See bug 10919.
478          */
479         if (fio->ft_writable)
480                 osc_page_touch_at(env, ios->cis_obj,
481                                   fio->ft_index, fio->ft_nob);
482         RETURN(0);
483 }
484 EXPORT_SYMBOL(osc_io_fault_start);
485
486
487 static int osc_async_upcall(void *a, int rc)
488 {
489         struct osc_async_cbargs *args = a;
490
491         args->opc_rc = rc;
492         complete(&args->opc_sync);
493         return 0;
494 }
495
496 /**
497  * Checks that there are no pages being written in the extent being truncated.
498  */
499 static bool trunc_check_cb(const struct lu_env *env, struct cl_io *io,
500                           struct osc_page *ops , void *cbdata)
501 {
502         struct cl_page *page = ops->ops_cl.cpl_page;
503         struct osc_async_page *oap;
504         __u64 start = *(__u64 *)cbdata;
505
506         oap = &ops->ops_oap;
507         if (oap->oap_cmd & OBD_BRW_WRITE &&
508             !list_empty(&oap->oap_pending_item))
509                 CL_PAGE_DEBUG(D_ERROR, env, page, "exists %llu/%s.\n",
510                                 start, current->comm);
511
512         if (PageLocked(page->cp_vmpage))
513                 CDEBUG(D_CACHE, "page %p index %lu locked for %d.\n",
514                        ops, osc_index(ops), oap->oap_cmd & OBD_BRW_RWMASK);
515
516         return true;
517 }
518
519 static void osc_trunc_check(const struct lu_env *env, struct cl_io *io,
520                             struct osc_io *oio, __u64 size)
521 {
522         struct cl_object *clob;
523         int     partial;
524         pgoff_t start;
525
526         clob    = oio->oi_cl.cis_obj;
527         start   = cl_index(clob, size);
528         partial = cl_offset(clob, start) < size;
529
530         /*
531          * Complain if there are pages in the truncated region.
532          */
533         osc_page_gang_lookup(env, io, cl2osc(clob),
534                                 start + partial, CL_PAGE_EOF,
535                                 trunc_check_cb, (void *)&size);
536 }
537
538 static int osc_io_setattr_start(const struct lu_env *env,
539                                 const struct cl_io_slice *slice)
540 {
541         struct cl_io *io = slice->cis_io;
542         struct osc_io *oio = cl2osc_io(env, slice);
543         struct cl_object *obj = slice->cis_obj;
544         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
545         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
546         struct obdo *oa = &oio->oi_oa;
547         struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
548         unsigned int ia_avalid = io->u.ci_setattr.sa_avalid;
549         enum op_xvalid ia_xvalid = io->u.ci_setattr.sa_xvalid;
550         int result = 0;
551         __u64 size = io->u.ci_setattr.sa_attr.lvb_size;
552         __u64 end = OBD_OBJECT_EOF;
553         bool io_is_falloc = false;
554
555         ENTRY;
556         /* truncate cache dirty pages first */
557         if (cl_io_is_trunc(io)) {
558                 result = osc_cache_truncate_start(env, cl2osc(obj), size,
559                                                   &oio->oi_trunc);
560         } else if (cl_io_is_fallocate(io)) {
561                 io_is_falloc = true;
562                 size = io->u.ci_setattr.sa_falloc_offset;
563                 end = io->u.ci_setattr.sa_falloc_end;
564         }
565
566         if (result == 0 && oio->oi_lockless == 0) {
567                 cl_object_attr_lock(obj);
568                 result = cl_object_attr_get(env, obj, attr);
569                 if (result == 0) {
570                         struct ost_lvb *lvb = &io->u.ci_setattr.sa_attr;
571                         unsigned int cl_valid = 0;
572
573                         if (ia_avalid & ATTR_SIZE) {
574                                 if (io_is_falloc) {
575                                         attr->cat_size =
576                                               io->u.ci_setattr.sa_attr.lvb_size;
577                                         attr->cat_kms = attr->cat_size;
578                                 } else {
579                                         attr->cat_size = size;
580                                         attr->cat_kms = size;
581                                 }
582                                 cl_valid = (CAT_SIZE | CAT_KMS);
583                         }
584                         if (ia_avalid & ATTR_MTIME_SET) {
585                                 attr->cat_mtime = lvb->lvb_mtime;
586                                 cl_valid |= CAT_MTIME;
587                         }
588                         if (ia_avalid & ATTR_ATIME_SET) {
589                                 attr->cat_atime = lvb->lvb_atime;
590                                 cl_valid |= CAT_ATIME;
591                         }
592                         if (ia_xvalid & OP_XVALID_CTIME_SET) {
593                                 attr->cat_ctime = lvb->lvb_ctime;
594                                 cl_valid |= CAT_CTIME;
595                         }
596                         result = cl_object_attr_update(env, obj, attr,
597                                                        cl_valid);
598                 }
599                 cl_object_attr_unlock(obj);
600         }
601         memset(oa, 0, sizeof(*oa));
602         if (result == 0) {
603                 oa->o_oi = loi->loi_oi;
604                 obdo_set_parent_fid(oa, io->u.ci_setattr.sa_parent_fid);
605                 oa->o_stripe_idx = io->u.ci_setattr.sa_stripe_index;
606                 oa->o_layout = io->u.ci_setattr.sa_layout;
607                 oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP |
608                         OBD_MD_FLOSTLAYOUT;
609                 if (ia_avalid & ATTR_CTIME) {
610                         oa->o_valid |= OBD_MD_FLCTIME;
611                         oa->o_ctime = attr->cat_ctime;
612                 }
613                 if (ia_avalid & ATTR_ATIME) {
614                         oa->o_valid |= OBD_MD_FLATIME;
615                         oa->o_atime = attr->cat_atime;
616                 }
617                 if (ia_avalid & ATTR_MTIME) {
618                         oa->o_valid |= OBD_MD_FLMTIME;
619                         oa->o_mtime = attr->cat_mtime;
620                 }
621                 if (ia_avalid & ATTR_SIZE) {
622                         if (io_is_falloc) {
623                                 oa->o_size = size;
624                                 oa->o_blocks = end;
625                                 oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
626                         } else {
627                                 oa->o_size = size;
628                                 oa->o_blocks = OBD_OBJECT_EOF;
629                                 oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
630                         }
631
632                         if (oio->oi_lockless) {
633                                 oa->o_flags = OBD_FL_SRVLOCK;
634                                 oa->o_valid |= OBD_MD_FLFLAGS;
635                         }
636
637                         if (io->ci_layout_version > 0) {
638                                 /* verify layout version */
639                                 oa->o_valid |= OBD_MD_LAYOUT_VERSION;
640                                 oa->o_layout_version = io->ci_layout_version;
641                         }
642                 } else {
643                         LASSERT(oio->oi_lockless == 0);
644                 }
645
646                 if (ia_xvalid & OP_XVALID_FLAGS) {
647                         oa->o_flags = io->u.ci_setattr.sa_attr_flags;
648                         oa->o_valid |= OBD_MD_FLFLAGS;
649                 }
650
651                 init_completion(&cbargs->opc_sync);
652
653                 if (io_is_falloc) {
654                         int falloc_mode = io->u.ci_setattr.sa_falloc_mode;
655
656                         result = osc_fallocate_base(osc_export(cl2osc(obj)),
657                                                     oa, osc_async_upcall,
658                                                     cbargs, falloc_mode);
659                 } else if (ia_avalid & ATTR_SIZE) {
660                         result = osc_punch_send(osc_export(cl2osc(obj)),
661                                                 oa, osc_async_upcall, cbargs);
662                 } else {
663                         result = osc_setattr_async(osc_export(cl2osc(obj)),
664                                                    oa, osc_async_upcall,
665                                                    cbargs, PTLRPCD_SET);
666                 }
667                 cbargs->opc_rpc_sent = result == 0;
668         }
669
670         RETURN(result);
671 }
672
673 void osc_io_setattr_end(const struct lu_env *env,
674                         const struct cl_io_slice *slice)
675 {
676         struct cl_io     *io  = slice->cis_io;
677         struct osc_io    *oio = cl2osc_io(env, slice);
678         struct cl_object *obj = slice->cis_obj;
679         struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
680         struct cl_attr  *attr = &osc_env_info(env)->oti_attr;
681         struct obdo *oa = &oio->oi_oa;
682         unsigned int cl_valid = 0;
683         int result = 0;
684
685         if (cbargs->opc_rpc_sent) {
686                 wait_for_completion(&cbargs->opc_sync);
687                 result = io->ci_result = cbargs->opc_rc;
688         }
689
690         if (result == 0) {
691                 if (oio->oi_lockless) {
692                         /* lockless truncate */
693                         struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev);
694
695                         LASSERT(cl_io_is_trunc(io));
696                         LASSERT(cl_io_is_trunc(io) || cl_io_is_fallocate(io));
697                         /* XXX: Need a lock. */
698                         osd->od_stats.os_lockless_truncates++;
699                 }
700         }
701
702         if (cl_io_is_trunc(io)) {
703                 __u64 size = io->u.ci_setattr.sa_attr.lvb_size;
704
705                 if (result == 0) {
706                         cl_object_attr_lock(obj);
707                         if (oa->o_valid & OBD_MD_FLBLOCKS) {
708                                 attr->cat_blocks = oa->o_blocks;
709                                 cl_valid |= CAT_BLOCKS;
710                         }
711
712                         cl_object_attr_update(env, obj, attr, cl_valid);
713                         cl_object_attr_unlock(obj);
714                 }
715                 osc_trunc_check(env, io, oio, size);
716                 osc_cache_truncate_end(env, oio->oi_trunc);
717                 oio->oi_trunc = NULL;
718         }
719
720         if (cl_io_is_fallocate(io)) {
721                 if (result == 0) {
722                         cl_object_attr_lock(obj);
723                         /* update blocks */
724                         if (oa->o_valid & OBD_MD_FLBLOCKS) {
725                                 attr->cat_blocks = oa->o_blocks;
726                                 cl_valid |= CAT_BLOCKS;
727                         }
728
729                         cl_object_attr_update(env, obj, attr, cl_valid);
730                         cl_object_attr_unlock(obj);
731                 }
732         }
733 }
734 EXPORT_SYMBOL(osc_io_setattr_end);
735
736 struct osc_data_version_args {
737         struct osc_io *dva_oio;
738 };
739
740 static int
741 osc_data_version_interpret(const struct lu_env *env, struct ptlrpc_request *req,
742                            void *args, int rc)
743 {
744         struct osc_data_version_args *dva = args;
745         struct osc_io *oio = dva->dva_oio;
746         const struct ost_body *body;
747
748         ENTRY;
749         if (rc < 0)
750                 GOTO(out, rc);
751
752         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
753         if (body == NULL)
754                 GOTO(out, rc = -EPROTO);
755
756         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, &oio->oi_oa,
757                              &body->oa);
758         EXIT;
759 out:
760         oio->oi_cbarg.opc_rc = rc;
761         complete(&oio->oi_cbarg.opc_sync);
762
763         return 0;
764 }
765
766 static int osc_io_data_version_start(const struct lu_env *env,
767                                      const struct cl_io_slice *slice)
768 {
769         struct cl_data_version_io *dv   = &slice->cis_io->u.ci_data_version;
770         struct osc_io           *oio    = cl2osc_io(env, slice);
771         struct obdo             *oa     = &oio->oi_oa;
772         struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
773         struct osc_object       *obj    = cl2osc(slice->cis_obj);
774         struct lov_oinfo        *loi    = obj->oo_oinfo;
775         struct obd_export       *exp    = osc_export(obj);
776         struct ptlrpc_request   *req;
777         struct ost_body         *body;
778         struct osc_data_version_args *dva;
779         int rc;
780
781         ENTRY;
782         memset(oa, 0, sizeof(*oa));
783         oa->o_oi = loi->loi_oi;
784         oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
785
786         if (dv->dv_flags & (LL_DV_RD_FLUSH | LL_DV_WR_FLUSH)) {
787                 oa->o_valid |= OBD_MD_FLFLAGS;
788                 oa->o_flags |= OBD_FL_SRVLOCK;
789                 if (dv->dv_flags & LL_DV_WR_FLUSH)
790                         oa->o_flags |= OBD_FL_FLUSH;
791         }
792
793         init_completion(&cbargs->opc_sync);
794
795         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
796         if (req == NULL)
797                 RETURN(-ENOMEM);
798
799         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
800         if (rc < 0) {
801                 ptlrpc_request_free(req);
802                 RETURN(rc);
803         }
804
805         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
806         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
807
808         ptlrpc_request_set_replen(req);
809         req->rq_interpret_reply = osc_data_version_interpret;
810         dva = ptlrpc_req_async_args(dva, req);
811         dva->dva_oio = oio;
812
813         ptlrpcd_add_req(req);
814
815         RETURN(0);
816 }
817
818 static void osc_io_data_version_end(const struct lu_env *env,
819                                     const struct cl_io_slice *slice)
820 {
821         struct cl_data_version_io *dv = &slice->cis_io->u.ci_data_version;
822         struct osc_io           *oio    = cl2osc_io(env, slice);
823         struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
824
825         ENTRY;
826         wait_for_completion(&cbargs->opc_sync);
827
828         if (cbargs->opc_rc != 0) {
829                 slice->cis_io->ci_result = cbargs->opc_rc;
830         } else {
831                 slice->cis_io->ci_result = 0;
832                 if (!(oio->oi_oa.o_valid &
833                       (OBD_MD_LAYOUT_VERSION | OBD_MD_FLDATAVERSION)))
834                         slice->cis_io->ci_result = -ENOTSUPP;
835
836                 if (oio->oi_oa.o_valid & OBD_MD_LAYOUT_VERSION)
837                         dv->dv_layout_version = oio->oi_oa.o_layout_version;
838                 if (oio->oi_oa.o_valid & OBD_MD_FLDATAVERSION)
839                         dv->dv_data_version = oio->oi_oa.o_data_version;
840         }
841
842         EXIT;
843 }
844
845 int osc_io_read_start(const struct lu_env *env,
846                       const struct cl_io_slice *slice)
847 {
848         struct cl_object *obj  = slice->cis_obj;
849         struct cl_attr   *attr = &osc_env_info(env)->oti_attr;
850         int rc = 0;
851         ENTRY;
852
853         if (!slice->cis_io->ci_noatime) {
854                 cl_object_attr_lock(obj);
855                 attr->cat_atime = ktime_get_real_seconds();
856                 rc = cl_object_attr_update(env, obj, attr, CAT_ATIME);
857                 cl_object_attr_unlock(obj);
858         }
859
860         RETURN(rc);
861 }
862 EXPORT_SYMBOL(osc_io_read_start);
863
864 int osc_io_write_start(const struct lu_env *env,
865                        const struct cl_io_slice *slice)
866 {
867         struct cl_object *obj   = slice->cis_obj;
868         struct cl_attr   *attr  = &osc_env_info(env)->oti_attr;
869         int rc = 0;
870         ENTRY;
871
872         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_SETTIME, 1);
873         cl_object_attr_lock(obj);
874         attr->cat_mtime = attr->cat_ctime = ktime_get_real_seconds();
875         rc = cl_object_attr_update(env, obj, attr, CAT_MTIME | CAT_CTIME);
876         cl_object_attr_unlock(obj);
877
878         RETURN(rc);
879 }
880 EXPORT_SYMBOL(osc_io_write_start);
881
882 int osc_fsync_ost(const struct lu_env *env, struct osc_object *obj,
883                   struct cl_fsync_io *fio)
884 {
885         struct osc_io    *oio   = osc_env_io(env);
886         struct obdo      *oa    = &oio->oi_oa;
887         struct lov_oinfo *loi   = obj->oo_oinfo;
888         struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
889         int rc = 0;
890         ENTRY;
891
892         memset(oa, 0, sizeof(*oa));
893         oa->o_oi = loi->loi_oi;
894         oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
895
896         /* reload size abd blocks for start and end of sync range */
897         oa->o_size = fio->fi_start;
898         oa->o_blocks = fio->fi_end;
899         oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
900
901         obdo_set_parent_fid(oa, fio->fi_fid);
902
903         init_completion(&cbargs->opc_sync);
904
905         rc = osc_sync_base(obj, oa, osc_async_upcall, cbargs, PTLRPCD_SET);
906         RETURN(rc);
907 }
908 EXPORT_SYMBOL(osc_fsync_ost);
909
910 int osc_io_fsync_start(const struct lu_env *env,
911                        const struct cl_io_slice *slice)
912 {
913         struct cl_io       *io  = slice->cis_io;
914         struct cl_fsync_io *fio = &io->u.ci_fsync;
915         struct cl_object   *obj = slice->cis_obj;
916         struct osc_object  *osc = cl2osc(obj);
917         pgoff_t start  = cl_index(obj, fio->fi_start);
918         pgoff_t end    = cl_index(obj, fio->fi_end);
919         int     result = 0;
920         ENTRY;
921
922         if (fio->fi_end == OBD_OBJECT_EOF)
923                 end = CL_PAGE_EOF;
924
925         result = osc_cache_writeback_range(env, osc, start, end, 0,
926                                            fio->fi_mode == CL_FSYNC_DISCARD);
927         if (result > 0) {
928                 fio->fi_nr_written += result;
929                 result = 0;
930         }
931         if (fio->fi_mode == CL_FSYNC_ALL) {
932                 int rc;
933
934                 /* we have to wait for writeback to finish before we can
935                  * send OST_SYNC RPC. This is bad because it causes extents
936                  * to be written osc by osc. However, we usually start
937                  * writeback before CL_FSYNC_ALL so this won't have any real
938                  * problem. */
939                 rc = osc_cache_wait_range(env, osc, start, end);
940                 if (result == 0)
941                         result = rc;
942                 rc = osc_fsync_ost(env, osc, fio);
943                 if (result == 0)
944                         result = rc;
945         }
946
947         RETURN(result);
948 }
949
950 void osc_io_fsync_end(const struct lu_env *env,
951                       const struct cl_io_slice *slice)
952 {
953         struct cl_fsync_io *fio = &slice->cis_io->u.ci_fsync;
954         struct cl_object   *obj = slice->cis_obj;
955         pgoff_t start = cl_index(obj, fio->fi_start);
956         pgoff_t end   = cl_index(obj, fio->fi_end);
957         int result = 0;
958
959         if (fio->fi_mode == CL_FSYNC_LOCAL) {
960                 result = osc_cache_wait_range(env, cl2osc(obj), start, end);
961         } else if (fio->fi_mode == CL_FSYNC_ALL) {
962                 struct osc_io           *oio    = cl2osc_io(env, slice);
963                 struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
964
965                 wait_for_completion(&cbargs->opc_sync);
966                 if (result == 0)
967                         result = cbargs->opc_rc;
968         }
969         slice->cis_io->ci_result = result;
970 }
971 EXPORT_SYMBOL(osc_io_fsync_end);
972
973 static int osc_io_ladvise_start(const struct lu_env *env,
974                                 const struct cl_io_slice *slice)
975 {
976         int                      result = 0;
977         struct cl_io            *io = slice->cis_io;
978         struct osc_io           *oio = cl2osc_io(env, slice);
979         struct cl_object        *obj = slice->cis_obj;
980         struct lov_oinfo        *loi = cl2osc(obj)->oo_oinfo;
981         struct cl_ladvise_io    *lio = &io->u.ci_ladvise;
982         struct obdo             *oa = &oio->oi_oa;
983         struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
984         struct lu_ladvise       *ladvise;
985         struct ladvise_hdr      *ladvise_hdr;
986         int                      buf_size;
987         int                      num_advise = 1;
988         ENTRY;
989
990         /* TODO: add multiple ladvise support in CLIO */
991         buf_size = offsetof(typeof(*ladvise_hdr), lah_advise[num_advise]);
992         if (osc_env_info(env)->oti_ladvise_buf.lb_len < buf_size)
993                 lu_buf_realloc(&osc_env_info(env)->oti_ladvise_buf, buf_size);
994
995         ladvise_hdr = osc_env_info(env)->oti_ladvise_buf.lb_buf;
996         if (ladvise_hdr == NULL)
997                 RETURN(-ENOMEM);
998
999         memset(ladvise_hdr, 0, buf_size);
1000         ladvise_hdr->lah_magic = LADVISE_MAGIC;
1001         ladvise_hdr->lah_count = num_advise;
1002         ladvise_hdr->lah_flags = lio->li_flags;
1003
1004         memset(oa, 0, sizeof(*oa));
1005         oa->o_oi = loi->loi_oi;
1006         oa->o_valid = OBD_MD_FLID;
1007         obdo_set_parent_fid(oa, lio->li_fid);
1008
1009         ladvise = ladvise_hdr->lah_advise;
1010         ladvise->lla_start = lio->li_start;
1011         ladvise->lla_end = lio->li_end;
1012         ladvise->lla_advice = lio->li_advice;
1013
1014         if (lio->li_flags & LF_ASYNC) {
1015                 result = osc_ladvise_base(osc_export(cl2osc(obj)), oa,
1016                                           ladvise_hdr, NULL, NULL, NULL);
1017         } else {
1018                 init_completion(&cbargs->opc_sync);
1019                 result = osc_ladvise_base(osc_export(cl2osc(obj)), oa,
1020                                           ladvise_hdr, osc_async_upcall,
1021                                           cbargs, PTLRPCD_SET);
1022                 cbargs->opc_rpc_sent = result == 0;
1023         }
1024         RETURN(result);
1025 }
1026
1027 static void osc_io_ladvise_end(const struct lu_env *env,
1028                                const struct cl_io_slice *slice)
1029 {
1030         struct cl_io            *io = slice->cis_io;
1031         struct osc_io           *oio = cl2osc_io(env, slice);
1032         struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
1033         int                      result = 0;
1034         struct cl_ladvise_io    *lio = &io->u.ci_ladvise;
1035
1036         if ((!(lio->li_flags & LF_ASYNC)) && cbargs->opc_rpc_sent) {
1037                 wait_for_completion(&cbargs->opc_sync);
1038                 result = cbargs->opc_rc;
1039         }
1040         slice->cis_io->ci_result = result;
1041 }
1042
1043 void osc_io_end(const struct lu_env *env, const struct cl_io_slice *slice)
1044 {
1045         struct osc_io *oio = cl2osc_io(env, slice);
1046
1047         if (oio->oi_active) {
1048                 osc_extent_release(env, oio->oi_active);
1049                 oio->oi_active = NULL;
1050         }
1051 }
1052 EXPORT_SYMBOL(osc_io_end);
1053
1054 struct osc_lseek_args {
1055         struct osc_io *lsa_oio;
1056 };
1057
1058 static int osc_lseek_interpret(const struct lu_env *env,
1059                                struct ptlrpc_request *req,
1060                                void *arg, int rc)
1061 {
1062         struct ost_body *reply;
1063         struct osc_lseek_args *lsa = arg;
1064         struct osc_io *oio = lsa->lsa_oio;
1065         struct cl_io *io = oio->oi_cl.cis_io;
1066         struct cl_lseek_io *lsio = &io->u.ci_lseek;
1067
1068         ENTRY;
1069
1070         if (rc != 0)
1071                 GOTO(out, rc);
1072
1073         reply = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1074         if (reply == NULL)
1075                 GOTO(out, rc = -EPROTO);
1076
1077         lsio->ls_result = reply->oa.o_size;
1078 out:
1079         osc_async_upcall(&oio->oi_cbarg, rc);
1080         RETURN(rc);
1081 }
1082
1083 int osc_io_lseek_start(const struct lu_env *env,
1084                        const struct cl_io_slice *slice)
1085 {
1086         struct cl_io *io = slice->cis_io;
1087         struct osc_io *oio = cl2osc_io(env, slice);
1088         struct cl_object *obj = slice->cis_obj;
1089         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1090         struct cl_lseek_io *lsio = &io->u.ci_lseek;
1091         struct obdo *oa = &oio->oi_oa;
1092         struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
1093         struct obd_export *exp = osc_export(cl2osc(obj));
1094         struct ptlrpc_request *req;
1095         struct ost_body *body;
1096         struct osc_lseek_args *lsa;
1097         int rc = 0;
1098
1099         ENTRY;
1100
1101         /* No negative values at this point */
1102         LASSERT(lsio->ls_start >= 0);
1103         LASSERT(lsio->ls_whence == SEEK_HOLE || lsio->ls_whence == SEEK_DATA);
1104
1105         /* with IO lock taken we have object size in LVB and can check
1106          * boundaries prior sending LSEEK RPC
1107          */
1108         if (lsio->ls_start >= loi->loi_lvb.lvb_size) {
1109                 /* consider area beyond end of object as hole */
1110                 if (lsio->ls_whence == SEEK_HOLE)
1111                         lsio->ls_result = lsio->ls_start;
1112                 else
1113                         lsio->ls_result = -ENXIO;
1114                 RETURN(0);
1115         }
1116
1117         /* if LSEEK RPC is not supported by server, consider whole stripe
1118          * object is data with hole after end of object
1119          */
1120         if (!exp_connect_lseek(exp)) {
1121                 if (lsio->ls_whence == SEEK_HOLE)
1122                         lsio->ls_result = loi->loi_lvb.lvb_size;
1123                 else
1124                         lsio->ls_result = lsio->ls_start;
1125                 RETURN(0);
1126         }
1127
1128         memset(oa, 0, sizeof(*oa));
1129         oa->o_oi = loi->loi_oi;
1130         oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
1131         oa->o_size = lsio->ls_start;
1132         oa->o_mode = lsio->ls_whence;
1133         if (oio->oi_lockless) {
1134                 oa->o_flags = OBD_FL_SRVLOCK;
1135                 oa->o_valid |= OBD_MD_FLFLAGS;
1136         }
1137
1138         init_completion(&cbargs->opc_sync);
1139         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SEEK);
1140         if (req == NULL)
1141                 RETURN(-ENOMEM);
1142
1143         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SEEK);
1144         if (rc < 0) {
1145                 ptlrpc_request_free(req);
1146                 RETURN(rc);
1147         }
1148
1149         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1150         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1151         ptlrpc_request_set_replen(req);
1152         req->rq_interpret_reply = osc_lseek_interpret;
1153         lsa = ptlrpc_req_async_args(lsa, req);
1154         lsa->lsa_oio = oio;
1155
1156         ptlrpcd_add_req(req);
1157         cbargs->opc_rpc_sent = 1;
1158
1159         RETURN(0);
1160 }
1161 EXPORT_SYMBOL(osc_io_lseek_start);
1162
1163 void osc_io_lseek_end(const struct lu_env *env,
1164                       const struct cl_io_slice *slice)
1165 {
1166         struct osc_io *oio = cl2osc_io(env, slice);
1167         struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
1168         int rc = 0;
1169
1170         if (cbargs->opc_rpc_sent) {
1171                 wait_for_completion(&cbargs->opc_sync);
1172                 rc = cbargs->opc_rc;
1173         }
1174         slice->cis_io->ci_result = rc;
1175 }
1176 EXPORT_SYMBOL(osc_io_lseek_end);
1177
1178 int osc_io_lru_reserve(const struct lu_env *env,
1179                        const struct cl_io_slice *ios,
1180                        loff_t pos, size_t bytes)
1181 {
1182         struct osc_object *osc = cl2osc(ios->cis_obj);
1183         struct osc_io *oio = osc_env_io(env);
1184         unsigned long npages = 0;
1185         size_t page_offset;
1186
1187         ENTRY;
1188
1189         page_offset = pos & ~PAGE_MASK;
1190         if (page_offset) {
1191                 ++npages;
1192                 if (bytes > PAGE_SIZE - page_offset)
1193                         bytes -= (PAGE_SIZE - page_offset);
1194                 else
1195                         bytes = 0;
1196         }
1197         npages += (bytes + PAGE_SIZE - 1) >> PAGE_SHIFT;
1198         oio->oi_lru_reserved = osc_lru_reserve(osc_cli(osc), npages);
1199
1200         RETURN(0);
1201 }
1202 EXPORT_SYMBOL(osc_io_lru_reserve);
1203
1204 static const struct cl_io_operations osc_io_ops = {
1205         .op = {
1206                 [CIT_READ] = {
1207                         .cio_iter_init = osc_io_iter_init,
1208                         .cio_iter_fini = osc_io_rw_iter_fini,
1209                         .cio_start  = osc_io_read_start,
1210                         .cio_fini   = osc_io_fini
1211                 },
1212                 [CIT_WRITE] = {
1213                         .cio_iter_init = osc_io_iter_init,
1214                         .cio_iter_fini = osc_io_rw_iter_fini,
1215                         .cio_start  = osc_io_write_start,
1216                         .cio_end    = osc_io_end,
1217                         .cio_fini   = osc_io_fini
1218                 },
1219                 [CIT_SETATTR] = {
1220                         .cio_iter_init = osc_io_iter_init,
1221                         .cio_iter_fini = osc_io_iter_fini,
1222                         .cio_start  = osc_io_setattr_start,
1223                         .cio_end    = osc_io_setattr_end
1224                 },
1225                 [CIT_DATA_VERSION] = {
1226                         .cio_start  = osc_io_data_version_start,
1227                         .cio_end    = osc_io_data_version_end,
1228                 },
1229                 [CIT_FAULT] = {
1230                         .cio_iter_init = osc_io_iter_init,
1231                         .cio_iter_fini = osc_io_iter_fini,
1232                         .cio_start  = osc_io_fault_start,
1233                         .cio_end    = osc_io_end,
1234                         .cio_fini   = osc_io_fini
1235                 },
1236                 [CIT_FSYNC] = {
1237                         .cio_start  = osc_io_fsync_start,
1238                         .cio_end    = osc_io_fsync_end,
1239                         .cio_fini   = osc_io_fini
1240                 },
1241                 [CIT_LADVISE] = {
1242                         .cio_start  = osc_io_ladvise_start,
1243                         .cio_end    = osc_io_ladvise_end,
1244                         .cio_fini   = osc_io_fini
1245                 },
1246                 [CIT_LSEEK] = {
1247                         .cio_start  = osc_io_lseek_start,
1248                         .cio_end    = osc_io_lseek_end,
1249                         .cio_fini   = osc_io_fini
1250                 },
1251                 [CIT_MISC] = {
1252                         .cio_fini   = osc_io_fini
1253                 }
1254         },
1255         .cio_read_ahead             = osc_io_read_ahead,
1256         .cio_lru_reserve            = osc_io_lru_reserve,
1257         .cio_submit                 = osc_io_submit,
1258         .cio_commit_async           = osc_io_commit_async,
1259         .cio_extent_release         = osc_io_extent_release
1260 };
1261
1262 /*****************************************************************************
1263  *
1264  * Transfer operations.
1265  *
1266  */
1267
1268 int osc_io_init(const struct lu_env *env,
1269                 struct cl_object *obj, struct cl_io *io)
1270 {
1271         struct osc_io *oio = osc_env_io(env);
1272
1273         CL_IO_SLICE_CLEAN(oio, oi_cl);
1274         cl_io_slice_add(io, &oio->oi_cl, obj, &osc_io_ops);
1275         return 0;
1276 }
1277
1278 /** @} osc */