Skip to content

Commit

Permalink
Remove unnecessary Block getLogicalSizeInBytes
Browse files Browse the repository at this point in the history
This was only used in a few places, and was not implement correctly in
several container block implementations.
  • Loading branch information
dain committed Oct 8, 2024
1 parent 596b31f commit 934caa4
Show file tree
Hide file tree
Showing 11 changed files with 76 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,14 @@
import io.trino.spi.ErrorCode;
import io.trino.spi.Page;
import io.trino.spi.QueryId;
import io.trino.spi.block.ArrayBlock;
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockEncodingSerde;
import io.trino.spi.block.DictionaryBlock;
import io.trino.spi.block.MapBlock;
import io.trino.spi.block.RowBlock;
import io.trino.spi.block.RunLengthEncodedBlock;
import io.trino.spi.block.ValueBlock;
import io.trino.spi.exchange.ExchangeId;
import io.trino.spi.security.SelectedRole;
import io.trino.spi.type.Type;
Expand Down Expand Up @@ -562,7 +569,9 @@ private synchronized QueryResultRows removePagesFromExchange(ResultQueryInfo que
}

Page page = deserializer.deserialize(serializedPage);
bytes += page.getLogicalSizeInBytes();
// page should already be loaded since it was just deserialized
page = page.getLoadedPage();
bytes += estimateJsonSize(page);
resultBuilder.addPage(page);
}
if (exchangeDataSource.isFinished()) {
Expand All @@ -577,6 +586,38 @@ private synchronized QueryResultRows removePagesFromExchange(ResultQueryInfo que
return resultBuilder.build();
}

private static long estimateJsonSize(Page page)
{
long estimatedSize = 0;
for (int i = 0; i < page.getChannelCount(); i++) {
estimatedSize += estimateJsonSize(page.getBlock(i));
}
return estimatedSize;
}

private static long estimateJsonSize(Block block)
{
switch (block) {
case RunLengthEncodedBlock rleBlock:
return estimateJsonSize(rleBlock.getValue()) * rleBlock.getPositionCount();
case DictionaryBlock dictionaryBlock:
ValueBlock dictionary = dictionaryBlock.getDictionary();
double averageSizePerEntry = (double) estimateJsonSize(dictionary) / dictionary.getPositionCount();
return (long) (averageSizePerEntry * block.getPositionCount());
case RowBlock rowBlock:
return rowBlock.getFieldBlocks().stream()
.mapToLong(Query::estimateJsonSize)
.sum();
case ArrayBlock arrayBlock:
return estimateJsonSize(arrayBlock.getElementsBlock());
case MapBlock mapBlock:
return estimateJsonSize(mapBlock.getKeyBlock()) +
estimateJsonSize(mapBlock.getValueBlock());
default:
return block.getSizeInBytes();
}
}

private void closeExchangeIfNecessary(ResultQueryInfo queryInfo)
{
if (queryInfo.state() != FAILED && queryInfo.outputStage().isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,31 +84,6 @@ public void testSizeInBytes()
assertThat(dictionaryBlock.getSizeInBytes()).isEqualTo(dictionaryBlock.getDictionary().getSizeInBytes() + (100 * SIZE_OF_INT));
}

@Test
public void testLogicalSizeInBytes()
{
// The 10 Slices in the array will be of lengths 0 to 9.
Slice[] expectedValues = createExpectedValues(10);

// The dictionary within the dictionary block is expected to be a VariableWidthBlock of size 95 bytes.
// 45 bytes for the expectedValues Slices (sum of seq(0,9)) and 50 bytes for the position and isNull array (total 10 positions).
DictionaryBlock dictionaryBlock = createDictionaryBlock(expectedValues, 100);
assertThat(dictionaryBlock.getDictionary().getLogicalSizeInBytes()).isEqualTo(95);

// The 100 positions in the dictionary block index to 10 positions in the underlying dictionary (10 each).
// Logical size calculation accounts for 4 bytes of offset and 1 byte of isNull. Therefore the expected unoptimized
// size is 10 times the size of the underlying dictionary (VariableWidthBlock).
assertThat(dictionaryBlock.getLogicalSizeInBytes()).isEqualTo(95 * 10);

// With alternating nulls, we have 21 positions, with the same size calculation as above.
dictionaryBlock = createDictionaryBlock(alternatingNullValues(expectedValues), 210);
assertThat(dictionaryBlock.getDictionary().getPositionCount()).isEqualTo(21);
assertThat(dictionaryBlock.getDictionary().getLogicalSizeInBytes()).isEqualTo(150);

// The null positions should be included in the logical size.
assertThat(dictionaryBlock.getLogicalSizeInBytes()).isEqualTo(150 * 10);
}

@Test
public void testCopyRegionCreatesCompactBlock()
{
Expand Down
20 changes: 20 additions & 0 deletions core/trino-spi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,26 @@
<old>interface io.trino.spi.protocol.SpoolingManagerFactory</old>
<justification>Spooling SPI marked as experimental</justification>
</item>
<item>
<ignore>true</ignore>
<code>java.method.removed</code>
<old>method long io.trino.spi.Page::getLogicalSizeInBytes()</old>
</item>
<item>
<ignore>true</ignore>
<code>java.method.removed</code>
<old>method long io.trino.spi.block.Block::getLogicalSizeInBytes()</old>
</item>
<item>
<ignore>true</ignore>
<code>java.method.removed</code>
<old>method long io.trino.spi.block.DictionaryBlock::getLogicalSizeInBytes()</old>
</item>
<item>
<ignore>true</ignore>
<code>java.method.removed</code>
<old>method long io.trino.spi.block.RunLengthEncodedBlock::getLogicalSizeInBytes()</old>
</item>
</differences>
</revapi.differences>
</analysisConfiguration>
Expand Down
15 changes: 0 additions & 15 deletions core/trino-spi/src/main/java/io/trino/spi/Page.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ static Page wrapBlocksWithoutCopy(int positionCount, Block[] blocks)
private final int positionCount;
private volatile long sizeInBytes = -1;
private volatile long retainedSizeInBytes = -1;
private volatile long logicalSizeInBytes = -1;

public Page(Block... blocks)
{
Expand All @@ -78,7 +77,6 @@ private Page(boolean blocksCopyRequired, int positionCount, Block[] blocks)
if (blocks.length == 0) {
this.blocks = EMPTY_BLOCKS;
this.sizeInBytes = 0;
this.logicalSizeInBytes = 0;
// Empty blocks are not considered "retained" by any particular page
this.retainedSizeInBytes = INSTANCE_SIZE;
}
Expand Down Expand Up @@ -114,19 +112,6 @@ public long getSizeInBytes()
return sizeInBytes;
}

public long getLogicalSizeInBytes()
{
long logicalSizeInBytes = this.logicalSizeInBytes;
if (logicalSizeInBytes < 0) {
logicalSizeInBytes = 0;
for (Block block : blocks) {
logicalSizeInBytes += block.getLogicalSizeInBytes();
}
this.logicalSizeInBytes = logicalSizeInBytes;
}
return logicalSizeInBytes;
}

public long getRetainedSizeInBytes()
{
long retainedSizeInBytes = this.retainedSizeInBytes;
Expand Down
14 changes: 0 additions & 14 deletions core/trino-spi/src/main/java/io/trino/spi/block/Block.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,6 @@ public sealed interface Block
*/
long getSizeInBytes();

/**
* Returns the size of the block contents, regardless of internal representation.
* The same logical data values should always have the same size, no matter
* what block type is used or how they are represented within a specific block.
* <p>
* This can differ substantially from {@link #getSizeInBytes} for certain block
* types. For RLE, it will be {@code N} times larger. For dictionary, it will be
* larger based on how many times dictionary entries are reused.
*/
default long getLogicalSizeInBytes()
{
return getSizeInBytes();
}

/**
* Returns the size of {@code block.getRegion(position, length)}.
* The method can be expensive. Do not use it outside an implementation of Block.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public final class DictionaryBlock
private final int[] ids;
private final long retainedSizeInBytes;
private volatile long sizeInBytes = -1;
private volatile long logicalSizeInBytes = -1;
private volatile int uniqueIds = -1;
// isSequentialIds is only valid when uniqueIds is computed
private volatile boolean isSequentialIds;
Expand Down Expand Up @@ -192,36 +191,6 @@ private void calculateCompactSize()
this.isSequentialIds = isSequentialIds;
}

@Override
public long getLogicalSizeInBytes()
{
if (logicalSizeInBytes >= 0) {
return logicalSizeInBytes;
}

OptionalInt dictionarySizePerPosition = dictionary.fixedSizeInBytesPerPosition();
if (dictionarySizePerPosition.isPresent()) {
logicalSizeInBytes = dictionarySizePerPosition.getAsInt() * (long) getPositionCount();
return logicalSizeInBytes;
}

// Calculation of logical size can be performed as part of calculateCompactSize() with minor modifications.
// Keeping this calculation separate as this is a little more expensive and may not be called as often.
long sizeInBytes = 0;
long[] seenSizes = new long[dictionary.getPositionCount()];
Arrays.fill(seenSizes, -1L);
for (int i = 0; i < getPositionCount(); i++) {
int position = getId(i);
if (seenSizes[position] < 0) {
seenSizes[position] = dictionary.getRegionSizeInBytes(position, 1);
}
sizeInBytes += seenSizes[position];
}

logicalSizeInBytes = sizeInBytes;
return sizeInBytes;
}

@Override
public long getRegionSizeInBytes(int positionOffset, int length)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,6 @@ public long getSizeInBytes()
return value.getSizeInBytes();
}

@Override
public long getLogicalSizeInBytes()
{
return positionCount * value.getLogicalSizeInBytes();
}

@Override
public long getRetainedSizeInBytes()
{
Expand Down
2 changes: 0 additions & 2 deletions core/trino-spi/src/test/java/io/trino/spi/TestPage.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ public void testSizesForNoColumnPage()
{
Page page = new Page(100);
assertThat(page.getSizeInBytes()).isEqualTo(0);
assertThat(page.getLogicalSizeInBytes()).isEqualTo(0);
assertThat(page.getRetainedSizeInBytes()).isEqualTo(Page.INSTANCE_SIZE); // does not include the blocks array
}

@Test
Expand Down
10 changes: 6 additions & 4 deletions lib/trino-orc/src/main/java/io/trino/orc/OrcWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public final class OrcWriter
private final List<Type> types;
private final CompressionKind compression;
private final int stripeMaxBytes;
private final int chunkMaxLogicalBytes;
private final int chunkMaxBytes;
private final int stripeMaxRowCount;
private final int rowGroupMaxRowCount;
private final int maxCompressionBufferSize;
Expand Down Expand Up @@ -153,7 +153,7 @@ public OrcWriter(
checkArgument(options.getStripeMaxSize().compareTo(options.getStripeMinSize()) >= 0, "stripeMaxSize must be greater than or equal to stripeMinSize");
int stripeMinBytes = toIntExact(requireNonNull(options.getStripeMinSize(), "stripeMinSize is null").toBytes());
this.stripeMaxBytes = toIntExact(requireNonNull(options.getStripeMaxSize(), "stripeMaxSize is null").toBytes());
this.chunkMaxLogicalBytes = Math.max(1, stripeMaxBytes / 2);
this.chunkMaxBytes = Math.max(1, stripeMaxBytes / 2);
this.stripeMaxRowCount = options.getStripeMaxRowCount();
this.rowGroupMaxRowCount = options.getRowGroupMaxRowCount();
recordValidation(validation -> validation.setRowGroupMaxRowCount(rowGroupMaxRowCount));
Expand Down Expand Up @@ -254,6 +254,8 @@ public void write(Page page)
}

checkArgument(page.getChannelCount() == columnWriters.size());
// page should already be loaded, but double check
page = page.getLoadedPage();

if (validationBuilder != null) {
validationBuilder.addPage(page);
Expand All @@ -264,8 +266,8 @@ public void write(Page page)
// align page to row group boundaries
Page chunk = page.getRegion(writeOffset, min(page.getPositionCount() - writeOffset, min(rowGroupMaxRowCount - rowGroupRowCount, stripeMaxRowCount - stripeRowCount)));

// avoid chunk with huge logical size
while (chunk.getPositionCount() > 1 && chunk.getLogicalSizeInBytes() > chunkMaxLogicalBytes) {
// avoid chunk with huge size
while (chunk.getPositionCount() > 1 && chunk.getSizeInBytes() > chunkMaxBytes) {
chunk = page.getRegion(writeOffset, chunk.getPositionCount() / 2);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public class SliceDictionaryColumnWriter
implements ColumnWriter, DictionaryColumn
{
private static final int INSTANCE_SIZE = instanceSize(SliceDictionaryColumnWriter.class);
private static final int DIRECT_CONVERSION_CHUNK_MAX_LOGICAL_BYTES = toIntExact(DataSize.of(32, MEGABYTE).toBytes());
private static final int DIRECT_CONVERSION_CHUNK_MAX_BYTES = toIntExact(DataSize.of(32, MEGABYTE).toBytes());

private final OrcColumnId columnId;
private final Type type;
Expand Down Expand Up @@ -224,7 +224,7 @@ private boolean writeDictionaryRowGroup(Block dictionary, int valueCount, IntBig
Block chunk = block.getRegion(0, chunkPositionCount);

// avoid chunk with huge logical size
while (chunkPositionCount > 1 && chunk.getLogicalSizeInBytes() > DIRECT_CONVERSION_CHUNK_MAX_LOGICAL_BYTES) {
while (chunkPositionCount > 1 && chunk.getSizeInBytes() > DIRECT_CONVERSION_CHUNK_MAX_BYTES) {
chunkPositionCount /= 2;
chunk = chunk.getRegion(0, chunkPositionCount);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public class ParquetWriter
private final OutputStreamSliceOutput outputStream;
private final ParquetWriterOptions writerOption;
private final MessageType messageType;
private final int chunkMaxLogicalBytes;
private final int chunkMaxBytes;
private final Map<List<String>, Type> primitiveTypes;
private final CompressionCodec compressionCodec;
private final Optional<DateTimeZone> parquetTimeZone;
Expand Down Expand Up @@ -142,7 +142,7 @@ public ParquetWriter(
recordValidation(validation -> validation.setColumns(messageType.getColumns()));
recordValidation(validation -> validation.setCreatedBy(createdBy));
initColumnWriters();
this.chunkMaxLogicalBytes = max(1, writerOption.getMaxRowGroupSize() / 2);
this.chunkMaxBytes = max(1, writerOption.getMaxRowGroupSize() / 2);
}

public long getWrittenBytes()
Expand Down Expand Up @@ -174,6 +174,9 @@ public void write(Page page)

checkArgument(page.getChannelCount() == columnWriters.size());

// page should already be loaded, but double check
page = page.getLoadedPage();

Page validationPage = page;
recordValidation(validation -> validation.addPage(validationPage));

Expand All @@ -182,7 +185,7 @@ public void write(Page page)
Page chunk = page.getRegion(writeOffset, min(page.getPositionCount() - writeOffset, writerOption.getBatchSize()));

// avoid chunk with huge logical size
while (chunk.getPositionCount() > 1 && chunk.getLogicalSizeInBytes() > chunkMaxLogicalBytes) {
while (chunk.getPositionCount() > 1 && chunk.getSizeInBytes() > chunkMaxBytes) {
chunk = page.getRegion(writeOffset, chunk.getPositionCount() / 2);
}

Expand Down

0 comments on commit 934caa4

Please sign in to comment.