Whamcloud - gitweb
LU-3536 lod: write updates to update log
[fs/lustre-release.git] / lustre / target / out_handler.c
index 68439f1..635ea99 100644 (file)
@@ -1243,6 +1243,10 @@ static int out_tx_write_exec(const struct lu_env *env, struct thandle *th,
        struct dt_object *dt_obj = arg->object;
        int rc;
 
+       CDEBUG(D_INFO, "write "DFID" pos "LPU64" buf %p, len %lu\n",
+              PFID(lu_object_fid(&dt_obj->do_lu)), arg->u.write.pos,
+              arg->u.write.buf.lb_buf, (unsigned long)arg->u.write.buf.lb_len);
+
        dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
        rc = dt_record_write(env, dt_obj, &arg->u.write.buf,
                             &arg->u.write.pos, th);
@@ -1325,6 +1329,77 @@ static int out_write(struct tgt_session_info *tsi)
        RETURN(rc);
 }
 
+static int out_read(struct tgt_session_info *tsi)
+{
+       const struct lu_env     *env = tsi->tsi_env;
+       struct tgt_thread_info  *tti = tgt_th_info(env);
+       struct object_update    *update = tti->tti_u.update.tti_update;
+       struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
+       struct object_update_reply *reply = tti->tti_u.update.tti_update_reply;
+       int             index = tti->tti_u.update.tti_update_reply_index;
+       struct object_update_result *update_result;
+       struct lu_buf           *lbuf = &tti->tti_buf;
+       struct out_read_reply   *orr;
+       void                    *tmp;
+       size_t                  size;
+       __u64                   pos;
+       int                      rc;
+       ENTRY;
+
+       update_result = object_update_result_get(reply, index, NULL);
+       LASSERT(update_result != NULL);
+       update_result->our_datalen = sizeof(*orr);
+
+       if (!lu_object_exists(&obj->do_lu))
+               GOTO(out, rc = -ENOENT);
+
+       tmp = object_update_param_get(update, 0, NULL);
+       if (tmp == NULL) {
+               CERROR("%s: empty size for read: rc = %d\n",
+                      tgt_name(tsi->tsi_tgt), -EPROTO);
+               GOTO(out, rc = err_serious(-EPROTO));
+       }
+       size = le64_to_cpu(*(size_t *)(tmp));
+
+       tmp = object_update_param_get(update, 1, NULL);
+       if (tmp == NULL) {
+               CERROR("%s: empty pos for read: rc = %d\n",
+                      tgt_name(tsi->tsi_tgt), -EPROTO);
+               GOTO(out, rc = err_serious(-EPROTO));
+       }
+       pos = le64_to_cpu(*(__u64 *)(tmp));
+
+       if (size > OUT_UPDATE_REPLY_SIZE -
+                  cfs_size_round((unsigned long)update_result->our_data -
+                                 (unsigned long)update_result) - sizeof(pos)) {
+               CERROR("%s: get %zu the biggest read size is %d: rc = %d\n",
+                      tgt_name(tsi->tsi_tgt), size, OUT_UPDATE_REPLY_SIZE,
+                      -EPROTO);
+               GOTO(out, rc = err_serious(-EPROTO));
+       }
+
+       /* Put the offset into the begining of the buffer in reply */
+       orr = (struct out_read_reply *)update_result->our_data;
+
+       lbuf->lb_buf = orr->orr_data;
+       lbuf->lb_len = size;
+
+       dt_read_lock(env, obj, MOR_TGT_CHILD);
+       rc = dt_read(env, obj, lbuf, &pos);
+       dt_read_unlock(env, obj);
+       orr->orr_size = rc < 0 ? 0 : rc;
+       orr->orr_offset = pos;
+
+       orr_cpu_to_le(orr, orr);
+       update_result->our_datalen += orr->orr_size;
+out:
+       /* Insert read buffer */
+       update_result->our_rc = ptlrpc_status_hton(rc);
+       reply->ourp_lens[index] = cfs_size_round(update_result->our_datalen +
+                                                sizeof(*update_result));
+       RETURN(rc);
+}
+
 #define DEF_OUT_HNDL(opc, name, flags, fn)     \
 [opc - OUT_CREATE] = {                                 \
        .th_name    = name,                             \
@@ -1362,6 +1437,8 @@ static struct tgt_handler out_update_ops[] = {
        DEF_OUT_HNDL(OUT_INDEX_DELETE, "out_index_delete",
                     MUTABOR | HABEO_REFERO, out_index_delete),
        DEF_OUT_HNDL(OUT_WRITE, "out_write", MUTABOR | HABEO_REFERO, out_write),
+       DEF_OUT_HNDL(OUT_READ, "out_read", HABEO_REFERO, out_read),
+
 };
 
 static struct tgt_handler *out_handler_find(__u32 opc)
@@ -1479,8 +1556,13 @@ static int out_tx_end(const struct lu_env *env, struct thandle_exec_args *ta,
                       dt_obd_name(ta->ta_handle->th_dev), i, ta->ta_argno, rc);
        }
 
-       /* Only fail for real update */
-       tsi->tsi_reply_fail_id = OBD_FAIL_OUT_UPDATE_NET_REP;
+       /* Only fail for real updates, XXX right now llog updates will be
+       * ignore, whose updates count is usually 1, so failover test
+       * case will spot this FAIL_UPDATE_NET_REP precisely, and it will
+       * be removed after async update patch is landed. */
+       if (ta->ta_argno > 1)
+               tsi->tsi_reply_fail_id = OBD_FAIL_OUT_UPDATE_NET_REP;
+
 stop:
        rc1 = out_trans_stop(env, ta, rc);
        if (rc == 0)