+/** version recovery epoch */
+#define LR_EPOCH_BITS 32
+
+/* Allocate a bitmap for a chunk of reply data slots */
+static int tgt_bitmap_chunk_alloc(struct lu_target *lut, int chunk)
+{
+ unsigned long *bm;
+
+ OBD_ALLOC_LARGE(bm, BITS_TO_LONGS(LUT_REPLY_SLOTS_PER_CHUNK) *
+ sizeof(long));
+ if (bm == NULL)
+ return -ENOMEM;
+
+ spin_lock(&lut->lut_client_bitmap_lock);
+
+ if (lut->lut_reply_bitmap[chunk] != NULL) {
+ /* someone else already allocated the bitmap for this chunk */
+ spin_unlock(&lut->lut_client_bitmap_lock);
+ OBD_FREE_LARGE(bm, BITS_TO_LONGS(LUT_REPLY_SLOTS_PER_CHUNK) *
+ sizeof(long));
+ return 0;
+ }
+
+ lut->lut_reply_bitmap[chunk] = bm;
+
+ spin_unlock(&lut->lut_client_bitmap_lock);
+
+ return 0;
+}
+
+/* Look for an available reply data slot in the bitmap
+ * of the target @lut
+ * Allocate bitmap chunk when first used
+ * XXX algo could be improved if this routine limits performance
+ */
+static int tgt_find_free_reply_slot(struct lu_target *lut)
+{
+ unsigned long *bmp;
+ int chunk = 0;
+ int rc;
+ int b;
+
+ for (chunk = 0; chunk < LUT_REPLY_SLOTS_MAX_CHUNKS; chunk++) {
+ /* allocate the bitmap chunk if necessary */
+ if (unlikely(lut->lut_reply_bitmap[chunk] == NULL)) {
+ rc = tgt_bitmap_chunk_alloc(lut, chunk);
+ if (rc != 0)
+ return rc;
+ }
+ bmp = lut->lut_reply_bitmap[chunk];
+
+ /* look for an available slot in this chunk */
+ do {
+ b = find_first_zero_bit(bmp, LUT_REPLY_SLOTS_PER_CHUNK);
+ if (b >= LUT_REPLY_SLOTS_PER_CHUNK)
+ break;
+
+ /* found one */
+ if (test_and_set_bit(b, bmp) == 0)
+ return chunk * LUT_REPLY_SLOTS_PER_CHUNK + b;
+ } while (true);
+ }
+
+ return -ENOSPC;
+}
+
+/* Mark the reply data slot @idx 'used' in the corresponding bitmap chunk
+ * of the target @lut
+ * Allocate the bitmap chunk if necessary
+ */
+static int tgt_set_reply_slot(struct lu_target *lut, int idx)
+{
+ int chunk;
+ int b;
+ int rc;
+
+ chunk = idx / LUT_REPLY_SLOTS_PER_CHUNK;
+ b = idx % LUT_REPLY_SLOTS_PER_CHUNK;
+
+ LASSERT(chunk < LUT_REPLY_SLOTS_MAX_CHUNKS);
+ LASSERT(b < LUT_REPLY_SLOTS_PER_CHUNK);
+
+ /* allocate the bitmap chunk if necessary */
+ if (unlikely(lut->lut_reply_bitmap[chunk] == NULL)) {
+ rc = tgt_bitmap_chunk_alloc(lut, chunk);
+ if (rc != 0)
+ return rc;
+ }
+
+ /* mark the slot 'used' in this chunk */
+ if (test_and_set_bit(b, lut->lut_reply_bitmap[chunk]) != 0) {
+ CERROR("%s: slot %d already set in bitmap\n",
+ tgt_name(lut), idx);
+ return -EALREADY;
+ }
+
+ return 0;
+}
+
+
+/* Mark the reply data slot @idx 'unused' in the corresponding bitmap chunk
+ * of the target @lut
+ */
+static int tgt_clear_reply_slot(struct lu_target *lut, int idx)
+{
+ int chunk;
+ int b;
+
+ if (lut->lut_obd->obd_stopping)
+ /*
+ * in case of failover keep the bit set in order to
+ * avoid overwriting slots in reply_data which might
+ * be required by resent rpcs
+ */
+ return 0;
+ chunk = idx / LUT_REPLY_SLOTS_PER_CHUNK;
+ b = idx % LUT_REPLY_SLOTS_PER_CHUNK;
+
+ LASSERT(chunk < LUT_REPLY_SLOTS_MAX_CHUNKS);
+ LASSERT(b < LUT_REPLY_SLOTS_PER_CHUNK);
+
+ if (lut->lut_reply_bitmap[chunk] == NULL) {
+ CERROR("%s: slot %d not allocated\n",
+ tgt_name(lut), idx);
+ return -ENOENT;
+ }
+
+ if (test_and_clear_bit(b, lut->lut_reply_bitmap[chunk]) == 0) {
+ CERROR("%s: slot %d already clear in bitmap\n",
+ tgt_name(lut), idx);
+ return -EALREADY;
+ }
+
+ return 0;
+}
+
+
+/* Read header of reply_data file of target @tgt into structure @lrh */
+static int tgt_reply_header_read(const struct lu_env *env,
+ struct lu_target *tgt,
+ struct lsd_reply_header *lrh)
+{
+ int rc;
+ struct lsd_reply_header buf;
+ struct tgt_thread_info *tti = tgt_th_info(env);
+
+ tti->tti_off = 0;
+ tti->tti_buf.lb_buf = &buf;
+ tti->tti_buf.lb_len = sizeof(buf);
+
+ rc = dt_record_read(env, tgt->lut_reply_data, &tti->tti_buf,
+ &tti->tti_off);
+ if (rc != 0)
+ return rc;
+
+ lrh->lrh_magic = le32_to_cpu(buf.lrh_magic);
+ lrh->lrh_header_size = le32_to_cpu(buf.lrh_header_size);
+ lrh->lrh_reply_size = le32_to_cpu(buf.lrh_reply_size);
+
+ CDEBUG(D_HA, "%s: read %s header. magic=0x%08x "
+ "header_size=%d reply_size=%d\n",
+ tgt->lut_obd->obd_name, REPLY_DATA,
+ lrh->lrh_magic, lrh->lrh_header_size, lrh->lrh_reply_size);
+
+ return 0;
+}
+
+/* Write header into replay_data file of target @tgt from structure @lrh */
+static int tgt_reply_header_write(const struct lu_env *env,
+ struct lu_target *tgt,
+ struct lsd_reply_header *lrh)
+{
+ int rc;
+ struct lsd_reply_header buf;
+ struct tgt_thread_info *tti = tgt_th_info(env);
+ struct thandle *th;
+ struct dt_object *dto;
+
+ CDEBUG(D_HA, "%s: write %s header. magic=0x%08x "
+ "header_size=%d reply_size=%d\n",
+ tgt->lut_obd->obd_name, REPLY_DATA,
+ lrh->lrh_magic, lrh->lrh_header_size, lrh->lrh_reply_size);
+
+ if (tgt->lut_bottom->dd_rdonly)
+ RETURN(0);
+
+ buf.lrh_magic = cpu_to_le32(lrh->lrh_magic);
+ buf.lrh_header_size = cpu_to_le32(lrh->lrh_header_size);
+ buf.lrh_reply_size = cpu_to_le32(lrh->lrh_reply_size);
+
+ th = dt_trans_create(env, tgt->lut_bottom);
+ if (IS_ERR(th))
+ return PTR_ERR(th);
+ th->th_sync = 1;
+
+ tti->tti_off = 0;
+ tti->tti_buf.lb_buf = &buf;
+ tti->tti_buf.lb_len = sizeof(buf);
+
+ rc = dt_declare_record_write(env, tgt->lut_reply_data,
+ &tti->tti_buf, tti->tti_off, th);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = dt_trans_start(env, tgt->lut_bottom, th);
+ if (rc)
+ GOTO(out, rc);
+
+ dto = dt_object_locate(tgt->lut_reply_data, th->th_dev);
+ rc = dt_record_write(env, dto, &tti->tti_buf, &tti->tti_off, th);
+out:
+ dt_trans_stop(env, tgt->lut_bottom, th);
+ return rc;
+}
+
+/* Write the reply data @lrd into reply_data file of target @tgt
+ * at offset @off
+ */
+static int tgt_reply_data_write(const struct lu_env *env, struct lu_target *tgt,
+ struct lsd_reply_data *lrd, loff_t off,
+ struct thandle *th)
+{
+ struct tgt_thread_info *tti = tgt_th_info(env);
+ struct dt_object *dto;
+ struct lsd_reply_data *buf = &tti->tti_lrd;
+
+ lrd->lrd_result = ptlrpc_status_hton(lrd->lrd_result);
+
+ buf->lrd_transno = cpu_to_le64(lrd->lrd_transno);
+ buf->lrd_xid = cpu_to_le64(lrd->lrd_xid);
+ buf->lrd_data = cpu_to_le64(lrd->lrd_data);
+ buf->lrd_result = cpu_to_le32(lrd->lrd_result);
+ buf->lrd_client_gen = cpu_to_le32(lrd->lrd_client_gen);
+
+ lrd->lrd_result = ptlrpc_status_ntoh(lrd->lrd_result);
+
+ tti->tti_off = off;
+ tti->tti_buf.lb_buf = buf;
+ tti->tti_buf.lb_len = sizeof(*buf);
+
+ dto = dt_object_locate(tgt->lut_reply_data, th->th_dev);
+ return dt_record_write(env, dto, &tti->tti_buf, &tti->tti_off, th);
+}
+
+/* Read the reply data from reply_data file of target @tgt at offset @off
+ * into structure @lrd
+ */
+static int tgt_reply_data_read(const struct lu_env *env, struct lu_target *tgt,
+ struct lsd_reply_data *lrd, loff_t off)
+{
+ int rc;
+ struct tgt_thread_info *tti = tgt_th_info(env);
+ struct lsd_reply_data *buf = &tti->tti_lrd;
+
+ tti->tti_off = off;
+ tti->tti_buf.lb_buf = buf;
+ tti->tti_buf.lb_len = sizeof(*buf);
+
+ rc = dt_record_read(env, tgt->lut_reply_data, &tti->tti_buf,
+ &tti->tti_off);
+ if (rc != 0)
+ return rc;
+
+ lrd->lrd_transno = le64_to_cpu(buf->lrd_transno);
+ lrd->lrd_xid = le64_to_cpu(buf->lrd_xid);
+ lrd->lrd_data = le64_to_cpu(buf->lrd_data);
+ lrd->lrd_result = le32_to_cpu(buf->lrd_result);
+ lrd->lrd_client_gen = le32_to_cpu(buf->lrd_client_gen);
+
+ return 0;
+}
+
+
+/* Free the in-memory reply data structure @trd and release
+ * the corresponding slot in the reply_data file of target @lut
+ * Called with ted_lcd_lock held
+ */
+static void tgt_free_reply_data(struct lu_target *lut,
+ struct tg_export_data *ted,
+ struct tg_reply_data *trd)
+{
+ CDEBUG(D_TRACE, "%s: free reply data %p: xid %llu, transno %llu, "
+ "client gen %u, slot idx %d\n",
+ lut == NULL ? "" : tgt_name(lut), trd, trd->trd_reply.lrd_xid,
+ trd->trd_reply.lrd_transno, trd->trd_reply.lrd_client_gen,
+ trd->trd_index);
+
+ LASSERT(mutex_is_locked(&ted->ted_lcd_lock));
+
+ list_del(&trd->trd_list);
+ ted->ted_reply_cnt--;
+ if (lut != NULL)
+ tgt_clear_reply_slot(lut, trd->trd_index);
+ OBD_FREE_PTR(trd);
+}
+
+/* Release the reply data @trd from target @lut
+ * The reply data with the highest transno for this export
+ * is retained to ensure correctness of target recovery
+ * Called with ted_lcd_lock held
+ */
+static void tgt_release_reply_data(struct lu_target *lut,
+ struct tg_export_data *ted,
+ struct tg_reply_data *trd)
+{
+ CDEBUG(D_TRACE, "%s: release reply data %p: xid %llu, transno %llu, "
+ "client gen %u, slot idx %d\n",
+ lut == NULL ? "" : tgt_name(lut), trd, trd->trd_reply.lrd_xid,
+ trd->trd_reply.lrd_transno, trd->trd_reply.lrd_client_gen,
+ trd->trd_index);
+
+ LASSERT(mutex_is_locked(&ted->ted_lcd_lock));
+
+ /* Do not free the reply data corresponding to the
+ * highest transno of this export.
+ * This ensures on-disk reply data is kept and
+ * last committed transno can be restored from disk in case
+ * of target recovery
+ */
+ if (trd->trd_reply.lrd_transno == ted->ted_lcd->lcd_last_transno) {
+ /* free previous retained reply */
+ if (ted->ted_reply_last != NULL)
+ tgt_free_reply_data(lut, ted, ted->ted_reply_last);
+ /* retain the reply */
+ list_del_init(&trd->trd_list);
+ ted->ted_reply_last = trd;
+ } else {
+ tgt_free_reply_data(lut, ted, trd);
+ }
+}
+