struct dt_object *dt_obj = arg->object;
int rc;
+ CDEBUG(D_INFO, "write "DFID" pos "LPU64" buf %p, len %lu\n",
+ PFID(lu_object_fid(&dt_obj->do_lu)), arg->u.write.pos,
+ arg->u.write.buf.lb_buf, (unsigned long)arg->u.write.buf.lb_len);
+
dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
rc = dt_record_write(env, dt_obj, &arg->u.write.buf,
&arg->u.write.pos, th);
RETURN(rc);
}
+static int out_read(struct tgt_session_info *tsi)
+{
+ const struct lu_env *env = tsi->tsi_env;
+ struct tgt_thread_info *tti = tgt_th_info(env);
+ struct object_update *update = tti->tti_u.update.tti_update;
+ struct dt_object *obj = tti->tti_u.update.tti_dt_object;
+ struct object_update_reply *reply = tti->tti_u.update.tti_update_reply;
+ int index = tti->tti_u.update.tti_update_reply_index;
+ struct object_update_result *update_result;
+ struct lu_buf *lbuf = &tti->tti_buf;
+ struct out_read_reply *orr;
+ void *tmp;
+ size_t size;
+ __u64 pos;
+ int rc;
+ ENTRY;
+
+ update_result = object_update_result_get(reply, index, NULL);
+ LASSERT(update_result != NULL);
+ update_result->our_datalen = sizeof(*orr);
+
+ if (!lu_object_exists(&obj->do_lu))
+ GOTO(out, rc = -ENOENT);
+
+ tmp = object_update_param_get(update, 0, NULL);
+ if (tmp == NULL) {
+ CERROR("%s: empty size for read: rc = %d\n",
+ tgt_name(tsi->tsi_tgt), -EPROTO);
+ GOTO(out, rc = err_serious(-EPROTO));
+ }
+ size = le64_to_cpu(*(size_t *)(tmp));
+
+ tmp = object_update_param_get(update, 1, NULL);
+ if (tmp == NULL) {
+ CERROR("%s: empty pos for read: rc = %d\n",
+ tgt_name(tsi->tsi_tgt), -EPROTO);
+ GOTO(out, rc = err_serious(-EPROTO));
+ }
+ pos = le64_to_cpu(*(__u64 *)(tmp));
+
+ if (size > OUT_UPDATE_REPLY_SIZE -
+ cfs_size_round((unsigned long)update_result->our_data -
+ (unsigned long)update_result) - sizeof(pos)) {
+ CERROR("%s: get %zu the biggest read size is %d: rc = %d\n",
+ tgt_name(tsi->tsi_tgt), size, OUT_UPDATE_REPLY_SIZE,
+ -EPROTO);
+ GOTO(out, rc = err_serious(-EPROTO));
+ }
+
+ /* Put the offset into the begining of the buffer in reply */
+ orr = (struct out_read_reply *)update_result->our_data;
+
+ lbuf->lb_buf = orr->orr_data;
+ lbuf->lb_len = size;
+
+ dt_read_lock(env, obj, MOR_TGT_CHILD);
+ rc = dt_read(env, obj, lbuf, &pos);
+ dt_read_unlock(env, obj);
+ orr->orr_size = rc < 0 ? 0 : rc;
+ orr->orr_offset = pos;
+
+ orr_cpu_to_le(orr, orr);
+ update_result->our_datalen += orr->orr_size;
+out:
+ /* Insert read buffer */
+ update_result->our_rc = ptlrpc_status_hton(rc);
+ reply->ourp_lens[index] = cfs_size_round(update_result->our_datalen +
+ sizeof(*update_result));
+ RETURN(rc);
+}
+
#define DEF_OUT_HNDL(opc, name, flags, fn) \
[opc - OUT_CREATE] = { \
.th_name = name, \
DEF_OUT_HNDL(OUT_INDEX_DELETE, "out_index_delete",
MUTABOR | HABEO_REFERO, out_index_delete),
DEF_OUT_HNDL(OUT_WRITE, "out_write", MUTABOR | HABEO_REFERO, out_write),
+ DEF_OUT_HNDL(OUT_READ, "out_read", HABEO_REFERO, out_read),
+
};
static struct tgt_handler *out_handler_find(__u32 opc)
dt_obd_name(ta->ta_handle->th_dev), i, ta->ta_argno, rc);
}
- /* Only fail for real update */
- tsi->tsi_reply_fail_id = OBD_FAIL_OUT_UPDATE_NET_REP;
+ /* Only fail for real updates, XXX right now llog updates will be
+ * ignore, whose updates count is usually 1, so failover test
+ * case will spot this FAIL_UPDATE_NET_REP precisely, and it will
+ * be removed after async update patch is landed. */
+ if (ta->ta_argno > 1)
+ tsi->tsi_reply_fail_id = OBD_FAIL_OUT_UPDATE_NET_REP;
+
stop:
rc1 = out_trans_stop(env, ta, rc);
if (rc == 0)