- unsigned long index, offset;
- char *kaddr;
-
- /*
- * Try to find the page in the cache. If it isn't there,
- * allocate a free page.
- */
- offset = (pos & (PAGE_CACHE_SIZE -1)); /* Within page */
- index = pos >> PAGE_CACHE_SHIFT;
- bytes = PAGE_CACHE_SIZE - offset;
- if (bytes > count) {
- bytes = count;
- }
-
- status = -ENOMEM; /* we'll assign it later anyway */
- page = __grab_cache_page(index);
- if (!page)
- break;
-
- kaddr = kmap(page);
- status = llu_prepare_write(inode, page, offset, offset+bytes);
- if (status)
- goto sync_failure;
-
- memcpy(kaddr+offset, buf, bytes);
-
- status = llu_commit_write(inode, page, offset, offset+bytes);
- if (!status)
- status = bytes;
-
- if (status >= 0) {
- written += status;
- count -= status;
- pos += status;
- buf += status;
- }
-unlock:
- kunmap(page);
- page_cache_release(page);
-
- if (status < 0)
- break;
- } while (count);
-done:
- err = written ? written : status;
-
-#if 0
- up(&inode->i_sem);
-#endif
- return err;
-
- status = -EFAULT;
- goto unlock;
-
-sync_failure:
- /*
- * If blocksize < pagesize, prepare_write() may have instantiated a
- * few blocks outside i_size. Trim these off again.
- */
- kunmap(page);
- page_cache_release(page);
- goto done;
+ unsigned long index, offset, bytes;
+
+ offset = (pos & ~CFS_PAGE_MASK);
+ index = pos >> CFS_PAGE_SHIFT;
+ bytes = CFS_PAGE_SIZE - offset;
+ if (bytes > count)
+ bytes = count;
+
+ /* prevent read beyond file range */
+ if (/* local_lock && */
+ cmd == OBD_BRW_READ && pos + bytes >= st->st_size) {
+ if (pos >= st->st_size)
+ break;
+ bytes = st->st_size - pos;
+ }
+
+ /* prepare page for this index */
+ page->index = index;
+ page->addr = buf - offset;
+
+ page->_offset = offset;
+ page->_count = bytes;
+
+ page++;
+ npages++;
+ count -= bytes;
+ pos += bytes;
+ buf += bytes;
+
+ group->lig_rwcount += bytes;
+ ret_bytes += bytes;
+ } while (count);
+
+ group->lig_npages += npages;
+
+ for (i = 0, page = pages; i < npages;
+ i++, page++, llap++, llap_cookie += llap_cookie_size){
+ llap->llap_magic = LLAP_MAGIC;
+ llap->llap_cookie = llap_cookie;
+ rc = obd_prep_async_page(exp, lsm, NULL, page,
+ (obd_off)page->index << CFS_PAGE_SHIFT,
+ &llu_async_page_ops,
+ llap, &llap->llap_cookie);
+ if (rc) {
+ LASSERT(rc < 0);
+ llap->llap_cookie = NULL;
+ RETURN(rc);
+ }
+
+ CDEBUG(D_CACHE, "llap %p page %p group %p obj off "LPU64"\n",
+ llap, page, llap->llap_cookie,
+ (obd_off)pages->index << CFS_PAGE_SHIFT);
+ page->private = (unsigned long)llap;
+ llap->llap_page = page;
+ llap->llap_inode = group->lig_inode;
+
+ rc = obd_queue_group_io(exp, lsm, NULL, group->lig_oig,
+ llap->llap_cookie, cmd,
+ page->_offset, page->_count,
+ group->lig_params->lrp_brw_flags,
+ ASYNC_READY | ASYNC_URGENT |
+ ASYNC_COUNT_STABLE | ASYNC_GROUP_SYNC);
+ if (!local_lock && cmd == OBD_BRW_READ) {
+ /*
+ * In OST-side locking case short reads cannot be
+ * detected properly.
+ *
+ * The root of the problem is that
+ *
+ * kms = lov_merge_size(lsm, 1);
+ * if (end >= kms)
+ * glimpse_size(inode);
+ * else
+ * st->st_size = kms;
+ *
+ * logic in the read code (both llite and liblustre)
+ * only works correctly when client holds DLM lock on
+ * [start, end]. Without DLM lock KMS can be
+ * completely out of date, and client can either make
+ * spurious short-read (missing concurrent write), or
+ * return stale data (missing concurrent
+ * truncate). For llite client this is fatal, because
+ * incorrect data are cached and can be later sent
+ * back to the server (vide bug 5047). This is hard to
+ * fix by handling short-reads on the server, as there
+ * is no easy way to communicate file size (or amount
+ * of bytes read/written) back to the client,
+ * _especially_ because OSC pages can be sliced and
+ * dices into multiple RPCs arbitrary. Fortunately,
+ * liblustre doesn't cache data and the worst case is
+ * that we get race with concurrent write or truncate.
+ */
+ }
+ if (rc) {
+ LASSERT(rc < 0);
+ RETURN(rc);
+ }
+
+ llap->llap_queued = 1;
+ }
+
+ RETURN(ret_bytes);