-#define LLAP_MAGIC 12346789
-
-struct ll_async_page {
- int llap_magic;
- void *llap_cookie;
- int llap_queued;
- struct page *llap_page;
- struct inode *llap_inode;
-};
-
-static void llu_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
-{
- struct ll_async_page *llap;
- struct inode *inode;
- struct lov_stripe_md *lsm;
- obd_flag valid_flags;
- ENTRY;
-
- llap = LLAP_FROM_COOKIE(data);
- inode = llap->llap_inode;
- lsm = llu_i2info(inode)->lli_smd;
-
- oa->o_id = lsm->lsm_object_id;
- oa->o_valid = OBD_MD_FLID;
- valid_flags = OBD_MD_FLTYPE | OBD_MD_FLATIME;
- if (cmd & OBD_BRW_WRITE)
- valid_flags |= OBD_MD_FLMTIME | OBD_MD_FLCTIME |
- OBD_MD_FLUID | OBD_MD_FLGID |
- OBD_MD_FLFID | OBD_MD_FLGENER;
-
- obdo_from_inode(oa, inode, valid_flags);
- EXIT;
-}
-
-static void llu_ap_update_obdo(void *data, int cmd, struct obdo *oa,
- obd_valid valid)
-{
- struct ll_async_page *llap;
- ENTRY;
-
- llap = LLAP_FROM_COOKIE(data);
- obdo_from_inode(oa, llap->llap_inode, valid);
-
- EXIT;
-}
-
-/* called for each page in a completed rpc.*/
-static int llu_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
-{
- struct ll_async_page *llap;
- struct page *page;
- ENTRY;
-
- llap = LLAP_FROM_COOKIE(data);
- llap->llap_queued = 0;
- page = llap->llap_page;
-
- if (rc != 0) {
- if (cmd & OBD_BRW_WRITE)
- CERROR("writeback error on page %p index %ld: %d\n",
- page, page->index, rc);
- }
- RETURN(0);
-}
-
-static struct obd_async_page_ops llu_async_page_ops = {
- .ap_make_ready = NULL,
- .ap_refresh_count = NULL,
- .ap_fill_obdo = llu_ap_fill_obdo,
- .ap_update_obdo = llu_ap_update_obdo,
- .ap_completion = llu_ap_completion,
-};
-
-static int llu_queue_pio(int cmd, struct llu_io_group *group,
- char *buf, size_t count, loff_t pos)
-{
- struct llu_inode_info *lli = llu_i2info(group->lig_inode);
- struct intnl_stat *st = llu_i2stat(group->lig_inode);
- struct lov_stripe_md *lsm = lli->lli_smd;
- struct obd_export *exp = llu_i2obdexp(group->lig_inode);
- struct page *pages = &group->lig_pages[group->lig_npages],*page = pages;
- struct ll_async_page *llap = &group->lig_llaps[group->lig_npages];
- void *llap_cookie = group->lig_llap_cookies +
- llap_cookie_size * group->lig_npages;
- int i, rc, npages = 0, ret_bytes = 0;
- int local_lock;
- ENTRY;
-
- if (!exp)
- RETURN(-EINVAL);
-
- local_lock = group->lig_params->lrp_lock_mode != LCK_NL;
- /* prepare the pages array */
- do {
- unsigned long index, offset, bytes;
-
- offset = (pos & ~CFS_PAGE_MASK);
- index = pos >> CFS_PAGE_SHIFT;
- bytes = CFS_PAGE_SIZE - offset;
- if (bytes > count)
- bytes = count;
-
- /* prevent read beyond file range */
- if (/* local_lock && */
- cmd == OBD_BRW_READ && pos + bytes >= st->st_size) {
- if (pos >= st->st_size)
- break;
- bytes = st->st_size - pos;
- }
-
- /* prepare page for this index */
- page->index = index;
- page->addr = buf - offset;
-
- page->_offset = offset;
- page->_count = bytes;
-
- page++;
- npages++;
- count -= bytes;
- pos += bytes;
- buf += bytes;
-
- group->lig_rwcount += bytes;
- ret_bytes += bytes;
- } while (count);
-
- group->lig_npages += npages;
-
- for (i = 0, page = pages; i < npages;
- i++, page++, llap++, llap_cookie += llap_cookie_size){
- llap->llap_magic = LLAP_MAGIC;
- llap->llap_cookie = llap_cookie;
- rc = obd_prep_async_page(exp, lsm, NULL, page,
- (obd_off)page->index << CFS_PAGE_SHIFT,
- &llu_async_page_ops,
- llap, &llap->llap_cookie);
- if (rc) {
- LASSERT(rc < 0);
- llap->llap_cookie = NULL;
- RETURN(rc);
- }
- CDEBUG(D_CACHE, "llap %p page %p group %p obj off "LPU64"\n",
- llap, page, llap->llap_cookie,
- (obd_off)pages->index << CFS_PAGE_SHIFT);
- page->private = (unsigned long)llap;
- llap->llap_page = page;
- llap->llap_inode = group->lig_inode;
-
- rc = obd_queue_group_io(exp, lsm, NULL, group->lig_oig,
- llap->llap_cookie, cmd,
- page->_offset, page->_count,
- group->lig_params->lrp_brw_flags,
- ASYNC_READY | ASYNC_URGENT |
- ASYNC_COUNT_STABLE | ASYNC_GROUP_SYNC);
- if (!local_lock && cmd == OBD_BRW_READ) {
- /*
- * In OST-side locking case short reads cannot be
- * detected properly.
- *
- * The root of the problem is that
- *
- * kms = lov_merge_size(lsm, 1);
- * if (end >= kms)
- * glimpse_size(inode);
- * else
- * st->st_size = kms;
- *
- * logic in the read code (both llite and liblustre)
- * only works correctly when client holds DLM lock on
- * [start, end]. Without DLM lock KMS can be
- * completely out of date, and client can either make
- * spurious short-read (missing concurrent write), or
- * return stale data (missing concurrent
- * truncate). For llite client this is fatal, because
- * incorrect data are cached and can be later sent
- * back to the server (vide bug 5047). This is hard to
- * fix by handling short-reads on the server, as there
- * is no easy way to communicate file size (or amount
- * of bytes read/written) back to the client,
- * _especially_ because OSC pages can be sliced and
- * dices into multiple RPCs arbitrary. Fortunately,
- * liblustre doesn't cache data and the worst case is
- * that we get race with concurrent write or truncate.
- */
- }
- if (rc) {
- LASSERT(rc < 0);
- RETURN(rc);
- }
-
- llap->llap_queued = 1;
- }
-
- RETURN(ret_bytes);
-}
-
-static
-struct llu_io_group * get_io_group(struct inode *inode, int maxpages,
- struct lustre_rw_params *params)
-{
- struct llu_io_group *group;
- int rc;
-
- if (!llap_cookie_size)
- llap_cookie_size = obd_prep_async_page(llu_i2obdexp(inode),
- NULL, NULL, NULL, 0,
- NULL, NULL, NULL);
-
- OBD_ALLOC(group, LLU_IO_GROUP_SIZE(maxpages));
- if (!group)
- return ERR_PTR(-ENOMEM);
-
- I_REF(inode);
- group->lig_inode = inode;
- group->lig_maxpages = maxpages;
- group->lig_params = params;
- group->lig_llaps = (struct ll_async_page *)(group + 1);
- group->lig_pages = (struct page *)(&group->lig_llaps[maxpages]);
- group->lig_llap_cookies = (void *)(&group->lig_pages[maxpages]);
-
- rc = oig_init(&group->lig_oig);
- if (rc) {
- OBD_FREE(group, LLU_IO_GROUP_SIZE(maxpages));
- return ERR_PTR(rc);
- }
-
- return group;
-}
-
-static int max_io_pages(ssize_t len, int iovlen)
-{
- return (((len + CFS_PAGE_SIZE -1) / CFS_PAGE_SIZE) + 2 + iovlen - 1);
-}
-
-static
-void put_io_group(struct llu_io_group *group)
-{
- struct lov_stripe_md *lsm = llu_i2info(group->lig_inode)->lli_smd;
- struct obd_export *exp = llu_i2obdexp(group->lig_inode);
- struct ll_async_page *llap = group->lig_llaps;
- int i;
-
- for (i = 0; i < group->lig_npages; i++, llap++) {
- if (llap->llap_cookie)
- obd_teardown_async_page(exp, lsm, NULL,
- llap->llap_cookie);
- }
-
- I_RELE(group->lig_inode);
-
- oig_release(group->lig_oig);
- OBD_FREE(group, LLU_IO_GROUP_SIZE(group->lig_maxpages));
-}
-