X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fosd-zfs%2Fosd_io.c;h=8825892c52df7053db26ec068d564397c12828f7;hb=6c3606b79a4c25946255a680f79ff2d58df66bbc;hp=4dafcfb08bdb8e1569a1d80abc227e9a4df4dc46;hpb=88c5beb51d24423d31fbb1f41fa5f7ab501becd2;p=fs%2Flustre-release.git diff --git a/lustre/osd-zfs/osd_io.c b/lustre/osd-zfs/osd_io.c index 4dafcfb..8825892 100644 --- a/lustre/osd-zfs/osd_io.c +++ b/lustre/osd-zfs/osd_io.c @@ -116,6 +116,7 @@ static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { struct osd_object *obj = osd_dt_obj(dt); + struct osd_device *osd = osd_obj2dev(obj); struct osd_thandle *oh; uint64_t oid; ENTRY; @@ -142,7 +143,12 @@ static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt, dmu_tx_hold_write(oh->ot_tx, oid, pos, size); - RETURN(0); + /* dt_declare_write() is usually called for system objects, such + * as llog or last_rcvd files. We needn't enforce quota on those + * objects, so always set the lqi_space as 0. */ + RETURN(osd_declare_quota(env, osd, obj->oo_attr.la_uid, + obj->oo_attr.la_gid, 0, oh, true, NULL, + false)); } static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt, @@ -291,7 +297,7 @@ static int osd_bufs_get_read(const struct lu_env *env, struct osd_object *obj, lnb->rc = 0; lnb->lnb_file_offset = off; - lnb->offset = bufoff & ~CFS_PAGE_MASK; + lnb->lnb_page_offset = bufoff & ~CFS_PAGE_MASK; lnb->len = thispage; lnb->page = kmem_to_page(dbp[i]->db_data + bufoff); @@ -357,7 +363,7 @@ static int osd_bufs_get_write(const struct lu_env *env, struct osd_object *obj, plen = min_t(int, sz_in_block, CFS_PAGE_SIZE); lnb[i].lnb_file_offset = off; - lnb[i].offset = 0; + lnb[i].lnb_page_offset = 0; lnb[i].len = plen; lnb[i].rc = 0; if (sz_in_block == bs) @@ -391,7 +397,7 @@ static int osd_bufs_get_write(const struct lu_env *env, struct osd_object *obj, plen = min_t(int, sz_in_block, CFS_PAGE_SIZE); lnb[i].lnb_file_offset = off; - lnb[i].offset = 0; + lnb[i].lnb_page_offset = 0; lnb[i].len = plen; lnb[i].rc = 0; lnb[i].dentry = NULL; @@ -452,16 +458,86 @@ static int osd_write_prep(const struct lu_env *env, struct dt_object *dt, return 0; } +/* Return number of blocks that aren't mapped in the [start, start + size] + * region */ +static int osd_count_not_mapped(struct osd_object *obj, uint64_t start, + uint32_t size) +{ + dmu_buf_impl_t *dbi = (dmu_buf_impl_t *)obj->oo_db; + dmu_buf_impl_t *db; + dnode_t *dn; + uint32_t blkshift; + uint64_t end, blkid; + int rc; + ENTRY; + + DB_DNODE_ENTER(dbi); + dn = DB_DNODE(dbi); + + if (dn->dn_maxblkid == 0) { + if (start + size <= dn->dn_datablksz) + GOTO(out, size = 0); + if (start < dn->dn_datablksz) + start = dn->dn_datablksz; + /* assume largest block size */ + blkshift = SPA_MAXBLOCKSHIFT; + } else { + /* blocksize can't change */ + blkshift = dn->dn_datablkshift; + } + + /* compute address of last block */ + end = (start + size - 1) >> blkshift; + /* align start on block boundaries */ + start >>= blkshift; + + /* size is null, can't be mapped */ + if (obj->oo_attr.la_size == 0 || dn->dn_maxblkid == 0) + GOTO(out, size = (end - start + 1) << blkshift); + + /* beyond EOF, can't be mapped */ + if (start > dn->dn_maxblkid) + GOTO(out, size = (end - start + 1) << blkshift); + + size = 0; + for (blkid = start; blkid <= end; blkid++) { + if (blkid == dn->dn_maxblkid) + /* this one is mapped for sure */ + continue; + if (blkid > dn->dn_maxblkid) { + size += (end - blkid + 1) << blkshift; + GOTO(out, size); + } + + rc = dbuf_hold_impl(dn, 0, blkid, TRUE, FTAG, &db); + if (rc) { + /* for ENOENT (block not mapped) and any other errors, + * assume the block isn't mapped */ + size += 1 << blkshift; + continue; + } + dbuf_rele(db, FTAG); + } + + GOTO(out, size); +out: + DB_DNODE_EXIT(dbi); + return size; +} + static int osd_declare_write_commit(const struct lu_env *env, struct dt_object *dt, struct niobuf_local *lnb, int npages, struct thandle *th) { struct osd_object *obj = osd_dt_obj(dt); + struct osd_device *osd = osd_obj2dev(obj); struct osd_thandle *oh; uint64_t offset = 0; uint32_t size = 0; - int i; + int i, rc, flags = 0; + bool ignore_quota = false, synced = false; + long long space = 0; ENTRY; LASSERT(dt_object_exists(dt)); @@ -479,6 +555,14 @@ static int osd_declare_write_commit(const struct lu_env *env, * skipped in osd_write_commit(). Hence we skip pages * with lnb_rc != 0 here too */ continue; + /* ignore quota for the whole request if any page is from + * client cache or written by root. + * + * XXX we could handle this on per-lnb basis as done by + * grant. */ + if ((lnb[i].flags & OBD_BRW_NOQUOTA) || + !(lnb[i].flags & OBD_BRW_SYNC)) + ignore_quota = true; if (size == 0) { /* first valid lnb */ offset = lnb[i].lnb_file_offset; @@ -493,18 +577,55 @@ static int osd_declare_write_commit(const struct lu_env *env, dmu_tx_hold_write(oh->ot_tx, obj->oo_db->db_object, offset,size); + /* estimating space that will be consumed by a write is rather + * complicated with ZFS. As a consequence, we don't account for + * indirect blocks and quota overrun will be adjusted once the + * operation is committed, if required. */ + space += osd_count_not_mapped(obj, offset, size); + offset = lnb->lnb_file_offset; size = lnb->len; } - if (size) + if (size) { dmu_tx_hold_write(oh->ot_tx, obj->oo_db->db_object, offset,size); + space += osd_count_not_mapped(obj, offset, size); + } dmu_tx_hold_sa(oh->ot_tx, obj->oo_sa_hdl, 0); oh->ot_write_commit = 1; /* used in osd_trans_start() for fail_loc */ - RETURN(0); + /* backend zfs filesystem might be configured to store multiple data + * copies */ + space *= osd->od_objset.os->os_copies; + space = toqb(space); + CDEBUG(D_QUOTA, "writting %d pages, reserving "LPD64"K of quota " + "space\n", npages, space); + +retry: + /* acquire quota space if needed */ + rc = osd_declare_quota(env, osd, obj->oo_attr.la_uid, + obj->oo_attr.la_gid, space, oh, true, &flags, + ignore_quota); + + if (!synced && rc == -EDQUOT && (flags & QUOTA_FL_SYNC) != 0) { + dt_sync(env, th->th_dev); + synced = true; + CDEBUG(D_QUOTA, "retry after sync\n"); + flags = 0; + goto retry; + } + + /* we need only to store the overquota flags in the first lnb for + * now, once we support multiple objects BRW, this code needs be + * revised. */ + if (flags & QUOTA_FL_OVER_USRQUOTA) + lnb[0].flags |= OBD_BRW_OVER_USRQUOTA; + if (flags & QUOTA_FL_OVER_GRPQUOTA) + lnb[0].flags |= OBD_BRW_OVER_GRPQUOTA; + + RETURN(rc); } static int osd_write_commit(const struct lu_env *env, struct dt_object *dt, @@ -697,6 +818,7 @@ static int osd_declare_punch(const struct lu_env *env, struct dt_object *dt, __u64 start, __u64 end, struct thandle *handle) { struct osd_object *obj = osd_dt_obj(dt); + struct osd_device *osd = osd_obj2dev(obj); struct osd_thandle *oh; __u64 len; ENTRY; @@ -720,7 +842,9 @@ static int osd_declare_punch(const struct lu_env *env, struct dt_object *dt, /* ... and we'll modify size attribute */ dmu_tx_hold_sa(oh->ot_tx, obj->oo_sa_hdl, 0); - RETURN(0); + RETURN(osd_declare_quota(env, osd, obj->oo_attr.la_uid, + obj->oo_attr.la_gid, 0, oh, true, NULL, + false)); }