X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fosd-zfs%2Fosd_io.c;h=e0b07485ec15add72f05a0400d8429ba3e78b3ef;hb=6fc066434a3b8d4cfee0e038eb5da1cf4c8b5c74;hp=8825892c52df7053db26ec068d564397c12828f7;hpb=687273b7bff7c8bf5ae6a5c912d46f529f4b0d1a;p=fs%2Flustre-release.git diff --git a/lustre/osd-zfs/osd_io.c b/lustre/osd-zfs/osd_io.c index 8825892..e0b0748 100644 --- a/lustre/osd-zfs/osd_io.c +++ b/lustre/osd-zfs/osd_io.c @@ -28,7 +28,7 @@ * Use is subject to license terms. */ /* - * Copyright (c) 2011, 2012 Whamcloud, Inc. + * Copyright (c) 2012, 2013, Intel Corporation. * Use is subject to license terms. */ /* @@ -41,20 +41,17 @@ * Author: Mike Pershin */ -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif #define DEBUG_SUBSYSTEM S_OSD #include #include -#include #include #include #include #include #include #include +#include /* LLOG_CHUNK_SIZE definition */ #include "osd_internal.h" @@ -86,9 +83,9 @@ static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt, LASSERT(dt_object_exists(dt)); LASSERT(obj->oo_db); - cfs_read_lock(&obj->oo_attr_lock); + read_lock(&obj->oo_attr_lock); old_size = obj->oo_attr.la_size; - cfs_read_unlock(&obj->oo_attr_lock); + read_unlock(&obj->oo_attr_lock); if (*pos + size > old_size) { if (old_size < *pos) @@ -112,7 +109,7 @@ static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt, } static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt, - const loff_t size, loff_t pos, + const struct lu_buf *buf, loff_t pos, struct thandle *th) { struct osd_object *obj = osd_dt_obj(dt); @@ -141,7 +138,13 @@ static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt, dmu_tx_hold_sa_create(oh->ot_tx, ZFS_SA_BASE_ATTR_SIZE); } - dmu_tx_hold_write(oh->ot_tx, oid, pos, size); + /* XXX: we still miss for append declaration support in ZFS + * -1 means append which is used by llog mostly, llog + * can grow upto LLOG_CHUNK_SIZE*8 records */ + if (pos == -1) + pos = max_t(loff_t, 256 * 8 * LLOG_CHUNK_SIZE, + obj->oo_attr.la_size + (2 << 20)); + dmu_tx_hold_write(oh->ot_tx, oid, pos, buf->lb_len); /* dt_declare_write() is usually called for system objects, such * as llog or last_rcvd files. We needn't enforce quota on those @@ -172,10 +175,10 @@ static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt, dmu_write(osd->od_objset.os, obj->oo_db->db_object, offset, (uint64_t)buf->lb_len, buf->lb_buf, oh->ot_tx); - cfs_write_lock(&obj->oo_attr_lock); + write_lock(&obj->oo_attr_lock); if (obj->oo_attr.la_size < offset + buf->lb_len) { obj->oo_attr.la_size = offset + buf->lb_len; - cfs_write_unlock(&obj->oo_attr_lock); + write_unlock(&obj->oo_attr_lock); /* osd_object_sa_update() will be copying directly from oo_attr * into dbuf. any update within a single txg will copy the * most actual */ @@ -184,7 +187,7 @@ static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt, if (unlikely(rc)) GOTO(out, rc); } else { - cfs_write_unlock(&obj->oo_attr_lock); + write_unlock(&obj->oo_attr_lock); } *pos += buf->lb_len; @@ -216,42 +219,38 @@ static int osd_bufs_put(const struct lu_env *env, struct dt_object *dt, LASSERT(obj->oo_db); for (i = 0; i < npages; i++) { - if (lnb[i].page == NULL) + if (lnb[i].lnb_page == NULL) continue; - if (lnb[i].page->mapping == (void *)obj) { + if (lnb[i].lnb_page->mapping == (void *)obj) { /* this is anonymous page allocated for copy-write */ - lnb[i].page->mapping = NULL; - __free_page(lnb[i].page); - cfs_atomic_dec(&osd->od_zerocopy_alloc); + lnb[i].lnb_page->mapping = NULL; + __free_page(lnb[i].lnb_page); + atomic_dec(&osd->od_zerocopy_alloc); } else { /* see comment in osd_bufs_get_read() */ - ptr = (unsigned long)lnb[i].dentry; + ptr = (unsigned long)lnb[i].lnb_data; if (ptr & 1UL) { ptr &= ~1UL; dmu_buf_rele((void *)ptr, osd_zerocopy_tag); - cfs_atomic_dec(&osd->od_zerocopy_pin); - } else if (lnb[i].dentry != NULL) { - dmu_return_arcbuf((void *)lnb[i].dentry); - cfs_atomic_dec(&osd->od_zerocopy_loan); + atomic_dec(&osd->od_zerocopy_pin); + } else if (lnb[i].lnb_data != NULL) { + dmu_return_arcbuf(lnb[i].lnb_data); + atomic_dec(&osd->od_zerocopy_loan); } } - lnb[i].page = NULL; - lnb[i].dentry = NULL; + lnb[i].lnb_page = NULL; + lnb[i].lnb_data = NULL; } return 0; } -static struct page *kmem_to_page(void *addr) +static inline struct page *kmem_to_page(void *addr) { - struct page *page; - - if (kmem_virt(addr)) - page = vmalloc_to_page(addr); + if (is_vmalloc_addr(addr)) + return vmalloc_to_page(addr); else - page = virt_to_page(addr); - - return page; + return virt_to_page(addr); } static int osd_bufs_get_read(const struct lu_env *env, struct osd_object *obj, @@ -271,9 +270,10 @@ static int osd_bufs_get_read(const struct lu_env *env, struct osd_object *obj, */ while (len > 0) { rc = -dmu_buf_hold_array_by_bonus(obj->oo_db, off, len, TRUE, - osd_zerocopy_tag, &numbufs, - &dbp); - LASSERT(rc == 0); + osd_zerocopy_tag, &numbufs, + &dbp); + if (unlikely(rc)) + GOTO(err, rc); for (i = 0; i < numbufs; i++) { int bufoff, tocpy, thispage; @@ -281,7 +281,7 @@ static int osd_bufs_get_read(const struct lu_env *env, struct osd_object *obj, LASSERT(len > 0); - cfs_atomic_inc(&osd->od_zerocopy_pin); + atomic_inc(&osd->od_zerocopy_pin); bufoff = off - dbp[i]->db_offset; tocpy = min_t(int, dbp[i]->db_size - bufoff, len); @@ -291,19 +291,19 @@ static int osd_bufs_get_read(const struct lu_env *env, struct osd_object *obj, dbf = (void *) ((unsigned long)dbp[i] | 1); while (tocpy > 0) { - thispage = CFS_PAGE_SIZE; - thispage -= bufoff & (CFS_PAGE_SIZE - 1); + thispage = PAGE_CACHE_SIZE; + thispage -= bufoff & (PAGE_CACHE_SIZE - 1); thispage = min(tocpy, thispage); - lnb->rc = 0; + lnb->lnb_rc = 0; lnb->lnb_file_offset = off; lnb->lnb_page_offset = bufoff & ~CFS_PAGE_MASK; - lnb->len = thispage; - lnb->page = kmem_to_page(dbp[i]->db_data + - bufoff); + lnb->lnb_len = thispage; + lnb->lnb_page = kmem_to_page(dbp[i]->db_data + + bufoff); /* mark just a single slot: we need this * reference to dbuf to be release once */ - lnb->dentry = dbf; + lnb->lnb_data = dbf; dbf = NULL; tocpy -= thispage; @@ -323,6 +323,11 @@ static int osd_bufs_get_read(const struct lu_env *env, struct osd_object *obj, } RETURN(npages); + +err: + LASSERT(rc < 0); + osd_bufs_put(env, &obj->oo_dt, lnb - npages, npages); + RETURN(rc); } static int osd_bufs_get_write(const struct lu_env *env, struct osd_object *obj, @@ -330,7 +335,7 @@ static int osd_bufs_get_write(const struct lu_env *env, struct osd_object *obj, { struct osd_device *osd = osd_obj2dev(obj); int plen, off_in_block, sz_in_block; - int i = 0, npages = 0; + int rc, i = 0, npages = 0; arc_buf_t *abuf; uint32_t bs; uint64_t dummy; @@ -353,28 +358,28 @@ static int osd_bufs_get_write(const struct lu_env *env, struct osd_object *obj, abuf = dmu_request_arcbuf(obj->oo_db, bs); if (unlikely(abuf == NULL)) - GOTO(out_err, -ENOMEM); + GOTO(out_err, rc = -ENOMEM); - cfs_atomic_inc(&osd->od_zerocopy_loan); + atomic_inc(&osd->od_zerocopy_loan); /* go over pages arcbuf contains, put them as * local niobufs for ptlrpc's bulks */ while (sz_in_block > 0) { - plen = min_t(int, sz_in_block, CFS_PAGE_SIZE); + plen = min_t(int, sz_in_block, PAGE_CACHE_SIZE); lnb[i].lnb_file_offset = off; lnb[i].lnb_page_offset = 0; - lnb[i].len = plen; - lnb[i].rc = 0; + lnb[i].lnb_len = plen; + lnb[i].lnb_rc = 0; if (sz_in_block == bs) - lnb[i].dentry = (void *)abuf; + lnb[i].lnb_data = abuf; else - lnb[i].dentry = NULL; + lnb[i].lnb_data = NULL; /* this one is not supposed to fail */ - lnb[i].page = kmem_to_page(abuf->b_data + + lnb[i].lnb_page = kmem_to_page(abuf->b_data + off_in_block); - LASSERT(lnb[i].page); + LASSERT(lnb[i].lnb_page); lprocfs_counter_add(osd->od_stats, LPROC_OSD_ZEROCOPY_IO, 1); @@ -394,22 +399,22 @@ static int osd_bufs_get_write(const struct lu_env *env, struct osd_object *obj, /* can't use zerocopy, allocate temp. buffers */ while (sz_in_block > 0) { - plen = min_t(int, sz_in_block, CFS_PAGE_SIZE); + plen = min_t(int, sz_in_block, PAGE_CACHE_SIZE); lnb[i].lnb_file_offset = off; lnb[i].lnb_page_offset = 0; - lnb[i].len = plen; - lnb[i].rc = 0; - lnb[i].dentry = NULL; + lnb[i].lnb_len = plen; + lnb[i].lnb_rc = 0; + lnb[i].lnb_data = NULL; - lnb[i].page = alloc_page(OSD_GFP_IO); - if (unlikely(lnb[i].page == NULL)) - GOTO(out_err, -ENOMEM); + lnb[i].lnb_page = alloc_page(OSD_GFP_IO); + if (unlikely(lnb[i].lnb_page == NULL)) + GOTO(out_err, rc = -ENOMEM); - LASSERT(lnb[i].page->mapping == NULL); - lnb[i].page->mapping = (void *)obj; + LASSERT(lnb[i].lnb_page->mapping == NULL); + lnb[i].lnb_page->mapping = (void *)obj; - cfs_atomic_inc(&osd->od_zerocopy_alloc); + atomic_inc(&osd->od_zerocopy_alloc); lprocfs_counter_add(osd->od_stats, LPROC_OSD_COPY_IO, 1); @@ -426,7 +431,7 @@ static int osd_bufs_get_write(const struct lu_env *env, struct osd_object *obj, out_err: osd_bufs_put(env, &obj->oo_dt, lnb, npages); - RETURN(-ENOMEM); + RETURN(rc); } static int osd_bufs_get(const struct lu_env *env, struct dt_object *dt, @@ -549,7 +554,7 @@ static int osd_declare_write_commit(const struct lu_env *env, oh = container_of0(th, struct osd_thandle, ot_super); for (i = 0; i < npages; i++) { - if (lnb[i].rc) + if (lnb[i].lnb_rc) /* ENOSPC, network RPC error, etc. * We don't want to book space for pages which will be * skipped in osd_write_commit(). Hence we skip pages @@ -558,37 +563,43 @@ static int osd_declare_write_commit(const struct lu_env *env, /* ignore quota for the whole request if any page is from * client cache or written by root. * + * XXX once we drop the 1.8 client support, the checking + * for whether page is from cache can be simplified as: + * !(lnb[i].flags & OBD_BRW_SYNC) + * * XXX we could handle this on per-lnb basis as done by * grant. */ - if ((lnb[i].flags & OBD_BRW_NOQUOTA) || - !(lnb[i].flags & OBD_BRW_SYNC)) + if ((lnb[i].lnb_flags & OBD_BRW_NOQUOTA) || + (lnb[i].lnb_flags & (OBD_BRW_FROM_GRANT | OBD_BRW_SYNC)) == + OBD_BRW_FROM_GRANT) ignore_quota = true; if (size == 0) { /* first valid lnb */ offset = lnb[i].lnb_file_offset; - size = lnb[i].len; + size = lnb[i].lnb_len; continue; } if (offset + size == lnb[i].lnb_file_offset) { /* this lnb is contiguous to the previous one */ - size += lnb[i].len; + size += lnb[i].lnb_len; continue; } - dmu_tx_hold_write(oh->ot_tx, obj->oo_db->db_object, offset,size); - + dmu_tx_hold_write(oh->ot_tx, obj->oo_db->db_object, + offset, size); /* estimating space that will be consumed by a write is rather * complicated with ZFS. As a consequence, we don't account for * indirect blocks and quota overrun will be adjusted once the * operation is committed, if required. */ space += osd_count_not_mapped(obj, offset, size); - offset = lnb->lnb_file_offset; - size = lnb->len; + offset = lnb[i].lnb_file_offset; + size = lnb[i].lnb_len; } if (size) { - dmu_tx_hold_write(oh->ot_tx, obj->oo_db->db_object, offset,size); + dmu_tx_hold_write(oh->ot_tx, obj->oo_db->db_object, + offset, size); space += osd_count_not_mapped(obj, offset, size); } @@ -621,9 +632,9 @@ retry: * now, once we support multiple objects BRW, this code needs be * revised. */ if (flags & QUOTA_FL_OVER_USRQUOTA) - lnb[0].flags |= OBD_BRW_OVER_USRQUOTA; + lnb[0].lnb_flags |= OBD_BRW_OVER_USRQUOTA; if (flags & QUOTA_FL_OVER_GRPQUOTA) - lnb[0].flags |= OBD_BRW_OVER_GRPQUOTA; + lnb[0].lnb_flags |= OBD_BRW_OVER_GRPQUOTA; RETURN(rc); } @@ -648,40 +659,40 @@ static int osd_write_commit(const struct lu_env *env, struct dt_object *dt, for (i = 0; i < npages; i++) { CDEBUG(D_INODE, "write %u bytes at %u\n", - (unsigned) lnb[i].len, + (unsigned) lnb[i].lnb_len, (unsigned) lnb[i].lnb_file_offset); - if (lnb[i].rc) { + if (lnb[i].lnb_rc) { /* ENOSPC, network RPC error, etc. * Unlike ldiskfs, zfs allocates new blocks on rewrite, * so we skip this page if lnb_rc is set to -ENOSPC */ CDEBUG(D_INODE, "obj "DFID": skipping lnb[%u]: rc=%d\n", PFID(lu_object_fid(&dt->do_lu)), i, - lnb[i].rc); + lnb[i].lnb_rc); continue; } - if (lnb[i].page->mapping == (void *)obj) { + if (lnb[i].lnb_page->mapping == (void *)obj) { dmu_write(osd->od_objset.os, obj->oo_db->db_object, - lnb[i].lnb_file_offset, lnb[i].len, - kmap(lnb[i].page), oh->ot_tx); - kunmap(lnb[i].page); - } else if (lnb[i].dentry) { - LASSERT(((unsigned long)lnb[i].dentry & 1) == 0); + lnb[i].lnb_file_offset, lnb[i].lnb_len, + kmap(lnb[i].lnb_page), oh->ot_tx); + kunmap(lnb[i].lnb_page); + } else if (lnb[i].lnb_data) { + LASSERT(((unsigned long)lnb[i].lnb_data & 1) == 0); /* buffer loaned for zerocopy, try to use it. * notice that dmu_assign_arcbuf() is smart * enough to recognize changed blocksize * in this case it fallbacks to dmu_write() */ dmu_assign_arcbuf(obj->oo_db, lnb[i].lnb_file_offset, - (void *)lnb[i].dentry, oh->ot_tx); + lnb[i].lnb_data, oh->ot_tx); /* drop the reference, otherwise osd_put_bufs() * will be releasing it - bad! */ - lnb[i].dentry = NULL; - cfs_atomic_dec(&osd->od_zerocopy_loan); + lnb[i].lnb_data = NULL; + atomic_dec(&osd->od_zerocopy_loan); } - if (new_size < lnb[i].lnb_file_offset + lnb[i].len) - new_size = lnb[i].lnb_file_offset + lnb[i].len; + if (new_size < lnb[i].lnb_file_offset + lnb[i].lnb_len) + new_size = lnb[i].lnb_file_offset + lnb[i].lnb_len; } if (unlikely(new_size == 0)) { @@ -692,17 +703,17 @@ static int osd_write_commit(const struct lu_env *env, struct dt_object *dt, RETURN(0); } - cfs_write_lock(&obj->oo_attr_lock); + write_lock(&obj->oo_attr_lock); if (obj->oo_attr.la_size < new_size) { obj->oo_attr.la_size = new_size; - cfs_write_unlock(&obj->oo_attr_lock); + write_unlock(&obj->oo_attr_lock); /* osd_object_sa_update() will be copying directly from * oo_attr into dbuf. any update within a single txg will copy * the most actual */ rc = osd_object_sa_update(obj, SA_ZPL_SIZE(uos), &obj->oo_attr.la_size, 8, oh); } else { - cfs_write_unlock(&obj->oo_attr_lock); + write_unlock(&obj->oo_attr_lock); } RETURN(rc); @@ -720,20 +731,20 @@ static int osd_read_prep(const struct lu_env *env, struct dt_object *dt, LASSERT(obj->oo_db); for (i = 0; i < npages; i++) { - buf.lb_buf = kmap(lnb[i].page); - buf.lb_len = lnb[i].len; + buf.lb_buf = kmap(lnb[i].lnb_page); + buf.lb_len = lnb[i].lnb_len; offset = lnb[i].lnb_file_offset; CDEBUG(D_OTHER, "read %u bytes at %u\n", - (unsigned) lnb[i].len, + (unsigned) lnb[i].lnb_len, (unsigned) lnb[i].lnb_file_offset); - lnb[i].rc = osd_read(env, dt, &buf, &offset, NULL); - kunmap(lnb[i].page); + lnb[i].lnb_rc = osd_read(env, dt, &buf, &offset, NULL); + kunmap(lnb[i].lnb_page); - if (lnb[i].rc < buf.lb_len) { + if (lnb[i].lnb_rc < buf.lb_len) { /* all subsequent rc should be 0 */ while (++i < npages) - lnb[i].rc = 0; + lnb[i].lnb_rc = 0; break; } } @@ -793,21 +804,21 @@ static int osd_punch(const struct lu_env *env, struct dt_object *dt, LASSERT(th != NULL); oh = container_of0(th, struct osd_thandle, ot_super); - cfs_write_lock(&obj->oo_attr_lock); + write_lock(&obj->oo_attr_lock); /* truncate */ if (end == OBD_OBJECT_EOF || end >= obj->oo_attr.la_size) len = DMU_OBJECT_END; else len = end - start; - cfs_write_unlock(&obj->oo_attr_lock); + write_unlock(&obj->oo_attr_lock); rc = __osd_object_punch(osd->od_objset.os, obj->oo_db, oh->ot_tx, obj->oo_attr.la_size, start, len); /* set new size */ if (len == DMU_OBJECT_END) { - cfs_write_lock(&obj->oo_attr_lock); + write_lock(&obj->oo_attr_lock); obj->oo_attr.la_size = start; - cfs_write_unlock(&obj->oo_attr_lock); + write_unlock(&obj->oo_attr_lock); rc = osd_object_sa_update(obj, SA_ZPL_SIZE(uos), &obj->oo_attr.la_size, 8, oh); } @@ -825,7 +836,7 @@ static int osd_declare_punch(const struct lu_env *env, struct dt_object *dt, oh = container_of0(handle, struct osd_thandle, ot_super); - cfs_read_lock(&obj->oo_attr_lock); + read_lock(&obj->oo_attr_lock); if (end == OBD_OBJECT_EOF || end >= obj->oo_attr.la_size) len = DMU_OBJECT_END; else @@ -833,10 +844,10 @@ static int osd_declare_punch(const struct lu_env *env, struct dt_object *dt, /* declare we'll free some blocks ... */ if (start < obj->oo_attr.la_size) { - cfs_read_unlock(&obj->oo_attr_lock); + read_unlock(&obj->oo_attr_lock); dmu_tx_hold_free(oh->ot_tx, obj->oo_db->db_object, start, len); } else { - cfs_read_unlock(&obj->oo_attr_lock); + read_unlock(&obj->oo_attr_lock); } /* ... and we'll modify size attribute */ @@ -858,7 +869,7 @@ struct dt_body_operations osd_body_ops = { .dbo_declare_write_commit = osd_declare_write_commit, .dbo_write_commit = osd_write_commit, .dbo_read_prep = osd_read_prep, - .do_declare_punch = osd_declare_punch, - .do_punch = osd_punch, + .dbo_declare_punch = osd_declare_punch, + .dbo_punch = osd_punch, };