From b59942cce0ed3b720ec38c3b5a25063d01b8d218 Mon Sep 17 00:00:00 2001 From: Alex Zhuravlev Date: Mon, 24 Mar 2014 19:30:19 +0400 Subject: [PATCH] LU-4820 osd: drop memcpy in zfs osd dmu_read() was called from osd_read_prep() copying from ARC bufs into the same ARC bufs. seem to be the remainings of pre-zerocopy age. Change-Id: I87c10a2d484b7fe0be370349a2bfeb857ddd74e9 Signed-off-by: Alex Zhuravlev Reviewed-on: http://review.whamcloud.com/12991 Reviewed-by: Andreas Dilger Tested-by: Jenkins Tested-by: Maloo Reviewed-by: Nathaniel Clark Reviewed-by: Oleg Drokin --- lustre/osd-zfs/osd_io.c | 88 ++++++++++++++++++++++++++++++------------------- 1 file changed, 54 insertions(+), 34 deletions(-) diff --git a/lustre/osd-zfs/osd_io.c b/lustre/osd-zfs/osd_io.c index 47058d2..c283cdc 100644 --- a/lustre/osd-zfs/osd_io.c +++ b/lustre/osd-zfs/osd_io.c @@ -69,8 +69,7 @@ static char *osd_zerocopy_tag = "zerocopy"; -static void record_start_io(struct osd_device *osd, int rw, int npages, - int discont_pages) +static void record_start_io(struct osd_device *osd, int rw, int discont_pages) { struct obd_histogram *h = osd->od_brw_stats.hist; @@ -78,26 +77,25 @@ static void record_start_io(struct osd_device *osd, int rw, int npages, atomic_inc(&osd->od_r_in_flight); lprocfs_oh_tally(&h[BRW_R_RPC_HIST], atomic_read(&osd->od_r_in_flight)); - lprocfs_oh_tally_log2(&h[BRW_R_PAGES], npages); lprocfs_oh_tally(&h[BRW_R_DISCONT_PAGES], discont_pages); } else { atomic_inc(&osd->od_w_in_flight); lprocfs_oh_tally(&h[BRW_W_RPC_HIST], atomic_read(&osd->od_w_in_flight)); - lprocfs_oh_tally_log2(&h[BRW_W_PAGES], npages); lprocfs_oh_tally(&h[BRW_W_DISCONT_PAGES], discont_pages); } } static void record_end_io(struct osd_device *osd, int rw, - unsigned long elapsed, int disksize) + unsigned long elapsed, int disksize, int npages) { struct obd_histogram *h = osd->od_brw_stats.hist; if (rw == READ) { atomic_dec(&osd->od_r_in_flight); + lprocfs_oh_tally_log2(&h[BRW_R_PAGES], npages); if (disksize > 0) lprocfs_oh_tally_log2(&h[BRW_R_DISK_IOSIZE], disksize); if (elapsed) @@ -105,6 +103,7 @@ static void record_end_io(struct osd_device *osd, int rw, } else { atomic_dec(&osd->od_w_in_flight); + lprocfs_oh_tally_log2(&h[BRW_W_PAGES], npages); if (disksize > 0) lprocfs_oh_tally_log2(&h[BRW_W_DISK_IOSIZE], disksize); if (elapsed) @@ -139,12 +138,13 @@ static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt, size = old_size - *pos; } - record_start_io(osd, READ, (size >> PAGE_CACHE_SHIFT), 0); + record_start_io(osd, READ, 0); rc = -dmu_read(osd->od_os, obj->oo_db->db_object, *pos, size, buf->lb_buf, DMU_READ_PREFETCH); - record_end_io(osd, READ, cfs_time_current() - start, size); + record_end_io(osd, READ, cfs_time_current() - start, size, + size >> PAGE_CACHE_SHIFT); if (rc == 0) { rc = size; *pos += size; @@ -217,7 +217,7 @@ static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt, LASSERT(th != NULL); oh = container_of0(th, struct osd_thandle, ot_super); - record_start_io(osd, WRITE, (buf->lb_len >> PAGE_CACHE_SHIFT), 0); + record_start_io(osd, WRITE, 0); dmu_write(osd->od_os, obj->oo_db->db_object, offset, (uint64_t)buf->lb_len, buf->lb_buf, oh->ot_tx); @@ -240,7 +240,8 @@ static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt, rc = buf->lb_len; out: - record_end_io(osd, WRITE, 0, buf->lb_len); + record_end_io(osd, WRITE, 0, buf->lb_len, + buf->lb_len >> PAGE_CACHE_SHIFT); RETURN(rc); } @@ -249,8 +250,8 @@ out: * XXX: for the moment I don't want to use lnb_flags for osd-internal * purposes as it's not very well defined ... * instead I use the lowest bit of the address so that: - * arc buffer: .lnb_obj = abuf (arc we loan for write) - * dbuf buffer: .lnb_obj = dbuf | 1 (dbuf we get for read) + * arc buffer: .lnb_data = abuf (arc we loan for write) + * dbuf buffer: .lnb_data = dbuf | 1 (dbuf we get for read) * copy buffer: .lnb_page->mapping = obj (page we allocate for write) * * bzzz, to blame @@ -301,14 +302,37 @@ static inline struct page *kmem_to_page(void *addr) return virt_to_page(addr); } +/** + * Prepare buffers for read. + * + * The function maps the range described by \a off and \a len to \a lnb array. + * dmu_buf_hold_array_by_bonus() finds/creates appropriate ARC buffers, then + * we fill \a lnb array with the pages storing ARC buffers. Notice the current + * implementationt passes TRUE to dmu_buf_hold_array_by_bonus() to fill ARC + * buffers with actual data, I/O is done in the conext of osd_bufs_get_read(). + * A better implementation would just return the buffers (potentially unfilled) + * and subsequent osd_read_prep() would do I/O for many ranges concurrently. + * + * \param[in] env environment + * \param[in] obj object + * \param[in] off offset in bytes + * \param[in] len the number of bytes to access + * \param[out] lnb array of local niobufs pointing to the buffers with data + * + * \retval 0 for success + * \retval negative error number of failure + */ static int osd_bufs_get_read(const struct lu_env *env, struct osd_object *obj, loff_t off, ssize_t len, struct niobuf_local *lnb) { struct osd_device *osd = osd_obj2dev(obj); - dmu_buf_t **dbp; + unsigned long start = cfs_time_current(); int rc, i, numbufs, npages = 0; + dmu_buf_t **dbp; ENTRY; + record_start_io(osd, READ, 0); + /* grab buffers for read: * OSD API let us to grab buffers first, then initiate IO(s) * so that all required IOs will be done in parallel, but at the @@ -370,6 +394,9 @@ static int osd_bufs_get_read(const struct lu_env *env, struct osd_object *obj, dmu_buf_rele_array(dbp, numbufs, osd_zerocopy_tag); } + record_end_io(osd, READ, cfs_time_current() - start, + npages * PAGE_SIZE, npages); + RETURN(npages); err: @@ -667,7 +694,7 @@ static int osd_declare_write_commit(const struct lu_env *env, CDEBUG(D_QUOTA, "writting %d pages, reserving "LPD64"K of quota " "space\n", npages, space); - record_start_io(osd, WRITE, npages, discont_pages); + record_start_io(osd, WRITE, discont_pages); retry: /* acquire quota space if needed */ rc = osd_declare_quota(env, osd, obj->oo_attr.la_uid, @@ -755,7 +782,7 @@ static int osd_write_commit(const struct lu_env *env, struct dt_object *dt, th->th_local = 1; /* it is important to return 0 even when all lnb_rc == -ENOSPC * since ofd_commitrw_write() retries several times on ENOSPC */ - record_end_io(osd, WRITE, 0, 0); + record_end_io(osd, WRITE, 0, 0, 0); RETURN(0); } @@ -772,7 +799,7 @@ static int osd_write_commit(const struct lu_env *env, struct dt_object *dt, write_unlock(&obj->oo_attr_lock); } - record_end_io(osd, WRITE, 0, iosize); + record_end_io(osd, WRITE, 0, iosize, npages); RETURN(rc); } @@ -781,34 +808,29 @@ static int osd_read_prep(const struct lu_env *env, struct dt_object *dt, struct niobuf_local *lnb, int npages) { struct osd_object *obj = osd_dt_obj(dt); - struct osd_device *osd = osd_obj2dev(obj); - struct lu_buf buf; - loff_t offset; int i; - unsigned long start; unsigned long size = 0; + loff_t eof; LASSERT(dt_object_exists(dt)); LASSERT(obj->oo_db); - start = cfs_time_current(); - - record_start_io(osd, READ, npages, 0); + read_lock(&obj->oo_attr_lock); + eof = obj->oo_attr.la_size; + read_unlock(&obj->oo_attr_lock); for (i = 0; i < npages; i++) { - buf.lb_buf = kmap(lnb[i].lnb_page); - buf.lb_len = lnb[i].lnb_len; - offset = lnb[i].lnb_file_offset; - - CDEBUG(D_OTHER, "read %u bytes at %u\n", - (unsigned) lnb[i].lnb_len, - (unsigned) lnb[i].lnb_file_offset); - lnb[i].lnb_rc = osd_read(env, dt, &buf, &offset, NULL); - kunmap(lnb[i].lnb_page); + if (unlikely(lnb[i].lnb_rc < 0)) + continue; + lnb[i].lnb_rc = lnb[i].lnb_len; size += lnb[i].lnb_rc; - if (lnb[i].lnb_rc < buf.lb_len) { + if (lnb[i].lnb_file_offset + lnb[i].lnb_len > eof) { + lnb[i].lnb_rc = eof - lnb[i].lnb_file_offset; + if (lnb[i].lnb_rc < 0) + lnb[i].lnb_rc = 0; + /* all subsequent rc should be 0 */ while (++i < npages) lnb[i].lnb_rc = 0; @@ -816,8 +838,6 @@ static int osd_read_prep(const struct lu_env *env, struct dt_object *dt, } } - record_end_io(osd, READ, cfs_time_current() - start, size); - return 0; } -- 1.8.3.1