Whamcloud - gitweb
06af63f30bdca4d319ea823415ea7e75ac7868d3
[fs/lustre-release.git] / lustre / osc / osc_io.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Implementation of cl_io for OSC layer.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  *   Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_OSC
43
44 #include <lustre_obdo.h>
45
46 #include "osc_cl_internal.h"
47
48 /** \addtogroup osc 
49  *  @{ 
50  */
51
52 /*****************************************************************************
53  *
54  * Type conversions.
55  *
56  */
57
58 static struct osc_io *cl2osc_io(const struct lu_env *env,
59                                 const struct cl_io_slice *slice)
60 {
61         struct osc_io *oio = container_of0(slice, struct osc_io, oi_cl);
62         LINVRNT(oio == osc_env_io(env));
63         return oio;
64 }
65
66 /*****************************************************************************
67  *
68  * io operations.
69  *
70  */
71
72 static void osc_io_fini(const struct lu_env *env, const struct cl_io_slice *io)
73 {
74 }
75
76 static void osc_read_ahead_release(const struct lu_env *env,
77                                    void *cbdata)
78 {
79         struct ldlm_lock *dlmlock = cbdata;
80         struct lustre_handle lockh;
81
82         ldlm_lock2handle(dlmlock, &lockh);
83         ldlm_lock_decref(&lockh, LCK_PR);
84         LDLM_LOCK_PUT(dlmlock);
85 }
86
87 static int osc_io_read_ahead(const struct lu_env *env,
88                              const struct cl_io_slice *ios,
89                              pgoff_t start, struct cl_read_ahead *ra)
90 {
91         struct osc_object       *osc = cl2osc(ios->cis_obj);
92         struct ldlm_lock        *dlmlock;
93         int                     result = -ENODATA;
94         ENTRY;
95
96         dlmlock = osc_dlmlock_at_pgoff(env, osc, start, 0);
97         if (dlmlock != NULL) {
98                 LASSERT(dlmlock->l_ast_data == osc);
99                 if (dlmlock->l_req_mode != LCK_PR) {
100                         struct lustre_handle lockh;
101                         ldlm_lock2handle(dlmlock, &lockh);
102                         ldlm_lock_addref(&lockh, LCK_PR);
103                         ldlm_lock_decref(&lockh, dlmlock->l_req_mode);
104                 }
105
106                 ra->cra_end = cl_index(osc2cl(osc),
107                                        dlmlock->l_policy_data.l_extent.end);
108                 ra->cra_release = osc_read_ahead_release;
109                 ra->cra_cbdata = dlmlock;
110                 result = 0;
111         }
112
113         RETURN(result);
114 }
115
116 /**
117  * An implementation of cl_io_operations::cio_io_submit() method for osc
118  * layer. Iterates over pages in the in-queue, prepares each for io by calling
119  * cl_page_prep() and then either submits them through osc_io_submit_page()
120  * or, if page is already submitted, changes osc flags through
121  * osc_set_async_flags().
122  */
123 static int osc_io_submit(const struct lu_env *env,
124                          const struct cl_io_slice *ios,
125                          enum cl_req_type crt, struct cl_2queue *queue)
126 {
127         struct cl_page    *page;
128         struct cl_page    *tmp;
129         struct client_obd *cli  = NULL;
130         struct osc_object *osc  = NULL; /* to keep gcc happy */
131         struct osc_page   *opg;
132         struct cl_io      *io;
133         struct list_head  list = LIST_HEAD_INIT(list);
134
135         struct cl_page_list *qin      = &queue->c2_qin;
136         struct cl_page_list *qout     = &queue->c2_qout;
137         unsigned int queued = 0;
138         int result = 0;
139         int cmd;
140         int brw_flags;
141         unsigned int max_pages;
142
143         LASSERT(qin->pl_nr > 0);
144
145         CDEBUG(D_CACHE, "%d %d\n", qin->pl_nr, crt);
146
147         osc = cl2osc(ios->cis_obj);
148         cli = osc_cli(osc);
149         max_pages = cli->cl_max_pages_per_rpc;
150
151         cmd = crt == CRT_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ;
152         brw_flags = osc_io_srvlock(cl2osc_io(env, ios)) ? OBD_BRW_SRVLOCK : 0;
153
154         /*
155          * NOTE: here @page is a top-level page. This is done to avoid
156          *       creation of sub-page-list.
157          */
158         cl_page_list_for_each_safe(page, tmp, qin) {
159                 struct osc_async_page *oap;
160
161                 /* Top level IO. */
162                 io = page->cp_owner;
163                 LASSERT(io != NULL);
164
165                 opg = osc_cl_page_osc(page, osc);
166                 oap = &opg->ops_oap;
167                 LASSERT(osc == oap->oap_obj);
168
169                 if (!list_empty(&oap->oap_pending_item) ||
170                     !list_empty(&oap->oap_rpc_item)) {
171                         CDEBUG(D_CACHE, "Busy oap %p page %p for submit.\n",
172                                oap, opg);
173                         result = -EBUSY;
174                         break;
175                 }
176
177                 result = cl_page_prep(env, io, page, crt);
178                 if (result != 0) {
179                         LASSERT(result < 0);
180                         if (result != -EALREADY)
181                                 break;
182                         /*
183                          * Handle -EALREADY error: for read case, the page is
184                          * already in UPTODATE state; for write, the page
185                          * is not dirty.
186                          */
187                         result = 0;
188                         continue;
189                 }
190
191                 spin_lock(&oap->oap_lock);
192                 oap->oap_async_flags = ASYNC_URGENT|ASYNC_READY;
193                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
194                 spin_unlock(&oap->oap_lock);
195
196                 osc_page_submit(env, opg, crt, brw_flags);
197                 list_add_tail(&oap->oap_pending_item, &list);
198
199                 if (page->cp_sync_io != NULL)
200                         cl_page_list_move(qout, qin, page);
201                 else /* async IO */
202                         cl_page_list_del(env, qin, page);
203
204                 if (++queued == max_pages) {
205                         queued = 0;
206                         result = osc_queue_sync_pages(env, osc, &list, cmd,
207                                                       brw_flags);
208                         if (result < 0)
209                                 break;
210                 }
211         }
212
213         if (queued > 0)
214                 result = osc_queue_sync_pages(env, osc, &list, cmd, brw_flags);
215
216         CDEBUG(D_INFO, "%d/%d %d\n", qin->pl_nr, qout->pl_nr, result);
217         return qout->pl_nr > 0 ? 0 : result;
218 }
219
220 /**
221  * This is called when a page is accessed within file in a way that creates
222  * new page, if one were missing (i.e., if there were a hole at that place in
223  * the file, or accessed page is beyond the current file size).
224  *
225  * Expand stripe KMS if necessary.
226  */
227 static void osc_page_touch_at(const struct lu_env *env,
228                               struct cl_object *obj, pgoff_t idx, size_t to)
229 {
230         struct lov_oinfo  *loi  = cl2osc(obj)->oo_oinfo;
231         struct cl_attr    *attr = &osc_env_info(env)->oti_attr;
232         int valid;
233         __u64 kms;
234
235         /* offset within stripe */
236         kms = cl_offset(obj, idx) + to;
237
238         cl_object_attr_lock(obj);
239         /*
240          * XXX old code used
241          *
242          *         ll_inode_size_lock(inode, 0); lov_stripe_lock(lsm);
243          *
244          * here
245          */
246         CDEBUG(D_INODE, "stripe KMS %sincreasing "LPU64"->"LPU64" "LPU64"\n",
247                kms > loi->loi_kms ? "" : "not ", loi->loi_kms, kms,
248                loi->loi_lvb.lvb_size);
249
250         attr->cat_mtime = attr->cat_ctime = LTIME_S(CFS_CURRENT_TIME);
251         valid = CAT_MTIME | CAT_CTIME;
252         if (kms > loi->loi_kms) {
253                 attr->cat_kms = kms;
254                 valid |= CAT_KMS;
255         }
256         if (kms > loi->loi_lvb.lvb_size) {
257                 attr->cat_size = kms;
258                 valid |= CAT_SIZE;
259         }
260         cl_object_attr_update(env, obj, attr, valid);
261         cl_object_attr_unlock(obj);
262 }
263
264 static int osc_io_commit_async(const struct lu_env *env,
265                                 const struct cl_io_slice *ios,
266                                 struct cl_page_list *qin, int from, int to,
267                                 cl_commit_cbt cb)
268 {
269         struct cl_io    *io = ios->cis_io;
270         struct osc_io   *oio = cl2osc_io(env, ios);
271         struct osc_object *osc = cl2osc(ios->cis_obj);
272         struct cl_page  *page;
273         struct cl_page  *last_page;
274         struct osc_page *opg;
275         int result = 0;
276         ENTRY;
277
278         LASSERT(qin->pl_nr > 0);
279
280         /* Handle partial page cases */
281         last_page = cl_page_list_last(qin);
282         if (oio->oi_lockless) {
283                 page = cl_page_list_first(qin);
284                 if (page == last_page) {
285                         cl_page_clip(env, page, from, to);
286                 } else {
287                         if (from != 0)
288                                 cl_page_clip(env, page, from, PAGE_SIZE);
289                         if (to != PAGE_SIZE)
290                                 cl_page_clip(env, last_page, 0, to);
291                 }
292         }
293
294         while (qin->pl_nr > 0) {
295                 struct osc_async_page *oap;
296
297                 page = cl_page_list_first(qin);
298                 opg = osc_cl_page_osc(page, osc);
299                 oap = &opg->ops_oap;
300
301                 if (!list_empty(&oap->oap_rpc_item)) {
302                         CDEBUG(D_CACHE, "Busy oap %p page %p for submit.\n",
303                                oap, opg);
304                         result = -EBUSY;
305                         break;
306                 }
307
308                 /* The page may be already in dirty cache. */
309                 if (list_empty(&oap->oap_pending_item)) {
310                         result = osc_page_cache_add(env, &opg->ops_cl, io);
311                         if (result != 0)
312                                 break;
313                 }
314
315                 osc_page_touch_at(env, osc2cl(osc), osc_index(opg),
316                                   page == last_page ? to : PAGE_SIZE);
317
318                 cl_page_list_del(env, qin, page);
319
320                 (*cb)(env, io, page);
321                 /* Can't access page any more. Page can be in transfer and
322                  * complete at any time. */
323         }
324
325         /* for sync write, kernel will wait for this page to be flushed before
326          * osc_io_end() is called, so release it earlier.
327          * for mkwrite(), it's known there is no further pages. */
328         if (cl_io_is_sync_write(io) && oio->oi_active != NULL) {
329                 osc_extent_release(env, oio->oi_active);
330                 oio->oi_active = NULL;
331         }
332
333         CDEBUG(D_INFO, "%d %d\n", qin->pl_nr, result);
334         RETURN(result);
335 }
336
337 static int osc_io_iter_init(const struct lu_env *env,
338                             const struct cl_io_slice *ios)
339 {
340         struct osc_object *osc = cl2osc(ios->cis_obj);
341         struct obd_import *imp = osc_cli(osc)->cl_import;
342         int rc = -EIO;
343
344         spin_lock(&imp->imp_lock);
345         if (likely(!imp->imp_invalid)) {
346                 struct osc_io *oio = osc_env_io(env);
347
348                 atomic_inc(&osc->oo_nr_ios);
349                 oio->oi_is_active = 1;
350                 rc = 0;
351         }
352         spin_unlock(&imp->imp_lock);
353
354         return rc;
355 }
356
357 static int osc_io_write_iter_init(const struct lu_env *env,
358                                   const struct cl_io_slice *ios)
359 {
360         struct cl_io *io = ios->cis_io;
361         struct osc_io *oio = osc_env_io(env);
362         struct osc_object *osc = cl2osc(ios->cis_obj);
363         unsigned long npages;
364         ENTRY;
365
366         if (cl_io_is_append(io))
367                 RETURN(osc_io_iter_init(env, ios));
368
369         npages = io->u.ci_rw.crw_count >> PAGE_CACHE_SHIFT;
370         if (io->u.ci_rw.crw_pos & ~PAGE_MASK)
371                 ++npages;
372
373         oio->oi_lru_reserved = osc_lru_reserve(osc_cli(osc), npages);
374
375         RETURN(osc_io_iter_init(env, ios));
376 }
377
378 static void osc_io_iter_fini(const struct lu_env *env,
379                              const struct cl_io_slice *ios)
380 {
381         struct osc_io *oio = osc_env_io(env);
382
383         if (oio->oi_is_active) {
384                 struct osc_object *osc = cl2osc(ios->cis_obj);
385
386                 oio->oi_is_active = 0;
387                 LASSERT(atomic_read(&osc->oo_nr_ios) > 0);
388                 if (atomic_dec_and_test(&osc->oo_nr_ios))
389                         wake_up_all(&osc->oo_io_waitq);
390         }
391 }
392
393 static void osc_io_write_iter_fini(const struct lu_env *env,
394                                    const struct cl_io_slice *ios)
395 {
396         struct osc_io *oio = osc_env_io(env);
397         struct osc_object *osc = cl2osc(ios->cis_obj);
398
399         if (oio->oi_lru_reserved > 0) {
400                 osc_lru_unreserve(osc_cli(osc), oio->oi_lru_reserved);
401                 oio->oi_lru_reserved = 0;
402         }
403         oio->oi_write_osclock = NULL;
404
405         osc_io_iter_fini(env, ios);
406 }
407
408 static int osc_io_fault_start(const struct lu_env *env,
409                               const struct cl_io_slice *ios)
410 {
411         struct cl_io       *io;
412         struct cl_fault_io *fio;
413         ENTRY;
414
415         io  = ios->cis_io;
416         fio = &io->u.ci_fault;
417         CDEBUG(D_INFO, "%lu %d %zu\n",
418                 fio->ft_index, fio->ft_writable, fio->ft_nob);
419         /*
420          * If mapping is writeable, adjust kms to cover this page,
421          * but do not extend kms beyond actual file size.
422          * See bug 10919.
423          */
424         if (fio->ft_writable)
425                 osc_page_touch_at(env, ios->cis_obj,
426                                   fio->ft_index, fio->ft_nob);
427         RETURN(0);
428 }
429
430 static int osc_async_upcall(void *a, int rc)
431 {
432         struct osc_async_cbargs *args = a;
433
434         args->opc_rc = rc;
435         complete(&args->opc_sync);
436         return 0;
437 }
438
439 /**
440  * Checks that there are no pages being written in the extent being truncated.
441  */
442 static int trunc_check_cb(const struct lu_env *env, struct cl_io *io,
443                           struct osc_page *ops , void *cbdata)
444 {
445         struct cl_page *page = ops->ops_cl.cpl_page;
446         struct osc_async_page *oap;
447         __u64 start = *(__u64 *)cbdata;
448
449         oap = &ops->ops_oap;
450         if (oap->oap_cmd & OBD_BRW_WRITE &&
451             !list_empty(&oap->oap_pending_item))
452                 CL_PAGE_DEBUG(D_ERROR, env, page, "exists " LPU64 "/%s.\n",
453                                 start, current->comm);
454
455         if (PageLocked(page->cp_vmpage))
456                 CDEBUG(D_CACHE, "page %p index %lu locked for %d.\n",
457                        ops, osc_index(ops), oap->oap_cmd & OBD_BRW_RWMASK);
458
459         return CLP_GANG_OKAY;
460 }
461
462 static void osc_trunc_check(const struct lu_env *env, struct cl_io *io,
463                             struct osc_io *oio, __u64 size)
464 {
465         struct cl_object *clob;
466         int     partial;
467         pgoff_t start;
468
469         clob    = oio->oi_cl.cis_obj;
470         start   = cl_index(clob, size);
471         partial = cl_offset(clob, start) < size;
472
473         /*
474          * Complain if there are pages in the truncated region.
475          */
476         osc_page_gang_lookup(env, io, cl2osc(clob),
477                                 start + partial, CL_PAGE_EOF,
478                                 trunc_check_cb, (void *)&size);
479 }
480
481 static int osc_io_setattr_start(const struct lu_env *env,
482                                 const struct cl_io_slice *slice)
483 {
484         struct cl_io            *io     = slice->cis_io;
485         struct osc_io           *oio    = cl2osc_io(env, slice);
486         struct cl_object        *obj    = slice->cis_obj;
487         struct lov_oinfo        *loi    = cl2osc(obj)->oo_oinfo;
488         struct cl_attr          *attr   = &osc_env_info(env)->oti_attr;
489         struct obdo             *oa     = &oio->oi_oa;
490         struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
491         __u64                    size   = io->u.ci_setattr.sa_attr.lvb_size;
492         unsigned int             ia_valid = io->u.ci_setattr.sa_valid;
493         int                      result = 0;
494
495         /* truncate cache dirty pages first */
496         if (cl_io_is_trunc(io))
497                 result = osc_cache_truncate_start(env, cl2osc(obj), size,
498                                                   &oio->oi_trunc);
499
500         if (result == 0 && oio->oi_lockless == 0) {
501                 cl_object_attr_lock(obj);
502                 result = cl_object_attr_get(env, obj, attr);
503                 if (result == 0) {
504                         struct ost_lvb *lvb = &io->u.ci_setattr.sa_attr;
505                         unsigned int cl_valid = 0;
506
507                         if (ia_valid & ATTR_SIZE) {
508                                 attr->cat_size = attr->cat_kms = size;
509                                 cl_valid = (CAT_SIZE | CAT_KMS);
510                         }
511                         if (ia_valid & ATTR_MTIME_SET) {
512                                 attr->cat_mtime = lvb->lvb_mtime;
513                                 cl_valid |= CAT_MTIME;
514                         }
515                         if (ia_valid & ATTR_ATIME_SET) {
516                                 attr->cat_atime = lvb->lvb_atime;
517                                 cl_valid |= CAT_ATIME;
518                         }
519                         if (ia_valid & ATTR_CTIME_SET) {
520                                 attr->cat_ctime = lvb->lvb_ctime;
521                                 cl_valid |= CAT_CTIME;
522                         }
523                         result = cl_object_attr_update(env, obj, attr,
524                                                        cl_valid);
525                 }
526                 cl_object_attr_unlock(obj);
527         }
528         memset(oa, 0, sizeof(*oa));
529         if (result == 0) {
530                 oa->o_oi = loi->loi_oi;
531                 obdo_set_parent_fid(oa, io->u.ci_setattr.sa_parent_fid);
532                 oa->o_stripe_idx = io->u.ci_setattr.sa_stripe_index;
533                 oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
534                 if (ia_valid & ATTR_CTIME) {
535                         oa->o_valid |= OBD_MD_FLCTIME;
536                         oa->o_ctime = attr->cat_ctime;
537                 }
538                 if (ia_valid & ATTR_ATIME) {
539                         oa->o_valid |= OBD_MD_FLATIME;
540                         oa->o_atime = attr->cat_atime;
541                 }
542                 if (ia_valid & ATTR_MTIME) {
543                         oa->o_valid |= OBD_MD_FLMTIME;
544                         oa->o_mtime = attr->cat_mtime;
545                 }
546                 if (ia_valid & ATTR_SIZE) {
547                         oa->o_size = size;
548                         oa->o_blocks = OBD_OBJECT_EOF;
549                         oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
550
551                         if (oio->oi_lockless) {
552                                 oa->o_flags = OBD_FL_SRVLOCK;
553                                 oa->o_valid |= OBD_MD_FLFLAGS;
554                         }
555                 } else {
556                         LASSERT(oio->oi_lockless == 0);
557                 }
558
559                 if (ia_valid & ATTR_ATTR_FLAG) {
560                         oa->o_flags = io->u.ci_setattr.sa_attr_flags;
561                         oa->o_valid |= OBD_MD_FLFLAGS;
562                 }
563
564                 init_completion(&cbargs->opc_sync);
565
566                 if (ia_valid & ATTR_SIZE)
567                         result = osc_punch_base(osc_export(cl2osc(obj)),
568                                                 oa, osc_async_upcall,
569                                                 cbargs, PTLRPCD_SET);
570                 else
571                         result = osc_setattr_async(osc_export(cl2osc(obj)),
572                                                    oa, osc_async_upcall,
573                                                    cbargs, PTLRPCD_SET);
574
575                 cbargs->opc_rpc_sent = result == 0;
576         }
577
578         return result;
579 }
580
581 static void osc_io_setattr_end(const struct lu_env *env,
582                                const struct cl_io_slice *slice)
583 {
584         struct cl_io     *io  = slice->cis_io;
585         struct osc_io    *oio = cl2osc_io(env, slice);
586         struct cl_object *obj = slice->cis_obj;
587         struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
588         int result = 0;
589
590         if (cbargs->opc_rpc_sent) {
591                 wait_for_completion(&cbargs->opc_sync);
592                 result = io->ci_result = cbargs->opc_rc;
593         }
594         if (result == 0) {
595                 if (oio->oi_lockless) {
596                         /* lockless truncate */
597                         struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev);
598
599                         LASSERT(cl_io_is_trunc(io));
600                         /* XXX: Need a lock. */
601                         osd->od_stats.os_lockless_truncates++;
602                 }
603         }
604
605         if (cl_io_is_trunc(io)) {
606                 __u64 size = io->u.ci_setattr.sa_attr.lvb_size;
607                 osc_trunc_check(env, io, oio, size);
608                 osc_cache_truncate_end(env, oio->oi_trunc);
609                 oio->oi_trunc = NULL;
610         }
611 }
612
613 struct osc_data_version_args {
614         struct osc_io *dva_oio;
615 };
616
617 static int
618 osc_data_version_interpret(const struct lu_env *env, struct ptlrpc_request *req,
619                            void *arg, int rc)
620 {
621         struct osc_data_version_args *dva = arg;
622         struct osc_io *oio = dva->dva_oio;
623         const struct ost_body *body;
624
625         ENTRY;
626         if (rc < 0)
627                 GOTO(out, rc);
628
629         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
630         if (body == NULL)
631                 GOTO(out, rc = -EPROTO);
632
633         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, &oio->oi_oa,
634                              &body->oa);
635         EXIT;
636 out:
637         oio->oi_cbarg.opc_rc = rc;
638         complete(&oio->oi_cbarg.opc_sync);
639
640         return 0;
641 }
642
643 static int osc_io_data_version_start(const struct lu_env *env,
644                                      const struct cl_io_slice *slice)
645 {
646         struct cl_data_version_io *dv   = &slice->cis_io->u.ci_data_version;
647         struct osc_io           *oio    = cl2osc_io(env, slice);
648         struct obdo             *oa     = &oio->oi_oa;
649         struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
650         struct osc_object       *obj    = cl2osc(slice->cis_obj);
651         struct lov_oinfo        *loi    = obj->oo_oinfo;
652         struct obd_export       *exp    = osc_export(obj);
653         struct ptlrpc_request   *req;
654         struct ost_body         *body;
655         struct osc_data_version_args *dva;
656         int rc;
657
658         ENTRY;
659         memset(oa, 0, sizeof(*oa));
660         oa->o_oi = loi->loi_oi;
661         oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
662
663         if (dv->dv_flags & (LL_DV_RD_FLUSH | LL_DV_WR_FLUSH)) {
664                 oa->o_valid |= OBD_MD_FLFLAGS;
665                 oa->o_flags |= OBD_FL_SRVLOCK;
666                 if (dv->dv_flags & LL_DV_WR_FLUSH)
667                         oa->o_flags |= OBD_FL_FLUSH;
668         }
669
670         init_completion(&cbargs->opc_sync);
671
672         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
673         if (req == NULL)
674                 RETURN(-ENOMEM);
675
676         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
677         if (rc < 0) {
678                 ptlrpc_request_free(req);
679                 RETURN(rc);
680         }
681
682         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
683         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
684
685         ptlrpc_request_set_replen(req);
686         req->rq_interpret_reply = osc_data_version_interpret;
687         CLASSERT(sizeof(*dva) <= sizeof(req->rq_async_args));
688         dva = ptlrpc_req_async_args(req);
689         dva->dva_oio = oio;
690
691         ptlrpcd_add_req(req);
692
693         RETURN(0);
694 }
695
696 static void osc_io_data_version_end(const struct lu_env *env,
697                                     const struct cl_io_slice *slice)
698 {
699         struct cl_data_version_io *dv = &slice->cis_io->u.ci_data_version;
700         struct osc_io           *oio    = cl2osc_io(env, slice);
701         struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
702
703         ENTRY;
704         wait_for_completion(&cbargs->opc_sync);
705
706         if (cbargs->opc_rc != 0) {
707                 slice->cis_io->ci_result = cbargs->opc_rc;
708         } else if (!(oio->oi_oa.o_valid & OBD_MD_FLDATAVERSION)) {
709                 slice->cis_io->ci_result = -EOPNOTSUPP;
710         } else {
711                 dv->dv_data_version = oio->oi_oa.o_data_version;
712                 slice->cis_io->ci_result = 0;
713         }
714
715         EXIT;
716 }
717
718 static int osc_io_read_start(const struct lu_env *env,
719                              const struct cl_io_slice *slice)
720 {
721         struct cl_object *obj  = slice->cis_obj;
722         struct cl_attr   *attr = &osc_env_info(env)->oti_attr;
723         int rc = 0;
724         ENTRY;
725
726         if (!slice->cis_io->ci_noatime) {
727                 cl_object_attr_lock(obj);
728                 attr->cat_atime = LTIME_S(CFS_CURRENT_TIME);
729                 rc = cl_object_attr_update(env, obj, attr, CAT_ATIME);
730                 cl_object_attr_unlock(obj);
731         }
732
733         RETURN(rc);
734 }
735
736 static int osc_io_write_start(const struct lu_env *env,
737                               const struct cl_io_slice *slice)
738 {
739         struct cl_object *obj   = slice->cis_obj;
740         struct cl_attr   *attr  = &osc_env_info(env)->oti_attr;
741         int rc = 0;
742         ENTRY;
743
744         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_SETTIME, 1);
745         cl_object_attr_lock(obj);
746         attr->cat_mtime = attr->cat_ctime = LTIME_S(CFS_CURRENT_TIME);
747         rc = cl_object_attr_update(env, obj, attr, CAT_MTIME | CAT_CTIME);
748         cl_object_attr_unlock(obj);
749
750         RETURN(rc);
751 }
752
753 static int osc_fsync_ost(const struct lu_env *env, struct osc_object *obj,
754                          struct cl_fsync_io *fio)
755 {
756         struct osc_io    *oio   = osc_env_io(env);
757         struct obdo      *oa    = &oio->oi_oa;
758         struct lov_oinfo *loi   = obj->oo_oinfo;
759         struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
760         int rc = 0;
761         ENTRY;
762
763         memset(oa, 0, sizeof(*oa));
764         oa->o_oi = loi->loi_oi;
765         oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
766
767         /* reload size abd blocks for start and end of sync range */
768         oa->o_size = fio->fi_start;
769         oa->o_blocks = fio->fi_end;
770         oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
771
772         obdo_set_parent_fid(oa, fio->fi_fid);
773
774         init_completion(&cbargs->opc_sync);
775
776         rc = osc_sync_base(obj, oa, osc_async_upcall, cbargs, PTLRPCD_SET);
777         RETURN(rc);
778 }
779
780 static int osc_io_fsync_start(const struct lu_env *env,
781                               const struct cl_io_slice *slice)
782 {
783         struct cl_io       *io  = slice->cis_io;
784         struct cl_fsync_io *fio = &io->u.ci_fsync;
785         struct cl_object   *obj = slice->cis_obj;
786         struct osc_object  *osc = cl2osc(obj);
787         pgoff_t start  = cl_index(obj, fio->fi_start);
788         pgoff_t end    = cl_index(obj, fio->fi_end);
789         int     result = 0;
790         ENTRY;
791
792         if (fio->fi_end == OBD_OBJECT_EOF)
793                 end = CL_PAGE_EOF;
794
795         result = osc_cache_writeback_range(env, osc, start, end, 0,
796                                            fio->fi_mode == CL_FSYNC_DISCARD);
797         if (result > 0) {
798                 fio->fi_nr_written += result;
799                 result = 0;
800         }
801         if (fio->fi_mode == CL_FSYNC_ALL) {
802                 int rc;
803
804                 /* we have to wait for writeback to finish before we can
805                  * send OST_SYNC RPC. This is bad because it causes extents
806                  * to be written osc by osc. However, we usually start
807                  * writeback before CL_FSYNC_ALL so this won't have any real
808                  * problem. */
809                 rc = osc_cache_wait_range(env, osc, start, end);
810                 if (result == 0)
811                         result = rc;
812                 rc = osc_fsync_ost(env, osc, fio);
813                 if (result == 0)
814                         result = rc;
815         }
816
817         RETURN(result);
818 }
819
820 static void osc_io_fsync_end(const struct lu_env *env,
821                              const struct cl_io_slice *slice)
822 {
823         struct cl_fsync_io *fio = &slice->cis_io->u.ci_fsync;
824         struct cl_object   *obj = slice->cis_obj;
825         pgoff_t start = cl_index(obj, fio->fi_start);
826         pgoff_t end   = cl_index(obj, fio->fi_end);
827         int result = 0;
828
829         if (fio->fi_mode == CL_FSYNC_LOCAL) {
830                 result = osc_cache_wait_range(env, cl2osc(obj), start, end);
831         } else if (fio->fi_mode == CL_FSYNC_ALL) {
832                 struct osc_io           *oio    = cl2osc_io(env, slice);
833                 struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
834
835                 wait_for_completion(&cbargs->opc_sync);
836                 if (result == 0)
837                         result = cbargs->opc_rc;
838         }
839         slice->cis_io->ci_result = result;
840 }
841
842 static int osc_io_ladvise_start(const struct lu_env *env,
843                                 const struct cl_io_slice *slice)
844 {
845         int                      result = 0;
846         struct cl_io            *io = slice->cis_io;
847         struct osc_io           *oio = cl2osc_io(env, slice);
848         struct cl_object        *obj = slice->cis_obj;
849         struct lov_oinfo        *loi = cl2osc(obj)->oo_oinfo;
850         struct cl_ladvise_io    *lio = &io->u.ci_ladvise;
851         struct obdo             *oa = &oio->oi_oa;
852         struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
853         struct lu_ladvise       *ladvise;
854         struct ladvise_hdr      *ladvise_hdr;
855         int                      buf_size;
856         int                      num_advise = 1;
857         ENTRY;
858
859         /* TODO: add multiple ladvise support in CLIO */
860         buf_size = offsetof(typeof(*ladvise_hdr), lah_advise[num_advise]);
861         if (osc_env_info(env)->oti_ladvise_buf.lb_len < buf_size)
862                 lu_buf_realloc(&osc_env_info(env)->oti_ladvise_buf, buf_size);
863
864         ladvise_hdr = osc_env_info(env)->oti_ladvise_buf.lb_buf;
865         if (ladvise_hdr == NULL)
866                 RETURN(-ENOMEM);
867
868         memset(ladvise_hdr, 0, buf_size);
869         ladvise_hdr->lah_magic = LADVISE_MAGIC;
870         ladvise_hdr->lah_count = num_advise;
871         ladvise_hdr->lah_flags = lio->li_flags;
872
873         memset(oa, 0, sizeof(*oa));
874         oa->o_oi = loi->loi_oi;
875         oa->o_valid = OBD_MD_FLID;
876         obdo_set_parent_fid(oa, lio->li_fid);
877
878         ladvise = ladvise_hdr->lah_advise;
879         ladvise->lla_start = lio->li_start;
880         ladvise->lla_end = lio->li_end;
881         ladvise->lla_advice = lio->li_advice;
882
883         if (lio->li_flags & LF_ASYNC) {
884                 result = osc_ladvise_base(osc_export(cl2osc(obj)), oa,
885                                           ladvise_hdr, NULL, NULL, NULL);
886         } else {
887                 init_completion(&cbargs->opc_sync);
888                 result = osc_ladvise_base(osc_export(cl2osc(obj)), oa,
889                                           ladvise_hdr, osc_async_upcall,
890                                           cbargs, PTLRPCD_SET);
891                 cbargs->opc_rpc_sent = result == 0;
892         }
893         RETURN(result);
894 }
895
896 static void osc_io_ladvise_end(const struct lu_env *env,
897                                const struct cl_io_slice *slice)
898 {
899         struct cl_io            *io = slice->cis_io;
900         struct osc_io           *oio = cl2osc_io(env, slice);
901         struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
902         int                      result = 0;
903         struct cl_ladvise_io    *lio = &io->u.ci_ladvise;
904
905         if ((!(lio->li_flags & LF_ASYNC)) && cbargs->opc_rpc_sent) {
906                 wait_for_completion(&cbargs->opc_sync);
907                 result = cbargs->opc_rc;
908         }
909         slice->cis_io->ci_result = result;
910 }
911
912 static void osc_io_end(const struct lu_env *env,
913                        const struct cl_io_slice *slice)
914 {
915         struct osc_io *oio = cl2osc_io(env, slice);
916
917         if (oio->oi_active) {
918                 osc_extent_release(env, oio->oi_active);
919                 oio->oi_active = NULL;
920         }
921 }
922
923 static const struct cl_io_operations osc_io_ops = {
924         .op = {
925                 [CIT_READ] = {
926                         .cio_iter_init = osc_io_iter_init,
927                         .cio_iter_fini = osc_io_iter_fini,
928                         .cio_start  = osc_io_read_start,
929                         .cio_fini   = osc_io_fini
930                 },
931                 [CIT_WRITE] = {
932                         .cio_iter_init = osc_io_write_iter_init,
933                         .cio_iter_fini = osc_io_write_iter_fini,
934                         .cio_start  = osc_io_write_start,
935                         .cio_end    = osc_io_end,
936                         .cio_fini   = osc_io_fini
937                 },
938                 [CIT_SETATTR] = {
939                         .cio_iter_init = osc_io_iter_init,
940                         .cio_iter_fini = osc_io_iter_fini,
941                         .cio_start  = osc_io_setattr_start,
942                         .cio_end    = osc_io_setattr_end
943                 },
944                 [CIT_DATA_VERSION] = {
945                         .cio_start  = osc_io_data_version_start,
946                         .cio_end    = osc_io_data_version_end,
947                 },
948                 [CIT_FAULT] = {
949                         .cio_iter_init = osc_io_iter_init,
950                         .cio_iter_fini = osc_io_iter_fini,
951                         .cio_start  = osc_io_fault_start,
952                         .cio_end    = osc_io_end,
953                         .cio_fini   = osc_io_fini
954                 },
955                 [CIT_FSYNC] = {
956                         .cio_start  = osc_io_fsync_start,
957                         .cio_end    = osc_io_fsync_end,
958                         .cio_fini   = osc_io_fini
959                 },
960                 [CIT_LADVISE] = {
961                         .cio_start  = osc_io_ladvise_start,
962                         .cio_end    = osc_io_ladvise_end,
963                         .cio_fini   = osc_io_fini
964                 },
965                 [CIT_MISC] = {
966                         .cio_fini   = osc_io_fini
967                 }
968         },
969         .cio_read_ahead             = osc_io_read_ahead,
970         .cio_submit                 = osc_io_submit,
971         .cio_commit_async           = osc_io_commit_async
972 };
973
974 /*****************************************************************************
975  *
976  * Transfer operations.
977  *
978  */
979
980 int osc_io_init(const struct lu_env *env,
981                 struct cl_object *obj, struct cl_io *io)
982 {
983         struct osc_io *oio = osc_env_io(env);
984
985         CL_IO_SLICE_CLEAN(oio, oi_cl);
986         cl_io_slice_add(io, &oio->oi_cl, obj, &osc_io_ops);
987         return 0;
988 }
989
990 /** @} osc */