ENTRY;
pg.pg = page;
- pg.disk_offset = pg.page_offset = ((obd_off)page->index) << PAGE_SHIFT;
+ pg.off = ((obd_off)page->index) << PAGE_SHIFT;
- if (cmd == OBD_BRW_WRITE &&
- (pg.disk_offset + PAGE_SIZE > inode->i_size))
+ if (cmd == OBD_BRW_WRITE && (pg.off + PAGE_SIZE > inode->i_size))
pg.count = inode->i_size % PAGE_SIZE;
else
pg.count = PAGE_SIZE;
CDEBUG(D_PAGE, "%s %d bytes ino %lu at "LPU64"/"LPX64"\n",
cmd & OBD_BRW_WRITE ? "write" : "read", pg.count, inode->i_ino,
- pg.disk_offset, pg.disk_offset);
+ pg.off, pg.off);
if (pg.count == 0) {
CERROR("ZERO COUNT: ino %lu: size %p:%Lu(%p:%Lu) idx %lu off "
- LPU64"\n", inode->i_ino, inode, inode->i_size,
- page->mapping->host, page->mapping->host->i_size,
- page->index, pg.disk_offset);
+ LPU64"\n",
+ inode->i_ino, inode, inode->i_size, page->mapping->host,
+ page->mapping->host->i_size, page->index, pg.off);
}
pg.flag = flags;
}
oa.o_id = lsm->lsm_object_id;
- oa.o_valid = OBD_MD_FLID;
+ oa.o_gr = lsm->lsm_object_gr;
+ oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
obdo_from_inode(&oa, inode, OBD_MD_FLTYPE|OBD_MD_FLMODE|OBD_MD_FLATIME|
OBD_MD_FLMTIME | OBD_MD_FLCTIME);
int rc = 0;
ENTRY;
- LASSERT(PageLocked(page));
- (void)llap_cast_private(page); /* assertion */
+ if (!PageLocked(page))
+ LBUG();
/* Check to see if we should return -EIO right away */
pga.pg = page;
- pga.disk_offset = pga.page_offset = offset;
+ pga.off = offset;
pga.count = PAGE_SIZE;
pga.flag = 0;
oa.o_id = lsm->lsm_object_id;
+ oa.o_gr = lsm->lsm_object_gr;
oa.o_mode = inode->i_mode;
- oa.o_valid = OBD_MD_FLID | OBD_MD_FLMODE | OBD_MD_FLTYPE;
+ oa.o_valid = OBD_MD_FLID | OBD_MD_FLMODE
+ | OBD_MD_FLTYPE | OBD_MD_FLGROUP;
rc = obd_brw(OBD_BRW_CHECK, ll_i2obdexp(inode), &oa, lsm, 1, &pga,
NULL);
return rc;
}
+int ll_write_count(struct page *page)
+{
+ struct inode *inode = page->mapping->host;
+
+ /* catch race with truncate */
+ if (((loff_t)page->index << PAGE_SHIFT) >= inode->i_size)
+ return 0;
+
+ /* catch sub-page write at end of file */
+ if (((loff_t)page->index << PAGE_SHIFT) + PAGE_SIZE > inode->i_size)
+ return inode->i_size % PAGE_SIZE;
+
+ return PAGE_SIZE;
+}
+
struct ll_async_page *llap_from_cookie(void *cookie)
{
struct ll_async_page *llap = cookie;
page = llap->llap_page;
- LASSERT(cmd != OBD_BRW_READ);
+ if (cmd == OBD_BRW_READ)
+ RETURN(0);
/* we're trying to write, but the page is locked.. come back later */
if (TryLockPage(page))
RETURN(0);
}
-/* We have two reasons for giving llite the opportunity to change the
- * write length of a given queued page as it builds the RPC containing
- * the page:
- *
- * 1) Further extending writes may have landed in the page cache
- * since a partial write first queued this page requiring us
- * to write more from the page cache.
- * 2) We might have raced with truncate and want to avoid performing
- * write RPCs that are just going to be thrown away by the
- * truncate's punch on the storage targets.
- *
- * The kms serves these purposes as it is set at both truncate and extending
- * writes.
- */
static int ll_ap_refresh_count(void *data, int cmd)
{
struct ll_async_page *llap;
- struct lov_stripe_md *lsm;
- struct page *page;
- __u64 kms;
ENTRY;
/* readpage queues with _COUNT_STABLE, shouldn't get here. */
if (IS_ERR(llap))
RETURN(PTR_ERR(llap));
- page = llap->llap_page;
- lsm = ll_i2info(page->mapping->host)->lli_smd;
- kms = lov_merge_size(lsm, 1);
-
- /* catch race with truncate */
- if (((__u64)page->index << PAGE_SHIFT) >= kms)
- return 0;
-
- /* catch sub-page write at end of file */
- if (((__u64)page->index << PAGE_SHIFT) + PAGE_SIZE > kms)
- return kms % PAGE_SIZE;
-
- return PAGE_SIZE;
+ return ll_write_count(llap->llap_page);
}
void ll_inode_fill_obdo(struct inode *inode, int cmd, struct obdo *oa)
lsm = ll_i2info(inode)->lli_smd;
oa->o_id = lsm->lsm_object_id;
- oa->o_valid = OBD_MD_FLID;
+ oa->o_gr = lsm->lsm_object_gr;
+ oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
valid_flags = OBD_MD_FLTYPE | OBD_MD_FLATIME;
if (cmd == OBD_BRW_WRITE) {
oa->o_valid |= OBD_MD_FLIFID | OBD_MD_FLEPOCH;
mdc_pack_fid(obdo_fid(oa), inode->i_ino, 0, inode->i_mode);
+ obdo_fid(oa)->mds = ll_i2info(inode)->lli_mds;
oa->o_easize = ll_i2info(inode)->lli_io_epoch;
valid_flags |= OBD_MD_FLMTIME | OBD_MD_FLCTIME;
.ap_completion = ll_ap_completion,
};
-struct ll_async_page *llap_cast_private(struct page *page)
-{
- struct ll_async_page *llap = (struct ll_async_page *)page->private;
-
- LASSERTF(llap == NULL || llap->llap_magic == LLAP_MAGIC,
- "page %p private %lu gave magic %d which != %d\n",
- page, page->private, llap->llap_magic, LLAP_MAGIC);
-
- return llap;
-}
-
/* XXX have the exp be an argument? */
struct ll_async_page *llap_from_page(struct page *page)
{
int rc;
ENTRY;
- llap = llap_cast_private(page);
- if (llap != NULL)
+ llap = (struct ll_async_page *)page->private;
+ if (llap != NULL) {
+ if (llap->llap_magic != LLAP_MAGIC)
+ RETURN(ERR_PTR(-EINVAL));
RETURN(llap);
+ }
exp = ll_i2obdexp(page->mapping->host);
if (exp == NULL)
page->private = (unsigned long)llap;
llap->llap_page = page;
- spin_lock(&sbi->ll_lock);
+ spin_lock(&sbi->ll_pglist_lock);
sbi->ll_pglist_gen++;
list_add_tail(&llap->llap_proc_item, &sbi->ll_pglist);
- spin_unlock(&sbi->ll_lock);
+ spin_unlock(&sbi->ll_pglist_lock);
RETURN(llap);
}
RETURN(rc);
}
-static unsigned long ll_ra_count_get(struct ll_sb_info *sbi, unsigned long len)
-{
- unsigned long ret;
- ENTRY;
-
- spin_lock(&sbi->ll_lock);
- ret = min(sbi->ll_max_read_ahead_pages - sbi->ll_read_ahead_pages,
- len);
- sbi->ll_read_ahead_pages += ret;
- spin_unlock(&sbi->ll_lock);
-
- RETURN(ret);
-}
-
-static void ll_ra_count_put(struct ll_sb_info *sbi, unsigned long len)
-{
- spin_lock(&sbi->ll_lock);
- LASSERTF(sbi->ll_read_ahead_pages >= len, "r_a_p %lu len %lu\n",
- sbi->ll_read_ahead_pages, len);
- sbi->ll_read_ahead_pages -= len;
- spin_unlock(&sbi->ll_lock);
-}
-
-/* called for each page in a completed rpc.*/
-void ll_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
-{
- struct ll_async_page *llap;
- struct page *page;
- ENTRY;
-
- llap = llap_from_cookie(data);
- if (IS_ERR(llap)) {
- EXIT;
- return;
- }
-
- page = llap->llap_page;
- LASSERT(PageLocked(page));
-
- LL_CDEBUG_PAGE(D_PAGE, page, "completing cmd %d with %d\n", cmd, rc);
-
- if (cmd == OBD_BRW_READ && llap->llap_defer_uptodate)
- ll_ra_count_put(ll_i2sbi(page->mapping->host), 1);
-
- if (rc == 0) {
- if (cmd == OBD_BRW_READ) {
- if (!llap->llap_defer_uptodate)
- SetPageUptodate(page);
- } else {
- llap->llap_write_queued = 0;
- }
- ClearPageError(page);
- } else {
- if (cmd == OBD_BRW_READ)
- llap->llap_defer_uptodate = 0;
- SetPageError(page);
- }
-
- unlock_page(page);
-
- if (0 && cmd == OBD_BRW_WRITE) {
- llap_write_complete(page->mapping->host, llap);
- ll_try_done_writing(page->mapping->host);
- }
-
- page_cache_release(page);
- EXIT;
-}
-
/* the kernel calls us here when a page is unhashed from the page cache.
* the page will be locked and the kernel is holding a spinlock, so
* we need to be careful. we're just tearing down our book-keeping
* is providing exclusivity to memory pressure/truncate/writeback..*/
page->private = 0;
- spin_lock(&sbi->ll_lock);
+ spin_lock(&sbi->ll_pglist_lock);
if (!list_empty(&llap->llap_proc_item))
list_del_init(&llap->llap_proc_item);
sbi->ll_pglist_gen++;
- spin_unlock(&sbi->ll_lock);
+ spin_unlock(&sbi->ll_pglist_lock);
OBD_FREE(llap, sizeof(*llap));
EXIT;
}
int flags, matches;
ENTRY;
- if (fd_flags & LL_FILE_GROUP_LOCKED)
+ if (fd_flags & LL_FILE_CW_LOCKED)
RETURN(1);
page_extent.l_extent.start = (__u64)page->index << PAGE_CACHE_SHIFT;
llap->llap_defer_uptodate = defer;
rc = obd_queue_group_io(exp, ll_i2info(page->mapping->host)->lli_smd,
NULL, oig, llap->llap_cookie, OBD_BRW_READ, 0,
- PAGE_SIZE, 0, ASYNC_COUNT_STABLE | ASYNC_READY
- | ASYNC_URGENT);
+ PAGE_SIZE, 0, ASYNC_COUNT_STABLE);
if (rc) {
LL_CDEBUG_PAGE(D_ERROR, page, "read queue failed: rc %d\n", rc);
page_cache_release(page);
RETURN(rc);
}
-#define RAS_CDEBUG(ras) \
- CDEBUG(D_READA, "lrp %lu c %lu ws %lu wl %lu nra %lu\n", \
- ras->ras_last_readpage, ras->ras_consecutive, \
- ras->ras_window_start, ras->ras_window_len, \
- ras->ras_next_readahead);
+#define LL_RA_MIN(inode) ((unsigned long)PTLRPC_MAX_BRW_PAGES / 2)
+#define LL_RA_MAX(inode) ((ll_i2info(inode)->lli_smd->lsm_xfersize * 3) >> \
+ PAGE_CACHE_SHIFT)
-static int ll_readahead(struct ll_readahead_state *ras,
+static void ll_readahead(struct ll_readahead_state *ras,
struct obd_export *exp, struct address_space *mapping,
struct obd_io_group *oig, int flags)
{
- unsigned long i, start = 0, end = 0, reserved;
+ unsigned long i, start, end;
struct ll_async_page *llap;
struct page *page;
- int rc, ret = 0;
- __u64 kms;
- ENTRY;
+ int rc;
- kms = lov_merge_size(ll_i2info(mapping->host)->lli_smd, 1);
- if (kms == 0)
- RETURN(0);
+ if (mapping->host->i_size == 0)
+ return;
spin_lock(&ras->ras_lock);
- if (ras->ras_window_len) {
- start = ras->ras_next_readahead;
- end = ras->ras_window_start + ras->ras_window_len - 1;
- end = min(end, (unsigned long)(kms >> PAGE_CACHE_SHIFT));
- ras->ras_next_readahead = max(end, end + 1);
+ /* make sure to issue a window's worth of read-ahead pages */
+ end = ras->ras_last;
+ start = end - ras->ras_window;
+ if (start > end)
+ start = 0;
- RAS_CDEBUG(ras);
- }
+ /* but don't iterate over pages that we've already issued. this
+ * will set start to end + 1 if we've already read-ahead up to
+ * ras_last sothe for() won't be entered */
+ if (ras->ras_next_index > start)
+ start = ras->ras_next_index;
+ if (end != ~0UL)
+ ras->ras_next_index = end + 1;
- spin_unlock(&ras->ras_lock);
+ CDEBUG(D_READA, "ni %lu last %lu win %lu: reading from %lu to %lu\n",
+ ras->ras_next_index, ras->ras_last, ras->ras_window,
+ start, end);
- if (end == 0)
- RETURN(0);
+ spin_unlock(&ras->ras_lock);
- reserved = ll_ra_count_get(ll_i2sbi(mapping->host), end - start + 1);
+ /* clamp to filesize */
+ i = (mapping->host->i_size - 1) >> PAGE_CACHE_SHIFT;
+ end = min(end, i);
- for (i = start; reserved > 0 && i <= end; i++) {
- /* skip locked pages from previous readpage calls */
+ for (i = start; i <= end; i++) {
+ /* grab_cache_page_nowait returns null if this races with
+ * truncating the page (page->mapping == NULL) */
page = grab_cache_page_nowait(mapping, i);
- if (page == NULL) {
- CDEBUG(D_READA, "g_c_p_n failed\n");
- continue;
- }
-
- /* we do this first so that we can see the page in the /proc
- * accounting */
- llap = llap_from_page(page);
- if (IS_ERR(llap) || llap->llap_defer_uptodate)
- goto next_page;
+ if (page == NULL)
+ break;
- /* skip completed pages */
+ /* the book-keeping above promises that we've tried
+ * all the indices from start to end, so we don't
+ * stop if anyone returns an error. This may not be good. */
if (Page_Uptodate(page))
goto next_page;
- /* bail when we hit the end of the lock. */
if ((rc = ll_page_matches(page, flags)) <= 0) {
LL_CDEBUG_PAGE(D_READA | D_PAGE, page,
"lock match failed: rc %d\n", rc);
- i = end;
goto next_page;
}
+ llap = llap_from_page(page);
+ if (IS_ERR(llap) || llap->llap_defer_uptodate)
+ goto next_page;
+
rc = ll_issue_page_read(exp, llap, oig, 1);
- if (rc == 0) {
- reserved--;
- ret++;
- LL_CDEBUG_PAGE(D_READA| D_PAGE, page,
- "started read-ahead\n");
- }
+ if (rc == 0)
+ LL_CDEBUG_PAGE(D_PAGE, page, "started read-ahead\n");
if (rc) {
next_page:
- LL_CDEBUG_PAGE(D_READA | D_PAGE, page,
- "skipping read-ahead\n");
+ LL_CDEBUG_PAGE(D_PAGE, page, "skipping read-ahead\n");
unlock_page(page);
}
page_cache_release(page);
}
-
- LASSERTF(reserved >= 0, "reserved %lu\n", reserved);
- if (reserved != 0)
- ll_ra_count_put(ll_i2sbi(mapping->host), reserved);
- RETURN(ret);
-}
-
-static void ras_set_start(struct ll_readahead_state *ras,
- unsigned long index)
-{
- ras->ras_window_start = index & (~(PTLRPC_MAX_BRW_PAGES - 1));
- ras->ras_next_readahead = max(ras->ras_window_start,
- ras->ras_next_readahead);
}
/* called with the ras_lock held or from places where it doesn't matter */
-static void ras_reset(struct ll_readahead_state *ras,
- unsigned long index)
+static void ll_readahead_set(struct inode *inode,
+ struct ll_readahead_state *ras,
+ unsigned long index)
{
- ras->ras_last_readpage = index;
- ras->ras_consecutive = 1;
- ras->ras_window_len = 0;
- ras_set_start(ras, index);
- ras->ras_next_readahead = ras->ras_window_start;
-
- RAS_CDEBUG(ras);
+ ras->ras_next_index = index;
+ if (ras->ras_next_index != ~0UL)
+ ras->ras_next_index++;
+ ras->ras_window = LL_RA_MIN(inode);
+ ras->ras_last = ras->ras_next_index + ras->ras_window;
+ if (ras->ras_last < ras->ras_next_index)
+ ras->ras_last = ~0UL;
+ CDEBUG(D_READA, "ni %lu last %lu win %lu: set %lu\n",
+ ras->ras_next_index, ras->ras_last, ras->ras_window,
+ index);
}
void ll_readahead_init(struct inode *inode, struct ll_readahead_state *ras)
{
spin_lock_init(&ras->ras_lock);
- ras_reset(ras, 0);
+ ll_readahead_set(inode, ras, 0);
}
-static void ras_update(struct ll_readahead_state *ras,
- unsigned long index, unsigned long max)
+static void ll_readahead_update(struct inode *inode,
+ struct ll_readahead_state *ras,
+ unsigned long index, int hit)
{
- ENTRY;
+ unsigned long issued_start, new_last;
spin_lock(&ras->ras_lock);
- if (index != ras->ras_last_readpage + 1) {
- ras_reset(ras, index);
- GOTO(out_unlock, 0);
- }
-
- ras->ras_last_readpage = index;
- ras->ras_consecutive++;
- ras_set_start(ras, index);
-
- if (ras->ras_consecutive == 2) {
- ras->ras_window_len = PTLRPC_MAX_BRW_PAGES;
- GOTO(out_unlock, 0);
+ /* we're interested in noticing the index's relation to the
+ * previously issued read-ahead pages */
+ issued_start = ras->ras_next_index - ras->ras_window - 1;
+ if (issued_start > ras->ras_next_index)
+ issued_start = 0;
+
+ CDEBUG(D_READA, "ni %lu last %lu win %lu: %s ind %lu start %lu\n",
+ ras->ras_next_index, ras->ras_last, ras->ras_window,
+ hit ? "hit" : "miss", index, issued_start);
+ if (!hit &&
+ index == ras->ras_next_index && index == ras->ras_last + 1) {
+ /* special case the kernel's read-ahead running into the
+ * page just beyond our read-ahead window as an extension
+ * of our read-ahead. sigh. wishing it was easier to
+ * turn off 2.4's read-ahead. */
+ ras->ras_window = min(LL_RA_MAX(inode), ras->ras_window + 1);
+ if (index != ~0UL)
+ ras->ras_next_index = index + 1;
+ ras->ras_last = index;
+ } else if (!hit &&
+ (index > issued_start || ras->ras_next_index >= index)) {
+ /* deal with a miss way out of the window. we interpret
+ * this as a seek and restart the window */
+ ll_readahead_set(inode, ras, index);
+
+ } else if (!hit &&
+ issued_start <= index && index < ras->ras_next_index) {
+ /* a miss inside the window? surely its memory pressure
+ * evicting our read pages before the app can see them.
+ * we shrink the window aggressively */
+ unsigned long old_window = ras->ras_window;
+
+ ras->ras_window = max(ras->ras_window / 2, LL_RA_MIN(inode));
+ ras->ras_last -= old_window - ras->ras_window;
+ if (ras->ras_next_index > ras->ras_last)
+ ras->ras_next_index = ras->ras_last + 1;
+ CDEBUG(D_READA, "ni %lu last %lu win %lu: miss inside\n",
+ ras->ras_next_index, ras->ras_last, ras->ras_window);
+
+ } else if (hit &&
+ issued_start <= index && index < ras->ras_next_index) {
+ /* a hit inside the window. grow the window by twice the
+ * number of pages that are satisified within the window. */
+ ras->ras_window = min(LL_RA_MAX(inode), ras->ras_window + 2);
+
+ /* we want the next readahead pass to issue a windows worth
+ * beyond where the app currently is */
+ new_last = index + ras->ras_window;
+ if (new_last > ras->ras_last)
+ ras->ras_last = new_last;
+
+ CDEBUG(D_READA, "ni %lu last %lu win %lu: extended window/last\n",
+ ras->ras_next_index, ras->ras_last, ras->ras_window);
}
- /* we need to increase the window sometimes. we'll arbitrarily
- * do it half-way through the pages in an rpc */
- if ((index & (PTLRPC_MAX_BRW_PAGES - 1)) ==
- (PTLRPC_MAX_BRW_PAGES >> 1)) {
- ras->ras_window_len += PTLRPC_MAX_BRW_PAGES;
- ras->ras_window_len = min(ras->ras_window_len, max);
- }
-
- EXIT;
-out_unlock:
- RAS_CDEBUG(ras);
spin_unlock(&ras->ras_lock);
- return;
}
/*
if (IS_ERR(llap))
GOTO(out, rc = PTR_ERR(llap));
- if (ll_i2sbi(inode)->ll_flags & LL_SBI_READAHEAD)
- ras_update(&fd->fd_ras, page->index,
- ll_i2sbi(inode)->ll_max_read_ahead_pages);
-
if (llap->llap_defer_uptodate) {
- rc = ll_readahead(&fd->fd_ras, exp, page->mapping, oig,
- fd->fd_flags);
- if (rc > 0)
- obd_trigger_group_io(exp, ll_i2info(inode)->lli_smd,
- NULL, oig);
+ ll_readahead_update(inode, &fd->fd_ras, page->index, 1);
+ ll_readahead(&fd->fd_ras, exp, page->mapping, oig,fd->fd_flags);
+ obd_trigger_group_io(exp, ll_i2info(inode)->lli_smd, NULL,
+ oig);
LL_CDEBUG_PAGE(D_PAGE, page, "marking uptodate from defer\n");
SetPageUptodate(page);
unlock_page(page);
GOTO(out_oig, rc = 0);
}
+ ll_readahead_update(inode, &fd->fd_ras, page->index, 0);
+
rc = ll_page_matches(page, fd->fd_flags);
if (rc < 0) {
LL_CDEBUG_PAGE(D_ERROR, page, "lock match failed: rc %d\n", rc);
inode->i_ino, page->index,
(long long)page->index << PAGE_CACHE_SHIFT);
if (time_after(jiffies, next_print)) {
- CWARN("ino %lu page %lu (%llu) not covered by "
+ CERROR("ino %lu page %lu (%llu) not covered by "
"a lock (mmap?). check debug logs.\n",
inode->i_ino, page->index,
(long long)page->index << PAGE_CACHE_SHIFT);
+ ldlm_dump_all_namespaces();
+ if (next_print == 0) {
+ CERROR("%s\n", portals_debug_dumpstack());
+ portals_debug_dumplog();
+ }
next_print = jiffies + 30 * HZ;
}
}
GOTO(out, rc);
LL_CDEBUG_PAGE(D_PAGE, page, "queued readpage\n");
- if (ll_i2sbi(inode)->ll_flags & LL_SBI_READAHEAD)
- ll_readahead(&fd->fd_ras, exp, page->mapping, oig,
- fd->fd_flags);
+ if ((ll_i2sbi(inode)->ll_flags & LL_SBI_READAHEAD))
+ ll_readahead(&fd->fd_ras, exp, page->mapping, oig,fd->fd_flags);
rc = obd_trigger_group_io(exp, ll_i2info(inode)->lli_smd, NULL, oig);
oig_release(oig);
RETURN(rc);
}
+
+#if 0
+/* this is for read pages. we issue them as ready but not urgent. when
+ * someone waits on them we fire them off, hopefully merged with adjacent
+ * reads that were queued by read-ahead. */
+int ll_sync_page(struct page *page)
+{
+ struct obd_export *exp;
+ struct ll_async_page *llap;
+ int rc;
+ ENTRY;
+
+ /* we're using a low bit flag to signify that a queued read should
+ * be issued once someone goes to lock it. it is also cleared
+ * as the page is built into an RPC */
+ if (!test_and_clear_bit(LL_PRIVBITS_READ, &page->private))
+ RETURN(0);
+
+ /* careful to only deref page->mapping after checking the bit */
+ exp = ll_i2obdexp(page->mapping->host);
+ if (exp == NULL)
+ RETURN(-EINVAL);
+
+ llap = llap_from_page(page);
+ if (IS_ERR(llap))
+ RETURN(PTR_ERR(llap));
+
+ LL_CDEBUG_PAGE(D_PAGE, page, "setting ready|urgent\n");
+
+ rc = obd_set_async_flags(exp, ll_i2info(page->mapping->host)->lli_smd,
+ NULL, llap->llap_cookie,
+ ASYNC_READY|ASYNC_URGENT);
+ return rc;
+}
+#endif