int (*o_preprw)(const struct lu_env *env, int cmd,
struct obd_export *exp, struct obdo *oa, int objcount,
struct obd_ioobj *obj, struct niobuf_remote *remote,
- int *nr_pages, struct niobuf_local *local,
- enum ll_compr_type type, int lvl, int chunk_bits);
+ int *nr_pages, struct niobuf_local *lnb,
+ struct niobuf_local *lnb2, enum ll_compr_type type,
+ int lvl, int chunk_bits);
int (*o_commitrw)(const struct lu_env *env, int cmd,
struct obd_export *exp, struct obdo *oa,
int objcount, struct obd_ioobj *obj,
struct obd_export *exp, struct obdo *oa,
int objcount, struct obd_ioobj *obj,
struct niobuf_remote *remote, int *pages,
- struct niobuf_local *local,
+ struct niobuf_local *lnb,
+ struct niobuf_local *lnb2,
enum ll_compr_type type, int lvl, int chunk_bits)
{
int rc;
}
rc = OBP(exp->exp_obd, preprw)(env, cmd, exp, oa, objcount, obj, remote,
- pages, local, type, lvl, chunk_bits);
+ pages, lnb, lnb2, type, lvl, chunk_bits);
RETURN(rc);
}
int mdt_obd_preprw(const struct lu_env *env, int cmd, struct obd_export *exp,
struct obdo *oa, int objcount, struct obd_ioobj *obj,
struct niobuf_remote *rnb, int *nr_local,
- struct niobuf_local *lnb, enum ll_compr_type type, int lvl,
- int chunk_bits);
+ struct niobuf_local *lnb, struct niobuf_local *unused,
+ enum ll_compr_type type, int lvl, int chunk_bits);
int mdt_obd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp,
struct obdo *oa, int objcount, struct obd_ioobj *obj,
int mdt_obd_preprw(const struct lu_env *env, int cmd, struct obd_export *exp,
struct obdo *oa, int objcount, struct obd_ioobj *obj,
struct niobuf_remote *rnb, int *nr_local,
- struct niobuf_local *lnb, enum ll_compr_type type, int lvl,
- int chunk_bits)
+ struct niobuf_local *lnb, struct niobuf_local *unused,
+ enum ll_compr_type type, int lvl, int chunk_bits)
{
struct tgt_session_info *tsi = tgt_ses_info(env);
struct mdt_thread_info *info = tsi2mdt_info(tsi);
struct obd_export *export, struct obdo *oa,
int objcount, struct obd_ioobj *obj,
struct niobuf_remote *nb, int *pages,
- struct niobuf_local *res, enum ll_compr_type type,
+ struct niobuf_local *res,
+ struct niobuf_local *unused, enum ll_compr_type type,
int lvl, int chunk_bits)
{
struct obd_device *obd;
lpages = npages;
ret = obd_preprw(env, rw, exp, oa, 1, &ioo, &rnb, &lpages, lnb,
- 0, 0, 0);
+ NULL, 0, 0, 0);
if (ret != 0)
GOTO(out, ret);
int ofd_preprw(const struct lu_env *env, int cmd, struct obd_export *exp,
struct obdo *oa, int objcount, struct obd_ioobj *obj,
struct niobuf_remote *rnb, int *nr_local,
- struct niobuf_local *lnb, enum ll_compr_type type, int lvl,
- int chunk_bits);
+ struct niobuf_local *lnb, struct niobuf_local *lnb2,
+ enum ll_compr_type type, int lvl, int chunk_bits);
int ofd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp,
struct obdo *oa, int objcount, struct obd_ioobj *obj,
struct niobuf_remote *rnb, int npages,
struct lu_attr *la, struct obdo *oa,
int objcount, struct obd_ioobj *obj,
struct niobuf_remote *rnb, int *nr_local,
- struct niobuf_local *lnb, enum ll_compr_type type,
- int lvl, int chunk_bits)
+ struct niobuf_local *write_lnb,
+ struct niobuf_local *read_lnb,
+ enum ll_compr_type type, int lvl, int chunk_bits)
{
struct range_lock *range = &ofd_info(env)->fti_write_range;
struct dt_object *dt_obj = NULL;
CDEBUG(D_SEC, "buf_start %llu, buf_end %llu\n", buf_start,
buf_end);
- rc = dt_bufs_get(env, ofd_object_child(fo), lnb + j, buf_start,
+ rc = dt_bufs_get(env, ofd_object_child(fo), write_lnb + j, buf_start,
buf_len, maxlnb, dbt);
if (unlikely(rc < 0))
GOTO(err_nolock, rc);
LASSERT(rc <= PTLRPC_MAX_BRW_PAGES);
/* correct index for local buffers to continue with */
for (k = 0; k < rc; k++) {
- lnb[j+k].lnb_flags = rnb[i].rnb_flags;
- lnb[j+k].lnb_flags &= ~OBD_BRW_LOCALS;
+ write_lnb[j+k].lnb_flags = rnb[i].rnb_flags;
+ write_lnb[j+k].lnb_flags &= ~OBD_BRW_LOCALS;
if (!(rnb[i].rnb_flags & OBD_BRW_GRANTED))
- lnb[j+k].lnb_rc = -ENOSPC;
+ write_lnb[j+k].lnb_rc = -ENOSPC;
}
j += rc;
*nr_local += rc;
}
if (compr_unaligned_write) {
- rc = dt_read_prep(env, ofd_object_child(fo), lnb, *nr_local,
- true);
+ read_lnb = write_lnb;
+
+ rc = dt_read_prep(env, ofd_object_child(fo), read_lnb,
+ *nr_local, true);
if (unlikely(rc != 0))
GOTO(err, rc);
- rc = ofd_decompress_read(env, exp, oa, rnb, lnb, obj, *nr_local,
- type, lvl, chunk_bits, true);
+ rc = ofd_decompress_read(env, exp, oa, rnb, read_lnb, obj,
+ *nr_local, type, lvl, chunk_bits,
+ true);
if (unlikely(rc != 0))
GOTO(err, rc);
/* read_prep sets lnb_rc if it read data, or on error, but the
* on error
*/
for (i = 0; i < *nr_local; i++) {
- if (lnb[i].lnb_rc > 0)
- lnb[i].lnb_rc = 0;
+ if (read_lnb[i].lnb_rc > 0)
+ read_lnb[i].lnb_rc = 0;
}
}
- rc = dt_write_prep(env, ofd_object_child(fo), lnb, *nr_local);
+ rc = dt_write_prep(env, ofd_object_child(fo), write_lnb, *nr_local);
if (unlikely(rc != 0))
GOTO(err, rc);
err:
ofd_read_unlock(env, fo);
err_nolock:
- dt_bufs_put(env, ofd_object_child(fo), lnb, *nr_local);
+ dt_bufs_put(env, ofd_object_child(fo), write_lnb, *nr_local);
ofd_object_put(env, fo);
/* tgt_grant_prepare_write() was called, so we must commit */
tgt_grant_commit(exp, oa->o_grant_used, rc);
* \param[in] rnb remote buffers
* \param[in] nr_local number of local buffers
* \param[in] lnb local buffers
+ * \param[in] lnb2 second set of local buffers, used by preprw_write
* \param[in] chunk compression chunk size
*
* \retval 0 on successful prepare
int ofd_preprw(const struct lu_env *env, int cmd, struct obd_export *exp,
struct obdo *oa, int objcount, struct obd_ioobj *obj,
struct niobuf_remote *rnb, int *nr_local,
- struct niobuf_local *lnb, enum ll_compr_type type, int lvl,
- int chunk_bits)
+ struct niobuf_local *lnb, struct niobuf_local *lnb2,
+ enum ll_compr_type type, int lvl, int chunk_bits)
{
struct tgt_session_info *tsi = tgt_ses_info(env);
struct ofd_device *ofd = ofd_exp(exp);
if (cmd == OBD_BRW_WRITE) {
la_from_obdo(&info->fti_attr, oa, OBD_MD_FLGETATTR);
rc = ofd_preprw_write(env, exp, ofd, fid, &info->fti_attr, oa,
- objcount, obj, rnb, nr_local, lnb, type,
- lvl, chunk_bits);
+ objcount, obj, rnb, nr_local, lnb, lnb2,
+ type, lvl, chunk_bits);
} else if (cmd == OBD_BRW_READ) {
tgt_grant_prepare_read(env, exp, oa);
rc = ofd_preprw_read(env, exp, ofd, fid, &info->fti_attr, oa,
kstart = ktime_get();
rc = obd_preprw(tsi->tsi_env, OBD_BRW_READ, exp, &repbody->oa, 1,
- ioo, remote_nb, &npages_local, local_io_nb,
+ ioo, remote_nb, &npages_local, local_io_nb, NULL,
compr_type, compr_lvl, chunk_bits);
if (rc != 0)
GOTO(out_lock, rc);
struct niobuf_remote chunk_lock_rnb;
struct ptlrpc_bulk_desc *desc = NULL;
struct lustre_handle lockh = {0};
- struct niobuf_local *local_io_nb;
+ struct niobuf_local *local_write_nb;
+ struct niobuf_local *local_read_nb;
struct niobuf_local *local_tx_nb;
struct niobuf_remote *remote_nb;
struct ost_body *repbody;
CFS_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_PACK, cfs_fail_val);
rcs = req_capsule_server_get(&req->rq_pill, &RMF_RCS);
- local_io_nb = tbc->tbc_lnb;
+ local_write_nb = tbc->tbc_lnb;
/* by default, the same local iobuf is used for io and transfer.
* compression sometimes changes this
*/
local_tx_nb = tbc->tbc_lnb;
+ /* compressed writes need a second set of lnb pointers for tracking the
+ * read they must do for read-modify-write
+ */
+ local_read_nb = tbc->tbc_lnb2;
olc = &body->oa.o_layout_compr;
compr_type = olc->ol_compr_type;
remote_nb[i].rnb_len);
kstart = ktime_get();
rc = obd_preprw(tsi->tsi_env, OBD_BRW_WRITE, exp, &repbody->oa,
- objcount, ioo, remote_nb, &npages_local, local_io_nb,
- compr_type, compr_lvl, chunk_bits);
+ objcount, ioo, remote_nb, &npages_local, local_write_nb,
+ local_read_nb, compr_type, compr_lvl, chunk_bits);
if (rc < 0)
GOTO(out_lock, rc);
if (npages_local != npages_remote) {
LASSERT(compr_type != LL_COMPR_TYPE_NONE);
local_tx_nb = tbc->tbc_lnb2;
- io_lnb_to_tx_lnb(local_io_nb, local_tx_nb, remote_nb,
+ io_lnb_to_tx_lnb(local_write_nb, local_tx_nb, remote_nb,
niocount, npages_local);
}
/* Must commit after prep above in all cases */
rc = obd_commitrw(tsi->tsi_env, OBD_BRW_WRITE, exp, &repbody->oa,
- objcount, ioo, remote_nb, npages_local, local_io_nb,
- rc, nob, kstart);
+ objcount, ioo, remote_nb, npages_local,
+ local_write_nb, rc, nob, kstart);
if (rc == -ENOTCONN)
/* quota acquire process has been given up because
* either the client has been evicted or the client
no_reply = true;
for (i = 0; i < niocount; i++) {
- if (!(local_io_nb[i].lnb_flags & OBD_BRW_ASYNC)) {
+ if (!(local_write_nb[i].lnb_flags & OBD_BRW_ASYNC)) {
wait_sync = true;
break;
}