RETURN(0);
}
-static int ll_file_get_tree_lock(struct ll_lock_tree *tree, struct file *file,
- const char *buf, size_t count,
- loff_t start, loff_t end, int rw)
+static int ll_file_get_tree_lock_iov(struct ll_lock_tree *tree,
+ struct file *file, const struct iovec *iov,
+ unsigned long nr_segs,
+ loff_t start, loff_t end, int rw)
{
int append;
int tree_locked = 0;
GOTO(out, rc);
}
tree->lt_fd = LUSTRE_FPRIVATE(file);
- rc = ll_tree_lock(tree, node, buf, count, ast_flags);
+ rc = ll_tree_lock_iov(tree, node, iov, nr_segs, ast_flags);
if (rc == 0)
tree_locked = 1;
else if (rc == -EUSERS)
return rc;
}
-static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
- loff_t *ppos)
+/* XXX: exact copy from kernel code (__generic_file_aio_write_nolock from rhel4)
+ */
+static size_t ll_file_get_iov_count(const struct iovec *iov,
+ unsigned long *nr_segs)
+{
+ size_t count = 0;
+ unsigned long seg;
+
+ for (seg = 0; seg < *nr_segs; seg++) {
+ const struct iovec *iv = &iov[seg];
+
+ /*
+ * If any segment has a negative length, or the cumulative
+ * length ever wraps negative then return -EINVAL.
+ */
+ count += iv->iov_len;
+ if (unlikely((ssize_t)(count|iv->iov_len) < 0))
+ return -EINVAL;
+ if (access_ok(VERIFY_WRITE, iv->iov_base, iv->iov_len))
+ continue;
+ if (seg == 0)
+ return -EFAULT;
+ *nr_segs = seg;
+ count -= iv->iov_len; /* This segment is no good */
+ break;
+ }
+ return count;
+}
+
+static int iov_copy_update(unsigned long *nr_segs, const struct iovec **iov_out,
+ unsigned long *nrsegs_copy,
+ struct iovec *iov_copy, size_t *offset,
+ size_t size)
{
+ int i;
+ const struct iovec *iov = *iov_out;
+ for (i = 0; i < *nr_segs;
+ i++) {
+ const struct iovec *iv = &iov[i];
+ struct iovec *ivc = &iov_copy[i];
+ *ivc = *iv;
+ if (i == 0) {
+ ivc->iov_len -= *offset;
+ ivc->iov_base += *offset;
+ }
+ if (ivc->iov_len > size) {
+ ivc->iov_len = size;
+ if (i == 0)
+ *offset += size;
+ else
+ *offset = size;
+ break;
+ }
+ size -= ivc->iov_len;
+ }
+ *iov_out += i;
+ *nr_segs -= i;
+ *nrsegs_copy = i + 1;
+
+ return 0;
+}
+
+#ifdef HAVE_FILE_READV
+static ssize_t ll_file_readv(struct file *file, const struct iovec *iov,
+ unsigned long nr_segs, loff_t *ppos)
+{
+#else
+static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
+ unsigned long nr_segs, loff_t pos)
+{
+ struct file *file = iocb->ki_filp;
+ loff_t *ppos = &iocb->ki_pos;
+#endif
struct inode *inode = file->f_dentry->d_inode;
struct ll_inode_info *lli = ll_i2info(inode);
struct lov_stripe_md *lsm = lli->lli_smd;
loff_t end;
ssize_t retval, chunk, sum = 0;
int tree_locked;
-
+ struct iovec *iov_copy = NULL;
+ unsigned long nrsegs_copy, nrsegs_orig = 0;
+ size_t count, iov_offset = 0;
__u64 kms;
ENTRY;
+
+ count = ll_file_get_iov_count(iov, &nr_segs);
CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
inode->i_ino, inode->i_generation, inode, count, *ppos);
/* "If nbyte is 0, read() will return 0 and have no other results."
count = i_size_read(inode) - *ppos;
/* Make sure to correctly adjust the file pos pointer for
* EFAULT case */
- notzeroed = clear_user(buf, count);
- count -= notzeroed;
- *ppos += count;
- if (!count)
+ for (nrsegs_copy = 0; nrsegs_copy < nr_segs; nrsegs_copy++) {
+ const struct iovec *iv = &iov[nrsegs_copy];
+
+ if (count < iv->iov_len)
+ chunk = count;
+ else
+ chunk = iv->iov_len;
+ notzeroed = clear_user(iv->iov_base, chunk);
+ sum += (chunk - notzeroed);
+ count -= (chunk - notzeroed);
+ if (notzeroed || !count)
+ break;
+ }
+ *ppos += sum;
+ if (!sum)
RETURN(-EFAULT);
- RETURN(count);
+ RETURN(sum);
}
repeat:
if (sbi->ll_max_rw_chunk != 0) {
/* and chunk shouldn't be too large even if striping is wide */
if (end - *ppos > sbi->ll_max_rw_chunk)
end = *ppos + sbi->ll_max_rw_chunk - 1;
+
+ chunk = end - *ppos + 1;
+ if ((count == chunk) && (iov_offset == 0)) {
+ if (iov_copy)
+ OBD_FREE(iov_copy, sizeof(iov) * nrsegs_orig);
+
+ iov_copy = (struct iovec *)iov;
+ nrsegs_copy = nr_segs;
+ } else {
+ if (!iov_copy) {
+ nrsegs_orig = nr_segs;
+ OBD_ALLOC(iov_copy, sizeof(iov) * nr_segs);
+ if (!iov_copy)
+ GOTO(out, retval = -ENOMEM);
+ }
+
+ iov_copy_update(&nr_segs, &iov, &nrsegs_copy, iov_copy,
+ &iov_offset, chunk);
+ }
+
} else {
end = *ppos + count - 1;
+ iov_copy = (struct iovec *)iov;
+ nrsegs_copy = nr_segs;
}
- tree_locked = ll_file_get_tree_lock(&tree, file, buf,
- count, *ppos, end, READ);
+ tree_locked = ll_file_get_tree_lock_iov(&tree, file, iov_copy,
+ nrsegs_copy, *ppos, end, READ);
if (tree_locked < 0)
GOTO(out, retval = tree_locked);
/* BUG: 5972 */
file_accessed(file);
- retval = generic_file_read(file, buf, chunk, ppos);
+#ifdef HAVE_FILE_READV
+ retval = generic_file_readv(file, iov_copy, nrsegs_copy, ppos);
+#else
+ retval = generic_file_aio_read(iocb, iov_copy, nrsegs_copy,
+ *ppos);
+#endif
ll_tree_unlock(&tree);
} else {
- retval = ll_file_lockless_io(file, buf, chunk, ppos, READ);
+ retval = ll_file_lockless_io(file, iov_copy, nrsegs_copy, ppos,
+ READ, chunk);
}
ll_rw_stats_tally(sbi, current->pid, file, count, 0);
if (retval > 0) {
- buf += retval;
count -= retval;
sum += retval;
if (retval == chunk && count > 0)
if (ra != 0)
ll_ra_read_ex(file, &bead);
retval = (sum > 0) ? sum : retval;
+
+ if (iov_copy && iov_copy != iov)
+ OBD_FREE(iov_copy, sizeof(iov) * nrsegs_orig);
+
RETURN(retval);
}
+static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
+ loff_t *ppos)
+{
+ struct iovec local_iov = { .iov_base = (void __user *)buf,
+ .iov_len = count };
+#ifdef HAVE_FILE_READV
+ return ll_file_readv(file, &local_iov, 1, ppos);
+#else
+ struct kiocb kiocb;
+ ssize_t ret;
+
+ init_sync_kiocb(&kiocb, file);
+ kiocb.ki_pos = *ppos;
+ kiocb.ki_left = count;
+
+ ret = ll_file_aio_read(&kiocb, &local_iov, 1, kiocb.ki_pos);
+ *ppos = kiocb.ki_pos;
+ return ret;
+#endif
+}
+
/*
* Write to a file (through the page cache).
*/
-static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
- loff_t *ppos)
+#ifdef HAVE_FILE_WRITEV
+static ssize_t ll_file_writev(struct file *file, const struct iovec *iov,
+ unsigned long nr_segs, loff_t *ppos)
{
+#else /* AIO stuff */
+static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
+ unsigned long nr_segs, loff_t pos)
+{
+ struct file *file = iocb->ki_filp;
+ loff_t *ppos = &iocb->ki_pos;
+#endif
struct inode *inode = file->f_dentry->d_inode;
struct ll_sb_info *sbi = ll_i2sbi(inode);
struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
loff_t lock_start, lock_end, end;
ssize_t retval, chunk, sum = 0;
int tree_locked;
+ struct iovec *iov_copy = NULL;
+ unsigned long nrsegs_copy, nrsegs_orig = 0;
+ size_t count, iov_offset = 0;
ENTRY;
+ count = ll_file_get_iov_count(iov, &nr_segs);
+
CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
inode->i_ino, inode->i_generation, inode, count, *ppos);
if (file->f_flags & O_APPEND) {
lock_start = 0;
lock_end = OBD_OBJECT_EOF;
+ iov_copy = (struct iovec *)iov;
+ nrsegs_copy = nr_segs;
} else if (sbi->ll_max_rw_chunk != 0) {
/* first, let's know the end of the current stripe */
end = *ppos;
end = *ppos + sbi->ll_max_rw_chunk - 1;
lock_start = *ppos;
lock_end = end;
+ chunk = end - *ppos + 1;
+ if ((count == chunk) && (iov_offset == 0)) {
+ if (iov_copy)
+ OBD_FREE(iov_copy, sizeof(iov) * nrsegs_orig);
+
+ iov_copy = (struct iovec *)iov;
+ nrsegs_copy = nr_segs;
+ } else {
+ if (!iov_copy) {
+ nrsegs_orig = nr_segs;
+ OBD_ALLOC(iov_copy, sizeof(iov) * nr_segs);
+ if (!iov_copy)
+ GOTO(out, retval = -ENOMEM);
+ }
+
+ iov_copy_update(&nr_segs, &iov, &nrsegs_copy, iov_copy,
+ &iov_offset, chunk);
+ }
} else {
lock_start = *ppos;
- lock_end = *ppos + count - 1;
+ lock_end = end;
+ iov_copy = (struct iovec *)iov;
+ nrsegs_copy = nr_segs;
}
- tree_locked = ll_file_get_tree_lock(&tree, file, buf, count,
- lock_start, lock_end, WRITE);
+ tree_locked = ll_file_get_tree_lock_iov(&tree, file, iov_copy,
+ nrsegs_copy, lock_start,
+ lock_end, WRITE);
if (tree_locked < 0)
GOTO(out, retval = tree_locked);
CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
inode->i_ino, chunk, *ppos);
if (tree_locked)
- retval = generic_file_write(file, buf, chunk, ppos);
+#ifdef HAVE_FILE_WRITEV
+ retval = generic_file_writev(file, iov_copy, nrsegs_copy, ppos);
+#else
+ retval = generic_file_aio_write(iocb, iov_copy, nrsegs_copy,
+ *ppos);
+#endif
else
- retval = ll_file_lockless_io(file, (char*)buf, chunk,
- ppos, WRITE);
+ retval = ll_file_lockless_io(file, iov_copy, nrsegs_copy,
+ ppos, WRITE, chunk);
ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, chunk, 1);
out_unlock:
out:
if (retval > 0) {
- buf += retval;
count -= retval;
sum += retval;
if (retval == chunk && count > 0)
up(&ll_i2info(inode)->lli_write_sem);
+ if (iov_copy && iov_copy != iov)
+ OBD_FREE(iov_copy, sizeof(iov) * nrsegs_orig);
+
retval = (sum > 0) ? sum : retval;
ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
retval > 0 ? retval : 0);
RETURN(retval);
}
+static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
+ loff_t *ppos)
+{
+ struct iovec local_iov = { .iov_base = (void __user *)buf,
+ .iov_len = count };
+
+#ifdef HAVE_FILE_WRITEV
+ return ll_file_writev(file, &local_iov, 1, ppos);
+#else
+ struct kiocb kiocb;
+ ssize_t ret;
+
+ init_sync_kiocb(&kiocb, file);
+ kiocb.ki_pos = *ppos;
+ kiocb.ki_left = count;
+
+ ret = ll_file_aio_write(&kiocb, &local_iov, 1, kiocb.ki_pos);
+ *ppos = kiocb.ki_pos;
+
+ return ret;
+#endif
+}
+
/*
* Send file content (through pagecache) somewhere with helper
*/
/* -o localflock - only provides locally consistent flock locks */
struct file_operations ll_file_operations = {
.read = ll_file_read,
+#ifdef HAVE_FILE_READV
+ .readv = ll_file_readv,
+#else
+ .aio_read = ll_file_aio_read,
+#endif
.write = ll_file_write,
+#ifdef HAVE_FILE_WRITEV
+ .writev = ll_file_writev,
+#else
+ .aio_write = ll_file_aio_write,
+#endif
.ioctl = ll_file_ioctl,
.open = ll_file_open,
.release = ll_file_release,
struct file_operations ll_file_operations_flock = {
.read = ll_file_read,
+#ifdef HAVE_FILE_READV
+ .readv = ll_file_readv,
+#else
+ .aio_read = ll_file_aio_read,
+#endif
.write = ll_file_write,
+#ifdef HAVE_FILE_WRITEV
+ .writev = ll_file_writev,
+#else
+ .aio_write = ll_file_aio_write,
+#endif
.ioctl = ll_file_ioctl,
.open = ll_file_open,
.release = ll_file_release,
/* These are for -o noflock - to return ENOSYS on flock calls */
struct file_operations ll_file_operations_noflock = {
.read = ll_file_read,
+#ifdef HAVE_FILE_READV
+ .readv = ll_file_readv,
+#else
+ .aio_read = ll_file_aio_read,
+#endif
.write = ll_file_write,
+#ifdef HAVE_FILE_WRITEV
+ .writev = ll_file_writev,
+#else
+ .aio_write = ll_file_aio_write,
+#endif
.ioctl = ll_file_ioctl,
.open = ll_file_open,
.release = ll_file_release,
}
static ssize_t ll_file_copy_pages(struct page **pages, int numpages,
- char *buf, loff_t pos, size_t count,
+ const struct iovec *iov, unsigned long nsegs,
+ ssize_t iov_offset, loff_t pos, size_t count,
int rw)
{
ssize_t amount = 0;
ENTRY;
for (i = 0; i < numpages; i++) {
- unsigned offset, bytes, left;
+ unsigned offset, bytes, left = 0;
char *vaddr;
vaddr = kmap(pages[i]);
offset = pos & (CFS_PAGE_SIZE - 1);
bytes = min_t(unsigned, CFS_PAGE_SIZE - offset, count);
LL_CDEBUG_PAGE(D_PAGE, pages[i], "op = %s, addr = %p, "
- "buf = %p, bytes = %u\n",
+ "bytes = %u\n",
(rw == WRITE) ? "CFU" : "CTU",
- vaddr + offset, buf, bytes);
- if (rw == WRITE) {
- left = copy_from_user(vaddr + offset, buf, bytes);
- if (updatechecksum) {
- struct ll_async_page *llap;
-
- llap = llap_cast_private(pages[i]);
- llap->llap_checksum =
- init_checksum(OSC_DEFAULT_CKSUM);
- llap->llap_checksum =
- compute_checksum(llap->llap_checksum,
- vaddr, CFS_PAGE_SIZE,
- OSC_DEFAULT_CKSUM);
+ vaddr + offset, bytes);
+ while (bytes > 0 && !left && nsegs) {
+ unsigned copy = min_t(ssize_t, bytes,
+ iov->iov_len - iov_offset);
+ if (rw == WRITE) {
+ left = copy_from_user(vaddr + offset,
+ iov->iov_base +iov_offset,
+ copy);
+ if (updatechecksum) {
+ struct ll_async_page *llap;
+
+ llap = llap_cast_private(pages[i]);
+ llap->llap_checksum =
+ init_checksum(OSC_DEFAULT_CKSUM);
+ llap->llap_checksum =
+ compute_checksum(llap->llap_checksum,
+ vaddr,CFS_PAGE_SIZE,
+ OSC_DEFAULT_CKSUM);
+ }
+ } else {
+ left = copy_to_user(iov->iov_base + iov_offset,
+ vaddr + offset, copy);
+ }
+
+ amount += copy;
+ count -= copy;
+ pos += copy;
+ iov_offset += copy;
+ bytes -= copy;
+ if (iov_offset == iov->iov_len) {
+ iov_offset = 0;
+ iov++;
+ nsegs--;
}
- } else {
- left = copy_to_user(buf, vaddr + offset, bytes);
}
kunmap(pages[i]);
- amount += bytes;
if (left) {
amount -= left;
break;
}
- buf += bytes;
- count -= bytes;
- pos += bytes;
}
if (amount == 0)
RETURN(-EFAULT);
RETURN(rc);
}
-ssize_t ll_file_lockless_io(struct file *file, char *buf, size_t count,
- loff_t *ppos, int rw)
+/* Advance through passed iov, adjust iov pointer as necessary and return
+ * starting offset in individual entry we are pointing at. Also reduce
+ * nr_segs as needed */
+static ssize_t ll_iov_advance(const struct iovec **iov, unsigned long *nr_segs,
+ ssize_t offset)
+{
+ while (*nr_segs > 0) {
+ if ((*iov)->iov_len > offset)
+ return ((*iov)->iov_len - offset);
+ offset -= (*iov)->iov_len;
+ (*iov)++;
+ (*nr_segs)--;
+ }
+ return 0;
+}
+
+ssize_t ll_file_lockless_io(struct file *file, const struct iovec *iov,
+ unsigned long nr_segs,
+ loff_t *ppos, int rw, ssize_t count)
{
loff_t pos;
struct inode *inode = file->f_dentry->d_inode;
int max_pages;
size_t amount = 0;
unsigned long first, last;
+ const struct iovec *iv = &iov[0];
+ unsigned long nsegs = nr_segs;
+ unsigned long offset = 0;
ENTRY;
if (rw == READ) {
if (rc)
GOTO(out, rc);
}
+
pos = *ppos;
first = pos >> CFS_PAGE_SHIFT;
last = (pos + count - 1) >> CFS_PAGE_SHIFT;
break;
}
if (rw == WRITE) {
- rc = ll_file_copy_pages(pages, pages_for_io, buf,
- pos + amount, bytes, rw);
+ rc = ll_file_copy_pages(pages, pages_for_io, iv, nsegs,
+ offset, pos + amount, bytes,
+ rw);
if (rc < 0)
GOTO(put_pages, rc);
+ offset = ll_iov_advance(&iv, &nsegs, offset + rc);
bytes = rc;
}
rc = ll_file_oig_pages(inode, pages, pages_for_io,
if (rc)
GOTO(put_pages, rc);
if (rw == READ) {
- rc = ll_file_copy_pages(pages, pages_for_io, buf,
- pos + amount, bytes, rw);
+ rc = ll_file_copy_pages(pages, pages_for_io, iv, nsegs,
+ offset, pos + amount, bytes, rw);
if (rc < 0)
GOTO(put_pages, rc);
+ offset = ll_iov_advance(&iv, &nsegs, offset + rc);
bytes = rc;
}
amount += bytes;
- buf += bytes;
put_pages:
ll_file_put_pages(pages, pages_for_io);
first += pages_for_io;
/* a short read/write check */
if (pos + amount < ((loff_t)first << CFS_PAGE_SHIFT))
break;
+ /* Check if we are out of userspace buffers. (how that could
+ happen?) */
+ if (nsegs == 0)
+ break;
}
/* NOTE: don't update i_size and KMS in absence of LDLM locks even
* write makes the file large */
file_accessed(file);
if (rw == READ && amount < count && rc == 0) {
unsigned long not_cleared;
-
- not_cleared = clear_user(buf, count - amount);
- amount = count - not_cleared;
- if (not_cleared)
- rc = -EFAULT;
+
+ while (nsegs > 0) {
+ ssize_t to_clear = min_t(ssize_t, count - amount,
+ iv->iov_len - offset);
+ not_cleared = clear_user(iv->iov_base + offset,
+ to_clear);
+ amount += to_clear - not_cleared;
+ if (not_cleared) {
+ rc = -EFAULT;
+ break;
+ }
+ offset = 0;
+ iv++;
+ nsegs--;
+ }
}
if (amount > 0) {
lprocfs_counter_add(ll_i2sbi(inode)->ll_stats,