X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;ds=sidebyside;f=lustre%2Fosd-zfs%2Fosd_io.c;h=e315fc188db6f2b3c3a3f42a27b883319a78a07f;hb=dcd2b724b7c1e90b5f71625657dc6fb6107cfbe0;hp=4dafcfb08bdb8e1569a1d80abc227e9a4df4dc46;hpb=88c5beb51d24423d31fbb1f41fa5f7ab501becd2;p=fs%2Flustre-release.git diff --git a/lustre/osd-zfs/osd_io.c b/lustre/osd-zfs/osd_io.c index 4dafcfb..e315fc1 100644 --- a/lustre/osd-zfs/osd_io.c +++ b/lustre/osd-zfs/osd_io.c @@ -28,7 +28,7 @@ * Use is subject to license terms. */ /* - * Copyright (c) 2011, 2012 Whamcloud, Inc. + * Copyright (c) 2012, 2013, Intel Corporation. * Use is subject to license terms. */ /* @@ -86,9 +86,9 @@ static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt, LASSERT(dt_object_exists(dt)); LASSERT(obj->oo_db); - cfs_read_lock(&obj->oo_attr_lock); + read_lock(&obj->oo_attr_lock); old_size = obj->oo_attr.la_size; - cfs_read_unlock(&obj->oo_attr_lock); + read_unlock(&obj->oo_attr_lock); if (*pos + size > old_size) { if (old_size < *pos) @@ -116,6 +116,7 @@ static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { struct osd_object *obj = osd_dt_obj(dt); + struct osd_device *osd = osd_obj2dev(obj); struct osd_thandle *oh; uint64_t oid; ENTRY; @@ -142,7 +143,12 @@ static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt, dmu_tx_hold_write(oh->ot_tx, oid, pos, size); - RETURN(0); + /* dt_declare_write() is usually called for system objects, such + * as llog or last_rcvd files. We needn't enforce quota on those + * objects, so always set the lqi_space as 0. */ + RETURN(osd_declare_quota(env, osd, obj->oo_attr.la_uid, + obj->oo_attr.la_gid, 0, oh, true, NULL, + false)); } static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt, @@ -166,10 +172,10 @@ static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt, dmu_write(osd->od_objset.os, obj->oo_db->db_object, offset, (uint64_t)buf->lb_len, buf->lb_buf, oh->ot_tx); - cfs_write_lock(&obj->oo_attr_lock); + write_lock(&obj->oo_attr_lock); if (obj->oo_attr.la_size < offset + buf->lb_len) { obj->oo_attr.la_size = offset + buf->lb_len; - cfs_write_unlock(&obj->oo_attr_lock); + write_unlock(&obj->oo_attr_lock); /* osd_object_sa_update() will be copying directly from oo_attr * into dbuf. any update within a single txg will copy the * most actual */ @@ -178,7 +184,7 @@ static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt, if (unlikely(rc)) GOTO(out, rc); } else { - cfs_write_unlock(&obj->oo_attr_lock); + write_unlock(&obj->oo_attr_lock); } *pos += buf->lb_len; @@ -265,9 +271,10 @@ static int osd_bufs_get_read(const struct lu_env *env, struct osd_object *obj, */ while (len > 0) { rc = -dmu_buf_hold_array_by_bonus(obj->oo_db, off, len, TRUE, - osd_zerocopy_tag, &numbufs, - &dbp); - LASSERT(rc == 0); + osd_zerocopy_tag, &numbufs, + &dbp); + if (unlikely(rc)) + GOTO(err, rc); for (i = 0; i < numbufs; i++) { int bufoff, tocpy, thispage; @@ -291,7 +298,7 @@ static int osd_bufs_get_read(const struct lu_env *env, struct osd_object *obj, lnb->rc = 0; lnb->lnb_file_offset = off; - lnb->offset = bufoff & ~CFS_PAGE_MASK; + lnb->lnb_page_offset = bufoff & ~CFS_PAGE_MASK; lnb->len = thispage; lnb->page = kmem_to_page(dbp[i]->db_data + bufoff); @@ -317,6 +324,11 @@ static int osd_bufs_get_read(const struct lu_env *env, struct osd_object *obj, } RETURN(npages); + +err: + LASSERT(rc < 0); + osd_bufs_put(env, &obj->oo_dt, lnb - npages, npages); + RETURN(rc); } static int osd_bufs_get_write(const struct lu_env *env, struct osd_object *obj, @@ -357,7 +369,7 @@ static int osd_bufs_get_write(const struct lu_env *env, struct osd_object *obj, plen = min_t(int, sz_in_block, CFS_PAGE_SIZE); lnb[i].lnb_file_offset = off; - lnb[i].offset = 0; + lnb[i].lnb_page_offset = 0; lnb[i].len = plen; lnb[i].rc = 0; if (sz_in_block == bs) @@ -391,7 +403,7 @@ static int osd_bufs_get_write(const struct lu_env *env, struct osd_object *obj, plen = min_t(int, sz_in_block, CFS_PAGE_SIZE); lnb[i].lnb_file_offset = off; - lnb[i].offset = 0; + lnb[i].lnb_page_offset = 0; lnb[i].len = plen; lnb[i].rc = 0; lnb[i].dentry = NULL; @@ -452,16 +464,86 @@ static int osd_write_prep(const struct lu_env *env, struct dt_object *dt, return 0; } +/* Return number of blocks that aren't mapped in the [start, start + size] + * region */ +static int osd_count_not_mapped(struct osd_object *obj, uint64_t start, + uint32_t size) +{ + dmu_buf_impl_t *dbi = (dmu_buf_impl_t *)obj->oo_db; + dmu_buf_impl_t *db; + dnode_t *dn; + uint32_t blkshift; + uint64_t end, blkid; + int rc; + ENTRY; + + DB_DNODE_ENTER(dbi); + dn = DB_DNODE(dbi); + + if (dn->dn_maxblkid == 0) { + if (start + size <= dn->dn_datablksz) + GOTO(out, size = 0); + if (start < dn->dn_datablksz) + start = dn->dn_datablksz; + /* assume largest block size */ + blkshift = SPA_MAXBLOCKSHIFT; + } else { + /* blocksize can't change */ + blkshift = dn->dn_datablkshift; + } + + /* compute address of last block */ + end = (start + size - 1) >> blkshift; + /* align start on block boundaries */ + start >>= blkshift; + + /* size is null, can't be mapped */ + if (obj->oo_attr.la_size == 0 || dn->dn_maxblkid == 0) + GOTO(out, size = (end - start + 1) << blkshift); + + /* beyond EOF, can't be mapped */ + if (start > dn->dn_maxblkid) + GOTO(out, size = (end - start + 1) << blkshift); + + size = 0; + for (blkid = start; blkid <= end; blkid++) { + if (blkid == dn->dn_maxblkid) + /* this one is mapped for sure */ + continue; + if (blkid > dn->dn_maxblkid) { + size += (end - blkid + 1) << blkshift; + GOTO(out, size); + } + + rc = dbuf_hold_impl(dn, 0, blkid, TRUE, FTAG, &db); + if (rc) { + /* for ENOENT (block not mapped) and any other errors, + * assume the block isn't mapped */ + size += 1 << blkshift; + continue; + } + dbuf_rele(db, FTAG); + } + + GOTO(out, size); +out: + DB_DNODE_EXIT(dbi); + return size; +} + static int osd_declare_write_commit(const struct lu_env *env, struct dt_object *dt, struct niobuf_local *lnb, int npages, struct thandle *th) { struct osd_object *obj = osd_dt_obj(dt); + struct osd_device *osd = osd_obj2dev(obj); struct osd_thandle *oh; uint64_t offset = 0; uint32_t size = 0; - int i; + int i, rc, flags = 0; + bool ignore_quota = false, synced = false; + long long space = 0; ENTRY; LASSERT(dt_object_exists(dt)); @@ -479,6 +561,19 @@ static int osd_declare_write_commit(const struct lu_env *env, * skipped in osd_write_commit(). Hence we skip pages * with lnb_rc != 0 here too */ continue; + /* ignore quota for the whole request if any page is from + * client cache or written by root. + * + * XXX once we drop the 1.8 client support, the checking + * for whether page is from cache can be simplified as: + * !(lnb[i].flags & OBD_BRW_SYNC) + * + * XXX we could handle this on per-lnb basis as done by + * grant. */ + if ((lnb[i].flags & OBD_BRW_NOQUOTA) || + (lnb[i].flags & (OBD_BRW_FROM_GRANT | OBD_BRW_SYNC)) == + OBD_BRW_FROM_GRANT) + ignore_quota = true; if (size == 0) { /* first valid lnb */ offset = lnb[i].lnb_file_offset; @@ -493,18 +588,55 @@ static int osd_declare_write_commit(const struct lu_env *env, dmu_tx_hold_write(oh->ot_tx, obj->oo_db->db_object, offset,size); + /* estimating space that will be consumed by a write is rather + * complicated with ZFS. As a consequence, we don't account for + * indirect blocks and quota overrun will be adjusted once the + * operation is committed, if required. */ + space += osd_count_not_mapped(obj, offset, size); + offset = lnb->lnb_file_offset; size = lnb->len; } - if (size) + if (size) { dmu_tx_hold_write(oh->ot_tx, obj->oo_db->db_object, offset,size); + space += osd_count_not_mapped(obj, offset, size); + } dmu_tx_hold_sa(oh->ot_tx, obj->oo_sa_hdl, 0); oh->ot_write_commit = 1; /* used in osd_trans_start() for fail_loc */ - RETURN(0); + /* backend zfs filesystem might be configured to store multiple data + * copies */ + space *= osd->od_objset.os->os_copies; + space = toqb(space); + CDEBUG(D_QUOTA, "writting %d pages, reserving "LPD64"K of quota " + "space\n", npages, space); + +retry: + /* acquire quota space if needed */ + rc = osd_declare_quota(env, osd, obj->oo_attr.la_uid, + obj->oo_attr.la_gid, space, oh, true, &flags, + ignore_quota); + + if (!synced && rc == -EDQUOT && (flags & QUOTA_FL_SYNC) != 0) { + dt_sync(env, th->th_dev); + synced = true; + CDEBUG(D_QUOTA, "retry after sync\n"); + flags = 0; + goto retry; + } + + /* we need only to store the overquota flags in the first lnb for + * now, once we support multiple objects BRW, this code needs be + * revised. */ + if (flags & QUOTA_FL_OVER_USRQUOTA) + lnb[0].flags |= OBD_BRW_OVER_USRQUOTA; + if (flags & QUOTA_FL_OVER_GRPQUOTA) + lnb[0].flags |= OBD_BRW_OVER_GRPQUOTA; + + RETURN(rc); } static int osd_write_commit(const struct lu_env *env, struct dt_object *dt, @@ -571,17 +703,17 @@ static int osd_write_commit(const struct lu_env *env, struct dt_object *dt, RETURN(0); } - cfs_write_lock(&obj->oo_attr_lock); + write_lock(&obj->oo_attr_lock); if (obj->oo_attr.la_size < new_size) { obj->oo_attr.la_size = new_size; - cfs_write_unlock(&obj->oo_attr_lock); + write_unlock(&obj->oo_attr_lock); /* osd_object_sa_update() will be copying directly from * oo_attr into dbuf. any update within a single txg will copy * the most actual */ rc = osd_object_sa_update(obj, SA_ZPL_SIZE(uos), &obj->oo_attr.la_size, 8, oh); } else { - cfs_write_unlock(&obj->oo_attr_lock); + write_unlock(&obj->oo_attr_lock); } RETURN(rc); @@ -672,21 +804,21 @@ static int osd_punch(const struct lu_env *env, struct dt_object *dt, LASSERT(th != NULL); oh = container_of0(th, struct osd_thandle, ot_super); - cfs_write_lock(&obj->oo_attr_lock); + write_lock(&obj->oo_attr_lock); /* truncate */ if (end == OBD_OBJECT_EOF || end >= obj->oo_attr.la_size) len = DMU_OBJECT_END; else len = end - start; - cfs_write_unlock(&obj->oo_attr_lock); + write_unlock(&obj->oo_attr_lock); rc = __osd_object_punch(osd->od_objset.os, obj->oo_db, oh->ot_tx, obj->oo_attr.la_size, start, len); /* set new size */ if (len == DMU_OBJECT_END) { - cfs_write_lock(&obj->oo_attr_lock); + write_lock(&obj->oo_attr_lock); obj->oo_attr.la_size = start; - cfs_write_unlock(&obj->oo_attr_lock); + write_unlock(&obj->oo_attr_lock); rc = osd_object_sa_update(obj, SA_ZPL_SIZE(uos), &obj->oo_attr.la_size, 8, oh); } @@ -697,13 +829,14 @@ static int osd_declare_punch(const struct lu_env *env, struct dt_object *dt, __u64 start, __u64 end, struct thandle *handle) { struct osd_object *obj = osd_dt_obj(dt); + struct osd_device *osd = osd_obj2dev(obj); struct osd_thandle *oh; __u64 len; ENTRY; oh = container_of0(handle, struct osd_thandle, ot_super); - cfs_read_lock(&obj->oo_attr_lock); + read_lock(&obj->oo_attr_lock); if (end == OBD_OBJECT_EOF || end >= obj->oo_attr.la_size) len = DMU_OBJECT_END; else @@ -711,16 +844,18 @@ static int osd_declare_punch(const struct lu_env *env, struct dt_object *dt, /* declare we'll free some blocks ... */ if (start < obj->oo_attr.la_size) { - cfs_read_unlock(&obj->oo_attr_lock); + read_unlock(&obj->oo_attr_lock); dmu_tx_hold_free(oh->ot_tx, obj->oo_db->db_object, start, len); } else { - cfs_read_unlock(&obj->oo_attr_lock); + read_unlock(&obj->oo_attr_lock); } /* ... and we'll modify size attribute */ dmu_tx_hold_sa(oh->ot_tx, obj->oo_sa_hdl, 0); - RETURN(0); + RETURN(osd_declare_quota(env, osd, obj->oo_attr.la_uid, + obj->oo_attr.la_gid, 0, oh, true, NULL, + false)); } @@ -734,7 +869,7 @@ struct dt_body_operations osd_body_ops = { .dbo_declare_write_commit = osd_declare_write_commit, .dbo_write_commit = osd_write_commit, .dbo_read_prep = osd_read_prep, - .do_declare_punch = osd_declare_punch, - .do_punch = osd_punch, + .dbo_declare_punch = osd_declare_punch, + .dbo_punch = osd_punch, };