Whamcloud - gitweb
LU-11771 osd: avoid use of HZ in brw_stats
[fs/lustre-release.git] / lustre / osd-zfs / osd_io.c
index fef00d7..ed3c92c 100644 (file)
 static char *osd_0copy_tag = "zerocopy";
 
 
+static void dbuf_set_pending_evict(dmu_buf_t *db)
+{
+       dmu_buf_impl_t *dbi = (dmu_buf_impl_t *)db;
+       dbi->db_pending_evict = TRUE;
+}
+
 static void record_start_io(struct osd_device *osd, int rw, int discont_pages)
 {
        struct obd_histogram *h = osd->od_brw_stats.hist;
@@ -88,22 +94,16 @@ static void record_end_io(struct osd_device *osd, int rw,
 {
        struct obd_histogram *h = osd->od_brw_stats.hist;
 
-       if (rw == READ) {
+       if (rw == READ)
                atomic_dec(&osd->od_r_in_flight);
-               lprocfs_oh_tally_log2(&h[BRW_R_PAGES], npages);
-               if (disksize > 0)
-                       lprocfs_oh_tally_log2(&h[BRW_R_DISK_IOSIZE], disksize);
-               if (elapsed)
-                       lprocfs_oh_tally_log2(&h[BRW_R_IO_TIME], elapsed);
-
-       } else {
+       else
                atomic_dec(&osd->od_w_in_flight);
-               lprocfs_oh_tally_log2(&h[BRW_W_PAGES], npages);
-               if (disksize > 0)
-                       lprocfs_oh_tally_log2(&h[BRW_W_DISK_IOSIZE], disksize);
-               if (elapsed)
-                       lprocfs_oh_tally_log2(&h[BRW_W_IO_TIME], elapsed);
-       }
+
+       lprocfs_oh_tally_log2(&h[BRW_R_PAGES + rw], npages);
+       if (disksize > 0)
+               lprocfs_oh_tally_log2(&h[BRW_R_DISK_IOSIZE + rw], disksize);
+       if (elapsed)
+               lprocfs_oh_tally_log2(&h[BRW_R_IO_TIME + rw], elapsed);
 }
 
 static ssize_t __osd_read(const struct lu_env *env, struct dt_object *dt,
@@ -205,7 +205,7 @@ static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt,
 
 static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
                        const struct lu_buf *buf, loff_t *pos,
-                       struct thandle *th, int ignore_quota)
+                        struct thandle *th)
 {
        struct osd_object  *obj  = osd_dt_obj(dt);
        struct osd_device  *osd = osd_obj2dev(obj);
@@ -333,7 +333,7 @@ static int osd_bufs_get_read(const struct lu_env *env, struct osd_object *obj,
                             loff_t off, ssize_t len, struct niobuf_local *lnb)
 {
        struct osd_device *osd = osd_obj2dev(obj);
-       int rc, i, numbufs, npages = 0;
+       int rc, i, numbufs, npages = 0, drop_cache = 0;
        ktime_t start = ktime_get();
        dmu_buf_t **dbp;
        s64 delta_ms;
@@ -341,6 +341,9 @@ static int osd_bufs_get_read(const struct lu_env *env, struct osd_object *obj,
        ENTRY;
        record_start_io(osd, READ, 0);
 
+       if (obj->oo_attr.la_size >= osd->od_readcache_max_filesize)
+               drop_cache = 1;
+
        /* grab buffers for read:
         * OSD API let us to grab buffers first, then initiate IO(s)
         * so that all required IOs will be done in parallel, but at the
@@ -401,6 +404,9 @@ static int osd_bufs_get_read(const struct lu_env *env, struct osd_object *obj,
                                lnb++;
                        }
 
+                       if (drop_cache)
+                               dbuf_set_pending_evict(dbp[i]);
+
                        /* steal dbuf so dmu_buf_rele_array() can't release
                         * it */
                        dbp[i] = NULL;
@@ -447,7 +453,7 @@ static int osd_bufs_get_write(const struct lu_env *env, struct osd_object *obj,
                              loff_t off, ssize_t len, struct niobuf_local *lnb)
 {
        struct osd_device *osd = osd_obj2dev(obj);
-       int                plen, off_in_block, sz_in_block;
+       int                poff, plen, off_in_block, sz_in_block;
        int                rc, i = 0, npages = 0;
        dnode_t *dn = obj->oo_dn;
        arc_buf_t *abuf;
@@ -511,11 +517,16 @@ static int osd_bufs_get_write(const struct lu_env *env, struct osd_object *obj,
                                                LPROC_OSD_TAIL_IO, 1);
 
                        /* can't use zerocopy, allocate temp. buffers */
+                       poff = off & (PAGE_SIZE - 1);
                        while (sz_in_block > 0) {
-                               plen = min_t(int, sz_in_block, PAGE_SIZE);
+                               plen = min_t(int, poff + sz_in_block,
+                                            PAGE_SIZE);
+                               plen -= poff;
 
                                lnb[i].lnb_file_offset = off;
-                               lnb[i].lnb_page_offset = 0;
+                               lnb[i].lnb_page_offset = poff;
+                               poff = 0;
+
                                lnb[i].lnb_len = plen;
                                lnb[i].lnb_rc = 0;
                                lnb[i].lnb_data = NULL;
@@ -675,8 +686,6 @@ static int osd_declare_write_commit(const struct lu_env *env,
                space += osd_roundup2blocksz(size, offset, blksz);
        }
 
-       oh->ot_write_commit = 1; /* used in osd_trans_start() for fail_loc */
-
        /* backend zfs filesystem might be configured to store multiple data
         * copies */
        space  *= osd->od_os->os_copies;
@@ -771,6 +780,23 @@ out:
        return rc;
 }
 
+static void osd_evict_dbufs_after_write(struct osd_object *obj,
+                                       loff_t off, ssize_t len)
+{
+       dmu_buf_t **dbp;
+       int i, rc, numbufs;
+
+       rc = -dmu_buf_hold_array_by_bonus(&obj->oo_dn->dn_bonus->db, off, len,
+                                         TRUE, osd_0copy_tag, &numbufs, &dbp);
+       if (unlikely(rc))
+               return;
+
+       for (i = 0; i < numbufs; i++)
+               dbuf_set_pending_evict(dbp[i]);
+
+       dmu_buf_rele_array(dbp, numbufs, osd_0copy_tag);
+}
+
 static int osd_write_commit(const struct lu_env *env, struct dt_object *dt,
                        struct niobuf_local *lnb, int npages,
                        struct thandle *th)
@@ -779,7 +805,7 @@ static int osd_write_commit(const struct lu_env *env, struct dt_object *dt,
        struct osd_device  *osd = osd_obj2dev(obj);
        struct osd_thandle *oh;
        uint64_t            new_size = 0;
-       int                 i, rc = 0;
+       int                 i, abufsz, rc = 0, drop_cache = 0;
        unsigned long      iosize = 0;
        ENTRY;
 
@@ -794,6 +820,14 @@ static int osd_write_commit(const struct lu_env *env, struct dt_object *dt,
                                 lnb[npages - 1].lnb_file_offset +
                                 lnb[npages - 1].lnb_len);
 
+       if (obj->oo_attr.la_size >= osd->od_readcache_max_filesize ||
+           lnb[npages - 1].lnb_file_offset + lnb[npages - 1].lnb_len >=
+           osd->od_readcache_max_filesize)
+               drop_cache = 1;
+
+       if (OBD_FAIL_CHECK(OBD_FAIL_OST_MAPBLK_ENOSPC))
+               RETURN(-ENOSPC);
+
        /* LU-8791: take oo_guard to avoid the deadlock that changing block
         * size and assigning arcbuf take place at the same time.
         *
@@ -841,12 +875,13 @@ static int osd_write_commit(const struct lu_env *env, struct dt_object *dt,
 
                if (lnb[i].lnb_page->mapping == (void *)obj) {
                        osd_dmu_write(osd, obj->oo_dn, lnb[i].lnb_file_offset,
-                                     lnb[i].lnb_len, kmap(lnb[i].lnb_page),
-                                     oh->ot_tx);
+                                     lnb[i].lnb_len, kmap(lnb[i].lnb_page) +
+                                     lnb[i].lnb_page_offset, oh->ot_tx);
                        kunmap(lnb[i].lnb_page);
                        iosize += lnb[i].lnb_len;
+                       abufsz = lnb[i].lnb_len; /* to drop cache below */
                } else if (lnb[i].lnb_data) {
-                       int j, apages, abufsz;
+                       int j, apages;
                        LASSERT(((unsigned long)lnb[i].lnb_data & 1) == 0);
                        /* buffer loaned for zerocopy, try to use it.
                         * notice that dmu_assign_arcbuf() is smart
@@ -868,8 +903,20 @@ static int osd_write_commit(const struct lu_env *env, struct dt_object *dt,
                        lnb[i].lnb_data = NULL;
                        atomic_dec(&osd->od_zerocopy_loan);
                        iosize += abufsz;
+               } else {
+                       /* we don't want to deal with cache if nothing
+                        * has been send to ZFS at this step */
+                       continue;
                }
 
+               if (!drop_cache)
+                       continue;
+
+               /* we have to mark dbufs for eviction here because
+                * dmu_assign_arcbuf() may create a new dbuf for
+                * loaned abuf */
+               osd_evict_dbufs_after_write(obj, lnb[i].lnb_file_offset,
+                                           abufsz);
        }
        up_read(&obj->oo_guard);