Whamcloud - gitweb
EX-7601 ofd: add remote_pages
authorPatrick Farrell <pfarrell@whamcloud.com>
Sat, 28 Oct 2023 20:34:25 +0000 (16:34 -0400)
committerAndreas Dilger <adilger@whamcloud.com>
Thu, 30 Nov 2023 17:16:54 +0000 (17:16 +0000)
When we round a read to get all of the compressed chunks,
the number of local and the number of remote pages will
differ.  We need to make sure we do the checksum and data
transfer using the number of remote pages, not the number of
local pages.

This patch calculates the number of remote pages and uses it
accordingly.  This doesn't do anything yet, but when we
round the local read to include the whole compressed chunk,
this will be needed.

Signed-off-by: Patrick Farrell <pfarrell@whamcloud.com>
Change-Id: I4875b02016570d227b3b926efd117f0a7cda41b4
Reviewed-on: https://review.whamcloud.com/c/ex/lustre-release/+/52878
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Artem Blagodarenko <ablagodarenko@ddn.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
lustre/target/tgt_handler.c

index ff97954..dbb4483 100644 (file)
@@ -2297,6 +2297,14 @@ static int tgt_checksum_niobuf_rw(struct lu_target *tgt,
        RETURN(rc);
 }
 
+int range_to_page_count(loff_t offset, ssize_t len)
+{
+       int start_page = offset / PAGE_SIZE;
+       int end_page = (offset + len - 1) / PAGE_SIZE;
+
+       return end_page - start_page + 1;
+}
+
 int tgt_brw_read(struct tgt_session_info *tsi)
 {
        struct ptlrpc_request   *req = tgt_ses_req(tsi);
@@ -2313,13 +2321,16 @@ int tgt_brw_read(struct tgt_session_info *tsi)
        struct ost_layout_compr *olc;
        const char *obd_name = exp->exp_obd->obd_name;
        bool compr_rounded_read_lock = false;
-       enum ll_compr_type type;
+       enum ll_compr_type compr_type;
+       int npages_remote = 0;
        int chunk_size = 0;
        int no_reply = 0;
+       int npages_local;
        int npages_read;
        ktime_t kstart;
+       int compr_lvl;
+       int niocount;
        int nob = 0;
-       int npages;
        int rc;
        int i;
 
@@ -2381,15 +2392,17 @@ int tgt_brw_read(struct tgt_session_info *tsi)
        ioo = req_capsule_client_get(tsi->tsi_pill, &RMF_OBD_IOOBJ);
        LASSERT(ioo != NULL); /* must exists after tgt_ost_body_unpack */
 
+       niocount = ioo->ioo_bufcnt;
+
        remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
        LASSERT(remote_nb != NULL); /* must exists after tgt_ost_body_unpack */
 
        local_nb = tbc->tbc_lnb;
 
        olc = &body->oa.o_layout_compr;
-       type = olc->ol_compr_type;
-       if (type != LL_COMPR_TYPE_NONE) {
-               int nrbufs = ioo->ioo_bufcnt;
+       compr_type = olc->ol_compr_type;
+       compr_lvl = olc->ol_compr_lvl;
+       if (compr_type != LL_COMPR_TYPE_NONE) {
                unsigned int chunk_log_bits;
                __u64 chunk_start;
                __u64 chunk_end;
@@ -2403,8 +2416,8 @@ int tgt_brw_read(struct tgt_session_info *tsi)
                 * first and end of IO from the last
                 */
                io_start = remote_nb[0].rnb_offset;
-               io_end = remote_nb[nrbufs - 1].rnb_offset +
-                        remote_nb[nrbufs - 1].rnb_len;
+               io_end = remote_nb[niocount - 1].rnb_offset +
+                        remote_nb[niocount - 1].rnb_len;
 
                chunk_start = round_down(io_start, chunk_size);
                chunk_end = round_up(io_end, chunk_size);
@@ -2458,19 +2471,31 @@ int tgt_brw_read(struct tgt_session_info *tsi)
        repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
        repbody->oa = body->oa;
 
-       npages = PTLRPC_MAX_BRW_PAGES;
+       npages_local = PTLRPC_MAX_BRW_PAGES;
+       for (i = 0; i < niocount; i++)
+               npages_remote += range_to_page_count(remote_nb[i].rnb_offset,
+                                                    remote_nb[i].rnb_len);
        kstart = ktime_get();
 
        rc = obd_preprw(tsi->tsi_env, OBD_BRW_READ, exp, &repbody->oa, 1,
-                       ioo, remote_nb, &npages, local_nb, chunk_size);
+                       ioo, remote_nb, &npages_local, local_nb, chunk_size);
        if (rc != 0)
                GOTO(out_lock, rc);
 
+       /* the server is responsible for decompressing partial chunk reads */
+       if (compr_type == LL_COMPR_TYPE_NONE) {
+               /* if there's no compression, the local page count should be
+                * identical to that requested by the client
+                */
+               LASSERT(npages_local == npages_remote);
+       }
+
        if (body->oa.o_valid & OBD_MD_FLFLAGS &&
            body->oa.o_flags & OBD_FL_SHORT_IO) {
                desc = NULL;
        } else {
-               desc = ptlrpc_prep_bulk_exp(req, npages, ioobj_max_brw_get(ioo),
+               desc = ptlrpc_prep_bulk_exp(req, npages_remote,
+                                           ioobj_max_brw_get(ioo),
                                            PTLRPC_BULK_PUT_SOURCE,
                                            OST_BULK_PORTAL,
                                            &ptlrpc_bulk_kiov_nopin_ops);
@@ -2478,8 +2503,8 @@ int tgt_brw_read(struct tgt_session_info *tsi)
                        GOTO(out_commitrw, rc = -ENOMEM);
        }
 
-       npages_read = npages;
-       for (i = 0; i < npages; i++) {
+       npages_read = npages_remote;
+       for (i = 0; i < npages_remote; i++) {
                int page_rc = local_nb[i].lnb_rc;
 
                if (page_rc < 0) {
@@ -2488,6 +2513,9 @@ int tgt_brw_read(struct tgt_session_info *tsi)
                        break;
                }
 
+               CDEBUG(D_SEC, "i %d, lnb_file_offset %llu, lnb_len %d, rc %d\n",
+                      i, local_nb[i].lnb_file_offset, local_nb[i].lnb_len,
+                      page_rc);
                nob += page_rc;
                if (page_rc != 0 && desc != NULL) { /* some data! */
                        LASSERT(local_nb[i].lnb_page != NULL);
@@ -2501,7 +2529,7 @@ int tgt_brw_read(struct tgt_session_info *tsi)
                        local_nb[i].lnb_len = page_rc;
                        npages_read = i + (page_rc != 0 ? 1 : 0);
                        /* All subsequent pages should be 0 */
-                       while (++i < npages)
+                       while (++i < npages_remote)
                                LASSERT(local_nb[i].lnb_rc == 0);
                        break;
                }
@@ -2576,7 +2604,7 @@ int tgt_brw_read(struct tgt_session_info *tsi)
 out_commitrw:
        /* Must commit after prep above in all cases */
        rc = obd_commitrw(tsi->tsi_env, OBD_BRW_READ, exp, &repbody->oa, 1, ioo,
-                         remote_nb, npages, local_nb, rc, nob, kstart);
+                         remote_nb, npages_local, local_nb, rc, nob, kstart);
 out_lock:
        tgt_brw_unlock(exp, ioo,
                       compr_rounded_read_lock ? &chunk_lock_rnb : remote_nb,