#include <lustre_compr.h>
#include <lustre_sec.h>
+static int decompress_chunk_in_lnb(const char *obd_name,
+ struct niobuf_local *lnbs, int lnb_start,
+ void **bounce_src, void **bounce_dst,
+ enum ll_compr_type type, int lvl,
+ int chunk_size)
+{
+ struct ll_compr_hdr *llch = NULL;
+ struct crypto_comp *cc = NULL;
+ int pages_in_chunk = chunk_size / PAGE_SIZE;
+ /* dst_size must be initialized for kernel compression code */
+ unsigned int dst_size = 2 * chunk_size;
+ unsigned int src_size;
+ int hdr_size;
+ int rc = 0;
+
+ ENTRY;
+
+ CDEBUG(D_SEC, "lnb start %d file offset %llu, page offset %u len %u\n",
+ lnb_start, lnbs[lnb_start].lnb_file_offset,
+ lnbs[lnb_start].lnb_page_offset, lnbs[lnb_start].lnb_len);
+
+ /* if this chunk isn't compressed, don't uncompress it */
+ if (!is_chunk_start(lnbs[lnb_start].lnb_page, &llch))
+ RETURN(0);
+
+ /* compression type and level in the compressed data can
+ * be different from those set in the layout, because the client
+ * can select different compression type as an optimization, but
+ * but chunk size must agree
+ */
+ CDEBUG(D_SEC,
+ "chunk_size %d, layout: type %d, lvl %d, disk: type %d, lvl %d\n",
+ chunk_size, type, lvl, llch->llch_compr_type,
+ llch->llch_compr_level);
+ if (chunk_size !=
+ COMPR_GET_CHUNK_SIZE(llch->llch_chunk_log_bits)) {
+ CERROR("%s: chunk size disagreement, layout %d, from disk %d\n",
+ obd_name, chunk_size,
+ COMPR_GET_CHUNK_SIZE(llch->llch_chunk_log_bits));
+ /* compression type and level can disagree with layout, we just
+ * dump them for debugging
+ */
+ CERROR("layout: type %d, lvl %d, disk: type %d, lvl %d\n",
+ type, lvl, llch->llch_compr_type, llch->llch_compr_level);
+ GOTO(out, rc = -EINVAL);
+ }
+ type = llch->llch_compr_type;
+ lvl = llch->llch_compr_level;
+ hdr_size = llch->llch_header_size;
+ rc = alloc_compr(obd_name, &type, lvl, &cc, true);
+ if (rc) {
+ CERROR("%s: Setup for decompression failed, type %i, lvl %d, rc = %d\n",
+ obd_name, type, lvl, rc);
+ GOTO(out, rc);
+ }
+
+ /* place the raw compressed data in a contiguous buffer */
+ merge_chunk(NULL, lnbs, lnb_start, pages_in_chunk, (char *) bounce_src,
+ &src_size);
+ LASSERT(src_size <= chunk_size);
+ CDEBUG(D_SEC, "merged size: %u\n", src_size);
+
+ rc = decompress_chunk(obd_name, cc,
+ ((char *) bounce_src) + hdr_size,
+ llch->llch_compr_size,
+ (char *) bounce_dst, &dst_size, type,
+ lvl);
+ if (rc != 0) {
+ CERROR("%s: Failed to decompress %d byte chunk at %llu, rc: %d\n",
+ obd_name, llch->llch_compr_size,
+ lnbs[lnb_start].lnb_file_offset, rc);
+ GOTO(out, rc);
+ }
+ LASSERT(dst_size <= chunk_size);
+
+ /* now that we've successfully decompressed this chunk, we copy
+ * it back to the read lnbs
+ */
+ unmerge_chunk(NULL, lnbs, lnb_start,
+ ((dst_size - 1) >> PAGE_SHIFT) + 1,
+ (char *) bounce_dst, dst_size);
+
+out:
+ if (cc)
+ crypto_free_comp(cc);
+ RETURN(rc);
+}
+
+/* this function handles decompression for an unaligned read
+ *
+ * the beginning and end of the read may be unaligned, so we check and if
+ * necessary decompress (in place) the data in those locations
+ */
+int decompress_rnb(const char *obd_name, struct niobuf_local *lnbs,
+ int lnb_npages, __u64 rnb_start, __u64 rnb_end,
+ int *lnb_start, void **bounce_src, void **bounce_dst,
+ enum ll_compr_type type, int lvl, int chunk_size)
+{
+ struct niobuf_local *lnb = NULL;
+ int pages_per_chunk = chunk_size / PAGE_SIZE;
+ bool chunk_found = false;
+ /* start looking where the previous call left off */
+ int i = *lnb_start;
+ __u64 chunk_start;
+ __u64 chunk_end;
+ int rc = 0;
+
+ ENTRY;
+
+ LASSERT(*lnb_start < lnb_npages);
+
+ CDEBUG(D_SEC,
+ "rnb_start %llu, rnb_end %llu, lnb_start %d, chunk_size %d\n",
+ rnb_start, rnb_end, *lnb_start, chunk_size);
+
+ /* if the start of the read is not chunk aligned, we find the start of
+ * the lnbs for that chunk and decompress it
+ */
+ chunk_start = round_down(rnb_start, chunk_size);
+ chunk_end = chunk_start + chunk_size;
+ if (chunk_start != rnb_start) {
+ chunk_found = false;
+ for (; i < lnb_npages; i++) {
+ lnb = lnbs + i;
+ if (lnb->lnb_file_offset == chunk_start) {
+ chunk_found = true;
+ break;
+ }
+ }
+
+ CDEBUG(D_SEC,
+ "leading chunk lnb %d lnb file offset %llu, chunk start %llu, chunk end %llu\n",
+ i, lnb ? lnb->lnb_file_offset : 0, chunk_start,
+ chunk_end);
+
+ if (!chunk_found)
+ RETURN(-EINVAL);
+
+ rc = decompress_chunk_in_lnb(obd_name, lnbs, i, bounce_src,
+ bounce_dst, type, lvl, chunk_size);
+ if (rc)
+ GOTO(out, rc);
+ i += pages_per_chunk;
+ }
+
+
+ /* we've decompressed a leading chunk and this read fits entirely
+ * inside that chunk, so we're done
+ */
+ if (chunk_found && rnb_end <= chunk_end)
+ GOTO(out, rc);
+
+ /* if the end of read is unaligned, find and decompress the
+ * corresponding chunk in the lnbs
+ */
+ chunk_start = round_down(rnb_end, chunk_size);
+ chunk_end = chunk_start + chunk_size;
+ if (chunk_start != rnb_end) {
+ struct niobuf_local *prev = NULL;
+ int j;
+
+ chunk_found = false;
+ for (; i < lnb_npages; i++) {
+ lnb = lnbs + i;
+ if (lnb->lnb_file_offset == chunk_start) {
+ chunk_found = true;
+ break;
+ }
+ }
+
+ CDEBUG(D_SEC,
+ "trailing chunk lnb %d lnb file offset %llu, chunk start %llu, chunk end %llu\n",
+ i, lnb ? lnb->lnb_file_offset : 0, chunk_start,
+ chunk_end);
+
+ if (!chunk_found)
+ RETURN(-EINVAL);
+
+ /* the read is not chunk aligned at the end, but it's possible
+ * the last part of this read is a not-full chunk, and if so,
+ * we may be able to send it to the client
+ */
+ for (j = i; j < lnb_npages; j++) {
+ CDEBUG(D_SEC, "page %d, lnb_rc %d\n", j, lnbs[j].lnb_rc);
+ /* we have a complete chunk, proceed to decompression */
+ if (j - i == pages_per_chunk - 1) {
+ CDEBUG(D_SEC, "complete chunk, from %d to %d\n",
+ i, j);
+ break;
+ }
+ /* we've hit the end of the data in this lnb; if the
+ * end of data is before the end of the read, then we
+ * hit a hole, and we can skip decompression - this is
+ * a short chunk, so the read will return the complete
+ * chunk to the client and the client will decompress it
+ */
+ if (lnbs[j].lnb_rc == 0) {
+ CDEBUG(D_SEC, "Hit EOF in lnb %d at %llu\n",
+ j, lnbs[j].lnb_file_offset);
+ if (prev &&
+ prev->lnb_file_offset + prev->lnb_len <= rnb_end) {
+ CDEBUG(D_SEC,
+ "read ends at %llu, beyond EOF, client will decompress chunk\n",
+ rnb_end);
+ GOTO(out, rc = 0);
+ }
+ break;
+ }
+ prev = lnbs + j;
+ }
+
+ rc = decompress_chunk_in_lnb(obd_name, lnbs, i, bounce_src,
+ bounce_dst, type, lvl, chunk_size);
+ if (rc)
+ GOTO(out, rc);
+ i += pages_per_chunk;
+ }
+
+out:
+ /* future rnbs and chunks in this io will start later in the lnbs, so
+ * save the offset so we can start searching there
+ */
+ *lnb_start = i;
+
+ /* we were given this rnb because it's unaligned, so we must find a
+ * chunk for unaligned read or something's wrong
+ */
+ LASSERT(chunk_found);
+
+ RETURN(rc);
+}
CDEBUG(D_SEC, "buf_start %llu, buf_end %llu\n", buf_start,
buf_end);
- /* compressd reads must be rounded to cover whole chunks */
+ /* compressed reads must be rounded to cover whole chunks */
if (chunk_size) {
chunk_round(&buf_start, &buf_end, chunk_size);
- /* unaligned reads on compressed files are not supported
- * yet
- */
- if (buf_start != rnb[i].rnb_offset ||
- buf_end != rnb[i].rnb_offset + rnb[i].rnb_len)
- GOTO(buf_put, rc = -EINVAL);
/* if we rounded the chunk, then we're going to do
* decompression and dt_read_prep needs to know this
*/
int ofd_decompress_read(const struct lu_env *env, struct obd_export *exp,
struct obdo *oa, struct niobuf_remote *rnb,
struct niobuf_local *lnb, struct obd_ioobj *obj,
- int npages, int chunk_bits)
+ int npages, enum ll_compr_type type, int lvl,
+ int chunk_bits)
{
struct ofd_device *ofd = ofd_exp(exp);
struct lu_fid *fid = &oa->o_oi.oi_fid;
int buf_bits = chunk_bits + 1;
void *bounce_src = NULL;
void *bounce_dst = NULL;
+ int lnb_start = 0;
int rc = 0;
int i;
ENTRY;
__u64 chunk_end;
__u64 rnb_end;
+ CDEBUG(D_SEC, "lnb_start %d, npages %d\n", lnb_start, npages);
+ if (lnb_start == npages) {
+ CDEBUG(D_SEC, "lnb_start %d, reached end of read pages - any further IOs are past EOF, so no need for decompression\n",
+ lnb_start);
+ break;
+ }
rnb_start = rnb[i].rnb_offset;
rnb_end = rnb[i].rnb_offset + rnb[i].rnb_len;
chunk_start = rnb_start;
chunk_end = rnb_end;
- chunk_round(&chunk_start, &chunk_end, chunk_size);
- /* if the iobuf is not aligned, it has an unaligned read and we
- * must decompress data for that locally
+
+ /* if the iobuf is not chunk aligned, it has an unaligned read
+ * and we must decompress data for that locally
*/
CDEBUG(D_SEC,
"checking: rnb %d rnb_start %llu, rnb_end %llu\n", i,
rnb_start, rnb_end);
+ chunk_round(&chunk_start, &chunk_end, chunk_size);
if (chunk_start != rnb_start || chunk_end != rnb_end) {
/* rounded rnbs can overlap at the chunk level, but if
* they do, we've already decompressed that chunk, so
CDEBUG(D_SEC,
"decompressing: rnb %d rnb_start %llu, rnb_end %llu\n",
i, rnb_start, rnb_end);
- /*rc = decompress_rnb(exp->exp_obd->obd_name, lnb, npages,
- rnb_start, rnb_end, chunk_size,
- bounce_src, bounce_dst);*/
+ rc = decompress_rnb(exp->exp_obd->obd_name, lnb, npages,
+ rnb_start, rnb_end, &lnb_start,
+ bounce_src, bounce_dst, type, lvl,
+ chunk_size);
if (rc)
GOTO(out, rc);
}
if (bounce_src)
sptlrpc_pool_put_pages(&bounce_src, buf_bits);
- /* implementation is incomplete, return EINVAL */
- rc = -EINVAL;
-
ofd_object_put(env, fo);
RETURN(rc);