Whamcloud - gitweb
LU-13132 osd: osd-zfs to cache dbufs for llog objects
[fs/lustre-release.git] / lustre / osd-zfs / osd_io.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/osd-zfs/osd_io.c
32  *
33  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
34  * Author: Mike Pershin <tappro@whamcloud.com>
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSD
38
39 #include <libcfs/libcfs.h>
40 #include <obd_support.h>
41 #include <lustre_net.h>
42 #include <obd.h>
43 #include <obd_class.h>
44 #include <lustre_disk.h>
45 #include <lustre_fid.h>
46 #include <lustre_quota.h>
47
48 #include "osd_internal.h"
49
50 #include <sys/dnode.h>
51 #include <sys/dbuf.h>
52 #include <sys/spa.h>
53 #include <sys/stat.h>
54 #include <sys/zap.h>
55 #include <sys/spa_impl.h>
56 #include <sys/zfs_znode.h>
57 #include <sys/dmu_tx.h>
58 #include <sys/dmu_objset.h>
59 #include <sys/dsl_prop.h>
60 #include <sys/sa_impl.h>
61 #include <sys/txg.h>
62
63 char osd_0copy_tag[] = "zerocopy";
64
65 static void dbuf_set_pending_evict(dmu_buf_t *db)
66 {
67         dmu_buf_impl_t *dbi = (dmu_buf_impl_t *)db;
68         dbi->db_pending_evict = TRUE;
69 }
70
71 static void record_start_io(struct osd_device *osd, int rw, int discont_pages)
72 {
73         struct brw_stats *h = &osd->od_brw_stats;
74
75         if (rw == READ) {
76                 atomic_inc(&osd->od_r_in_flight);
77                 lprocfs_oh_tally_pcpu(&h->bs_hist[BRW_R_RPC_HIST],
78                                       atomic_read(&osd->od_r_in_flight));
79                 lprocfs_oh_tally_pcpu(&h->bs_hist[BRW_R_DISCONT_PAGES],
80                                       discont_pages);
81         } else {
82                 atomic_inc(&osd->od_w_in_flight);
83                 lprocfs_oh_tally_pcpu(&h->bs_hist[BRW_W_RPC_HIST],
84                                       atomic_read(&osd->od_w_in_flight));
85                 lprocfs_oh_tally_pcpu(&h->bs_hist[BRW_W_DISCONT_PAGES],
86                                       discont_pages);
87         }
88 }
89
90 static void record_end_io(struct osd_device *osd, int rw,
91                           unsigned long elapsed, int disksize, int npages)
92 {
93         struct brw_stats *h = &osd->od_brw_stats;
94
95         if (rw == READ)
96                 atomic_dec(&osd->od_r_in_flight);
97         else
98                 atomic_dec(&osd->od_w_in_flight);
99
100         lprocfs_oh_tally_log2_pcpu(&h->bs_hist[BRW_R_PAGES + rw], npages);
101         if (disksize > 0)
102                 lprocfs_oh_tally_log2_pcpu(&h->bs_hist[BRW_R_DISK_IOSIZE + rw],
103                                            disksize);
104         if (elapsed)
105                 lprocfs_oh_tally_log2_pcpu(&h->bs_hist[BRW_R_IO_TIME + rw],
106                                             elapsed);
107 }
108
109 static ssize_t __osd_read(const struct lu_env *env, struct dt_object *dt,
110                           struct lu_buf *buf, loff_t *pos, size_t *size)
111 {
112         struct osd_object *obj = osd_dt_obj(dt);
113         uint64_t old_size;
114         int rc;
115
116         LASSERT(dt_object_exists(dt));
117         LASSERT(obj->oo_dn);
118
119         read_lock(&obj->oo_attr_lock);
120         old_size = obj->oo_attr.la_size;
121         read_unlock(&obj->oo_attr_lock);
122
123         if (*pos + *size > old_size) {
124                 if (old_size < *pos)
125                         return 0;
126
127                 *size = old_size - *pos;
128         }
129
130         rc = osd_dmu_read(osd_obj2dev(obj), obj->oo_dn, *pos, *size,
131                           buf->lb_buf, DMU_READ_PREFETCH);
132         if (!rc) {
133                 rc = *size;
134                 *pos += *size;
135         }
136
137         return rc;
138 }
139
140 static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt,
141                         struct lu_buf *buf, loff_t *pos)
142 {
143         struct osd_device *osd = osd_obj2dev(osd_dt_obj(dt));
144         size_t size = buf->lb_len;
145         hrtime_t start = gethrtime();
146         s64 delta_ms;
147         int rc;
148
149         record_start_io(osd, READ, 0);
150         rc = __osd_read(env, dt, buf, pos, &size);
151         delta_ms = gethrtime() - start;
152         do_div(delta_ms, NSEC_PER_MSEC);
153         record_end_io(osd, READ, delta_ms, size, size >> PAGE_SHIFT);
154
155         return rc;
156 }
157
158 static inline ssize_t osd_read_no_record(const struct lu_env *env,
159                                          struct dt_object *dt,
160                                          struct lu_buf *buf, loff_t *pos)
161 {
162         size_t size = buf->lb_len;
163
164         return __osd_read(env, dt, buf, pos, &size);
165 }
166
167 static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt,
168                                 const struct lu_buf *buf, loff_t pos,
169                                 struct thandle *th)
170 {
171         struct osd_object  *obj  = osd_dt_obj(dt);
172         struct osd_device  *osd = osd_obj2dev(obj);
173         loff_t _pos = pos, max = 0;
174         struct osd_thandle *oh;
175         uint64_t            oid;
176         ENTRY;
177
178         oh = container_of(th, struct osd_thandle, ot_super);
179
180         /* in some cases declare can race with creation (e.g. llog)
181          * and we need to wait till object is initialized. notice
182          * LOHA_EXISTs is supposed to be the last step in the
183          * initialization */
184
185         /* size change (in dnode) will be declared by dmu_tx_hold_write() */
186         if (dt_object_exists(dt))
187                 oid = obj->oo_dn->dn_object;
188         else
189                 oid = DMU_NEW_OBJECT;
190
191         /* XXX: we still miss for append declaration support in ZFS
192          *      -1 means append which is used by llog mostly, llog
193          *      can grow upto LLOG_MIN_CHUNK_SIZE*8 records */
194         max = max_t(loff_t, 256 * 8 * LLOG_MIN_CHUNK_SIZE,
195                     obj->oo_attr.la_size + (2 << 20));
196         if (pos == -1)
197                 pos = max;
198         if (obj->oo_dn) {
199                 loff_t tstart, tend, end = pos + buf->lb_len;
200                 dmu_tx_hold_t *txh;
201
202                 /* try to find a close declared window to fit/extend */
203                 for (txh = list_head(&oh->ot_tx->tx_holds); txh != NULL;
204                     txh = list_next(&oh->ot_tx->tx_holds, txh)) {
205                         if (obj->oo_dn != txh->txh_dnode)
206                                 continue;
207                         if (txh->txh_type != THT_WRITE)
208                                 continue;
209
210                         /* bytes already declared in this handle */
211                         tstart = txh->txh_arg1;
212                         tend = txh->txh_arg1 + txh->txh_arg2;
213
214                         if (pos < tstart)
215                                 tstart = pos;
216                         if (tend < end)
217                                 tend = end;
218                         /* if this is an append, then extend it */
219                         if (_pos == -1 && txh->txh_arg1 == max)
220                                 tend += buf->lb_len;
221                         /* don't let too big appends */
222                         if (tend - tstart > 4*1024*1024)
223                                 continue;
224                         if (pos >= tend || end <= tstart)
225                                 continue;
226
227                         txh->txh_arg1 = tstart;
228                         txh->txh_arg2 = tend - tstart;
229                         return 0;
230                 }
231         }
232         osd_tx_hold_write(oh->ot_tx, oid, obj->oo_dn, pos, buf->lb_len);
233
234         /* dt_declare_write() is usually called for system objects, such
235          * as llog or last_rcvd files. We needn't enforce quota on those
236          * objects, so always set the lqi_space as 0. */
237         RETURN(osd_declare_quota(env, osd, obj->oo_attr.la_uid,
238                                  obj->oo_attr.la_gid, obj->oo_attr.la_projid,
239                                  0, oh, NULL, OSD_QID_BLK));
240 }
241
242 static dmu_buf_t *osd_get_dbuf(struct osd_object *obj, uint64_t offset)
243 {
244         dmu_buf_t **dbs = obj->oo_dbs;
245         uint64_t blkid;
246         int i;
247
248         blkid = dbuf_whichblock(obj->oo_dn, 0, offset);
249         for (i = 0; i < OSD_MAX_DBUFS; i++) {
250                 dmu_buf_impl_t *dbi = (void *)dbs[i];
251                 if (!dbs[i])
252                         continue;
253                 if (dbi->db_blkid == blkid)
254                         return dbs[i];
255         }
256         return (dmu_buf_t *)dbuf_hold(obj->oo_dn, blkid, osd_0copy_tag);
257 }
258
259 static void osd_put_dbuf(struct osd_object *obj, dmu_buf_t *db)
260 {
261         dmu_buf_t **dbs = obj->oo_dbs;
262         int i;
263
264         for (i = 0; i < OSD_MAX_DBUFS; i++) {
265                 if (dbs[i] == db)
266                         return;
267         }
268         /* get rid of dbuf with blkd > 0 */
269         for (i = 0; i < OSD_MAX_DBUFS; i++) {
270                 if (dbs[i] == NULL) {
271                         dbs[i] = db;
272                         return;
273                 }
274                 if (dbs[i]->db_offset > 0) {
275                         /* replace this one */
276                         dbuf_rele((dmu_buf_impl_t *)dbs[i], osd_0copy_tag);
277                         dbs[i] = db;
278                         return;
279                 }
280         }
281         LBUG();
282 }
283
284 static ssize_t osd_write_llog_header(struct osd_object *obj,
285                                      const struct lu_buf *buf, loff_t *pos,
286                                      struct osd_thandle *oh)
287 {
288         int bufoff, tocpy;
289         int len = buf->lb_len;
290         loff_t offset = *pos;
291         char *data = buf->lb_buf;
292
293         while (len > 0) {
294                 dmu_buf_t *db = osd_get_dbuf(obj, offset);
295
296                 bufoff = offset - db->db_offset;
297                 tocpy = MIN(db->db_size - bufoff, len);
298                 if (tocpy == db->db_size)
299                         dmu_buf_will_fill(db, oh->ot_tx);
300                 else
301                         dmu_buf_will_dirty(db, oh->ot_tx);
302                 LASSERT(offset >= db->db_offset);
303                 LASSERT(offset + tocpy <= db->db_offset + db->db_size);
304                 (void) memcpy((char *)db->db_data + bufoff, data, tocpy);
305
306                 if (tocpy == db->db_size)
307                         dmu_buf_fill_done(db, oh->ot_tx);
308
309                 offset += tocpy;
310                 data += tocpy;
311                 len -= tocpy;
312
313                 osd_put_dbuf(obj, db);
314         }
315
316         return 0;
317 }
318
319 static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
320                         const struct lu_buf *buf, loff_t *pos,
321                         struct thandle *th)
322 {
323         struct osd_object  *obj  = osd_dt_obj(dt);
324         struct osd_device  *osd = osd_obj2dev(obj);
325         struct osd_thandle *oh;
326         uint64_t            offset = *pos;
327         int                 rc;
328
329         ENTRY;
330
331         LASSERT(dt_object_exists(dt));
332         LASSERT(obj->oo_dn);
333
334         LASSERT(th != NULL);
335         oh = container_of(th, struct osd_thandle, ot_super);
336
337         down_read(&obj->oo_guard);
338         if (obj->oo_destroyed)
339                 GOTO(out, rc = -ENOENT);
340
341         if (fid_is_llog(lu_object_fid(&dt->do_lu))) {
342                 osd_write_llog_header(obj, buf, pos, oh);
343         } else {
344                 osd_dmu_write(osd, obj->oo_dn, offset, (uint64_t)buf->lb_len,
345                               buf->lb_buf, oh->ot_tx);
346         }
347         write_lock(&obj->oo_attr_lock);
348         if (obj->oo_attr.la_size < offset + buf->lb_len) {
349                 obj->oo_attr.la_size = offset + buf->lb_len;
350                 write_unlock(&obj->oo_attr_lock);
351                 /* osd_object_sa_update() will be copying directly from oo_attr
352                  * into dbuf.  any update within a single txg will copy the
353                  * most actual */
354                 rc = osd_object_sa_update(obj, SA_ZPL_SIZE(osd),
355                                         &obj->oo_attr.la_size, 8, oh);
356                 if (unlikely(rc))
357                         GOTO(out, rc);
358         } else {
359                 write_unlock(&obj->oo_attr_lock);
360         }
361
362         *pos += buf->lb_len;
363         rc = buf->lb_len;
364
365 out:
366         up_read(&obj->oo_guard);
367         RETURN(rc);
368 }
369
370 /*
371  * XXX: for the moment I don't want to use lnb_flags for osd-internal
372  *      purposes as it's not very well defined ...
373  *      instead I use the lowest bit of the address so that:
374  *        arc buffer:  .lnb_data = abuf          (arc we loan for write)
375  *        dbuf buffer: .lnb_data = dbuf | 1      (dbuf we get for read)
376  *        copy buffer: .lnb_page->mapping = obj (page we allocate for write)
377  *
378  *      bzzz, to blame
379  */
380 static int osd_bufs_put(const struct lu_env *env, struct dt_object *dt,
381                         struct niobuf_local *lnb, int npages)
382 {
383         struct osd_object *obj  = osd_dt_obj(dt);
384         struct osd_device *osd = osd_obj2dev(obj);
385         unsigned long      ptr;
386         int                i;
387
388         LASSERT(dt_object_exists(dt));
389         LASSERT(obj->oo_dn);
390
391         for (i = 0; i < npages; i++) {
392                 if (lnb[i].lnb_page == NULL)
393                         continue;
394                 if (lnb[i].lnb_page->mapping == (void *)obj) {
395                         /* this is anonymous page allocated for copy-write */
396                         lnb[i].lnb_page->mapping = NULL;
397                         __free_page(lnb[i].lnb_page);
398                         atomic_dec(&osd->od_zerocopy_alloc);
399                 } else {
400                         /* see comment in osd_bufs_get_read() */
401                         ptr = (unsigned long)lnb[i].lnb_data;
402                         if (ptr & 1UL) {
403                                 ptr &= ~1UL;
404                                 dmu_buf_rele((void *)ptr, osd_0copy_tag);
405                                 atomic_dec(&osd->od_zerocopy_pin);
406                         } else if (lnb[i].lnb_data != NULL) {
407                                 int j, apages, abufsz;
408                                 abufsz = arc_buf_size(lnb[i].lnb_data);
409                                 apages = abufsz >> PAGE_SHIFT;
410                                 /* these references to pages must be invalidated
411                                  * to prevent access in osd_bufs_put() */
412                                 for (j = 0; j < apages; j++)
413                                         lnb[i + j].lnb_page = NULL;
414                                 dmu_return_arcbuf(lnb[i].lnb_data);
415                                 atomic_dec(&osd->od_zerocopy_loan);
416                         }
417                 }
418                 lnb[i].lnb_page = NULL;
419                 lnb[i].lnb_data = NULL;
420         }
421
422         return 0;
423 }
424
425 static inline struct page *kmem_to_page(void *addr)
426 {
427         LASSERT(!((unsigned long)addr & ~PAGE_MASK));
428         if (is_vmalloc_addr(addr))
429                 return vmalloc_to_page(addr);
430         else
431                 return virt_to_page(addr);
432 }
433
434 /**
435  * Prepare buffers for read.
436  *
437  * The function maps the range described by \a off and \a len to \a lnb array.
438  * dmu_buf_hold_array_by_bonus() finds/creates appropriate ARC buffers, then
439  * we fill \a lnb array with the pages storing ARC buffers. Notice the current
440  * implementationt passes TRUE to dmu_buf_hold_array_by_bonus() to fill ARC
441  * buffers with actual data, I/O is done in the conext of osd_bufs_get_read().
442  * A better implementation would just return the buffers (potentially unfilled)
443  * and subsequent osd_read_prep() would do I/O for many ranges concurrently.
444  *
445  * \param[in] env       environment
446  * \param[in] obj       object
447  * \param[in] off       offset in bytes
448  * \param[in] len       the number of bytes to access
449  * \param[out] lnb      array of local niobufs pointing to the buffers with data
450  *
451  * \retval              0 for success
452  * \retval              negative error number of failure
453  */
454 static int osd_bufs_get_read(const struct lu_env *env, struct osd_object *obj,
455                              loff_t off, ssize_t len, struct niobuf_local *lnb,
456                              int maxlnb)
457 {
458         struct osd_device *osd = osd_obj2dev(obj);
459         int rc, i, numbufs, npages = 0, drop_cache = 0;
460         hrtime_t start = gethrtime();
461         dmu_buf_t **dbp;
462         s64 delta_ms;
463
464         ENTRY;
465         record_start_io(osd, READ, 0);
466
467         if (obj->oo_attr.la_size >= osd->od_readcache_max_filesize)
468                 drop_cache = 1;
469
470         /* grab buffers for read:
471          * OSD API let us to grab buffers first, then initiate IO(s)
472          * so that all required IOs will be done in parallel, but at the
473          * moment DMU doesn't provide us with a method to grab buffers.
474          * If we discover this is a vital for good performance we
475          * can get own replacement for dmu_buf_hold_array_by_bonus().
476          */
477         while (len > 0 &&
478                (obj->oo_dn->dn_datablkshift != 0 ||
479                 off < obj->oo_dn->dn_datablksz)) {
480                 if (obj->oo_dn->dn_datablkshift == 0 &&
481                     off + len > obj->oo_dn->dn_datablksz)
482                         len = obj->oo_dn->dn_datablksz - off;
483
484                 dbp = NULL;
485                 if (unlikely(npages >= maxlnb))
486                         GOTO(err, rc = -EOVERFLOW);
487
488                 rc = -dmu_buf_hold_array_by_bonus(&obj->oo_dn->dn_bonus->db,
489                                                   off, len, TRUE, osd_0copy_tag,
490                                                   &numbufs, &dbp);
491                 if (unlikely(rc))
492                         GOTO(err, rc);
493
494                 for (i = 0; i < numbufs; i++) {
495                         int bufoff, tocpy, thispage;
496                         void *dbf = dbp[i];
497
498                         LASSERT(len > 0);
499
500                         atomic_inc(&osd->od_zerocopy_pin);
501
502                         bufoff = off - dbp[i]->db_offset;
503                         tocpy = min_t(int, dbp[i]->db_size - bufoff, len);
504
505                         /* kind of trick to differentiate dbuf vs. arcbuf */
506                         LASSERT(((unsigned long)dbp[i] & 1) == 0);
507                         dbf = (void *) ((unsigned long)dbp[i] | 1);
508
509                         while (tocpy > 0) {
510                                 if (unlikely(npages >= maxlnb))
511                                         GOTO(err, rc = -EOVERFLOW);
512
513                                 thispage = PAGE_SIZE;
514                                 thispage -= bufoff & (PAGE_SIZE - 1);
515                                 thispage = min(tocpy, thispage);
516
517                                 lnb->lnb_rc = 0;
518                                 lnb->lnb_file_offset = off;
519                                 lnb->lnb_page_offset = bufoff & ~PAGE_MASK;
520                                 lnb->lnb_len = thispage;
521                                 lnb->lnb_page = kmem_to_page(dbp[i]->db_data +
522                                                              bufoff);
523                                 /* mark just a single slot: we need this
524                                  * reference to dbuf to be released once */
525                                 lnb->lnb_data = dbf;
526                                 dbf = NULL;
527
528                                 tocpy -= thispage;
529                                 len -= thispage;
530                                 bufoff += thispage;
531                                 off += thispage;
532
533                                 npages++;
534                                 lnb++;
535                         }
536
537                         if (drop_cache)
538                                 dbuf_set_pending_evict(dbp[i]);
539
540                         /* steal dbuf so dmu_buf_rele_array() can't release
541                          * it */
542                         dbp[i] = NULL;
543                 }
544
545                 dmu_buf_rele_array(dbp, numbufs, osd_0copy_tag);
546         }
547
548         delta_ms = gethrtime() - start;
549         do_div(delta_ms, NSEC_PER_MSEC);
550         record_end_io(osd, READ, delta_ms, npages * PAGE_SIZE, npages);
551
552         RETURN(npages);
553
554 err:
555         LASSERT(rc < 0);
556         if (dbp)
557                 dmu_buf_rele_array(dbp, numbufs, osd_0copy_tag);
558         osd_bufs_put(env, &obj->oo_dt, lnb - npages, npages);
559         RETURN(rc);
560 }
561
562 static inline arc_buf_t *osd_request_arcbuf(dnode_t *dn, size_t bs)
563 {
564         arc_buf_t *abuf;
565
566         abuf = dmu_request_arcbuf(&dn->dn_bonus->db, bs);
567         if (unlikely(!abuf))
568                 return ERR_PTR(-ENOMEM);
569
570 #if ZFS_VERSION_CODE < OBD_OCD_VERSION(0, 7, 0, 0)
571         /**
572          * ZFS prior to 0.7.0 doesn't guarantee PAGE_SIZE alignment for zio
573          * blocks smaller than (PAGE_SIZE << 2). This poses a problem of
574          * setting up page array for RDMA transfer. See LU-9305.
575          */
576         if ((unsigned long)abuf->b_data & ~PAGE_MASK) {
577                 dmu_return_arcbuf(abuf);
578                 return NULL;
579         }
580 #endif
581
582         return abuf;
583 }
584
585 static int osd_bufs_get_write(const struct lu_env *env, struct osd_object *obj,
586                               loff_t off, ssize_t len, struct niobuf_local *lnb,
587                               int maxlnb)
588 {
589         struct osd_device *osd = osd_obj2dev(obj);
590         int                poff, plen, off_in_block, sz_in_block;
591         int                rc, i = 0, npages = 0;
592         dnode_t *dn = obj->oo_dn;
593         arc_buf_t *abuf;
594         uint32_t bs = dn->dn_datablksz;
595         ENTRY;
596
597         /*
598          * currently only full blocks are subject to zerocopy approach:
599          * so that we're sure nobody is trying to update the same block
600          */
601         while (len > 0) {
602                 if (unlikely(npages >= maxlnb))
603                         GOTO(out_err, rc = -EOVERFLOW);
604
605                 off_in_block = off & (bs - 1);
606                 sz_in_block = min_t(int, bs - off_in_block, len);
607
608                 abuf = NULL;
609                 if (sz_in_block == bs) {
610                         /* full block, try to use zerocopy */
611                         abuf = osd_request_arcbuf(dn, bs);
612                         if (unlikely(IS_ERR(abuf)))
613                                 GOTO(out_err, rc = PTR_ERR(abuf));
614                 }
615
616                 if (abuf != NULL) {
617                         atomic_inc(&osd->od_zerocopy_loan);
618
619                         /* go over pages arcbuf contains, put them as
620                          * local niobufs for ptlrpc's bulks */
621                         while (sz_in_block > 0) {
622                                 plen = min_t(int, sz_in_block, PAGE_SIZE);
623
624                                 if (unlikely(npages >= maxlnb))
625                                         GOTO(out_err, rc = -EOVERFLOW);
626
627                                 lnb[i].lnb_file_offset = off;
628                                 lnb[i].lnb_page_offset = 0;
629                                 lnb[i].lnb_len = plen;
630                                 lnb[i].lnb_rc = 0;
631                                 if (sz_in_block == bs)
632                                         lnb[i].lnb_data = abuf;
633                                 else
634                                         lnb[i].lnb_data = NULL;
635
636                                 /* this one is not supposed to fail */
637                                 lnb[i].lnb_page = kmem_to_page(abuf->b_data +
638                                                         off_in_block);
639                                 LASSERT(lnb[i].lnb_page);
640
641                                 lprocfs_counter_add(osd->od_stats,
642                                                 LPROC_OSD_ZEROCOPY_IO, 1);
643
644                                 sz_in_block -= plen;
645                                 len -= plen;
646                                 off += plen;
647                                 off_in_block += plen;
648                                 i++;
649                                 npages++;
650                         }
651                 } else {
652                         if (off_in_block == 0 && len < bs &&
653                                         off + len >= obj->oo_attr.la_size)
654                                 lprocfs_counter_add(osd->od_stats,
655                                                 LPROC_OSD_TAIL_IO, 1);
656
657                         /* can't use zerocopy, allocate temp. buffers */
658                         poff = off & (PAGE_SIZE - 1);
659                         while (sz_in_block > 0) {
660                                 plen = min_t(int, poff + sz_in_block,
661                                              PAGE_SIZE);
662                                 plen -= poff;
663
664                                 if (unlikely(npages >= maxlnb))
665                                         GOTO(out_err, rc = -EOVERFLOW);
666
667                                 lnb[i].lnb_file_offset = off;
668                                 lnb[i].lnb_page_offset = poff;
669                                 poff = 0;
670
671                                 lnb[i].lnb_len = plen;
672                                 lnb[i].lnb_rc = 0;
673                                 lnb[i].lnb_data = NULL;
674
675                                 lnb[i].lnb_page = alloc_page(OSD_GFP_IO);
676                                 if (unlikely(lnb[i].lnb_page == NULL))
677                                         GOTO(out_err, rc = -ENOMEM);
678
679                                 LASSERT(lnb[i].lnb_page->mapping == NULL);
680                                 lnb[i].lnb_page->mapping = (void *)obj;
681
682                                 atomic_inc(&osd->od_zerocopy_alloc);
683                                 lprocfs_counter_add(osd->od_stats,
684                                                 LPROC_OSD_COPY_IO, 1);
685
686                                 sz_in_block -= plen;
687                                 len -= plen;
688                                 off += plen;
689                                 i++;
690                                 npages++;
691                         }
692                 }
693         }
694
695         RETURN(npages);
696
697 out_err:
698         osd_bufs_put(env, &obj->oo_dt, lnb, npages);
699         RETURN(rc);
700 }
701
702 static int osd_bufs_get(const struct lu_env *env, struct dt_object *dt,
703                         loff_t offset, ssize_t len, struct niobuf_local *lnb,
704                         int maxlnb, enum dt_bufs_type rw)
705 {
706         struct osd_object *obj  = osd_dt_obj(dt);
707         int                rc;
708
709         down_read(&obj->oo_guard);
710
711         if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
712                 GOTO(out, rc = -ENOENT);
713
714         if (rw & DT_BUFS_TYPE_WRITE)
715                 rc = osd_bufs_get_write(env, obj, offset, len, lnb, maxlnb);
716         else
717                 rc = osd_bufs_get_read(env, obj, offset, len, lnb, maxlnb);
718
719 out:
720         up_read(&obj->oo_guard);
721         return rc;
722 }
723
724 static int osd_write_prep(const struct lu_env *env, struct dt_object *dt,
725                         struct niobuf_local *lnb, int npages)
726 {
727         struct osd_object *obj = osd_dt_obj(dt);
728
729         LASSERT(dt_object_exists(dt));
730         LASSERT(obj->oo_dn);
731
732         return 0;
733 }
734
735 static inline uint64_t osd_roundup2blocksz(uint64_t size,
736                                            uint64_t offset,
737                                            uint32_t blksz)
738 {
739         LASSERT(blksz > 0);
740
741         size += offset % blksz;
742
743         if (likely(is_power_of_2(blksz)))
744                 return round_up(size, blksz);
745         else
746                 return DIV_ROUND_UP_ULL(size, blksz) * blksz;
747 }
748
749 static int osd_declare_write_commit(const struct lu_env *env,
750                                     struct dt_object *dt,
751                                     struct niobuf_local *lnb, int npages,
752                                     struct thandle *th)
753 {
754         struct osd_object  *obj = osd_dt_obj(dt);
755         struct osd_device  *osd = osd_obj2dev(obj);
756         struct osd_thandle *oh;
757         uint64_t            offset = 0;
758         uint32_t            size = 0;
759         uint32_t blksz = obj->oo_dn->dn_datablksz;
760         int                 i, rc;
761         bool synced = false;
762         long long           space = 0;
763         struct page        *last_page = NULL;
764         unsigned long       discont_pages = 0;
765         enum osd_quota_local_flags local_flags = 0;
766         enum osd_qid_declare_flags declare_flags = OSD_QID_BLK;
767         ENTRY;
768
769         LASSERT(dt_object_exists(dt));
770         LASSERT(obj->oo_dn);
771
772         LASSERT(lnb);
773         LASSERT(npages > 0);
774
775         oh = container_of(th, struct osd_thandle, ot_super);
776
777         for (i = 0; i < npages; i++) {
778                 if (last_page && lnb[i].lnb_page->index != (last_page->index + 1))
779                         ++discont_pages;
780                 last_page = lnb[i].lnb_page;
781                 if (lnb[i].lnb_rc)
782                         /* ENOSPC, network RPC error, etc.
783                          * We don't want to book space for pages which will be
784                          * skipped in osd_write_commit(). Hence we skip pages
785                          * with lnb_rc != 0 here too */
786                         continue;
787                 /* ignore quota for the whole request if any page is from
788                  * client cache or written by root.
789                  *
790                  * XXX we could handle this on per-lnb basis as done by
791                  * grant. */
792                 if ((lnb[i].lnb_flags & OBD_BRW_NOQUOTA) ||
793                     (lnb[i].lnb_flags & OBD_BRW_SYS_RESOURCE) ||
794                     !(lnb[i].lnb_flags & OBD_BRW_SYNC))
795                         declare_flags |= OSD_QID_FORCE;
796
797                 if (size == 0) {
798                         /* first valid lnb */
799                         offset = lnb[i].lnb_file_offset;
800                         size = lnb[i].lnb_len;
801                         continue;
802                 }
803                 if (offset + size == lnb[i].lnb_file_offset) {
804                         /* this lnb is contiguous to the previous one */
805                         size += lnb[i].lnb_len;
806                         continue;
807                 }
808
809                 osd_tx_hold_write(oh->ot_tx, obj->oo_dn->dn_object,
810                                   obj->oo_dn, offset, size);
811                 /* Estimating space to be consumed by a write is rather
812                  * complicated with ZFS. As a consequence, we don't account for
813                  * indirect blocks and just use as a rough estimate the worse
814                  * case where the old space is being held by a snapshot. Quota
815                  * overrun will be adjusted once the operation is committed, if
816                  * required. */
817                 space += osd_roundup2blocksz(size, offset, blksz);
818
819                 offset = lnb[i].lnb_file_offset;
820                 size = lnb[i].lnb_len;
821         }
822
823         if (size) {
824                 osd_tx_hold_write(oh->ot_tx, obj->oo_dn->dn_object, obj->oo_dn,
825                                   offset, size);
826                 space += osd_roundup2blocksz(size, offset, blksz);
827         }
828
829         /* backend zfs filesystem might be configured to store multiple data
830          * copies */
831         space  *= osd->od_os->os_copies;
832         space   = toqb(space);
833         CDEBUG(D_QUOTA, "writing %d pages, reserving %lldK of quota space\n",
834                npages, space);
835
836         record_start_io(osd, WRITE, discont_pages);
837 retry:
838         /* acquire quota space if needed */
839         rc = osd_declare_quota(env, osd, obj->oo_attr.la_uid,
840                                obj->oo_attr.la_gid, obj->oo_attr.la_projid,
841                                space, oh, &local_flags, declare_flags);
842
843         if (!synced && rc == -EDQUOT &&
844             (local_flags & QUOTA_FL_SYNC) != 0) {
845                 dt_sync(env, th->th_dev);
846                 synced = true;
847                 CDEBUG(D_QUOTA, "retry after sync\n");
848                 local_flags = 0;
849                 goto retry;
850         }
851
852         /* we need only to store the overquota flags in the first lnb for
853          * now, once we support multiple objects BRW, this code needs be
854          * revised. */
855         if (local_flags & QUOTA_FL_OVER_USRQUOTA)
856                 lnb[0].lnb_flags |= OBD_BRW_OVER_USRQUOTA;
857         if (local_flags & QUOTA_FL_OVER_GRPQUOTA)
858                 lnb[0].lnb_flags |= OBD_BRW_OVER_GRPQUOTA;
859 #ifdef ZFS_PROJINHERIT
860         if (local_flags & QUOTA_FL_OVER_PRJQUOTA)
861                 lnb[0].lnb_flags |= OBD_BRW_OVER_PRJQUOTA;
862         if (local_flags & QUOTA_FL_ROOT_PRJQUOTA)
863                 lnb[0].lnb_flags |= OBD_BRW_ROOT_PRJQUOTA;
864 #endif
865
866         RETURN(rc);
867 }
868
869 /**
870  * Policy to grow ZFS block size by write pattern.
871  * For sequential write, it grows block size gradually until it reaches the
872  * maximum blocksize the dataset can support. Otherwise, it will pick a
873  * a block size by the writing region of this I/O.
874  */
875 static int osd_grow_blocksize(struct osd_object *obj, struct osd_thandle *oh,
876                               uint64_t start, uint64_t end)
877 {
878         struct osd_device       *osd = osd_obj2dev(obj);
879         dnode_t *dn = obj->oo_dn;
880         uint32_t                 blksz;
881         int                      rc = 0;
882
883         ENTRY;
884
885         if (dn->dn_maxblkid > 0) /* can't change block size */
886                 GOTO(out, rc);
887
888         if (dn->dn_datablksz >= osd->od_max_blksz)
889                 GOTO(out, rc);
890
891         down_write(&obj->oo_guard);
892
893         blksz = dn->dn_datablksz;
894         if (blksz >= osd->od_max_blksz) /* check again after grabbing lock */
895                 GOTO(out_unlock, rc);
896
897         /* now ZFS can support up to 16MB block size, and if the write
898          * is sequential, it just increases the block size gradually */
899         if (start <= blksz) { /* sequential */
900                 blksz = (uint32_t)min_t(uint64_t, osd->od_max_blksz, end);
901         } else { /* sparse, pick a block size by write region */
902                 blksz = (uint32_t)min_t(uint64_t, osd->od_max_blksz,
903                                         end - start);
904         }
905
906         if (!is_power_of_2(blksz))
907                 blksz = size_roundup_power2(blksz);
908
909         if (blksz > dn->dn_datablksz) {
910                 rc = -dmu_object_set_blocksize(osd->od_os, dn->dn_object,
911                                                blksz, 0, oh->ot_tx);
912                 LASSERT(ergo(rc == 0, dn->dn_datablksz >= blksz));
913                 if (rc < 0)
914                         CDEBUG(D_INODE, "object "DFID": change block size"
915                                "%u -> %u error rc = %d\n",
916                                PFID(lu_object_fid(&obj->oo_dt.do_lu)),
917                                dn->dn_datablksz, blksz, rc);
918         }
919         EXIT;
920 out_unlock:
921         up_write(&obj->oo_guard);
922 out:
923         return rc;
924 }
925
926 static void osd_evict_dbufs_after_write(struct osd_object *obj,
927                                         loff_t off, ssize_t len)
928 {
929         dmu_buf_t **dbp;
930         int i, rc, numbufs;
931
932         rc = -dmu_buf_hold_array_by_bonus(&obj->oo_dn->dn_bonus->db, off, len,
933                                           TRUE, osd_0copy_tag, &numbufs, &dbp);
934         if (unlikely(rc))
935                 return;
936
937         for (i = 0; i < numbufs; i++)
938                 dbuf_set_pending_evict(dbp[i]);
939
940         dmu_buf_rele_array(dbp, numbufs, osd_0copy_tag);
941 }
942
943 static int osd_write_commit(const struct lu_env *env, struct dt_object *dt,
944                         struct niobuf_local *lnb, int npages,
945                         struct thandle *th, __u64 user_size)
946 {
947         struct osd_object  *obj  = osd_dt_obj(dt);
948         struct osd_device  *osd = osd_obj2dev(obj);
949         struct osd_thandle *oh;
950         uint64_t            new_size = 0;
951         int                 i, abufsz, rc = 0, drop_cache = 0;
952         unsigned long      iosize = 0;
953         ENTRY;
954
955         LASSERT(dt_object_exists(dt));
956         LASSERT(obj->oo_dn);
957
958         LASSERT(th != NULL);
959         oh = container_of(th, struct osd_thandle, ot_super);
960
961         /* adjust block size. Assume the buffers are sorted. */
962         (void)osd_grow_blocksize(obj, oh, lnb[0].lnb_file_offset,
963                                  lnb[npages - 1].lnb_file_offset +
964                                  lnb[npages - 1].lnb_len);
965
966         if (obj->oo_attr.la_size >= osd->od_readcache_max_filesize ||
967             lnb[npages - 1].lnb_file_offset + lnb[npages - 1].lnb_len >=
968             osd->od_readcache_max_filesize)
969                 drop_cache = 1;
970
971         if (OBD_FAIL_CHECK(OBD_FAIL_OST_MAPBLK_ENOSPC))
972                 RETURN(-ENOSPC);
973
974         /* if la_size is already bigger than specified user_size,
975          * ignore user_size
976          */
977         if (obj->oo_attr.la_size > user_size)
978                 user_size = 0;
979
980         /* LU-8791: take oo_guard to avoid the deadlock that changing block
981          * size and assigning arcbuf take place at the same time.
982          *
983          * Thread 1:
984          * osd_write_commit()
985          *  -> osd_grow_blocksize() with osd_object::oo_guard held
986          *   -> dmu_object_set_blocksize()
987          *    -> dnode_set_blksz(), with dnode_t::dn_struct_rwlock
988          *       write lock held
989          *     -> dbuf_new_size()
990          *      -> dmu_buf_will_dirty()
991          *       -> dbuf_read()
992          *        -> wait for the dbuf state to change
993          * Thread 2:
994          * osd_write_commit()
995          *  -> dmu_assign_arcbuf()
996          *   -> dbuf_assign_arcbuf(), set dbuf state to DB_FILL
997          *    -> dbuf_dirty()
998          *     -> try to hold the read lock of dnode_t::dn_struct_rwlock
999          *
1000          * By taking the read lock, it can avoid thread 2 to enter into the
1001          * critical section of assigning the arcbuf, while thread 1 is
1002          * changing the block size.
1003          */
1004         down_read(&obj->oo_guard);
1005         if (obj->oo_destroyed) {
1006                 up_read(&obj->oo_guard);
1007                 RETURN(-ENOENT);
1008         }
1009
1010         for (i = 0; i < npages; i++) {
1011                 CDEBUG(D_INODE, "write %u bytes at %u\n",
1012                         (unsigned) lnb[i].lnb_len,
1013                         (unsigned) lnb[i].lnb_file_offset);
1014
1015                 if (lnb[i].lnb_rc) {
1016                         /* ENOSPC, network RPC error, etc.
1017                          * Unlike ldiskfs, zfs allocates new blocks on rewrite,
1018                          * so we skip this page if lnb_rc is set to -ENOSPC */
1019                         CDEBUG(D_INODE, "obj "DFID": skipping lnb[%u]: rc=%d\n",
1020                                 PFID(lu_object_fid(&dt->do_lu)), i,
1021                                 lnb[i].lnb_rc);
1022                         continue;
1023                 }
1024
1025                 if (new_size < lnb[i].lnb_file_offset + lnb[i].lnb_len)
1026                         new_size = lnb[i].lnb_file_offset + lnb[i].lnb_len;
1027                 if (lnb[i].lnb_page == NULL)
1028                         continue;
1029
1030                 if (lnb[i].lnb_page->mapping == (void *)obj) {
1031                         osd_dmu_write(osd, obj->oo_dn, lnb[i].lnb_file_offset,
1032                                       lnb[i].lnb_len, kmap(lnb[i].lnb_page) +
1033                                       lnb[i].lnb_page_offset, oh->ot_tx);
1034                         kunmap(lnb[i].lnb_page);
1035                         iosize += lnb[i].lnb_len;
1036                         abufsz = lnb[i].lnb_len; /* to drop cache below */
1037                 } else if (lnb[i].lnb_data) {
1038                         int j, apages;
1039                         LASSERT(((unsigned long)lnb[i].lnb_data & 1) == 0);
1040                         /* buffer loaned for zerocopy, try to use it.
1041                          * notice that dmu_assign_arcbuf() is smart
1042                          * enough to recognize changed blocksize
1043                          * in this case it fallbacks to dmu_write() */
1044                         abufsz = arc_buf_size(lnb[i].lnb_data);
1045                         LASSERT(abufsz & PAGE_MASK);
1046                         apages = abufsz >> PAGE_SHIFT;
1047                         LASSERT(i + apages <= npages);
1048                         /* these references to pages must be invalidated
1049                          * to prevent access in osd_bufs_put() */
1050                         for (j = 0; j < apages; j++)
1051                                 lnb[i + j].lnb_page = NULL;
1052                         dmu_assign_arcbuf(&obj->oo_dn->dn_bonus->db,
1053                                           lnb[i].lnb_file_offset,
1054                                           lnb[i].lnb_data, oh->ot_tx);
1055                         /* drop the reference, otherwise osd_put_bufs()
1056                          * will be releasing it - bad! */
1057                         lnb[i].lnb_data = NULL;
1058                         atomic_dec(&osd->od_zerocopy_loan);
1059                         iosize += abufsz;
1060                 } else {
1061                         /* we don't want to deal with cache if nothing
1062                          * has been send to ZFS at this step */
1063                         continue;
1064                 }
1065
1066                 if (!drop_cache)
1067                         continue;
1068
1069                 /* we have to mark dbufs for eviction here because
1070                  * dmu_assign_arcbuf() may create a new dbuf for
1071                  * loaned abuf */
1072                 osd_evict_dbufs_after_write(obj, lnb[i].lnb_file_offset,
1073                                             abufsz);
1074         }
1075
1076         if (unlikely(new_size == 0)) {
1077                 /* no pages to write, no transno is needed */
1078                 th->th_local = 1;
1079                 /* it is important to return 0 even when all lnb_rc == -ENOSPC
1080                  * since ofd_commitrw_write() retries several times on ENOSPC */
1081                 up_read(&obj->oo_guard);
1082                 record_end_io(osd, WRITE, 0, 0, 0);
1083                 RETURN(0);
1084         }
1085
1086         /* if file has grown, take user_size into account */
1087         if (user_size && new_size > user_size)
1088                 new_size = user_size;
1089         write_lock(&obj->oo_attr_lock);
1090         if (obj->oo_attr.la_size < new_size) {
1091                 obj->oo_attr.la_size = new_size;
1092                 write_unlock(&obj->oo_attr_lock);
1093                 /* osd_object_sa_update() will be copying directly from
1094                  * oo_attr into dbuf. any update within a single txg will copy
1095                  * the most actual */
1096                 rc = osd_object_sa_update(obj, SA_ZPL_SIZE(osd),
1097                                           &obj->oo_attr.la_size, 8, oh);
1098         } else {
1099                 write_unlock(&obj->oo_attr_lock);
1100         }
1101
1102         up_read(&obj->oo_guard);
1103
1104         record_end_io(osd, WRITE, 0, iosize, npages);
1105
1106         RETURN(rc);
1107 }
1108
1109 static int osd_read_prep(const struct lu_env *env, struct dt_object *dt,
1110                         struct niobuf_local *lnb, int npages)
1111 {
1112         struct osd_object *obj  = osd_dt_obj(dt);
1113         int                i;
1114         loff_t             eof;
1115
1116         LASSERT(dt_object_exists(dt));
1117         LASSERT(obj->oo_dn);
1118
1119         read_lock(&obj->oo_attr_lock);
1120         eof = obj->oo_attr.la_size;
1121         read_unlock(&obj->oo_attr_lock);
1122
1123         for (i = 0; i < npages; i++) {
1124                 if (unlikely(lnb[i].lnb_rc < 0))
1125                         continue;
1126
1127                 lnb[i].lnb_rc = lnb[i].lnb_len;
1128
1129                 if (lnb[i].lnb_file_offset + lnb[i].lnb_len >= eof) {
1130                         /* send complete pages all the time */
1131                         if (eof <= lnb[i].lnb_file_offset)
1132                                 lnb[i].lnb_rc = 0;
1133
1134                         /* all subsequent rc should be 0 */
1135                         while (++i < npages)
1136                                 lnb[i].lnb_rc = 0;
1137                         break;
1138                 }
1139         }
1140
1141         return 0;
1142 }
1143
1144 /*
1145  * Punch/truncate an object
1146  *
1147  *      IN:     db  - dmu_buf of the object to free data in.
1148  *              off - start of section to free.
1149  *              len - length of section to free (DMU_OBJECT_END => to EOF).
1150  *
1151  *      RETURN: 0 if success
1152  *              error code if failure
1153  *
1154  * The transaction passed to this routine must have
1155  * dmu_tx_hold_sa() and if off < size, dmu_tx_hold_free()
1156  * called and then assigned to a transaction group.
1157  */
1158 static int __osd_object_punch(struct osd_object *obj, objset_t *os,
1159                               dmu_tx_t *tx, uint64_t off, uint64_t len)
1160 {
1161         dnode_t *dn = obj->oo_dn;
1162         uint64_t size = obj->oo_attr.la_size;
1163         int rc = 0;
1164
1165         /* Assert that the transaction has been assigned to a
1166            transaction group. */
1167         LASSERT(tx->tx_txg != 0);
1168         /*
1169          * Nothing to do if file already at desired length.
1170          */
1171         if (len == DMU_OBJECT_END && size == off)
1172                 return 0;
1173
1174         /* if object holds encrypted content, we need to make sure we truncate
1175          * on an encryption unit boundary, or subsequent reads will get
1176          * corrupted content
1177          */
1178         if (len != DMU_OBJECT_END)
1179                 len -= LUSTRE_ENCRYPTION_UNIT_SIZE -
1180                         (off & ~LUSTRE_ENCRYPTION_MASK);
1181         if (obj->oo_lma_flags & LUSTRE_ENCRYPT_FL &&
1182             off & ~LUSTRE_ENCRYPTION_MASK)
1183                 off = (off & LUSTRE_ENCRYPTION_MASK) +
1184                         LUSTRE_ENCRYPTION_UNIT_SIZE;
1185
1186
1187         /* XXX: dnode_free_range() can be used to save on dnode lookup */
1188         if (off < size)
1189                 dmu_free_range(os, dn->dn_object, off, len, tx);
1190
1191         return rc;
1192 }
1193
1194 static int osd_punch(const struct lu_env *env, struct dt_object *dt,
1195                         __u64 start, __u64 end, struct thandle *th)
1196 {
1197         struct osd_object  *obj = osd_dt_obj(dt);
1198         struct osd_device  *osd = osd_obj2dev(obj);
1199         struct osd_thandle *oh;
1200         __u64               len;
1201         int                 rc = 0;
1202         ENTRY;
1203
1204         LASSERT(dt_object_exists(dt));
1205         LASSERT(osd_invariant(obj));
1206
1207         LASSERT(th != NULL);
1208         oh = container_of(th, struct osd_thandle, ot_super);
1209
1210         write_lock(&obj->oo_attr_lock);
1211         /* truncate */
1212         if (end == OBD_OBJECT_EOF || end >= obj->oo_attr.la_size)
1213                 len = DMU_OBJECT_END;
1214         else
1215                 len = end - start;
1216         write_unlock(&obj->oo_attr_lock);
1217
1218         down_read(&obj->oo_guard);
1219         if (obj->oo_destroyed)
1220                 GOTO(out, rc = -ENOENT);
1221
1222         rc = __osd_object_punch(obj, osd->od_os, oh->ot_tx, start, len);
1223
1224         /* set new size */
1225         if (len == DMU_OBJECT_END) {
1226                 write_lock(&obj->oo_attr_lock);
1227                 obj->oo_attr.la_size = start;
1228                 write_unlock(&obj->oo_attr_lock);
1229                 rc = osd_object_sa_update(obj, SA_ZPL_SIZE(osd),
1230                                           &obj->oo_attr.la_size, 8, oh);
1231         }
1232 out:
1233         up_read(&obj->oo_guard);
1234         RETURN(rc);
1235 }
1236
1237 static int osd_declare_punch(const struct lu_env *env, struct dt_object *dt,
1238                         __u64 start, __u64 end, struct thandle *handle)
1239 {
1240         struct osd_object  *obj = osd_dt_obj(dt);
1241         struct osd_device  *osd = osd_obj2dev(obj);
1242         struct osd_thandle *oh;
1243         __u64               len;
1244         ENTRY;
1245
1246         oh = container_of(handle, struct osd_thandle, ot_super);
1247
1248         read_lock(&obj->oo_attr_lock);
1249         if (end == OBD_OBJECT_EOF || end >= obj->oo_attr.la_size)
1250                 len = DMU_OBJECT_END;
1251         else
1252                 len = end - start;
1253
1254         /* declare we'll free some blocks ... */
1255         /* if object holds encrypted content, we need to make sure we truncate
1256          * on an encryption unit boundary, or subsequent reads will get
1257          * corrupted content
1258          */
1259         if (obj->oo_lma_flags & LUSTRE_ENCRYPT_FL &&
1260             start & ~LUSTRE_ENCRYPTION_MASK)
1261                 start = (start & LUSTRE_ENCRYPTION_MASK) +
1262                         LUSTRE_ENCRYPTION_UNIT_SIZE;
1263         if (start < obj->oo_attr.la_size) {
1264                 read_unlock(&obj->oo_attr_lock);
1265                 dmu_tx_mark_netfree(oh->ot_tx);
1266                 dmu_tx_hold_free(oh->ot_tx, obj->oo_dn->dn_object, start, len);
1267         } else {
1268                 read_unlock(&obj->oo_attr_lock);
1269         }
1270
1271         RETURN(osd_declare_quota(env, osd, obj->oo_attr.la_uid,
1272                                  obj->oo_attr.la_gid, obj->oo_attr.la_projid,
1273                                  0, oh, NULL, OSD_QID_BLK));
1274 }
1275
1276 static int osd_ladvise(const struct lu_env *env, struct dt_object *dt,
1277                        __u64 start, __u64 end, enum lu_ladvise_type advice)
1278 {
1279         int     rc;
1280         ENTRY;
1281
1282         switch (advice) {
1283         default:
1284                 rc = -ENOTSUPP;
1285                 break;
1286         }
1287
1288         RETURN(rc);
1289 }
1290
1291 static int osd_fallocate(const struct lu_env *env, struct dt_object *dt,
1292                          __u64 start, __u64 end, int mode, struct thandle *th)
1293 {
1294         int rc = -EOPNOTSUPP;
1295         ENTRY;
1296
1297          /*
1298           * space preallocation is not supported for ZFS
1299           * Returns -EOPNOTSUPP for now
1300           */
1301         RETURN(rc);
1302 }
1303
1304 static int osd_declare_fallocate(const struct lu_env *env,
1305                                  struct dt_object *dt, __u64 start, __u64 end,
1306                                  int mode, struct thandle *th)
1307 {
1308         int rc = -EOPNOTSUPP;
1309         ENTRY;
1310
1311          /*
1312           * space preallocation is not supported for ZFS
1313           * Returns -EOPNOTSUPP for now
1314           */
1315         RETURN(rc);
1316 }
1317
1318 static loff_t osd_lseek(const struct lu_env *env, struct dt_object *dt,
1319                         loff_t offset, int whence)
1320 {
1321         struct osd_object *obj = osd_dt_obj(dt);
1322         struct osd_device *osd = osd_obj2dev(obj);
1323         uint64_t size = obj->oo_attr.la_size;
1324         uint64_t result = offset;
1325         int rc;
1326         boolean_t hole = whence == SEEK_HOLE;
1327
1328         ENTRY;
1329
1330         LASSERT(dt_object_exists(dt));
1331         LASSERT(osd_invariant(obj));
1332         LASSERT(offset >= 0);
1333
1334         /* for SEEK_HOLE treat 'offset' beyond the end of file as in real
1335          * hole. LOV to decide after all if that real hole or not.
1336          */
1337         if (offset >= size)
1338                 RETURN(hole ? offset : -ENXIO);
1339
1340         /* Currently ZFS reports no valid DATA offset if object has dirty data
1341          * and we cannot just switch to generic way with reporting DATA on all
1342          * file offsets and HOLE beyond end of file, because we may get HOLE
1343          * reported correctly at some offset inside file then DATA will find
1344          * dirty state and be reported also at that offset by generic approach.
1345          * This is because for HOLE report ZFS doesn't check dirty state but
1346          * does for DATA.
1347          * The only way to get reliable results is to call txg_wait_synced()
1348          * when ZFS reports EBUSY result and repeat lseek call and that is
1349          * controlled via od_sync_on_lseek option.
1350          */
1351         if (!osd->od_sync_on_lseek)
1352                 result = hole ? size : offset;
1353
1354 again:
1355         rc = osd_dmu_offset_next(osd->od_os, obj->oo_dn->dn_object, hole,
1356                                  &result);
1357         /* dirty inode, lseek result is unreliable without sync */
1358         if (rc == EBUSY) {
1359                 txg_wait_synced(dmu_objset_pool(osd->od_os), 0ULL);
1360                 goto again;
1361         }
1362
1363         if (rc == ESRCH)
1364                 RETURN(-ENXIO);
1365
1366         /* ZFS is not exported all needed function, so fall back to the
1367          * generic logic: for HOLE return file size, for DATA return
1368          * the current offset
1369          */
1370         if (rc == EOPNOTSUPP)
1371                 result = hole ? size : offset;
1372         else if (rc)
1373                 return -rc;
1374
1375         /* dmu_offset_next() only works on whole blocks so may return SEEK_HOLE
1376          * result as end of the last block instead of logical EOF which we need
1377          */
1378         if (result > size)
1379                 result = size;
1380
1381         RETURN(result);
1382 }
1383
1384 const struct dt_body_operations osd_body_ops = {
1385         .dbo_read                       = osd_read,
1386         .dbo_declare_write              = osd_declare_write,
1387         .dbo_write                      = osd_write,
1388         .dbo_bufs_get                   = osd_bufs_get,
1389         .dbo_bufs_put                   = osd_bufs_put,
1390         .dbo_write_prep                 = osd_write_prep,
1391         .dbo_declare_write_commit       = osd_declare_write_commit,
1392         .dbo_write_commit               = osd_write_commit,
1393         .dbo_read_prep                  = osd_read_prep,
1394         .dbo_declare_punch              = osd_declare_punch,
1395         .dbo_punch                      = osd_punch,
1396         .dbo_ladvise                    = osd_ladvise,
1397         .dbo_declare_fallocate          = osd_declare_fallocate,
1398         .dbo_fallocate                  = osd_fallocate,
1399         .dbo_lseek                      = osd_lseek,
1400 };
1401
1402 const struct dt_body_operations osd_body_scrub_ops = {
1403         .dbo_read                       = osd_read_no_record,
1404         .dbo_declare_write              = osd_declare_write,
1405         .dbo_write                      = osd_write,
1406 };