struct obdo *oa);
int (*o_getattr)(const struct lu_env *env, struct obd_export *exp,
struct obdo *oa);
+ int (*o_decompress_read)(struct obd_export *exp,
+ struct niobuf_remote *rnb,
+ struct niobuf_local *lnb,
+ struct obd_ioobj *obj, int npages,
+ int chunk_bits);
int (*o_preprw)(const struct lu_env *env, int cmd,
struct obd_export *exp, struct obdo *oa, int objcount,
struct obd_ioobj *obj, struct niobuf_remote *remote,
RETURN(rc);
}
+static inline int obd_decompress_read(struct obd_export *exp,
+ struct niobuf_remote *rnb,
+ struct niobuf_local *lnb,
+ struct obd_ioobj *obj, int npages,
+ int chunk_bits)
+{
+ int rc;
+
+ ENTRY;
+
+ rc = exp_check_ops(exp);
+ if (rc)
+ RETURN(rc);
+
+ if (!exp->exp_obd->obd_type->typ_dt_ops->o_decompress_read) {
+ CERROR("%s: no %s operation\n",
+ (exp)->exp_obd->obd_name, __func__);
+ RETURN(-EOPNOTSUPP);
+ }
+
+ rc = OBP(exp->exp_obd, decompress_read)(exp, rnb, lnb, obj, npages,
+ chunk_bits);
+
+ RETURN(rc);
+}
+
static inline int obd_preprw(const struct lu_env *env, int cmd,
struct obd_export *exp, struct obdo *oa,
int objcount, struct obd_ioobj *obj,
struct obdo *oa);
int ofd_verify_layout_version(const struct lu_env *env,
struct ofd_object *fo, const struct obdo *oa);
+int ofd_decompress_read(struct obd_export *exp, struct niobuf_remote *rnb,
+ struct niobuf_local *lnb, struct obd_ioobj *obj,
+ int npages, int chunk_bits);
int ofd_preprw(const struct lu_env *env,int cmd, struct obd_export *exp,
struct obdo *oa, int objcount, struct obd_ioobj *obj,
struct niobuf_remote *rnb, int *nr_local,
return rc;
}
+/* this function handles decompression for the local niobufs for a chunk
+ * unaligned read
+ *
+ * it takes a set of local niobufs which contain raw data from disk, then uses
+ * the remote niobuf to identify unaligned reads and passes the corresponding
+ * local niobufs to a function which decompresses them in place
+ *
+ * the result is a set of lnbs containing decompressed data for the unaligned
+ * portion of client reads and raw data for the aligned portion
+ */
+int ofd_decompress_read(struct obd_export *exp, struct niobuf_remote *rnb,
+ struct niobuf_local *lnb, struct obd_ioobj *obj,
+ int npages, int chunk_bits)
+{
+ int chunk_size = 1 << chunk_bits;
+ int niocount = obj->ioo_bufcnt;
+ int buf_bits = chunk_bits + 1;
+ void *bounce_src = NULL;
+ void *bounce_dst = NULL;
+ int rc = 0;
+ int i;
+ ENTRY;
+
+ for (i = 0; i < niocount; i++) {
+ __u64 prev_rnb_end = 0;
+ __u64 chunk_start;
+ __u64 rnb_start;
+ __u64 chunk_end;
+ __u64 rnb_end;
+
+ rnb_start = rnb[i].rnb_offset;
+ rnb_end = rnb[i].rnb_offset + rnb[i].rnb_len;
+
+ chunk_start = rnb_start;
+ chunk_end = rnb_end;
+ chunk_round(&chunk_start, &chunk_end, chunk_size);
+ /* if the iobuf is not aligned, it has an unaligned read and we
+ * must decompress data for that locally
+ */
+ CDEBUG(D_SEC,
+ "checking: rnb %d rnb_start %llu, rnb_end %llu\n", i,
+ rnb_start, rnb_end);
+ if (chunk_start != rnb_start || chunk_end != rnb_end) {
+ /* rounded rnbs can overlap at the chunk level, but if
+ * they do, we've already decompressed that chunk, so
+ * start at the end of that chunk
+ */
+ if (rnb_start < prev_rnb_end) {
+ rnb_start = prev_rnb_end;
+ /* it's possible both rnbs are in the same
+ * chunk, and in that case, we've already
+ * decompressed the chunk, so skip
+ */
+ if (rnb_start == rnb_end) {
+ CDEBUG(D_SEC,
+ "skipping rnb %d, already decompressed\n",
+ i);
+ prev_rnb_end = rnb_end;
+ continue;
+ }
+ }
+ /* only allocate buffers if they haven't been allocated
+ * yet
+ */
+ if (!bounce_src) {
+ sptlrpc_pool_get_pages(&bounce_src, buf_bits);
+ if (bounce_src == NULL)
+ GOTO(out, rc = -ENOMEM);
+ }
+
+ if (!bounce_dst) {
+ sptlrpc_pool_get_pages(&bounce_dst, buf_bits);
+ if (bounce_dst == NULL)
+ GOTO(out, rc = -ENOMEM);
+ }
+
+ CDEBUG(D_SEC,
+ "decompressing: rnb %d rnb_start %llu, rnb_end %llu\n",
+ i, rnb_start, rnb_end);
+ /*rc = decompress_rnb(exp->exp_obd->obd_name, lnb, npages,
+ rnb_start, rnb_end, chunk_size,
+ bounce_src, bounce_dst);*/
+ if (rc)
+ GOTO(out, rc);
+ }
+ prev_rnb_end = rnb_end;
+ }
+
+out:
+ if (bounce_dst)
+ sptlrpc_pool_put_pages(&bounce_dst, buf_bits);
+ if (bounce_src)
+ sptlrpc_pool_put_pages(&bounce_src, buf_bits);
+
+ /* implementation is incomplete, return EINVAL */
+ rc = -EINVAL;
+
+ RETURN(rc);
+}
+
/**
* Prepare bulk IO requests for processing.
*
.o_create = ofd_echo_create,
.o_statfs = ofd_statfs,
.o_setattr = ofd_echo_setattr,
+ .o_decompress_read = ofd_decompress_read,
.o_preprw = ofd_preprw,
.o_commitrw = ofd_commitrw,
.o_destroy = ofd_echo_destroy,
enum ll_compr_type compr_type;
int npages_remote = 0;
int chunk_size = 0;
+ int chunk_bits = 0;
int no_reply = 0;
int npages_local;
int npages_read;
ktime_t kstart;
- int compr_lvl;
int niocount;
int nob = 0;
int rc;
olc = &body->oa.o_layout_compr;
compr_type = olc->ol_compr_type;
- compr_lvl = olc->ol_compr_lvl;
if (compr_type != LL_COMPR_TYPE_NONE) {
unsigned int chunk_log_bits;
__u64 chunk_start;
__u64 io_end;
chunk_log_bits = olc->ol_compr_chunk_log_bits;
+ chunk_bits = chunk_log_bits + COMPR_CHUNK_MIN_BITS;
chunk_size = COMPR_GET_CHUNK_SIZE(chunk_log_bits);
/* rnbs are in offset order, so we get the start of IO from the
if (rc != 0)
GOTO(out_lock, rc);
- if (compr_type == LL_COMPR_TYPE_NONE) {
+ /* the server is responsible for decompressing partial chunk reads */
+ if (npages_local != npages_remote) {
+ rc = obd_decompress_read(exp, remote_nb, local_io_nb, ioo,
+ npages_local, chunk_bits);
+ if (rc != 0)
+ GOTO(out_commitrw, rc);
+ } else {
/* if there's no compression, the local page count should be
* identical to that requested by the client
*/