struct niobuf_remote *rnb, int *nr_local,
struct niobuf_local *lnb, int chunk_bits)
{
+ struct range_lock *range = &ofd_info(env)->fti_write_range;
+ struct dt_object *dt_obj = NULL;
struct ofd_object *fo;
- int i, j, k, rc = 0, tot_bytes = 0;
enum dt_bufs_type dbt = DT_BUFS_TYPE_WRITE;
int chunk_size = chunk_bits ? 1 << chunk_bits : 0;
int maxlnb = *nr_local;
- __u64 begin, end;
- struct range_lock *range = &ofd_info(env)->fti_write_range;
+ __u64 prev_buf_end = 0;
+ int tot_bytes = 0;
+ __u64 begin;
+ int rc = 0;
+ __u64 end;
+ int i;
+ int j;
+ int k;
ENTRY;
LASSERT(env != NULL);
/* parse remote buffers to local buffers and prepare the latter */
for (*nr_local = 0, i = 0, j = 0; i < obj->ioo_bufcnt; i++) {
+ __u64 orig_start;
+ __u64 buf_start;
+ __u64 orig_end;
+ __u64 buf_end;
+ int buf_len;
+
begin = min_t(__u64, begin, rnb[i].rnb_offset);
end = max_t(__u64, end, rnb[i].rnb_offset + rnb[i].rnb_len);
+ CDEBUG(D_SEC, "begin %llu, end %llu\n", begin, end);
if (OBD_FAIL_CHECK(OBD_FAIL_OST_2BIG_NIOBUF))
rnb[i].rnb_len += PAGE_SIZE;
- rc = dt_bufs_get(env, ofd_object_child(fo), lnb + j,
- rnb[i].rnb_offset, rnb[i].rnb_len, maxlnb,
- dbt);
+
+ buf_start = rnb[i].rnb_offset;
+ buf_end = rnb[i].rnb_offset + rnb[i].rnb_len;
+ orig_start = buf_start;
+ orig_end = buf_end;
+
+ CDEBUG(D_SEC, "buf_start %llu, buf_end %llu\n", buf_start,
+ buf_end);
+
+ /* when writing to a compressed file, we have to round the write
+ * to cover full chunks so we can read-modify-write full chunks
+ *
+ * we know the client will not compress unaligned writes
+ * unless they are at or beyond EOF, in which case there is no
+ * need to do read-modify write. So if a write is compressed,
+ * we can ignore it.
+ *
+ * There's a gap here, which is if we had incompressible data
+ * being written beyond EOF, we will do read-modify-write for
+ * that data. This shouldn't be too bad, since read beyond EOF
+ * is basically free.
+ */
+ if (chunk_size && !(rnb[i].rnb_flags & OBD_BRW_COMPRESSED)) {
+ chunk_round(&buf_start, &buf_end, chunk_size);
+
+ /* rounded rnbs can overlap at the chunk level, but it's
+ * important we don't allocate multiple buffers for the
+ * same page, so move the start of this buffer to the
+ * end of the previous one
+ */
+ if (buf_start < prev_buf_end) {
+ buf_start = prev_buf_end;
+ /* two rnbs may be entirely inside the same
+ * chunk, in which case we're already doing IO
+ * for that chunk, so skip it
+ */
+ prev_buf_end = buf_end;
+ if (buf_start == buf_end)
+ continue;
+ }
+
+ if (buf_start != orig_start || buf_end != orig_end) {
+ /* get attr only once for each IO */
+ if (!dt_obj) {
+ dt_obj = ofd_object_child(fo);
+ rc = dt_attr_get(env, dt_obj, la);
+ if (rc)
+ GOTO(err_nolock, rc);
+ }
+ /* if this write is beyond EOF, there's no
+ * compressed data under it, so no need to do
+ * read-modify-write, so no rounding required
+ */
+ if (buf_start >= la->la_size) {
+ buf_start = orig_start;
+ buf_end = orig_end;
+ }
+ }
+ prev_buf_end = buf_end;
+ }
+
+ buf_len = buf_end - buf_start;
+
+ CDEBUG(D_SEC, "buf_start %llu, buf_end %llu\n", buf_start,
+ buf_end);
+
+ rc = dt_bufs_get(env, ofd_object_child(fo), lnb + j, buf_start,
+ buf_len, maxlnb, dbt);
if (unlikely(rc < 0))
GOTO(err_nolock, rc);
LASSERT(rc <= PTLRPC_MAX_BRW_PAGES);