From f8fa1a8a572f7668058bc29de23da586dc40c855 Mon Sep 17 00:00:00 2001 From: lnbest0707 <106711887+lnbest0707-uber@users.noreply.github.com> Date: Thu, 2 Jan 2025 10:47:17 -0800 Subject: [PATCH] Expose msg length info to metadata (#14688) * Expose msg length info to metadata * Address comment --- .../org/apache/pinot/spi/stream/StreamDataDecoderImpl.java | 2 ++ .../org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java index 127ecfe12156..35721fcb826a 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java @@ -30,6 +30,7 @@ public class StreamDataDecoderImpl implements StreamDataDecoder { public static final String KEY = "__key"; public static final String HEADER_KEY_PREFIX = "__header$"; public static final String METADATA_KEY_PREFIX = "__metadata$"; + public static final String RECORD_SERIALIZED_VALUE_SIZE_KEY = METADATA_KEY_PREFIX + "recordSerializedValueSize"; private final StreamMessageDecoder _valueDecoder; private final GenericRow _reuse = new GenericRow(); @@ -65,6 +66,7 @@ public StreamDataDecoderResult decode(StreamMessage message) { if (metadata.getRecordMetadata() != null) { metadata.getRecordMetadata().forEach((key, value) -> row.putValue(METADATA_KEY_PREFIX + key, value)); } + row.putValue(RECORD_SERIALIZED_VALUE_SIZE_KEY, message.getLength()); } return new StreamDataDecoderResult(row, null); } else { diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java index f9f6aafc11d7..a2ddec6d99b2 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java @@ -71,11 +71,12 @@ public void testDecodeKeyAndHeaders() Assert.assertNotNull(result.getResult()); GenericRow row = result.getResult(); - Assert.assertEquals(row.getFieldToValueMap().size(), 4); + Assert.assertEquals(row.getFieldToValueMap().size(), 5); Assert.assertEquals(row.getValue(NAME_FIELD), value); Assert.assertEquals(row.getValue(StreamDataDecoderImpl.KEY), key, "Failed to decode record key"); Assert.assertEquals(row.getValue(StreamDataDecoderImpl.HEADER_KEY_PREFIX + AGE_HEADER_KEY), 3); Assert.assertEquals(row.getValue(StreamDataDecoderImpl.METADATA_KEY_PREFIX + SEQNO_RECORD_METADATA), "1"); + Assert.assertEquals(row.getValue(StreamDataDecoderImpl.RECORD_SERIALIZED_VALUE_SIZE_KEY), value.length()); } @Test