4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
31 * Copyright (c) 2012, 2013, Intel Corporation.
32 * Use is subject to license terms.
35 * This file is part of Lustre, http://www.lustre.org/
36 * Lustre is a trademark of Sun Microsystems, Inc.
38 * lustre/osd-zfs/osd_io.c
40 * Author: Alex Zhuravlev <bzzz@whamcloud.com>
41 * Author: Mike Pershin <tappro@whamcloud.com>
45 # define EXPORT_SYMTAB
47 #define DEBUG_SUBSYSTEM S_OSD
49 #include <lustre_ver.h>
50 #include <libcfs/libcfs.h>
51 #include <obd_support.h>
52 #include <lustre_net.h>
54 #include <obd_class.h>
55 #include <lustre_disk.h>
56 #include <lustre_fid.h>
58 #include "osd_internal.h"
60 #include <sys/dnode.h>
65 #include <sys/spa_impl.h>
66 #include <sys/zfs_znode.h>
67 #include <sys/dmu_tx.h>
68 #include <sys/dmu_objset.h>
69 #include <sys/dsl_prop.h>
70 #include <sys/sa_impl.h>
73 static char *osd_zerocopy_tag = "zerocopy";
75 static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt,
76 struct lu_buf *buf, loff_t *pos,
77 struct lustre_capa *capa)
79 struct osd_object *obj = osd_dt_obj(dt);
80 struct osd_device *osd = osd_obj2dev(obj);
82 int size = buf->lb_len;
85 LASSERT(dt_object_exists(dt));
88 read_lock(&obj->oo_attr_lock);
89 old_size = obj->oo_attr.la_size;
90 read_unlock(&obj->oo_attr_lock);
92 if (*pos + size > old_size) {
96 size = old_size - *pos;
99 rc = -dmu_read(osd->od_objset.os, obj->oo_db->db_object, *pos, size,
100 buf->lb_buf, DMU_READ_PREFETCH);
105 /* XXX: workaround for bug in HEAD: fsfilt_ldiskfs_read() returns
106 * requested number of bytes, not actually read ones */
107 if (S_ISLNK(obj->oo_dt.do_lu.lo_header->loh_attr))
113 static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt,
114 const loff_t size, loff_t pos,
117 struct osd_object *obj = osd_dt_obj(dt);
118 struct osd_device *osd = osd_obj2dev(obj);
119 struct osd_thandle *oh;
123 oh = container_of0(th, struct osd_thandle, ot_super);
125 /* in some cases declare can race with creation (e.g. llog)
126 * and we need to wait till object is initialized. notice
127 * LOHA_EXISTs is supposed to be the last step in the
130 /* declare possible size change. notice we can't check
131 * current size here as another thread can change it */
133 if (dt_object_exists(dt)) {
135 oid = obj->oo_db->db_object;
137 dmu_tx_hold_sa(oh->ot_tx, obj->oo_sa_hdl, 0);
139 oid = DMU_NEW_OBJECT;
140 dmu_tx_hold_sa_create(oh->ot_tx, ZFS_SA_BASE_ATTR_SIZE);
143 dmu_tx_hold_write(oh->ot_tx, oid, pos, size);
145 /* dt_declare_write() is usually called for system objects, such
146 * as llog or last_rcvd files. We needn't enforce quota on those
147 * objects, so always set the lqi_space as 0. */
148 RETURN(osd_declare_quota(env, osd, obj->oo_attr.la_uid,
149 obj->oo_attr.la_gid, 0, oh, true, NULL,
153 static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
154 const struct lu_buf *buf, loff_t *pos,
155 struct thandle *th, struct lustre_capa *capa,
158 struct osd_object *obj = osd_dt_obj(dt);
159 struct osd_device *osd = osd_obj2dev(obj);
160 udmu_objset_t *uos = &osd->od_objset;
161 struct osd_thandle *oh;
162 uint64_t offset = *pos;
166 LASSERT(dt_object_exists(dt));
170 oh = container_of0(th, struct osd_thandle, ot_super);
172 dmu_write(osd->od_objset.os, obj->oo_db->db_object, offset,
173 (uint64_t)buf->lb_len, buf->lb_buf, oh->ot_tx);
174 write_lock(&obj->oo_attr_lock);
175 if (obj->oo_attr.la_size < offset + buf->lb_len) {
176 obj->oo_attr.la_size = offset + buf->lb_len;
177 write_unlock(&obj->oo_attr_lock);
178 /* osd_object_sa_update() will be copying directly from oo_attr
179 * into dbuf. any update within a single txg will copy the
181 rc = osd_object_sa_update(obj, SA_ZPL_SIZE(uos),
182 &obj->oo_attr.la_size, 8, oh);
186 write_unlock(&obj->oo_attr_lock);
197 * XXX: for the moment I don't want to use lnb_flags for osd-internal
198 * purposes as it's not very well defined ...
199 * instead I use the lowest bit of the address so that:
200 * arc buffer: .lnb_obj = abuf (arc we loan for write)
201 * dbuf buffer: .lnb_obj = dbuf | 1 (dbuf we get for read)
202 * copy buffer: .lnb_page->mapping = obj (page we allocate for write)
206 static int osd_bufs_put(const struct lu_env *env, struct dt_object *dt,
207 struct niobuf_local *lnb, int npages)
209 struct osd_object *obj = osd_dt_obj(dt);
210 struct osd_device *osd = osd_obj2dev(obj);
214 LASSERT(dt_object_exists(dt));
217 for (i = 0; i < npages; i++) {
218 if (lnb[i].page == NULL)
220 if (lnb[i].page->mapping == (void *)obj) {
221 /* this is anonymous page allocated for copy-write */
222 lnb[i].page->mapping = NULL;
223 __free_page(lnb[i].page);
224 cfs_atomic_dec(&osd->od_zerocopy_alloc);
226 /* see comment in osd_bufs_get_read() */
227 ptr = (unsigned long)lnb[i].dentry;
230 dmu_buf_rele((void *)ptr, osd_zerocopy_tag);
231 cfs_atomic_dec(&osd->od_zerocopy_pin);
232 } else if (lnb[i].dentry != NULL) {
233 dmu_return_arcbuf((void *)lnb[i].dentry);
234 cfs_atomic_dec(&osd->od_zerocopy_loan);
238 lnb[i].dentry = NULL;
244 static inline struct page *kmem_to_page(void *addr)
246 if (is_vmalloc_addr(addr))
247 return vmalloc_to_page(addr);
249 return virt_to_page(addr);
252 static int osd_bufs_get_read(const struct lu_env *env, struct osd_object *obj,
253 loff_t off, ssize_t len, struct niobuf_local *lnb)
255 struct osd_device *osd = osd_obj2dev(obj);
257 int rc, i, numbufs, npages = 0;
260 /* grab buffers for read:
261 * OSD API let us to grab buffers first, then initiate IO(s)
262 * so that all required IOs will be done in parallel, but at the
263 * moment DMU doesn't provide us with a method to grab buffers.
264 * If we discover this is a vital for good performance we
265 * can get own replacement for dmu_buf_hold_array_by_bonus().
268 rc = -dmu_buf_hold_array_by_bonus(obj->oo_db, off, len, TRUE,
269 osd_zerocopy_tag, &numbufs,
274 for (i = 0; i < numbufs; i++) {
275 int bufoff, tocpy, thispage;
280 cfs_atomic_inc(&osd->od_zerocopy_pin);
282 bufoff = off - dbp[i]->db_offset;
283 tocpy = min_t(int, dbp[i]->db_size - bufoff, len);
285 /* kind of trick to differentiate dbuf vs. arcbuf */
286 LASSERT(((unsigned long)dbp[i] & 1) == 0);
287 dbf = (void *) ((unsigned long)dbp[i] | 1);
290 thispage = PAGE_CACHE_SIZE;
291 thispage -= bufoff & (PAGE_CACHE_SIZE - 1);
292 thispage = min(tocpy, thispage);
295 lnb->lnb_file_offset = off;
296 lnb->lnb_page_offset = bufoff & ~CFS_PAGE_MASK;
298 lnb->page = kmem_to_page(dbp[i]->db_data +
300 /* mark just a single slot: we need this
301 * reference to dbuf to be release once */
314 /* steal dbuf so dmu_buf_rele_array() cant release it */
318 dmu_buf_rele_array(dbp, numbufs, osd_zerocopy_tag);
325 osd_bufs_put(env, &obj->oo_dt, lnb - npages, npages);
329 static int osd_bufs_get_write(const struct lu_env *env, struct osd_object *obj,
330 loff_t off, ssize_t len, struct niobuf_local *lnb)
332 struct osd_device *osd = osd_obj2dev(obj);
333 int plen, off_in_block, sz_in_block;
334 int i = 0, npages = 0;
340 dmu_object_size_from_db(obj->oo_db, &bs, &dummy);
343 * currently only full blocks are subject to zerocopy approach:
344 * so that we're sure nobody is trying to update the same block
347 LASSERT(npages < PTLRPC_MAX_BRW_PAGES);
349 off_in_block = off & (bs - 1);
350 sz_in_block = min_t(int, bs - off_in_block, len);
352 if (sz_in_block == bs) {
353 /* full block, try to use zerocopy */
355 abuf = dmu_request_arcbuf(obj->oo_db, bs);
356 if (unlikely(abuf == NULL))
357 GOTO(out_err, -ENOMEM);
359 cfs_atomic_inc(&osd->od_zerocopy_loan);
361 /* go over pages arcbuf contains, put them as
362 * local niobufs for ptlrpc's bulks */
363 while (sz_in_block > 0) {
364 plen = min_t(int, sz_in_block, PAGE_CACHE_SIZE);
366 lnb[i].lnb_file_offset = off;
367 lnb[i].lnb_page_offset = 0;
370 if (sz_in_block == bs)
371 lnb[i].dentry = (void *)abuf;
373 lnb[i].dentry = NULL;
375 /* this one is not supposed to fail */
376 lnb[i].page = kmem_to_page(abuf->b_data +
378 LASSERT(lnb[i].page);
380 lprocfs_counter_add(osd->od_stats,
381 LPROC_OSD_ZEROCOPY_IO, 1);
386 off_in_block += plen;
391 if (off_in_block == 0 && len < bs &&
392 off + len >= obj->oo_attr.la_size)
393 lprocfs_counter_add(osd->od_stats,
394 LPROC_OSD_TAIL_IO, 1);
396 /* can't use zerocopy, allocate temp. buffers */
397 while (sz_in_block > 0) {
398 plen = min_t(int, sz_in_block, PAGE_CACHE_SIZE);
400 lnb[i].lnb_file_offset = off;
401 lnb[i].lnb_page_offset = 0;
404 lnb[i].dentry = NULL;
406 lnb[i].page = alloc_page(OSD_GFP_IO);
407 if (unlikely(lnb[i].page == NULL))
408 GOTO(out_err, -ENOMEM);
410 LASSERT(lnb[i].page->mapping == NULL);
411 lnb[i].page->mapping = (void *)obj;
413 cfs_atomic_inc(&osd->od_zerocopy_alloc);
414 lprocfs_counter_add(osd->od_stats,
415 LPROC_OSD_COPY_IO, 1);
429 osd_bufs_put(env, &obj->oo_dt, lnb, npages);
433 static int osd_bufs_get(const struct lu_env *env, struct dt_object *dt,
434 loff_t offset, ssize_t len, struct niobuf_local *lnb,
435 int rw, struct lustre_capa *capa)
437 struct osd_object *obj = osd_dt_obj(dt);
440 LASSERT(dt_object_exists(dt));
444 rc = osd_bufs_get_read(env, obj, offset, len, lnb);
446 rc = osd_bufs_get_write(env, obj, offset, len, lnb);
451 static int osd_write_prep(const struct lu_env *env, struct dt_object *dt,
452 struct niobuf_local *lnb, int npages)
454 struct osd_object *obj = osd_dt_obj(dt);
456 LASSERT(dt_object_exists(dt));
462 /* Return number of blocks that aren't mapped in the [start, start + size]
464 static int osd_count_not_mapped(struct osd_object *obj, uint64_t start,
467 dmu_buf_impl_t *dbi = (dmu_buf_impl_t *)obj->oo_db;
478 if (dn->dn_maxblkid == 0) {
479 if (start + size <= dn->dn_datablksz)
481 if (start < dn->dn_datablksz)
482 start = dn->dn_datablksz;
483 /* assume largest block size */
484 blkshift = SPA_MAXBLOCKSHIFT;
486 /* blocksize can't change */
487 blkshift = dn->dn_datablkshift;
490 /* compute address of last block */
491 end = (start + size - 1) >> blkshift;
492 /* align start on block boundaries */
495 /* size is null, can't be mapped */
496 if (obj->oo_attr.la_size == 0 || dn->dn_maxblkid == 0)
497 GOTO(out, size = (end - start + 1) << blkshift);
499 /* beyond EOF, can't be mapped */
500 if (start > dn->dn_maxblkid)
501 GOTO(out, size = (end - start + 1) << blkshift);
504 for (blkid = start; blkid <= end; blkid++) {
505 if (blkid == dn->dn_maxblkid)
506 /* this one is mapped for sure */
508 if (blkid > dn->dn_maxblkid) {
509 size += (end - blkid + 1) << blkshift;
513 rc = dbuf_hold_impl(dn, 0, blkid, TRUE, FTAG, &db);
515 /* for ENOENT (block not mapped) and any other errors,
516 * assume the block isn't mapped */
517 size += 1 << blkshift;
529 static int osd_declare_write_commit(const struct lu_env *env,
530 struct dt_object *dt,
531 struct niobuf_local *lnb, int npages,
534 struct osd_object *obj = osd_dt_obj(dt);
535 struct osd_device *osd = osd_obj2dev(obj);
536 struct osd_thandle *oh;
539 int i, rc, flags = 0;
540 bool ignore_quota = false, synced = false;
544 LASSERT(dt_object_exists(dt));
550 oh = container_of0(th, struct osd_thandle, ot_super);
552 for (i = 0; i < npages; i++) {
554 /* ENOSPC, network RPC error, etc.
555 * We don't want to book space for pages which will be
556 * skipped in osd_write_commit(). Hence we skip pages
557 * with lnb_rc != 0 here too */
559 /* ignore quota for the whole request if any page is from
560 * client cache or written by root.
562 * XXX once we drop the 1.8 client support, the checking
563 * for whether page is from cache can be simplified as:
564 * !(lnb[i].flags & OBD_BRW_SYNC)
566 * XXX we could handle this on per-lnb basis as done by
568 if ((lnb[i].flags & OBD_BRW_NOQUOTA) ||
569 (lnb[i].flags & (OBD_BRW_FROM_GRANT | OBD_BRW_SYNC)) ==
573 /* first valid lnb */
574 offset = lnb[i].lnb_file_offset;
578 if (offset + size == lnb[i].lnb_file_offset) {
579 /* this lnb is contiguous to the previous one */
584 dmu_tx_hold_write(oh->ot_tx, obj->oo_db->db_object, offset,size);
586 /* estimating space that will be consumed by a write is rather
587 * complicated with ZFS. As a consequence, we don't account for
588 * indirect blocks and quota overrun will be adjusted once the
589 * operation is committed, if required. */
590 space += osd_count_not_mapped(obj, offset, size);
592 offset = lnb->lnb_file_offset;
597 dmu_tx_hold_write(oh->ot_tx, obj->oo_db->db_object, offset,size);
598 space += osd_count_not_mapped(obj, offset, size);
601 dmu_tx_hold_sa(oh->ot_tx, obj->oo_sa_hdl, 0);
603 oh->ot_write_commit = 1; /* used in osd_trans_start() for fail_loc */
605 /* backend zfs filesystem might be configured to store multiple data
607 space *= osd->od_objset.os->os_copies;
609 CDEBUG(D_QUOTA, "writting %d pages, reserving "LPD64"K of quota "
610 "space\n", npages, space);
613 /* acquire quota space if needed */
614 rc = osd_declare_quota(env, osd, obj->oo_attr.la_uid,
615 obj->oo_attr.la_gid, space, oh, true, &flags,
618 if (!synced && rc == -EDQUOT && (flags & QUOTA_FL_SYNC) != 0) {
619 dt_sync(env, th->th_dev);
621 CDEBUG(D_QUOTA, "retry after sync\n");
626 /* we need only to store the overquota flags in the first lnb for
627 * now, once we support multiple objects BRW, this code needs be
629 if (flags & QUOTA_FL_OVER_USRQUOTA)
630 lnb[0].flags |= OBD_BRW_OVER_USRQUOTA;
631 if (flags & QUOTA_FL_OVER_GRPQUOTA)
632 lnb[0].flags |= OBD_BRW_OVER_GRPQUOTA;
637 static int osd_write_commit(const struct lu_env *env, struct dt_object *dt,
638 struct niobuf_local *lnb, int npages,
641 struct osd_object *obj = osd_dt_obj(dt);
642 struct osd_device *osd = osd_obj2dev(obj);
643 udmu_objset_t *uos = &osd->od_objset;
644 struct osd_thandle *oh;
645 uint64_t new_size = 0;
649 LASSERT(dt_object_exists(dt));
653 oh = container_of0(th, struct osd_thandle, ot_super);
655 for (i = 0; i < npages; i++) {
656 CDEBUG(D_INODE, "write %u bytes at %u\n",
657 (unsigned) lnb[i].len,
658 (unsigned) lnb[i].lnb_file_offset);
661 /* ENOSPC, network RPC error, etc.
662 * Unlike ldiskfs, zfs allocates new blocks on rewrite,
663 * so we skip this page if lnb_rc is set to -ENOSPC */
664 CDEBUG(D_INODE, "obj "DFID": skipping lnb[%u]: rc=%d\n",
665 PFID(lu_object_fid(&dt->do_lu)), i,
670 if (lnb[i].page->mapping == (void *)obj) {
671 dmu_write(osd->od_objset.os, obj->oo_db->db_object,
672 lnb[i].lnb_file_offset, lnb[i].len,
673 kmap(lnb[i].page), oh->ot_tx);
675 } else if (lnb[i].dentry) {
676 LASSERT(((unsigned long)lnb[i].dentry & 1) == 0);
677 /* buffer loaned for zerocopy, try to use it.
678 * notice that dmu_assign_arcbuf() is smart
679 * enough to recognize changed blocksize
680 * in this case it fallbacks to dmu_write() */
681 dmu_assign_arcbuf(obj->oo_db, lnb[i].lnb_file_offset,
682 (void *)lnb[i].dentry, oh->ot_tx);
683 /* drop the reference, otherwise osd_put_bufs()
684 * will be releasing it - bad! */
685 lnb[i].dentry = NULL;
686 cfs_atomic_dec(&osd->od_zerocopy_loan);
689 if (new_size < lnb[i].lnb_file_offset + lnb[i].len)
690 new_size = lnb[i].lnb_file_offset + lnb[i].len;
693 if (unlikely(new_size == 0)) {
694 /* no pages to write, no transno is needed */
696 /* it is important to return 0 even when all lnb_rc == -ENOSPC
697 * since ofd_commitrw_write() retries several times on ENOSPC */
701 write_lock(&obj->oo_attr_lock);
702 if (obj->oo_attr.la_size < new_size) {
703 obj->oo_attr.la_size = new_size;
704 write_unlock(&obj->oo_attr_lock);
705 /* osd_object_sa_update() will be copying directly from
706 * oo_attr into dbuf. any update within a single txg will copy
708 rc = osd_object_sa_update(obj, SA_ZPL_SIZE(uos),
709 &obj->oo_attr.la_size, 8, oh);
711 write_unlock(&obj->oo_attr_lock);
717 static int osd_read_prep(const struct lu_env *env, struct dt_object *dt,
718 struct niobuf_local *lnb, int npages)
720 struct osd_object *obj = osd_dt_obj(dt);
725 LASSERT(dt_object_exists(dt));
728 for (i = 0; i < npages; i++) {
729 buf.lb_buf = kmap(lnb[i].page);
730 buf.lb_len = lnb[i].len;
731 offset = lnb[i].lnb_file_offset;
733 CDEBUG(D_OTHER, "read %u bytes at %u\n",
734 (unsigned) lnb[i].len,
735 (unsigned) lnb[i].lnb_file_offset);
736 lnb[i].rc = osd_read(env, dt, &buf, &offset, NULL);
739 if (lnb[i].rc < buf.lb_len) {
740 /* all subsequent rc should be 0 */
751 * Punch/truncate an object
753 * IN: db - dmu_buf of the object to free data in.
754 * off - start of section to free.
755 * len - length of section to free (DMU_OBJECT_END => to EOF).
757 * RETURN: 0 if success
758 * error code if failure
760 * The transaction passed to this routine must have
761 * dmu_tx_hold_sa() and if off < size, dmu_tx_hold_free()
762 * called and then assigned to a transaction group.
764 static int __osd_object_punch(objset_t *os, dmu_buf_t *db, dmu_tx_t *tx,
765 uint64_t size, uint64_t off, uint64_t len)
769 /* Assert that the transaction has been assigned to a
770 transaction group. */
771 LASSERT(tx->tx_txg != 0);
773 * Nothing to do if file already at desired length.
775 if (len == DMU_OBJECT_END && size == off)
779 rc = -dmu_free_range(os, db->db_object, off, len, tx);
784 static int osd_punch(const struct lu_env *env, struct dt_object *dt,
785 __u64 start, __u64 end, struct thandle *th,
786 struct lustre_capa *capa)
788 struct osd_object *obj = osd_dt_obj(dt);
789 struct osd_device *osd = osd_obj2dev(obj);
790 udmu_objset_t *uos = &osd->od_objset;
791 struct osd_thandle *oh;
796 LASSERT(dt_object_exists(dt));
797 LASSERT(osd_invariant(obj));
800 oh = container_of0(th, struct osd_thandle, ot_super);
802 write_lock(&obj->oo_attr_lock);
804 if (end == OBD_OBJECT_EOF || end >= obj->oo_attr.la_size)
805 len = DMU_OBJECT_END;
808 write_unlock(&obj->oo_attr_lock);
810 rc = __osd_object_punch(osd->od_objset.os, obj->oo_db, oh->ot_tx,
811 obj->oo_attr.la_size, start, len);
813 if (len == DMU_OBJECT_END) {
814 write_lock(&obj->oo_attr_lock);
815 obj->oo_attr.la_size = start;
816 write_unlock(&obj->oo_attr_lock);
817 rc = osd_object_sa_update(obj, SA_ZPL_SIZE(uos),
818 &obj->oo_attr.la_size, 8, oh);
823 static int osd_declare_punch(const struct lu_env *env, struct dt_object *dt,
824 __u64 start, __u64 end, struct thandle *handle)
826 struct osd_object *obj = osd_dt_obj(dt);
827 struct osd_device *osd = osd_obj2dev(obj);
828 struct osd_thandle *oh;
832 oh = container_of0(handle, struct osd_thandle, ot_super);
834 read_lock(&obj->oo_attr_lock);
835 if (end == OBD_OBJECT_EOF || end >= obj->oo_attr.la_size)
836 len = DMU_OBJECT_END;
840 /* declare we'll free some blocks ... */
841 if (start < obj->oo_attr.la_size) {
842 read_unlock(&obj->oo_attr_lock);
843 dmu_tx_hold_free(oh->ot_tx, obj->oo_db->db_object, start, len);
845 read_unlock(&obj->oo_attr_lock);
848 /* ... and we'll modify size attribute */
849 dmu_tx_hold_sa(oh->ot_tx, obj->oo_sa_hdl, 0);
851 RETURN(osd_declare_quota(env, osd, obj->oo_attr.la_uid,
852 obj->oo_attr.la_gid, 0, oh, true, NULL,
857 struct dt_body_operations osd_body_ops = {
858 .dbo_read = osd_read,
859 .dbo_declare_write = osd_declare_write,
860 .dbo_write = osd_write,
861 .dbo_bufs_get = osd_bufs_get,
862 .dbo_bufs_put = osd_bufs_put,
863 .dbo_write_prep = osd_write_prep,
864 .dbo_declare_write_commit = osd_declare_write_commit,
865 .dbo_write_commit = osd_write_commit,
866 .dbo_read_prep = osd_read_prep,
867 .dbo_declare_punch = osd_declare_punch,
868 .dbo_punch = osd_punch,