X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosd-ldiskfs%2Fosd_io.c;h=5626f5f1bb134a43104928d48c3c62822f2926f3;hp=b7ad5a035d67e587b592b94e701243bc716d5900;hb=94b6f0980fecb1ee08049e0f1da544e4c99f3fc3;hpb=7bb78fa519cd404758c67811c116744bc755b2cf diff --git a/lustre/osd-ldiskfs/osd_io.c b/lustre/osd-ldiskfs/osd_io.c index b7ad5a0..5626f5f 100644 --- a/lustre/osd-ldiskfs/osd_io.c +++ b/lustre/osd-ldiskfs/osd_io.c @@ -93,8 +93,14 @@ int generic_error_remove_page(struct address_space *mapping, struct page *page) } #endif -static void osd_init_iobuf(struct osd_device *d, struct osd_iobuf *iobuf,int rw) +static void __osd_init_iobuf(struct osd_device *d, struct osd_iobuf *iobuf, + int rw, int line) { + LASSERTF(iobuf->dr_elapsed_valid == 0, + "iobuf %p, reqs %d, rw %d, line %d\n", iobuf, + cfs_atomic_read(&iobuf->dr_numreqs), iobuf->dr_rw, + iobuf->dr_init_at); + cfs_waitq_init(&iobuf->dr_wait); cfs_atomic_set(&iobuf->dr_numreqs, 0); iobuf->dr_max_pages = PTLRPC_MAX_BRW_PAGES; @@ -104,9 +110,10 @@ static void osd_init_iobuf(struct osd_device *d, struct osd_iobuf *iobuf,int rw) iobuf->dr_frags = 0; iobuf->dr_elapsed = 0; /* must be counted before, so assert */ - LASSERT(iobuf->dr_elapsed_valid == 0); iobuf->dr_rw = rw; + iobuf->dr_init_at = line; } +#define osd_init_iobuf(dev,iobuf,rw) __osd_init_iobuf(dev, iobuf, rw, __LINE__) static void osd_iobuf_add_page(struct osd_iobuf *iobuf, struct page *page) { @@ -188,11 +195,19 @@ static int dio_complete_routine(struct bio *bio, unsigned int done, int error) if (error != 0 && iobuf->dr_error == 0) iobuf->dr_error = error; - if (cfs_atomic_dec_and_test(&iobuf->dr_numreqs)) { - iobuf->dr_elapsed = jiffies - iobuf->dr_start_time; - iobuf->dr_elapsed_valid = 1; - cfs_waitq_signal(&iobuf->dr_wait); - } + /* + * set dr_elapsed before dr_numreqs turns to 0, otherwise + * it's possible that service thread will see dr_numreqs + * is zero, but dr_elapsed is not set yet, leading to lost + * data in this processing and an assertion in a subsequent + * call to OSD. + */ + if (cfs_atomic_read(&iobuf->dr_numreqs) == 1) { + iobuf->dr_elapsed = jiffies - iobuf->dr_start_time; + iobuf->dr_elapsed_valid = 1; + } + if (cfs_atomic_dec_and_test(&iobuf->dr_numreqs)) + cfs_waitq_signal(&iobuf->dr_wait); /* Completed bios used to be chained off iobuf->dr_bios and freed in * filter_clear_dreq(). It was then possible to exhaust the biovec-256 @@ -351,6 +366,7 @@ static int osd_do_bio(struct osd_device *osd, struct inode *inode, bio->bi_bdev = inode->i_sb->s_bdev; bio->bi_sector = sector; + bio->bi_rw = (iobuf->dr_rw == 0) ? READ : WRITE; bio->bi_end_io = dio_complete_routine; bio->bi_private = iobuf; @@ -394,8 +410,8 @@ static int osd_map_remote_to_local(loff_t offset, ssize_t len, int *nrpages, if (plen > len) plen = len; - lnb->offset = offset; - /* lnb->lnb_page_offset = poff; */ + lnb->lnb_file_offset = offset; + lnb->lnb_page_offset = poff; lnb->len = plen; /* lb->flags = rnb->flags; */ lnb->flags = 0; @@ -466,7 +482,7 @@ int osd_bufs_get(const struct lu_env *env, struct dt_object *d, loff_t pos, * needs to keep the pages all aligned properly. */ lnb->dentry = (void *) obj; - lnb->page = osd_get_page(d, lnb->offset, rw); + lnb->page = osd_get_page(d, lnb->lnb_file_offset, rw); if (lnb->page == NULL) GOTO(cleanup, rc = -ENOMEM); @@ -564,11 +580,11 @@ static int osd_write_prep(const struct lu_env *env, struct dt_object *dt, long off; char *p = kmap(lnb[i].page); - off = lnb[i].offset; - if (off) - memset(p, 0, off); - off = lnb[i].offset + lnb[i].len; - off &= ~CFS_PAGE_MASK; + off = lnb[i].lnb_page_offset; + if (off) + memset(p, 0, off); + off = (lnb[i].lnb_page_offset + lnb[i].len) & + ~CFS_PAGE_MASK; if (off) memset(p + off, 0, CFS_PAGE_SIZE - off); kunmap(lnb[i].page); @@ -593,6 +609,31 @@ static int osd_write_prep(const struct lu_env *env, struct dt_object *dt, RETURN(rc); } +/* Check if a block is allocated or not */ +static int osd_is_mapped(struct inode *inode, obd_size offset) +{ + sector_t (*fs_bmap)(struct address_space *, sector_t); + + fs_bmap = inode->i_mapping->a_ops->bmap; + + /* We can't know if we are overwriting or not */ + if (unlikely(fs_bmap == NULL)) + return 0; + + if (i_size_read(inode) == 0) + return 0; + + /* Beyond EOF, must not be mapped */ + if (((i_size_read(inode) - 1) >> inode->i_blkbits) < + (offset >> inode->i_blkbits)) + return 0; + + if (fs_bmap(inode->i_mapping, offset >> inode->i_blkbits) == 0) + return 0; + + return 1; +} + static int osd_declare_write_commit(const struct lu_env *env, struct dt_object *dt, struct niobuf_local *lnb, int npages, @@ -605,20 +646,36 @@ static int osd_declare_write_commit(const struct lu_env *env, int depth; int i; int newblocks; - int old; + int rc = 0; + int flags = 0; + bool ignore_quota = false; + long long quota_space = 0; + ENTRY; LASSERT(handle != NULL); oh = container_of0(handle, struct osd_thandle, ot_super); LASSERT(oh->ot_handle == NULL); - old = oh->ot_credits; newblocks = npages; /* calculate number of extents (probably better to pass nb) */ - for (i = 1; i < npages; i++) - if (lnb[i].offset != - lnb[i - 1].offset + lnb[i - 1].len) - extents++; + for (i = 0; i < npages; i++) { + if (i && lnb[i].lnb_file_offset != + lnb[i - 1].lnb_file_offset + lnb[i - 1].len) + extents++; + + if (!osd_is_mapped(inode, lnb[i].lnb_file_offset)) + quota_space += CFS_PAGE_SIZE; + + /* ignore quota for the whole request if any page is from + * client cache or written by root. + * + * XXX we could handle this on per-lnb basis as done by + * grant. */ + if ((lnb[i].flags & OBD_BRW_NOQUOTA) || + !(lnb[i].flags & OBD_BRW_SYNC)) + ignore_quota = true; + } /* * each extent can go into new leaf causing a split @@ -642,6 +699,12 @@ static int osd_declare_write_commit(const struct lu_env *env, oh->ot_credits += depth * extents; } + /* quota space for metadata blocks */ + quota_space += depth * extents * LDISKFS_BLOCK_SIZE(osd_sb(osd)); + + /* quota space should be reported in 1K blocks */ + quota_space = toqb(quota_space); + /* each new block can go in different group (bitmap + gd) */ /* we can't dirty more bitmap blocks than exist */ @@ -656,26 +719,25 @@ static int osd_declare_write_commit(const struct lu_env *env, else oh->ot_credits += newblocks; - RETURN(0); -} - -/* Check if a block is allocated or not */ -static int osd_is_mapped(struct inode *inode, obd_size offset) -{ - sector_t (*fs_bmap)(struct address_space *, sector_t); - - fs_bmap = inode->i_mapping->a_ops->bmap; + /* make sure the over quota flags were not set */ + lnb[0].flags &= ~(OBD_BRW_OVER_USRQUOTA | OBD_BRW_OVER_GRPQUOTA); - /* We can't know if we are overwriting or not */ - if (fs_bmap == NULL) - return 0; + rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, + quota_space, oh, true, true, &flags, + ignore_quota); - if (fs_bmap(inode->i_mapping, offset >> inode->i_blkbits) == 0) - return 0; + /* we need only to store the overquota flags in the first lnb for + * now, once we support multiple objects BRW, this code needs be + * revised. */ + if (flags & QUOTA_FL_OVER_USRQUOTA) + lnb[0].flags |= OBD_BRW_OVER_USRQUOTA; + if (flags & QUOTA_FL_OVER_GRPQUOTA) + lnb[0].flags |= OBD_BRW_OVER_GRPQUOTA; - return 1; + RETURN(rc); } +/* Check if a block is allocated or not */ static int osd_write_commit(const struct lu_env *env, struct dt_object *dt, struct niobuf_local *lnb, int npages, struct thandle *thandle) @@ -691,10 +753,11 @@ static int osd_write_commit(const struct lu_env *env, struct dt_object *dt, osd_init_iobuf(osd, iobuf, 1); isize = i_size_read(inode); + ll_vfs_dq_init(inode); for (i = 0; i < npages; i++) { if (lnb[i].rc == -ENOSPC && - osd_is_mapped(inode, lnb[i].offset)) { + osd_is_mapped(inode, lnb[i].lnb_file_offset)) { /* Allow the write to proceed if overwriting an * existing block */ lnb[i].rc = 0; @@ -711,8 +774,8 @@ static int osd_write_commit(const struct lu_env *env, struct dt_object *dt, LASSERT(PageLocked(lnb[i].page)); LASSERT(!PageWriteback(lnb[i].page)); - if (lnb[i].offset + lnb[i].len > isize) - isize = lnb[i].offset + lnb[i].len; + if (lnb[i].lnb_file_offset + lnb[i].len > isize) + isize = lnb[i].lnb_file_offset + lnb[i].len; /* * Since write and truncate are serialized by oo_sem, even @@ -787,14 +850,14 @@ static int osd_read_prep(const struct lu_env *env, struct dt_object *dt, cfs_gettimeofday(&start); for (i = 0; i < npages; i++) { - if (i_size_read(inode) <= lnb[i].offset) + if (i_size_read(inode) <= lnb[i].lnb_file_offset) /* If there's no more data, abort early. * lnb->rc == 0, so it's easy to detect later. */ break; if (i_size_read(inode) < - lnb[i].offset + lnb[i].len - 1) - lnb[i].rc = i_size_read(inode) - lnb[i].offset; + lnb[i].lnb_file_offset + lnb[i].len - 1) + lnb[i].rc = i_size_read(inode) - lnb[i].lnb_file_offset; else lnb[i].rc = lnb[i].len; m += lnb[i].len; @@ -862,15 +925,17 @@ int osd_ldiskfs_read(struct inode *inode, void *buf, int size, loff_t *offs) /* prevent reading after eof */ cfs_spin_lock(&inode->i_lock); if (i_size_read(inode) < *offs + size) { - size = i_size_read(inode) - *offs; - cfs_spin_unlock(&inode->i_lock); - if (size < 0) { - CDEBUG(D_EXT2, "size %llu is too short to read @%llu\n", - i_size_read(inode), *offs); - return -EBADR; - } else if (size == 0) { - return 0; - } + loff_t diff = i_size_read(inode) - *offs; + cfs_spin_unlock(&inode->i_lock); + if (diff < 0) { + CDEBUG(D_EXT2, "size %llu is too short to read @%llu\n", + i_size_read(inode), *offs); + return -EBADR; + } else if (diff == 0) { + return 0; + } else { + size = diff; + } } else { cfs_spin_unlock(&inode->i_lock); } @@ -927,6 +992,9 @@ static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt, { struct osd_thandle *oh; int credits; + struct inode *inode; + int rc; + ENTRY; LASSERT(handle != NULL); @@ -948,14 +1016,18 @@ static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt, OSD_DECLARE_OP(oh, write); oh->ot_credits += credits; - if (osd_dt_obj(dt)->oo_inode == NULL) - return 0; + inode = osd_dt_obj(dt)->oo_inode; - osd_declare_qid(dt, oh, USRQUOTA, osd_dt_obj(dt)->oo_inode->i_uid, - osd_dt_obj(dt)->oo_inode); - osd_declare_qid(dt, oh, GRPQUOTA, osd_dt_obj(dt)->oo_inode->i_gid, - osd_dt_obj(dt)->oo_inode); - return 0; + /* we may declare write to non-exist llog */ + if (inode == NULL) + RETURN(0); + + /* dt_declare_write() is usually called for system objects, such + * as llog or last_rcvd files. We needn't enforce quota on those + * objects, so always set the lqi_space as 0. */ + rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0, oh, + true, true, NULL, false); + RETURN(rc); } static int osd_ldiskfs_writelink(struct inode *inode, char *buffer, int buflen) @@ -1058,9 +1130,6 @@ static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt, struct inode *inode = osd_dt_obj(dt)->oo_inode; struct osd_thandle *oh; ssize_t result; -#ifdef HAVE_QUOTA_SUPPORT - cfs_cap_t save = cfs_curproc_cap_pack(); -#endif int is_link; LASSERT(dt_object_exists(dt)); @@ -1069,18 +1138,14 @@ static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt, return -EACCES; LASSERT(handle != NULL); + LASSERT(inode != NULL); + ll_vfs_dq_init(inode); /* XXX: don't check: one declared chunk can be used many times */ /* OSD_EXEC_OP(handle, write); */ oh = container_of(handle, struct osd_thandle, ot_super); LASSERT(oh->ot_handle->h_transaction != NULL); -#ifdef HAVE_QUOTA_SUPPORT - if (ignore_quota) - cfs_cap_raise(CFS_CAP_SYS_RESOURCE); - else - cfs_cap_lower(CFS_CAP_SYS_RESOURCE); -#endif /* Write small symlink to inode body as we need to maintain correct * on-disk symlinks for ldiskfs. * Note: the buf->lb_buf contains a NUL terminator while buf->lb_len @@ -1093,9 +1158,6 @@ static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt, result = osd_ldiskfs_write_record(inode, buf->lb_buf, buf->lb_len, is_link, pos, oh->ot_handle); -#ifdef HAVE_QUOTA_SUPPORT - cfs_curproc_cap_unpack(save); -#endif if (result == 0) result = buf->lb_len; return result; @@ -1105,6 +1167,8 @@ static int osd_declare_punch(const struct lu_env *env, struct dt_object *dt, __u64 start, __u64 end, struct thandle *th) { struct osd_thandle *oh; + struct inode *inode; + int rc; ENTRY; LASSERT(th); @@ -1123,7 +1187,12 @@ static int osd_declare_punch(const struct lu_env *env, struct dt_object *dt, oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE]; oh->ot_credits += 3; - RETURN(0); + inode = osd_dt_obj(dt)->oo_inode; + LASSERT(inode); + + rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0, oh, + true, true, NULL, false); + RETURN(rc); } static int osd_punch(const struct lu_env *env, struct dt_object *dt, @@ -1141,6 +1210,8 @@ static int osd_punch(const struct lu_env *env, struct dt_object *dt, LASSERT(end == OBD_OBJECT_EOF); LASSERT(dt_object_exists(dt)); LASSERT(osd_invariant(obj)); + LASSERT(inode != NULL); + ll_vfs_dq_init(inode); LASSERT(th); oh = container_of(th, struct osd_thandle, ot_super);