RETURN(rc);
}
+int range_to_page_count(loff_t offset, ssize_t len)
+{
+ int start_page = offset / PAGE_SIZE;
+ int end_page = (offset + len - 1) / PAGE_SIZE;
+
+ return end_page - start_page + 1;
+}
+
int tgt_brw_read(struct tgt_session_info *tsi)
{
struct ptlrpc_request *req = tgt_ses_req(tsi);
struct ost_layout_compr *olc;
const char *obd_name = exp->exp_obd->obd_name;
bool compr_rounded_read_lock = false;
- enum ll_compr_type type;
+ enum ll_compr_type compr_type;
+ int npages_remote = 0;
int chunk_size = 0;
int no_reply = 0;
+ int npages_local;
int npages_read;
ktime_t kstart;
+ int compr_lvl;
+ int niocount;
int nob = 0;
- int npages;
int rc;
int i;
ioo = req_capsule_client_get(tsi->tsi_pill, &RMF_OBD_IOOBJ);
LASSERT(ioo != NULL); /* must exists after tgt_ost_body_unpack */
+ niocount = ioo->ioo_bufcnt;
+
remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
LASSERT(remote_nb != NULL); /* must exists after tgt_ost_body_unpack */
local_nb = tbc->tbc_lnb;
olc = &body->oa.o_layout_compr;
- type = olc->ol_compr_type;
- if (type != LL_COMPR_TYPE_NONE) {
- int nrbufs = ioo->ioo_bufcnt;
+ compr_type = olc->ol_compr_type;
+ compr_lvl = olc->ol_compr_lvl;
+ if (compr_type != LL_COMPR_TYPE_NONE) {
unsigned int chunk_log_bits;
__u64 chunk_start;
__u64 chunk_end;
* first and end of IO from the last
*/
io_start = remote_nb[0].rnb_offset;
- io_end = remote_nb[nrbufs - 1].rnb_offset +
- remote_nb[nrbufs - 1].rnb_len;
+ io_end = remote_nb[niocount - 1].rnb_offset +
+ remote_nb[niocount - 1].rnb_len;
chunk_start = round_down(io_start, chunk_size);
chunk_end = round_up(io_end, chunk_size);
repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
repbody->oa = body->oa;
- npages = PTLRPC_MAX_BRW_PAGES;
+ npages_local = PTLRPC_MAX_BRW_PAGES;
+ for (i = 0; i < niocount; i++)
+ npages_remote += range_to_page_count(remote_nb[i].rnb_offset,
+ remote_nb[i].rnb_len);
kstart = ktime_get();
rc = obd_preprw(tsi->tsi_env, OBD_BRW_READ, exp, &repbody->oa, 1,
- ioo, remote_nb, &npages, local_nb, chunk_size);
+ ioo, remote_nb, &npages_local, local_nb, chunk_size);
if (rc != 0)
GOTO(out_lock, rc);
+ /* the server is responsible for decompressing partial chunk reads */
+ if (compr_type == LL_COMPR_TYPE_NONE) {
+ /* if there's no compression, the local page count should be
+ * identical to that requested by the client
+ */
+ LASSERT(npages_local == npages_remote);
+ }
+
if (body->oa.o_valid & OBD_MD_FLFLAGS &&
body->oa.o_flags & OBD_FL_SHORT_IO) {
desc = NULL;
} else {
- desc = ptlrpc_prep_bulk_exp(req, npages, ioobj_max_brw_get(ioo),
+ desc = ptlrpc_prep_bulk_exp(req, npages_remote,
+ ioobj_max_brw_get(ioo),
PTLRPC_BULK_PUT_SOURCE,
OST_BULK_PORTAL,
&ptlrpc_bulk_kiov_nopin_ops);
GOTO(out_commitrw, rc = -ENOMEM);
}
- npages_read = npages;
- for (i = 0; i < npages; i++) {
+ npages_read = npages_remote;
+ for (i = 0; i < npages_remote; i++) {
int page_rc = local_nb[i].lnb_rc;
if (page_rc < 0) {
break;
}
+ CDEBUG(D_SEC, "i %d, lnb_file_offset %llu, lnb_len %d, rc %d\n",
+ i, local_nb[i].lnb_file_offset, local_nb[i].lnb_len,
+ page_rc);
nob += page_rc;
if (page_rc != 0 && desc != NULL) { /* some data! */
LASSERT(local_nb[i].lnb_page != NULL);
local_nb[i].lnb_len = page_rc;
npages_read = i + (page_rc != 0 ? 1 : 0);
/* All subsequent pages should be 0 */
- while (++i < npages)
+ while (++i < npages_remote)
LASSERT(local_nb[i].lnb_rc == 0);
break;
}
out_commitrw:
/* Must commit after prep above in all cases */
rc = obd_commitrw(tsi->tsi_env, OBD_BRW_READ, exp, &repbody->oa, 1, ioo,
- remote_nb, npages, local_nb, rc, nob, kstart);
+ remote_nb, npages_local, local_nb, rc, nob, kstart);
out_lock:
tgt_brw_unlock(exp, ioo,
compr_rounded_read_lock ? &chunk_lock_rnb : remote_nb,