-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
* GPL HEADER START
*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
}
#endif
-static void osd_init_iobuf(struct osd_device *d, struct osd_iobuf *iobuf,int rw)
+static void __osd_init_iobuf(struct osd_device *d, struct osd_iobuf *iobuf,
+ int rw, int line)
{
+ LASSERTF(iobuf->dr_elapsed_valid == 0,
+ "iobuf %p, reqs %d, rw %d, line %d\n", iobuf,
+ cfs_atomic_read(&iobuf->dr_numreqs), iobuf->dr_rw,
+ iobuf->dr_init_at);
+
cfs_waitq_init(&iobuf->dr_wait);
cfs_atomic_set(&iobuf->dr_numreqs, 0);
iobuf->dr_max_pages = PTLRPC_MAX_BRW_PAGES;
iobuf->dr_frags = 0;
iobuf->dr_elapsed = 0;
/* must be counted before, so assert */
- LASSERT(iobuf->dr_elapsed_valid == 0);
iobuf->dr_rw = rw;
+ iobuf->dr_init_at = line;
}
+#define osd_init_iobuf(dev,iobuf,rw) __osd_init_iobuf(dev, iobuf, rw, __LINE__)
static void osd_iobuf_add_page(struct osd_iobuf *iobuf, struct page *page)
{
if (error != 0 && iobuf->dr_error == 0)
iobuf->dr_error = error;
- if (cfs_atomic_dec_and_test(&iobuf->dr_numreqs)) {
- iobuf->dr_elapsed = jiffies - iobuf->dr_start_time;
- iobuf->dr_elapsed_valid = 1;
- cfs_waitq_signal(&iobuf->dr_wait);
- }
+ /*
+ * set dr_elapsed before dr_numreqs turns to 0, otherwise
+ * it's possible that service thread will see dr_numreqs
+ * is zero, but dr_elapsed is not set yet, leading to lost
+ * data in this processing and an assertion in a subsequent
+ * call to OSD.
+ */
+ if (cfs_atomic_read(&iobuf->dr_numreqs) == 1) {
+ iobuf->dr_elapsed = jiffies - iobuf->dr_start_time;
+ iobuf->dr_elapsed_valid = 1;
+ }
+ if (cfs_atomic_dec_and_test(&iobuf->dr_numreqs))
+ cfs_waitq_signal(&iobuf->dr_wait);
/* Completed bios used to be chained off iobuf->dr_bios and freed in
* filter_clear_dreq(). It was then possible to exhaust the biovec-256
bio->bi_bdev = inode->i_sb->s_bdev;
bio->bi_sector = sector;
+ bio->bi_rw = (iobuf->dr_rw == 0) ? READ : WRITE;
bio->bi_end_io = dio_complete_routine;
bio->bi_private = iobuf;
if (plen > len)
plen = len;
- lnb->offset = offset;
- /* lnb->lnb_page_offset = poff; */
+ lnb->lnb_file_offset = offset;
+ lnb->lnb_page_offset = poff;
lnb->len = plen;
/* lb->flags = rnb->flags; */
lnb->flags = 0;
/*
* there are following "locks":
* journal_start
- * i_alloc_sem
* i_mutex
* page lock
* needs to keep the pages all aligned properly. */
lnb->dentry = (void *) obj;
- lnb->page = osd_get_page(d, lnb->offset, rw);
+ lnb->page = osd_get_page(d, lnb->lnb_file_offset, rw);
if (lnb->page == NULL)
GOTO(cleanup, rc = -ENOMEM);
long off;
char *p = kmap(lnb[i].page);
- off = lnb[i].offset;
- if (off)
- memset(p, 0, off);
- off = lnb[i].offset + lnb[i].len;
- off &= ~CFS_PAGE_MASK;
+ off = lnb[i].lnb_page_offset;
+ if (off)
+ memset(p, 0, off);
+ off = (lnb[i].lnb_page_offset + lnb[i].len) &
+ ~CFS_PAGE_MASK;
if (off)
memset(p + off, 0, CFS_PAGE_SIZE - off);
kunmap(lnb[i].page);
RETURN(rc);
}
+/* Check if a block is allocated or not */
+static int osd_is_mapped(struct inode *inode, obd_size offset)
+{
+ sector_t (*fs_bmap)(struct address_space *, sector_t);
+
+ fs_bmap = inode->i_mapping->a_ops->bmap;
+
+ /* We can't know if we are overwriting or not */
+ if (unlikely(fs_bmap == NULL))
+ return 0;
+
+ if (i_size_read(inode) == 0)
+ return 0;
+
+ /* Beyond EOF, must not be mapped */
+ if (((i_size_read(inode) - 1) >> inode->i_blkbits) <
+ (offset >> inode->i_blkbits))
+ return 0;
+
+ if (fs_bmap(inode->i_mapping, offset >> inode->i_blkbits) == 0)
+ return 0;
+
+ return 1;
+}
+
static int osd_declare_write_commit(const struct lu_env *env,
struct dt_object *dt,
struct niobuf_local *lnb, int npages,
int depth;
int i;
int newblocks;
- int old;
+ int rc = 0;
+ int flags = 0;
+ bool ignore_quota = false;
+ long long quota_space = 0;
+ ENTRY;
LASSERT(handle != NULL);
oh = container_of0(handle, struct osd_thandle, ot_super);
LASSERT(oh->ot_handle == NULL);
- old = oh->ot_credits;
newblocks = npages;
/* calculate number of extents (probably better to pass nb) */
- for (i = 1; i < npages; i++)
- if (lnb[i].offset !=
- lnb[i - 1].offset + lnb[i - 1].len)
- extents++;
+ for (i = 0; i < npages; i++) {
+ if (i && lnb[i].lnb_file_offset !=
+ lnb[i - 1].lnb_file_offset + lnb[i - 1].len)
+ extents++;
+
+ if (!osd_is_mapped(inode, lnb[i].lnb_file_offset))
+ quota_space += CFS_PAGE_SIZE;
+
+ /* ignore quota for the whole request if any page is from
+ * client cache or written by root.
+ *
+ * XXX we could handle this on per-lnb basis as done by
+ * grant. */
+ if ((lnb[i].flags & OBD_BRW_NOQUOTA) ||
+ !(lnb[i].flags & OBD_BRW_SYNC))
+ ignore_quota = true;
+ }
/*
* each extent can go into new leaf causing a split
oh->ot_credits += depth * extents;
}
+ /* quota space for metadata blocks */
+ quota_space += depth * extents * LDISKFS_BLOCK_SIZE(osd_sb(osd));
+
+ /* quota space should be reported in 1K blocks */
+ quota_space = toqb(quota_space);
+
/* each new block can go in different group (bitmap + gd) */
/* we can't dirty more bitmap blocks than exist */
else
oh->ot_credits += newblocks;
- RETURN(0);
-}
+ /* make sure the over quota flags were not set */
+ lnb[0].flags &= ~(OBD_BRW_OVER_USRQUOTA | OBD_BRW_OVER_GRPQUOTA);
-/* Check if a block is allocated or not */
-static int osd_is_mapped(struct inode *inode, obd_size offset)
-{
- sector_t (*fs_bmap)(struct address_space *, sector_t);
+ rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid,
+ quota_space, oh, true, true, &flags,
+ ignore_quota);
- fs_bmap = inode->i_mapping->a_ops->bmap;
+ /* we need only to store the overquota flags in the first lnb for
+ * now, once we support multiple objects BRW, this code needs be
+ * revised. */
+ if (flags & QUOTA_FL_OVER_USRQUOTA)
+ lnb[0].flags |= OBD_BRW_OVER_USRQUOTA;
+ if (flags & QUOTA_FL_OVER_GRPQUOTA)
+ lnb[0].flags |= OBD_BRW_OVER_GRPQUOTA;
- /* We can't know if we are overwriting or not */
- if (fs_bmap == NULL)
- return 0;
-
- if (fs_bmap(inode->i_mapping, offset >> inode->i_blkbits) == 0)
- return 0;
-
- return 1;
+ RETURN(rc);
}
+/* Check if a block is allocated or not */
static int osd_write_commit(const struct lu_env *env, struct dt_object *dt,
struct niobuf_local *lnb, int npages,
struct thandle *thandle)
osd_init_iobuf(osd, iobuf, 1);
isize = i_size_read(inode);
+ ll_vfs_dq_init(inode);
for (i = 0; i < npages; i++) {
if (lnb[i].rc == -ENOSPC &&
- osd_is_mapped(inode, lnb[i].offset)) {
+ osd_is_mapped(inode, lnb[i].lnb_file_offset)) {
/* Allow the write to proceed if overwriting an
* existing block */
lnb[i].rc = 0;
LASSERT(PageLocked(lnb[i].page));
LASSERT(!PageWriteback(lnb[i].page));
- if (lnb[i].offset + lnb[i].len > isize)
- isize = lnb[i].offset + lnb[i].len;
+ if (lnb[i].lnb_file_offset + lnb[i].len > isize)
+ isize = lnb[i].lnb_file_offset + lnb[i].len;
/*
* Since write and truncate are serialized by oo_sem, even
cfs_gettimeofday(&start);
for (i = 0; i < npages; i++) {
- if (i_size_read(inode) <= lnb[i].offset)
+ if (i_size_read(inode) <= lnb[i].lnb_file_offset)
/* If there's no more data, abort early.
* lnb->rc == 0, so it's easy to detect later. */
break;
if (i_size_read(inode) <
- lnb[i].offset + lnb[i].len - 1)
- lnb[i].rc = i_size_read(inode) - lnb[i].offset;
+ lnb[i].lnb_file_offset + lnb[i].len - 1)
+ lnb[i].rc = i_size_read(inode) - lnb[i].lnb_file_offset;
else
lnb[i].rc = lnb[i].len;
m += lnb[i].len;
/* prevent reading after eof */
cfs_spin_lock(&inode->i_lock);
if (i_size_read(inode) < *offs + size) {
- size = i_size_read(inode) - *offs;
- cfs_spin_unlock(&inode->i_lock);
- if (size < 0) {
- CDEBUG(D_EXT2, "size %llu is too short to read @%llu\n",
- i_size_read(inode), *offs);
- return -EBADR;
- } else if (size == 0) {
- return 0;
- }
+ loff_t diff = i_size_read(inode) - *offs;
+ cfs_spin_unlock(&inode->i_lock);
+ if (diff < 0) {
+ CDEBUG(D_EXT2, "size %llu is too short to read @%llu\n",
+ i_size_read(inode), *offs);
+ return -EBADR;
+ } else if (diff == 0) {
+ return 0;
+ } else {
+ size = diff;
+ }
} else {
cfs_spin_unlock(&inode->i_lock);
}
{
struct osd_thandle *oh;
int credits;
+ struct inode *inode;
+ int rc;
+ ENTRY;
LASSERT(handle != NULL);
else
credits = osd_dto_credits_noquota[DTO_WRITE_BLOCK];
- OSD_DECLARE_OP(oh, write);
- oh->ot_credits += credits;
+ OSD_DECLARE_OP(oh, write, credits);
- if (osd_dt_obj(dt)->oo_inode == NULL)
- return 0;
+ inode = osd_dt_obj(dt)->oo_inode;
- osd_declare_qid(dt, oh, USRQUOTA, osd_dt_obj(dt)->oo_inode->i_uid,
- osd_dt_obj(dt)->oo_inode);
- osd_declare_qid(dt, oh, GRPQUOTA, osd_dt_obj(dt)->oo_inode->i_gid,
- osd_dt_obj(dt)->oo_inode);
- return 0;
+ /* we may declare write to non-exist llog */
+ if (inode == NULL)
+ RETURN(0);
+
+ /* dt_declare_write() is usually called for system objects, such
+ * as llog or last_rcvd files. We needn't enforce quota on those
+ * objects, so always set the lqi_space as 0. */
+ rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0, oh,
+ true, true, NULL, false);
+ RETURN(rc);
}
static int osd_ldiskfs_writelink(struct inode *inode, char *buffer, int buflen)
return 0;
}
-static int osd_ldiskfs_write_record(struct inode *inode, void *buf, int bufsize,
- loff_t *offs, handle_t *handle)
+int osd_ldiskfs_write_record(struct inode *inode, void *buf, int bufsize,
+ int write_NUL, loff_t *offs, handle_t *handle)
{
struct buffer_head *bh = NULL;
loff_t offset = *offs;
int boffs;
int dirty_inode = 0;
+ if (write_NUL) {
+ /*
+ * long symlink write does not count the NUL terminator in
+ * bufsize, we write it, and the inode's file size does not
+ * count the NUL terminator as well.
+ */
+ ((char *)buf)[bufsize] = '\0';
+ ++bufsize;
+ }
while (bufsize > 0) {
if (bh != NULL)
brelse(bh);
if (bh)
brelse(bh);
+ if (write_NUL)
+ --new_size;
/* correct in-core and on-disk sizes */
if (new_size > i_size_read(inode)) {
cfs_spin_lock(&inode->i_lock);
struct thandle *handle, struct lustre_capa *capa,
int ignore_quota)
{
- struct inode *inode = osd_dt_obj(dt)->oo_inode;
- struct osd_thandle *oh;
- ssize_t result;
-#ifdef HAVE_QUOTA_SUPPORT
- cfs_cap_t save = cfs_curproc_cap_pack();
-#endif
+ struct inode *inode = osd_dt_obj(dt)->oo_inode;
+ struct osd_thandle *oh;
+ ssize_t result;
+ int is_link;
LASSERT(dt_object_exists(dt));
return -EACCES;
LASSERT(handle != NULL);
+ LASSERT(inode != NULL);
+ ll_vfs_dq_init(inode);
/* XXX: don't check: one declared chunk can be used many times */
/* OSD_EXEC_OP(handle, write); */
oh = container_of(handle, struct osd_thandle, ot_super);
LASSERT(oh->ot_handle->h_transaction != NULL);
-#ifdef HAVE_QUOTA_SUPPORT
- if (ignore_quota)
- cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
- else
- cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
-#endif
- /* Write small symlink to inode body as we need to maintain correct
- * on-disk symlinks for ldiskfs.
- */
- if (S_ISLNK(dt->do_lu.lo_header->loh_attr) &&
- (buf->lb_len < sizeof(LDISKFS_I(inode)->i_data)))
- result = osd_ldiskfs_writelink(inode, buf->lb_buf, buf->lb_len);
- else
- result = osd_ldiskfs_write_record(inode, buf->lb_buf,
- buf->lb_len, pos,
- oh->ot_handle);
-#ifdef HAVE_QUOTA_SUPPORT
- cfs_curproc_cap_unpack(save);
-#endif
+ /* Write small symlink to inode body as we need to maintain correct
+ * on-disk symlinks for ldiskfs.
+ * Note: the buf->lb_buf contains a NUL terminator while buf->lb_len
+ * does not count it in.
+ */
+ is_link = S_ISLNK(dt->do_lu.lo_header->loh_attr);
+ if (is_link && (buf->lb_len < sizeof(LDISKFS_I(inode)->i_data)))
+ result = osd_ldiskfs_writelink(inode, buf->lb_buf, buf->lb_len);
+ else
+ result = osd_ldiskfs_write_record(inode, buf->lb_buf,
+ buf->lb_len, is_link, pos,
+ oh->ot_handle);
if (result == 0)
result = buf->lb_len;
return result;
__u64 start, __u64 end, struct thandle *th)
{
struct osd_thandle *oh;
+ struct inode *inode;
+ int rc;
ENTRY;
LASSERT(th);
oh = container_of(th, struct osd_thandle, ot_super);
- OSD_DECLARE_OP(oh, punch);
-
/*
* we don't need to reserve credits for whole truncate
* it's not possible as truncate may need to free too many
* orphan list. if needed truncate will extend or restart
* transaction
*/
- oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE];
- oh->ot_credits += 3;
+ OSD_DECLARE_OP(oh, punch,
+ osd_dto_credits_noquota[DTO_ATTR_SET_BASE] + 3);
- RETURN(0);
+ inode = osd_dt_obj(dt)->oo_inode;
+ LASSERT(inode);
+
+ rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0, oh,
+ true, true, NULL, false);
+ RETURN(rc);
}
static int osd_punch(const struct lu_env *env, struct dt_object *dt,
LASSERT(end == OBD_OBJECT_EOF);
LASSERT(dt_object_exists(dt));
LASSERT(osd_invariant(obj));
+ LASSERT(inode != NULL);
+ ll_vfs_dq_init(inode);
LASSERT(th);
oh = container_of(th, struct osd_thandle, ot_super);