Skip to content

Commit

Permalink
Expose msg length info to metadata (#14688)
Browse files Browse the repository at this point in the history
* Expose msg length info to metadata

* Address comment
  • Loading branch information
lnbest0707-uber authored Jan 2, 2025
1 parent 97cbbfc commit f8fa1a8
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class StreamDataDecoderImpl implements StreamDataDecoder {
public static final String KEY = "__key";
public static final String HEADER_KEY_PREFIX = "__header$";
public static final String METADATA_KEY_PREFIX = "__metadata$";
public static final String RECORD_SERIALIZED_VALUE_SIZE_KEY = METADATA_KEY_PREFIX + "recordSerializedValueSize";

private final StreamMessageDecoder _valueDecoder;
private final GenericRow _reuse = new GenericRow();
Expand Down Expand Up @@ -65,6 +66,7 @@ public StreamDataDecoderResult decode(StreamMessage message) {
if (metadata.getRecordMetadata() != null) {
metadata.getRecordMetadata().forEach((key, value) -> row.putValue(METADATA_KEY_PREFIX + key, value));
}
row.putValue(RECORD_SERIALIZED_VALUE_SIZE_KEY, message.getLength());
}
return new StreamDataDecoderResult(row, null);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,12 @@ public void testDecodeKeyAndHeaders()
Assert.assertNotNull(result.getResult());

GenericRow row = result.getResult();
Assert.assertEquals(row.getFieldToValueMap().size(), 4);
Assert.assertEquals(row.getFieldToValueMap().size(), 5);
Assert.assertEquals(row.getValue(NAME_FIELD), value);
Assert.assertEquals(row.getValue(StreamDataDecoderImpl.KEY), key, "Failed to decode record key");
Assert.assertEquals(row.getValue(StreamDataDecoderImpl.HEADER_KEY_PREFIX + AGE_HEADER_KEY), 3);
Assert.assertEquals(row.getValue(StreamDataDecoderImpl.METADATA_KEY_PREFIX + SEQNO_RECORD_METADATA), "1");
Assert.assertEquals(row.getValue(StreamDataDecoderImpl.RECORD_SERIALIZED_VALUE_SIZE_KEY), value.length());
}

@Test
Expand Down

0 comments on commit f8fa1a8

Please sign in to comment.