4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2015, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * Implementation of cl_io for OSC layer.
38 * Author: Nikita Danilov <nikita.danilov@sun.com>
39 * Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
42 #define DEBUG_SUBSYSTEM S_OSC
44 #include <lustre_obdo.h>
46 #include "osc_cl_internal.h"
52 /*****************************************************************************
58 static struct osc_io *cl2osc_io(const struct lu_env *env,
59 const struct cl_io_slice *slice)
61 struct osc_io *oio = container_of0(slice, struct osc_io, oi_cl);
62 LINVRNT(oio == osc_env_io(env));
66 /*****************************************************************************
72 static void osc_io_fini(const struct lu_env *env, const struct cl_io_slice *io)
76 static void osc_read_ahead_release(const struct lu_env *env,
79 struct ldlm_lock *dlmlock = cbdata;
80 struct lustre_handle lockh;
82 ldlm_lock2handle(dlmlock, &lockh);
83 ldlm_lock_decref(&lockh, LCK_PR);
84 LDLM_LOCK_PUT(dlmlock);
87 static int osc_io_read_ahead(const struct lu_env *env,
88 const struct cl_io_slice *ios,
89 pgoff_t start, struct cl_read_ahead *ra)
91 struct osc_object *osc = cl2osc(ios->cis_obj);
92 struct ldlm_lock *dlmlock;
93 int result = -ENODATA;
96 dlmlock = osc_dlmlock_at_pgoff(env, osc, start, 0);
97 if (dlmlock != NULL) {
98 if (dlmlock->l_req_mode != LCK_PR) {
99 struct lustre_handle lockh;
100 ldlm_lock2handle(dlmlock, &lockh);
101 ldlm_lock_addref(&lockh, LCK_PR);
102 ldlm_lock_decref(&lockh, dlmlock->l_req_mode);
105 ra->cra_end = cl_index(osc2cl(osc),
106 dlmlock->l_policy_data.l_extent.end);
107 ra->cra_release = osc_read_ahead_release;
108 ra->cra_cbdata = dlmlock;
116 * An implementation of cl_io_operations::cio_io_submit() method for osc
117 * layer. Iterates over pages in the in-queue, prepares each for io by calling
118 * cl_page_prep() and then either submits them through osc_io_submit_page()
119 * or, if page is already submitted, changes osc flags through
120 * osc_set_async_flags().
122 static int osc_io_submit(const struct lu_env *env,
123 const struct cl_io_slice *ios,
124 enum cl_req_type crt, struct cl_2queue *queue)
126 struct cl_page *page;
128 struct client_obd *cli = NULL;
129 struct osc_object *osc = NULL; /* to keep gcc happy */
130 struct osc_page *opg;
132 struct list_head list = LIST_HEAD_INIT(list);
134 struct cl_page_list *qin = &queue->c2_qin;
135 struct cl_page_list *qout = &queue->c2_qout;
136 unsigned int queued = 0;
140 unsigned int max_pages;
142 LASSERT(qin->pl_nr > 0);
144 CDEBUG(D_CACHE, "%d %d\n", qin->pl_nr, crt);
146 osc = cl2osc(ios->cis_obj);
148 max_pages = cli->cl_max_pages_per_rpc;
150 cmd = crt == CRT_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ;
151 brw_flags = osc_io_srvlock(cl2osc_io(env, ios)) ? OBD_BRW_SRVLOCK : 0;
154 * NOTE: here @page is a top-level page. This is done to avoid
155 * creation of sub-page-list.
157 cl_page_list_for_each_safe(page, tmp, qin) {
158 struct osc_async_page *oap;
164 opg = osc_cl_page_osc(page, osc);
166 LASSERT(osc == oap->oap_obj);
168 if (!list_empty(&oap->oap_pending_item) ||
169 !list_empty(&oap->oap_rpc_item)) {
170 CDEBUG(D_CACHE, "Busy oap %p page %p for submit.\n",
176 result = cl_page_prep(env, io, page, crt);
179 if (result != -EALREADY)
182 * Handle -EALREADY error: for read case, the page is
183 * already in UPTODATE state; for write, the page
190 spin_lock(&oap->oap_lock);
191 oap->oap_async_flags = ASYNC_URGENT|ASYNC_READY;
192 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
193 spin_unlock(&oap->oap_lock);
195 osc_page_submit(env, opg, crt, brw_flags);
196 list_add_tail(&oap->oap_pending_item, &list);
198 if (page->cp_sync_io != NULL)
199 cl_page_list_move(qout, qin, page);
201 cl_page_list_del(env, qin, page);
203 if (++queued == max_pages) {
205 result = osc_queue_sync_pages(env, osc, &list, cmd,
213 result = osc_queue_sync_pages(env, osc, &list, cmd, brw_flags);
215 CDEBUG(D_INFO, "%d/%d %d\n", qin->pl_nr, qout->pl_nr, result);
216 return qout->pl_nr > 0 ? 0 : result;
220 * This is called when a page is accessed within file in a way that creates
221 * new page, if one were missing (i.e., if there were a hole at that place in
222 * the file, or accessed page is beyond the current file size).
224 * Expand stripe KMS if necessary.
226 static void osc_page_touch_at(const struct lu_env *env,
227 struct cl_object *obj, pgoff_t idx, size_t to)
229 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
230 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
234 /* offset within stripe */
235 kms = cl_offset(obj, idx) + to;
237 cl_object_attr_lock(obj);
241 * ll_inode_size_lock(inode, 0); lov_stripe_lock(lsm);
245 CDEBUG(D_INODE, "stripe KMS %sincreasing "LPU64"->"LPU64" "LPU64"\n",
246 kms > loi->loi_kms ? "" : "not ", loi->loi_kms, kms,
247 loi->loi_lvb.lvb_size);
249 attr->cat_mtime = attr->cat_ctime = LTIME_S(CFS_CURRENT_TIME);
250 valid = CAT_MTIME | CAT_CTIME;
251 if (kms > loi->loi_kms) {
255 if (kms > loi->loi_lvb.lvb_size) {
256 attr->cat_size = kms;
259 cl_object_attr_update(env, obj, attr, valid);
260 cl_object_attr_unlock(obj);
263 static int osc_io_commit_async(const struct lu_env *env,
264 const struct cl_io_slice *ios,
265 struct cl_page_list *qin, int from, int to,
268 struct cl_io *io = ios->cis_io;
269 struct osc_io *oio = cl2osc_io(env, ios);
270 struct osc_object *osc = cl2osc(ios->cis_obj);
271 struct cl_page *page;
272 struct cl_page *last_page;
273 struct osc_page *opg;
277 LASSERT(qin->pl_nr > 0);
279 /* Handle partial page cases */
280 last_page = cl_page_list_last(qin);
281 if (oio->oi_lockless) {
282 page = cl_page_list_first(qin);
283 if (page == last_page) {
284 cl_page_clip(env, page, from, to);
287 cl_page_clip(env, page, from, PAGE_SIZE);
289 cl_page_clip(env, last_page, 0, to);
293 while (qin->pl_nr > 0) {
294 struct osc_async_page *oap;
296 page = cl_page_list_first(qin);
297 opg = osc_cl_page_osc(page, osc);
300 if (!list_empty(&oap->oap_rpc_item)) {
301 CDEBUG(D_CACHE, "Busy oap %p page %p for submit.\n",
307 /* The page may be already in dirty cache. */
308 if (list_empty(&oap->oap_pending_item)) {
309 result = osc_page_cache_add(env, &opg->ops_cl, io);
314 osc_page_touch_at(env, osc2cl(osc), osc_index(opg),
315 page == last_page ? to : PAGE_SIZE);
317 cl_page_list_del(env, qin, page);
319 (*cb)(env, io, page);
320 /* Can't access page any more. Page can be in transfer and
321 * complete at any time. */
324 /* for sync write, kernel will wait for this page to be flushed before
325 * osc_io_end() is called, so release it earlier.
326 * for mkwrite(), it's known there is no further pages. */
327 if (cl_io_is_sync_write(io) && oio->oi_active != NULL) {
328 osc_extent_release(env, oio->oi_active);
329 oio->oi_active = NULL;
332 CDEBUG(D_INFO, "%d %d\n", qin->pl_nr, result);
336 static int osc_io_iter_init(const struct lu_env *env,
337 const struct cl_io_slice *ios)
339 struct osc_object *osc = cl2osc(ios->cis_obj);
340 struct obd_import *imp = osc_cli(osc)->cl_import;
343 spin_lock(&imp->imp_lock);
344 if (likely(!imp->imp_invalid)) {
345 struct osc_io *oio = osc_env_io(env);
347 atomic_inc(&osc->oo_nr_ios);
348 oio->oi_is_active = 1;
351 spin_unlock(&imp->imp_lock);
356 static int osc_io_write_iter_init(const struct lu_env *env,
357 const struct cl_io_slice *ios)
359 struct cl_io *io = ios->cis_io;
360 struct osc_io *oio = osc_env_io(env);
361 struct osc_object *osc = cl2osc(ios->cis_obj);
362 unsigned long npages;
365 if (cl_io_is_append(io))
366 RETURN(osc_io_iter_init(env, ios));
368 npages = io->u.ci_rw.crw_count >> PAGE_CACHE_SHIFT;
369 if (io->u.ci_rw.crw_pos & ~PAGE_MASK)
372 oio->oi_lru_reserved = osc_lru_reserve(osc_cli(osc), npages);
374 RETURN(osc_io_iter_init(env, ios));
377 static void osc_io_iter_fini(const struct lu_env *env,
378 const struct cl_io_slice *ios)
380 struct osc_io *oio = osc_env_io(env);
382 if (oio->oi_is_active) {
383 struct osc_object *osc = cl2osc(ios->cis_obj);
385 oio->oi_is_active = 0;
386 LASSERT(atomic_read(&osc->oo_nr_ios) > 0);
387 if (atomic_dec_and_test(&osc->oo_nr_ios))
388 wake_up_all(&osc->oo_io_waitq);
392 static void osc_io_write_iter_fini(const struct lu_env *env,
393 const struct cl_io_slice *ios)
395 struct osc_io *oio = osc_env_io(env);
396 struct osc_object *osc = cl2osc(ios->cis_obj);
398 if (oio->oi_lru_reserved > 0) {
399 osc_lru_unreserve(osc_cli(osc), oio->oi_lru_reserved);
400 oio->oi_lru_reserved = 0;
402 oio->oi_write_osclock = NULL;
404 osc_io_iter_fini(env, ios);
407 static int osc_io_fault_start(const struct lu_env *env,
408 const struct cl_io_slice *ios)
411 struct cl_fault_io *fio;
415 fio = &io->u.ci_fault;
416 CDEBUG(D_INFO, "%lu %d %zu\n",
417 fio->ft_index, fio->ft_writable, fio->ft_nob);
419 * If mapping is writeable, adjust kms to cover this page,
420 * but do not extend kms beyond actual file size.
423 if (fio->ft_writable)
424 osc_page_touch_at(env, ios->cis_obj,
425 fio->ft_index, fio->ft_nob);
429 static int osc_async_upcall(void *a, int rc)
431 struct osc_async_cbargs *args = a;
434 complete(&args->opc_sync);
439 * Checks that there are no pages being written in the extent being truncated.
441 static int trunc_check_cb(const struct lu_env *env, struct cl_io *io,
442 struct osc_page *ops , void *cbdata)
444 struct cl_page *page = ops->ops_cl.cpl_page;
445 struct osc_async_page *oap;
446 __u64 start = *(__u64 *)cbdata;
449 if (oap->oap_cmd & OBD_BRW_WRITE &&
450 !list_empty(&oap->oap_pending_item))
451 CL_PAGE_DEBUG(D_ERROR, env, page, "exists " LPU64 "/%s.\n",
452 start, current->comm);
454 if (PageLocked(page->cp_vmpage))
455 CDEBUG(D_CACHE, "page %p index %lu locked for %d.\n",
456 ops, osc_index(ops), oap->oap_cmd & OBD_BRW_RWMASK);
458 return CLP_GANG_OKAY;
461 static void osc_trunc_check(const struct lu_env *env, struct cl_io *io,
462 struct osc_io *oio, __u64 size)
464 struct cl_object *clob;
468 clob = oio->oi_cl.cis_obj;
469 start = cl_index(clob, size);
470 partial = cl_offset(clob, start) < size;
473 * Complain if there are pages in the truncated region.
475 osc_page_gang_lookup(env, io, cl2osc(clob),
476 start + partial, CL_PAGE_EOF,
477 trunc_check_cb, (void *)&size);
480 static int osc_io_setattr_start(const struct lu_env *env,
481 const struct cl_io_slice *slice)
483 struct cl_io *io = slice->cis_io;
484 struct osc_io *oio = cl2osc_io(env, slice);
485 struct cl_object *obj = slice->cis_obj;
486 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
487 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
488 struct obdo *oa = &oio->oi_oa;
489 struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
490 __u64 size = io->u.ci_setattr.sa_attr.lvb_size;
491 unsigned int ia_valid = io->u.ci_setattr.sa_valid;
494 /* truncate cache dirty pages first */
495 if (cl_io_is_trunc(io))
496 result = osc_cache_truncate_start(env, cl2osc(obj), size,
499 if (result == 0 && oio->oi_lockless == 0) {
500 cl_object_attr_lock(obj);
501 result = cl_object_attr_get(env, obj, attr);
503 struct ost_lvb *lvb = &io->u.ci_setattr.sa_attr;
504 unsigned int cl_valid = 0;
506 if (ia_valid & ATTR_SIZE) {
507 attr->cat_size = attr->cat_kms = size;
508 cl_valid = (CAT_SIZE | CAT_KMS);
510 if (ia_valid & ATTR_MTIME_SET) {
511 attr->cat_mtime = lvb->lvb_mtime;
512 cl_valid |= CAT_MTIME;
514 if (ia_valid & ATTR_ATIME_SET) {
515 attr->cat_atime = lvb->lvb_atime;
516 cl_valid |= CAT_ATIME;
518 if (ia_valid & ATTR_CTIME_SET) {
519 attr->cat_ctime = lvb->lvb_ctime;
520 cl_valid |= CAT_CTIME;
522 result = cl_object_attr_update(env, obj, attr,
525 cl_object_attr_unlock(obj);
527 memset(oa, 0, sizeof(*oa));
529 oa->o_oi = loi->loi_oi;
530 obdo_set_parent_fid(oa, io->u.ci_setattr.sa_parent_fid);
531 oa->o_stripe_idx = io->u.ci_setattr.sa_stripe_index;
532 oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
533 if (ia_valid & ATTR_CTIME) {
534 oa->o_valid |= OBD_MD_FLCTIME;
535 oa->o_ctime = attr->cat_ctime;
537 if (ia_valid & ATTR_ATIME) {
538 oa->o_valid |= OBD_MD_FLATIME;
539 oa->o_atime = attr->cat_atime;
541 if (ia_valid & ATTR_MTIME) {
542 oa->o_valid |= OBD_MD_FLMTIME;
543 oa->o_mtime = attr->cat_mtime;
545 if (ia_valid & ATTR_SIZE) {
547 oa->o_blocks = OBD_OBJECT_EOF;
548 oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
550 if (oio->oi_lockless) {
551 oa->o_flags = OBD_FL_SRVLOCK;
552 oa->o_valid |= OBD_MD_FLFLAGS;
555 LASSERT(oio->oi_lockless == 0);
558 if (ia_valid & ATTR_ATTR_FLAG) {
559 oa->o_flags = io->u.ci_setattr.sa_attr_flags;
560 oa->o_valid |= OBD_MD_FLFLAGS;
563 init_completion(&cbargs->opc_sync);
565 if (ia_valid & ATTR_SIZE)
566 result = osc_punch_base(osc_export(cl2osc(obj)),
567 oa, osc_async_upcall,
568 cbargs, PTLRPCD_SET);
570 result = osc_setattr_async(osc_export(cl2osc(obj)),
571 oa, osc_async_upcall,
572 cbargs, PTLRPCD_SET);
574 cbargs->opc_rpc_sent = result == 0;
580 static void osc_io_setattr_end(const struct lu_env *env,
581 const struct cl_io_slice *slice)
583 struct cl_io *io = slice->cis_io;
584 struct osc_io *oio = cl2osc_io(env, slice);
585 struct cl_object *obj = slice->cis_obj;
586 struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
589 if (cbargs->opc_rpc_sent) {
590 wait_for_completion(&cbargs->opc_sync);
591 result = io->ci_result = cbargs->opc_rc;
594 if (oio->oi_lockless) {
595 /* lockless truncate */
596 struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev);
598 LASSERT(cl_io_is_trunc(io));
599 /* XXX: Need a lock. */
600 osd->od_stats.os_lockless_truncates++;
604 if (cl_io_is_trunc(io)) {
605 __u64 size = io->u.ci_setattr.sa_attr.lvb_size;
606 osc_trunc_check(env, io, oio, size);
607 osc_cache_truncate_end(env, oio->oi_trunc);
608 oio->oi_trunc = NULL;
612 struct osc_data_version_args {
613 struct osc_io *dva_oio;
617 osc_data_version_interpret(const struct lu_env *env, struct ptlrpc_request *req,
620 struct osc_data_version_args *dva = arg;
621 struct osc_io *oio = dva->dva_oio;
622 const struct ost_body *body;
628 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
630 GOTO(out, rc = -EPROTO);
632 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, &oio->oi_oa,
636 oio->oi_cbarg.opc_rc = rc;
637 complete(&oio->oi_cbarg.opc_sync);
642 static int osc_io_data_version_start(const struct lu_env *env,
643 const struct cl_io_slice *slice)
645 struct cl_data_version_io *dv = &slice->cis_io->u.ci_data_version;
646 struct osc_io *oio = cl2osc_io(env, slice);
647 struct obdo *oa = &oio->oi_oa;
648 struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
649 struct osc_object *obj = cl2osc(slice->cis_obj);
650 struct lov_oinfo *loi = obj->oo_oinfo;
651 struct obd_export *exp = osc_export(obj);
652 struct ptlrpc_request *req;
653 struct ost_body *body;
654 struct osc_data_version_args *dva;
658 memset(oa, 0, sizeof(*oa));
659 oa->o_oi = loi->loi_oi;
660 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
662 if (dv->dv_flags & (LL_DV_RD_FLUSH | LL_DV_WR_FLUSH)) {
663 oa->o_valid |= OBD_MD_FLFLAGS;
664 oa->o_flags |= OBD_FL_SRVLOCK;
665 if (dv->dv_flags & LL_DV_WR_FLUSH)
666 oa->o_flags |= OBD_FL_FLUSH;
669 init_completion(&cbargs->opc_sync);
671 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
675 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
677 ptlrpc_request_free(req);
681 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
682 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
684 ptlrpc_request_set_replen(req);
685 req->rq_interpret_reply = osc_data_version_interpret;
686 CLASSERT(sizeof(*dva) <= sizeof(req->rq_async_args));
687 dva = ptlrpc_req_async_args(req);
690 ptlrpcd_add_req(req);
695 static void osc_io_data_version_end(const struct lu_env *env,
696 const struct cl_io_slice *slice)
698 struct cl_data_version_io *dv = &slice->cis_io->u.ci_data_version;
699 struct osc_io *oio = cl2osc_io(env, slice);
700 struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
703 wait_for_completion(&cbargs->opc_sync);
705 if (cbargs->opc_rc != 0) {
706 slice->cis_io->ci_result = cbargs->opc_rc;
707 } else if (!(oio->oi_oa.o_valid & OBD_MD_FLDATAVERSION)) {
708 slice->cis_io->ci_result = -EOPNOTSUPP;
710 dv->dv_data_version = oio->oi_oa.o_data_version;
711 slice->cis_io->ci_result = 0;
717 static int osc_io_read_start(const struct lu_env *env,
718 const struct cl_io_slice *slice)
720 struct cl_object *obj = slice->cis_obj;
721 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
725 if (!slice->cis_io->ci_noatime) {
726 cl_object_attr_lock(obj);
727 attr->cat_atime = LTIME_S(CFS_CURRENT_TIME);
728 rc = cl_object_attr_update(env, obj, attr, CAT_ATIME);
729 cl_object_attr_unlock(obj);
735 static int osc_io_write_start(const struct lu_env *env,
736 const struct cl_io_slice *slice)
738 struct cl_object *obj = slice->cis_obj;
739 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
743 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_SETTIME, 1);
744 cl_object_attr_lock(obj);
745 attr->cat_mtime = attr->cat_ctime = LTIME_S(CFS_CURRENT_TIME);
746 rc = cl_object_attr_update(env, obj, attr, CAT_MTIME | CAT_CTIME);
747 cl_object_attr_unlock(obj);
752 static int osc_fsync_ost(const struct lu_env *env, struct osc_object *obj,
753 struct cl_fsync_io *fio)
755 struct osc_io *oio = osc_env_io(env);
756 struct obdo *oa = &oio->oi_oa;
757 struct lov_oinfo *loi = obj->oo_oinfo;
758 struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
762 memset(oa, 0, sizeof(*oa));
763 oa->o_oi = loi->loi_oi;
764 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
766 /* reload size abd blocks for start and end of sync range */
767 oa->o_size = fio->fi_start;
768 oa->o_blocks = fio->fi_end;
769 oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
771 obdo_set_parent_fid(oa, fio->fi_fid);
773 init_completion(&cbargs->opc_sync);
775 rc = osc_sync_base(obj, oa, osc_async_upcall, cbargs, PTLRPCD_SET);
779 static int osc_io_fsync_start(const struct lu_env *env,
780 const struct cl_io_slice *slice)
782 struct cl_io *io = slice->cis_io;
783 struct cl_fsync_io *fio = &io->u.ci_fsync;
784 struct cl_object *obj = slice->cis_obj;
785 struct osc_object *osc = cl2osc(obj);
786 pgoff_t start = cl_index(obj, fio->fi_start);
787 pgoff_t end = cl_index(obj, fio->fi_end);
791 if (fio->fi_end == OBD_OBJECT_EOF)
794 result = osc_cache_writeback_range(env, osc, start, end, 0,
795 fio->fi_mode == CL_FSYNC_DISCARD);
797 fio->fi_nr_written += result;
800 if (fio->fi_mode == CL_FSYNC_ALL) {
803 /* we have to wait for writeback to finish before we can
804 * send OST_SYNC RPC. This is bad because it causes extents
805 * to be written osc by osc. However, we usually start
806 * writeback before CL_FSYNC_ALL so this won't have any real
808 rc = osc_cache_wait_range(env, osc, start, end);
811 rc = osc_fsync_ost(env, osc, fio);
819 static void osc_io_fsync_end(const struct lu_env *env,
820 const struct cl_io_slice *slice)
822 struct cl_fsync_io *fio = &slice->cis_io->u.ci_fsync;
823 struct cl_object *obj = slice->cis_obj;
824 pgoff_t start = cl_index(obj, fio->fi_start);
825 pgoff_t end = cl_index(obj, fio->fi_end);
828 if (fio->fi_mode == CL_FSYNC_LOCAL) {
829 result = osc_cache_wait_range(env, cl2osc(obj), start, end);
830 } else if (fio->fi_mode == CL_FSYNC_ALL) {
831 struct osc_io *oio = cl2osc_io(env, slice);
832 struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
834 wait_for_completion(&cbargs->opc_sync);
836 result = cbargs->opc_rc;
838 slice->cis_io->ci_result = result;
841 static int osc_io_ladvise_start(const struct lu_env *env,
842 const struct cl_io_slice *slice)
845 struct cl_io *io = slice->cis_io;
846 struct osc_io *oio = cl2osc_io(env, slice);
847 struct cl_object *obj = slice->cis_obj;
848 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
849 struct cl_ladvise_io *lio = &io->u.ci_ladvise;
850 struct obdo *oa = &oio->oi_oa;
851 struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
852 struct lu_ladvise *ladvise;
853 struct ladvise_hdr *ladvise_hdr;
858 /* TODO: add multiple ladvise support in CLIO */
859 buf_size = offsetof(typeof(*ladvise_hdr), lah_advise[num_advise]);
860 if (osc_env_info(env)->oti_ladvise_buf.lb_len < buf_size)
861 lu_buf_realloc(&osc_env_info(env)->oti_ladvise_buf, buf_size);
863 ladvise_hdr = osc_env_info(env)->oti_ladvise_buf.lb_buf;
864 if (ladvise_hdr == NULL)
867 memset(ladvise_hdr, 0, buf_size);
868 ladvise_hdr->lah_magic = LADVISE_MAGIC;
869 ladvise_hdr->lah_count = num_advise;
870 ladvise_hdr->lah_flags = lio->li_flags;
872 memset(oa, 0, sizeof(*oa));
873 oa->o_oi = loi->loi_oi;
874 oa->o_valid = OBD_MD_FLID;
875 obdo_set_parent_fid(oa, lio->li_fid);
877 ladvise = ladvise_hdr->lah_advise;
878 ladvise->lla_start = lio->li_start;
879 ladvise->lla_end = lio->li_end;
880 ladvise->lla_advice = lio->li_advice;
882 if (lio->li_flags & LF_ASYNC) {
883 result = osc_ladvise_base(osc_export(cl2osc(obj)), oa,
884 ladvise_hdr, NULL, NULL, NULL);
886 init_completion(&cbargs->opc_sync);
887 result = osc_ladvise_base(osc_export(cl2osc(obj)), oa,
888 ladvise_hdr, osc_async_upcall,
889 cbargs, PTLRPCD_SET);
890 cbargs->opc_rpc_sent = result == 0;
895 static void osc_io_ladvise_end(const struct lu_env *env,
896 const struct cl_io_slice *slice)
898 struct cl_io *io = slice->cis_io;
899 struct osc_io *oio = cl2osc_io(env, slice);
900 struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
902 struct cl_ladvise_io *lio = &io->u.ci_ladvise;
904 if ((!(lio->li_flags & LF_ASYNC)) && cbargs->opc_rpc_sent) {
905 wait_for_completion(&cbargs->opc_sync);
906 result = cbargs->opc_rc;
908 slice->cis_io->ci_result = result;
911 static void osc_io_end(const struct lu_env *env,
912 const struct cl_io_slice *slice)
914 struct osc_io *oio = cl2osc_io(env, slice);
916 if (oio->oi_active) {
917 osc_extent_release(env, oio->oi_active);
918 oio->oi_active = NULL;
922 static const struct cl_io_operations osc_io_ops = {
925 .cio_iter_init = osc_io_iter_init,
926 .cio_iter_fini = osc_io_iter_fini,
927 .cio_start = osc_io_read_start,
928 .cio_fini = osc_io_fini
931 .cio_iter_init = osc_io_write_iter_init,
932 .cio_iter_fini = osc_io_write_iter_fini,
933 .cio_start = osc_io_write_start,
934 .cio_end = osc_io_end,
935 .cio_fini = osc_io_fini
938 .cio_iter_init = osc_io_iter_init,
939 .cio_iter_fini = osc_io_iter_fini,
940 .cio_start = osc_io_setattr_start,
941 .cio_end = osc_io_setattr_end
943 [CIT_DATA_VERSION] = {
944 .cio_start = osc_io_data_version_start,
945 .cio_end = osc_io_data_version_end,
948 .cio_iter_init = osc_io_iter_init,
949 .cio_iter_fini = osc_io_iter_fini,
950 .cio_start = osc_io_fault_start,
951 .cio_end = osc_io_end,
952 .cio_fini = osc_io_fini
955 .cio_start = osc_io_fsync_start,
956 .cio_end = osc_io_fsync_end,
957 .cio_fini = osc_io_fini
960 .cio_start = osc_io_ladvise_start,
961 .cio_end = osc_io_ladvise_end,
962 .cio_fini = osc_io_fini
965 .cio_fini = osc_io_fini
968 .cio_read_ahead = osc_io_read_ahead,
969 .cio_submit = osc_io_submit,
970 .cio_commit_async = osc_io_commit_async
973 /*****************************************************************************
975 * Transfer operations.
979 int osc_io_init(const struct lu_env *env,
980 struct cl_object *obj, struct cl_io *io)
982 struct osc_io *oio = osc_env_io(env);
984 CL_IO_SLICE_CLEAN(oio, oi_cl);
985 cl_io_slice_add(io, &oio->oi_cl, obj, &osc_io_ops);