+
+static void ll_file_put_pages(struct page **pages, int numpages)
+{
+ int i;
+ struct page **pp;
+ ENTRY;
+
+ for (i = 0, pp = pages; i < numpages; i++, pp++) {
+ if (*pp) {
+ LL_CDEBUG_PAGE(D_PAGE, (*pp), "free\n");
+ __ll_put_llap(*pp);
+ if (page_private(*pp))
+ CERROR("the llap wasn't freed\n");
+ (*pp)->mapping = NULL;
+ if (page_count(*pp) != 1)
+ CERROR("page %p, flags %#lx, count %i, private %p\n",
+ (*pp), (unsigned long)(*pp)->flags, page_count(*pp),
+ (void*)page_private(*pp));
+ __free_pages(*pp, 0);
+ }
+ }
+ OBD_FREE(pages, numpages * sizeof(struct page*));
+ EXIT;
+}
+
+static struct page **ll_file_prepare_pages(int numpages, struct inode *inode,
+ unsigned long first)
+{
+ struct page **pages;
+ int i;
+ int rc = 0;
+ ENTRY;
+
+ OBD_ALLOC(pages, sizeof(struct page *) * numpages);
+ if (pages == NULL)
+ RETURN(ERR_PTR(-ENOMEM));
+ for (i = 0; i < numpages; i++) {
+ struct page *page;
+ struct ll_async_page *llap;
+
+ page = alloc_pages(GFP_HIGHUSER, 0);
+ if (page == NULL)
+ GOTO(err, rc = -ENOMEM);
+ pages[i] = page;
+ /* llap_from_page needs page index and mapping to be set */
+ page->index = first++;
+ page->mapping = inode->i_mapping;
+ llap = llap_from_page(page, LLAP_ORIGIN_LOCKLESS_IO);
+ if (IS_ERR(llap))
+ GOTO(err, rc = PTR_ERR(llap));
+ llap->llap_lockless_io_page = 1;
+ }
+ RETURN(pages);
+err:
+ ll_file_put_pages(pages, numpages);
+ RETURN(ERR_PTR(rc));
+ }
+
+static ssize_t ll_file_copy_pages(struct page **pages, int numpages,
+ char *buf, loff_t pos, size_t count, int rw)
+{
+ ssize_t amount = 0;
+ int i;
+ int updatechecksum = ll_i2sbi(pages[0]->mapping->host)->ll_flags &
+ LL_SBI_CHECKSUM;
+ ENTRY;
+
+ for (i = 0; i < numpages; i++) {
+ unsigned offset, bytes, left;
+ char *vaddr;
+
+ vaddr = kmap(pages[i]);
+ offset = pos & (CFS_PAGE_SIZE - 1);
+ bytes = min_t(unsigned, CFS_PAGE_SIZE - offset, count);
+ LL_CDEBUG_PAGE(D_PAGE, pages[i], "op = %s, addr = %p, "
+ "buf = %p, bytes = %u\n",
+ (rw == WRITE) ? "CFU" : "CTU",
+ vaddr + offset, buf, bytes);
+ if (rw == WRITE) {
+ left = copy_from_user(vaddr + offset, buf, bytes);
+ if (updatechecksum) {
+ struct ll_async_page *llap;
+
+ llap = llap_cast_private(pages[i]);
+ llap->llap_checksum = crc32_le(0, vaddr,
+ CFS_PAGE_SIZE);
+ }
+ } else {
+ left = copy_to_user(buf, vaddr + offset, bytes);
+ }
+ kunmap(pages[i]);
+ amount += bytes;
+ if (left) {
+ amount -= left;
+ break;
+ }
+ buf += bytes;
+ count -= bytes;
+ pos += bytes;
+ }
+ if (amount == 0)
+ RETURN(-EFAULT);
+ RETURN(amount);
+}
+
+static int ll_file_oig_pages(struct inode * inode, struct page **pages,
+ int numpages, loff_t pos, size_t count, int rw)
+{
+ struct obd_io_group *oig;
+ struct ll_inode_info *lli = ll_i2info(inode);
+ struct obd_export *exp;
+ loff_t org_pos = pos;
+ obd_flag brw_flags;
+ int rc;
+ int i;
+ ENTRY;
+
+ exp = ll_i2dtexp(inode);
+ if (exp == NULL)
+ RETURN(-EINVAL);
+ rc = oig_init(&oig);
+ if (rc)
+ RETURN(rc);
+ brw_flags = OBD_BRW_SRVLOCK;
+ if (capable(CAP_SYS_RESOURCE))
+ brw_flags |= OBD_BRW_NOQUOTA;
+
+ for (i = 0; i < numpages; i++) {
+ struct ll_async_page *llap;
+ unsigned from, bytes;
+
+ from = pos & (CFS_PAGE_SIZE - 1);
+ bytes = min_t(unsigned, CFS_PAGE_SIZE - from,
+ count - pos + org_pos);
+ llap = llap_cast_private(pages[i]);
+ LASSERT(llap);
+
+ lock_page(pages[i]);
+
+ LL_CDEBUG_PAGE(D_PAGE, pages[i], "offset "LPU64","
+ " from %u, bytes = %u\n",
+ pos, from, bytes);
+ LASSERTF(pos >> CFS_PAGE_SHIFT == pages[i]->index,
+ "wrong page index %lu (%lu)\n",
+ pages[i]->index,
+ (unsigned long)(pos >> CFS_PAGE_SHIFT));
+ rc = obd_queue_group_io(exp, lli->lli_smd, NULL, oig,
+ llap->llap_cookie,
+ (rw == WRITE) ?
+ OBD_BRW_WRITE:OBD_BRW_READ,
+ from, bytes, brw_flags,
+ ASYNC_READY | ASYNC_URGENT |
+ ASYNC_COUNT_STABLE | ASYNC_GROUP_SYNC);
+ if (rc) {
+ i++;
+ GOTO(out, rc);
+ }
+ pos += bytes;
+ }
+ rc = obd_trigger_group_io(exp, lli->lli_smd, NULL, oig);
+ if (rc)
+ GOTO(out, rc);
+ rc = oig_wait(oig);
+out:
+ while(--i >= 0)
+ unlock_page(pages[i]);
+ oig_release(oig);
+ RETURN(rc);
+}
+
+ssize_t ll_file_lockless_io(struct file *file, char *buf, size_t count,
+ loff_t *ppos, int rw)
+{
+ loff_t pos;
+ struct inode *inode = file->f_dentry->d_inode;
+ ssize_t rc = 0;
+ int max_pages;
+ size_t amount = 0;
+ unsigned long first, last;
+ ENTRY;
+
+ if (rw == READ) {
+ loff_t isize;
+
+ ll_inode_size_lock(inode, 0);
+ isize = i_size_read(inode);
+ ll_inode_size_unlock(inode, 0);
+ if (*ppos >= isize)
+ GOTO(out, rc = 0);
+ if (*ppos + count >= isize)
+ count -= *ppos + count - isize;
+ if (count == 0)
+ GOTO(out, rc);
+ } else {
+ rc = generic_write_checks(file, ppos, &count, 0);
+ if (rc)
+ GOTO(out, rc);
+ rc = remove_suid(file->f_dentry);
+ if (rc)
+ GOTO(out, rc);
+ }
+ pos = *ppos;
+ first = pos >> CFS_PAGE_SHIFT;
+ last = (pos + count - 1) >> CFS_PAGE_SHIFT;
+ max_pages = PTLRPC_MAX_BRW_PAGES *
+ ll_i2info(inode)->lli_smd->lsm_stripe_count;
+ CDEBUG(D_INFO, "%u, stripe_count = %u\n",
+ PTLRPC_MAX_BRW_PAGES /* max_pages_per_rpc */,
+ ll_i2info(inode)->lli_smd->lsm_stripe_count);
+
+ while (first <= last && rc >= 0) {
+ int pages_for_io;
+ struct page **pages;
+ size_t bytes = count - amount;
+
+ pages_for_io = min_t(int, last - first + 1, max_pages);
+ pages = ll_file_prepare_pages(pages_for_io, inode, first);
+ if (IS_ERR(pages)) {
+ rc = PTR_ERR(pages);
+ break;
+ }
+ if (rw == WRITE) {
+ rc = ll_file_copy_pages(pages, pages_for_io, buf,
+ pos + amount, bytes, rw);
+ if (rc < 0)
+ GOTO(put_pages, rc);
+ bytes = rc;
+ }
+ rc = ll_file_oig_pages(inode, pages, pages_for_io,
+ pos + amount, bytes, rw);
+ if (rc)
+ GOTO(put_pages, rc);
+ if (rw == READ) {
+ rc = ll_file_copy_pages(pages, pages_for_io, buf,
+ pos + amount, bytes, rw);
+ if (rc < 0)
+ GOTO(put_pages, rc);
+ bytes = rc;
+ }
+ amount += bytes;
+ buf += bytes;
+put_pages:
+ ll_file_put_pages(pages, pages_for_io);
+ first += pages_for_io;
+ /* a short read/write check */
+ if (pos + amount < ((loff_t)first << CFS_PAGE_SHIFT))
+ break;
+ }
+ /* NOTE: don't update i_size and KMS in absence of LDLM locks even
+ * write makes the file large */
+ file_accessed(file);
+ if (rw == READ && amount < count && rc == 0) {
+ unsigned long not_cleared;
+
+ not_cleared = clear_user(buf, count - amount);
+ amount = count - not_cleared;
+ if (not_cleared)
+ rc = -EFAULT;
+ }
+ if (amount > 0) {
+ lprocfs_counter_add(ll_i2sbi(inode)->ll_stats,
+ (rw == WRITE) ?
+ LPROC_LL_LOCKLESS_WRITE :
+ LPROC_LL_LOCKLESS_READ,
+ (long)amount);
+ *ppos += amount;
+ RETURN(amount);
+ }
+out:
+ RETURN(rc);
+}