return end_page - start_page + 1;
}
+void io_lnb_to_tx_lnb(struct niobuf_local *io_lnb, struct niobuf_local *tx_lnb,
+ struct niobuf_remote *rnb, int niocount, int npages_local)
+{
+ int lnb_index = 0;
+ int tx_lnb_pg = 0;
+ int i;
+
+ /* for each io in the remote niobuf, we find the offset of the
+ * starting page in the lnb, then we point the tx lnbs at the matching
+ * pages in the io lnb, so we can do transfer from just those pages
+ */
+ for (i = 0; i < niocount; i++) {
+ ssize_t len = rnb[i].rnb_len;
+ int npages_remote;
+ int j;
+
+ while (lnb_index < npages_local) {
+ struct niobuf_local lnb = io_lnb[lnb_index];
+ __u64 lnb_offset = lnb.lnb_file_offset;
+ __u64 lnb_len = lnb.lnb_len;
+
+ /* if this rnb is inside this lnb, then this lnb is the
+ * start of the transfer
+ *
+ * NB: We can't use 'equal to' because the lnbs are
+ * server pages and might be larger than client pages.
+ */
+ if (rnb[i].rnb_offset >= lnb_offset &&
+ rnb[i].rnb_offset < lnb_offset + lnb_len)
+ break;
+ lnb_index++;
+ }
+ npages_remote = range_to_page_count(rnb[i].rnb_offset,
+ rnb[i].rnb_len);
+ CDEBUG(D_SEC, "nio %d npages_remote %d, lnb_index %d\n", i,
+ npages_remote, lnb_index);
+ for (j = 0; j < npages_remote; j++) {
+ /* point the tx_ln member at the io_lnb member */
+ tx_lnb[tx_lnb_pg] = io_lnb[lnb_index + j];
+
+ /* NB: This does not handle the case where the first
+ * page is incomplete. But this is impossible until we
+ * have unaligned DIO, so don't worry about it now.
+ */
+ if (len > PAGE_SIZE)
+ tx_lnb[tx_lnb_pg].lnb_len = PAGE_SIZE;
+ else
+ tx_lnb[tx_lnb_pg].lnb_len = len;
+ len -= tx_lnb[tx_lnb_pg].lnb_len;
+ tx_lnb_pg++;
+ }
+ LASSERT(len == 0);
+ }
+}
+
int tgt_brw_read(struct tgt_session_info *tsi)
{
struct ptlrpc_request *req = tgt_ses_req(tsi);
struct lustre_handle lockh = { 0 };
struct niobuf_remote chunk_lock_rnb;
struct ptlrpc_bulk_desc *desc = NULL;
+ struct niobuf_local *local_io_nb;
+ struct niobuf_local *local_tx_nb;
struct niobuf_remote *remote_nb;
- struct niobuf_local *local_nb;
struct ost_body *repbody;
struct ost_body *body;
struct obd_ioobj *ioo;
remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
LASSERT(remote_nb != NULL); /* must exists after tgt_ost_body_unpack */
- local_nb = tbc->tbc_lnb;
+ local_io_nb = tbc->tbc_lnb;
+ /* by default, the same local iobuf is used for io and transfer.
+ * compression sometimes changes this
+ */
+ local_tx_nb = tbc->tbc_lnb;
olc = &body->oa.o_layout_compr;
compr_type = olc->ol_compr_type;
kstart = ktime_get();
rc = obd_preprw(tsi->tsi_env, OBD_BRW_READ, exp, &repbody->oa, 1,
- ioo, remote_nb, &npages_local, local_nb, chunk_size);
+ ioo, remote_nb, &npages_local, local_io_nb,
+ chunk_size);
if (rc != 0)
GOTO(out_lock, rc);
- /* the server is responsible for decompressing partial chunk reads */
if (compr_type == LL_COMPR_TYPE_NONE) {
/* if there's no compression, the local page count should be
* identical to that requested by the client
GOTO(out_commitrw, rc = -ENOMEM);
}
+ /* if we had to do rounding for compression, then the local io niobuf is
+ * no longer the same as what we send back to the client, so we have to
+ * create the tx niobuf from a subset of the io niobuf
+ */
+ if (npages_local != npages_remote) {
+ LASSERT(compr_type != LL_COMPR_TYPE_NONE);
+ local_tx_nb = tbc->tbc_lnb2;
+ io_lnb_to_tx_lnb(local_io_nb, local_tx_nb, remote_nb,
+ niocount, npages_local);
+ }
+
npages_read = npages_remote;
for (i = 0; i < npages_remote; i++) {
- int page_rc = local_nb[i].lnb_rc;
+ int page_rc = local_tx_nb[i].lnb_rc;
if (page_rc < 0) {
rc = page_rc;
}
CDEBUG(D_SEC, "i %d, lnb_file_offset %llu, lnb_len %d, rc %d\n",
- i, local_nb[i].lnb_file_offset, local_nb[i].lnb_len,
- page_rc);
+ i, local_tx_nb[i].lnb_file_offset,
+ local_tx_nb[i].lnb_len, page_rc);
nob += page_rc;
if (page_rc != 0 && desc != NULL) { /* some data! */
- LASSERT(local_nb[i].lnb_page != NULL);
+ LASSERT(local_tx_nb[i].lnb_page != NULL);
desc->bd_frag_ops->add_kiov_frag
- (desc, local_nb[i].lnb_page,
- local_nb[i].lnb_page_offset & ~PAGE_MASK,
+ (desc, local_tx_nb[i].lnb_page,
+ local_tx_nb[i].lnb_page_offset & ~PAGE_MASK,
page_rc);
}
- if (page_rc != local_nb[i].lnb_len) { /* short read */
- local_nb[i].lnb_len = page_rc;
+ if (page_rc != local_tx_nb[i].lnb_len) { /* short read */
+ local_tx_nb[i].lnb_len = page_rc;
npages_read = i + (page_rc != 0 ? 1 : 0);
/* All subsequent pages should be 0 */
while (++i < npages_remote)
- LASSERT(local_nb[i].lnb_rc == 0);
+ LASSERT(local_tx_nb[i].lnb_rc == 0);
break;
}
}
repbody->oa.o_valid = OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
rc = tgt_checksum_niobuf_rw(tsi->tsi_tgt, cksum_type,
- local_nb, npages_read, OST_READ,
+ local_tx_nb, npages_read, OST_READ,
&repbody->oa.o_cksum, resend);
if (rc < 0)
GOTO(out_commitrw, rc);
* cksum with returned Client cksum (this should even cover
* zero-cksum case) */
if (resend)
- check_read_checksum(local_nb, npages_read, exp,
+ check_read_checksum(local_tx_nb, npages_read, exp,
&body->oa, &req->rq_peer,
body->oa.o_cksum,
repbody->oa.o_cksum, cksum_type);
short_io_size = req_capsule_get_size(&req->rq_pill,
&RMF_SHORT_IO,
RCL_SERVER);
- rc = tgt_pages2shortio(local_nb, npages_read,
+ rc = tgt_pages2shortio(local_tx_nb, npages_read,
short_io_buf, short_io_size);
if (rc >= 0)
req_capsule_shrink(&req->rq_pill,
out_commitrw:
/* Must commit after prep above in all cases */
rc = obd_commitrw(tsi->tsi_env, OBD_BRW_READ, exp, &repbody->oa, 1, ioo,
- remote_nb, npages_local, local_nb, rc, nob, kstart);
+ remote_nb, npages_local, local_io_nb, rc, nob,
+ kstart);
out_lock:
tgt_brw_unlock(exp, ioo,
compr_rounded_read_lock ? &chunk_lock_rnb : remote_nb,
GOTO(skip_transfer, rc = -ENOMEM);
/* NB Having prepped, we must commit... */
- for (i = 0; i < npages; i++)
+ for (i = 0; i < npages; i++) {
+ CDEBUG(D_SEC, "adding frag, page %d, offset %lu, len %d\n",
+ i, local_nb[i].lnb_page_offset & ~PAGE_MASK,
+ local_nb[i].lnb_len);
desc->bd_frag_ops->add_kiov_frag(desc,
local_nb[i].lnb_page,
local_nb[i].lnb_page_offset & ~PAGE_MASK,
local_nb[i].lnb_len);
+ }
rc = sptlrpc_svc_prep_bulk(req, desc);
if (rc != 0)