+ sbi = ll_i2sbi(inode);
+ LASSERT(ll_async_page_slab);
+ LASSERTF(origin < LLAP__ORIGIN_MAX, "%u\n", origin);
+
+ llap = llap_cast_private(page);
+ if (llap != NULL) {
+ /* move to end of LRU list, except when page is just about to
+ * die */
+ if (origin != LLAP_ORIGIN_REMOVEPAGE) {
+ spin_lock(&sbi->ll_lock);
+ sbi->ll_pglist_gen++;
+ list_del_init(&llap->llap_pglist_item);
+ list_add_tail(&llap->llap_pglist_item, &sbi->ll_pglist);
+ spin_unlock(&sbi->ll_lock);
+ }
+ GOTO(out, llap);
+ }
+
+ exp = ll_i2dtexp(page->mapping->host);
+ if (exp == NULL)
+ RETURN(ERR_PTR(-EINVAL));
+
+ /* limit the number of lustre-cached pages */
+ if (sbi->ll_async_page_count >= sbi->ll_async_page_max)
+ llap_shrink_cache(sbi, 0);
+
+ OBD_SLAB_ALLOC(llap, ll_async_page_slab, CFS_ALLOC_STD,
+ ll_async_page_slab_size);
+ if (llap == NULL)
+ RETURN(ERR_PTR(-ENOMEM));
+ llap->llap_magic = LLAP_MAGIC;
+ llap->llap_cookie = (void *)llap + size_round(sizeof(*llap));
+
+ rc = obd_prep_async_page(exp, ll_i2info(inode)->lli_smd, NULL, page,
+ (obd_off)page->index << CFS_PAGE_SHIFT,
+ &ll_async_page_ops, llap, &llap->llap_cookie);
+ if (rc) {
+ OBD_SLAB_FREE(llap, ll_async_page_slab,
+ ll_async_page_slab_size);
+ RETURN(ERR_PTR(rc));
+ }
+
+ CDEBUG(D_CACHE, "llap %p page %p cookie %p obj off "LPU64"\n", llap,
+ page, llap->llap_cookie, (obd_off)page->index << CFS_PAGE_SHIFT);
+ /* also zeroing the PRIVBITS low order bitflags */
+ __set_page_ll_data(page, llap);
+ llap->llap_page = page;
+ spin_lock(&sbi->ll_lock);
+ sbi->ll_pglist_gen++;
+ sbi->ll_async_page_count++;
+ list_add_tail(&llap->llap_pglist_item, &sbi->ll_pglist);
+ INIT_LIST_HEAD(&llap->llap_pending_write);
+ spin_unlock(&sbi->ll_lock);
+
+ out:
+ if (unlikely(sbi->ll_flags & LL_SBI_CHECKSUM)) {
+ __u32 csum = 0;
+ csum = crc32_le(csum, kmap(page), CFS_PAGE_SIZE);
+ kunmap(page);
+ if (origin == LLAP_ORIGIN_READAHEAD ||
+ origin == LLAP_ORIGIN_READPAGE) {
+ llap->llap_checksum = 0;
+ } else if (origin == LLAP_ORIGIN_COMMIT_WRITE ||
+ llap->llap_checksum == 0) {
+ llap->llap_checksum = csum;
+ CDEBUG(D_PAGE, "page %p cksum %x\n", page, csum);
+ } else if (llap->llap_checksum == csum) {
+ /* origin == LLAP_ORIGIN_WRITEPAGE */
+ CDEBUG(D_PAGE, "page %p cksum %x confirmed\n",
+ page, csum);
+ } else {
+ /* origin == LLAP_ORIGIN_WRITEPAGE */
+ LL_CDEBUG_PAGE(D_ERROR, page, "old cksum %x != new "
+ "%x!\n", llap->llap_checksum, csum);
+ }
+ }
+
+ llap->llap_origin = origin;
+ RETURN(llap);
+}
+
+static int queue_or_sync_write(struct obd_export *exp, struct inode *inode,
+ struct ll_async_page *llap,
+ unsigned to, obd_flag async_flags)
+{
+ unsigned long size_index = i_size_read(inode) >> CFS_PAGE_SHIFT;
+ struct obd_io_group *oig;
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
+ int rc, noquot = llap->llap_ignore_quota ? OBD_BRW_NOQUOTA : 0;
+ ENTRY;
+
+ /* _make_ready only sees llap once we've unlocked the page */
+ llap->llap_write_queued = 1;
+ rc = obd_queue_async_io(exp, ll_i2info(inode)->lli_smd, NULL,
+ llap->llap_cookie, OBD_BRW_WRITE | noquot,
+ 0, 0, 0, async_flags);
+ if (rc == 0) {
+ LL_CDEBUG_PAGE(D_PAGE, llap->llap_page, "write queued\n");
+ GOTO(out, 0);
+ }
+
+ llap->llap_write_queued = 0;
+ /* Do not pass llap here as it is sync write. */
+ llap_write_pending(inode, NULL);
+
+ rc = oig_init(&oig);
+ if (rc)
+ GOTO(out, rc);
+
+ /* make full-page requests if we are not at EOF (bug 4410) */
+ if (to != CFS_PAGE_SIZE && llap->llap_page->index < size_index) {
+ LL_CDEBUG_PAGE(D_PAGE, llap->llap_page,
+ "sync write before EOF: size_index %lu, to %d\n",
+ size_index, to);
+ to = CFS_PAGE_SIZE;
+ } else if (to != CFS_PAGE_SIZE && llap->llap_page->index == size_index) {
+ int size_to = i_size_read(inode) & ~CFS_PAGE_MASK;
+ LL_CDEBUG_PAGE(D_PAGE, llap->llap_page,
+ "sync write at EOF: size_index %lu, to %d/%d\n",
+ size_index, to, size_to);
+ if (to < size_to)
+ to = size_to;
+ }
+
+ /* compare the checksum once before the page leaves llite */
+ if (unlikely((sbi->ll_flags & LL_SBI_CHECKSUM) &&
+ llap->llap_checksum != 0)) {
+ __u32 csum = 0;
+ struct page *page = llap->llap_page;
+ csum = crc32_le(csum, kmap(page), CFS_PAGE_SIZE);
+ kunmap(page);
+ if (llap->llap_checksum == csum) {
+ CDEBUG(D_PAGE, "page %p cksum %x confirmed\n",
+ page, csum);
+ } else {
+ CERROR("page %p old cksum %x != new cksum %x!\n",
+ page, llap->llap_checksum, csum);
+ }
+ }
+
+ rc = obd_queue_group_io(exp, ll_i2info(inode)->lli_smd, NULL, oig,
+ llap->llap_cookie, OBD_BRW_WRITE | noquot,
+ 0, to, 0, ASYNC_READY | ASYNC_URGENT |
+ ASYNC_COUNT_STABLE | ASYNC_GROUP_SYNC);
+ if (rc)
+ GOTO(free_oig, rc);
+
+ rc = obd_trigger_group_io(exp, ll_i2info(inode)->lli_smd, NULL, oig);
+ if (rc)
+ GOTO(free_oig, rc);
+
+ rc = oig_wait(oig);
+
+ if (!rc && async_flags & ASYNC_READY) {
+ unlock_page(llap->llap_page);
+ if (PageWriteback(llap->llap_page)) {
+ end_page_writeback(llap->llap_page);
+ }
+ }
+
+ if (rc == 0 && llap_write_complete(inode, llap))
+ ll_queue_done_writing(inode, 0);
+
+ LL_CDEBUG_PAGE(D_PAGE, llap->llap_page, "sync write returned %d\n", rc);
+
+free_oig:
+ oig_release(oig);
+out:
+ RETURN(rc);