Skip to content

Commit

Permalink
[improve] Reduce the performance loss of additional buffer expansion.
Browse files Browse the repository at this point in the history
  • Loading branch information
CodeCooker17 committed Sep 19, 2023
1 parent ad5d62f commit dc0fef4
Showing 1 changed file with 29 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ public class RecordBatchInputStream extends InputStream {

public static final Logger LOG = LoggerFactory.getLogger(RecordBatchInputStream.class);

private static final int DEFAULT_BUF_SIZE = 4096;

/**
* Load record batch
*/
Expand All @@ -55,7 +53,12 @@ public class RecordBatchInputStream extends InputStream {
/**
* record buffer
*/
private ByteBuffer buffer = ByteBuffer.allocate(0);

private ByteBuffer lineBuf = ByteBuffer.allocate(0);;

private ByteBuffer delimBuf = ByteBuffer.allocate(0);

private final byte[] delim;

/**
* record count has been read
Expand All @@ -70,31 +73,42 @@ public class RecordBatchInputStream extends InputStream {
public RecordBatchInputStream(RecordBatch recordBatch, boolean passThrough) {
this.recordBatch = recordBatch;
this.passThrough = passThrough;
this.delim = recordBatch.getDelim();
}

@Override
public int read() throws IOException {
try {
if (buffer.remaining() == 0 && endOfBatch()) {
return -1; // End of stream
if (lineBuf.remaining() == 0 && endOfBatch()) {
return -1;
}

if (delimBuf != null && delimBuf.remaining() > 0) {
return delimBuf.get() & 0xff;
}
} catch (DorisException e) {
throw new IOException(e);
}
return buffer.get() & 0xFF;
return lineBuf.get() & 0xFF;
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
try {
if (buffer.remaining() == 0 && endOfBatch()) {
return -1; // End of stream
if (lineBuf.remaining() == 0 && endOfBatch()) {
return -1;
}

if (delimBuf != null && delimBuf.remaining() > 0) {
int bytesRead = Math.min(len, delimBuf.remaining());
delimBuf.get(b, off, bytesRead);
return bytesRead;
}
} catch (DorisException e) {
throw new IOException(e);
}
int bytesRead = Math.min(len, buffer.remaining());
buffer.get(b, off, bytesRead);
int bytesRead = Math.min(len, lineBuf.remaining());
lineBuf.get(b, off, bytesRead);
return bytesRead;
}

Expand All @@ -109,6 +123,7 @@ public int read(byte[] b, int off, int len) throws IOException {
public boolean endOfBatch() throws DorisException {
Iterator<InternalRow> iterator = recordBatch.getIterator();
if (readCount >= recordBatch.getBatchSize() || !iterator.hasNext()) {
delimBuf = null;
return true;
}
readNext(iterator);
Expand All @@ -125,62 +140,18 @@ private void readNext(Iterator<InternalRow> iterator) throws DorisException {
if (!iterator.hasNext()) {
throw new ShouldNeverHappenException();
}
byte[] delim = recordBatch.getDelim();
byte[] rowBytes = rowToByte(iterator.next());
if (isFirst) {
ensureCapacity(rowBytes.length);
buffer.put(rowBytes);
buffer.flip();
delimBuf = null;
lineBuf = ByteBuffer.wrap(rowBytes);
isFirst = false;
} else {
ensureCapacity(delim.length + rowBytes.length);
buffer.put(delim);
buffer.put(rowBytes);
buffer.flip();
delimBuf = ByteBuffer.wrap(delim);
lineBuf = ByteBuffer.wrap(rowBytes);
}
readCount++;
}

/**
* Check if the buffer has enough capacity.
*
* @param need required buffer space
*/
private void ensureCapacity(int need) {

int capacity = buffer.capacity();

if (need <= capacity) {
buffer.clear();
return;
}

// need to extend
int newCapacity = calculateNewCapacity(capacity, need);
LOG.info("expand buffer, min cap: {}, now cap: {}, new cap: {}", need, capacity, newCapacity);
buffer = ByteBuffer.allocate(newCapacity);

}

/**
* Calculate new capacity for buffer expansion.
*
* @param capacity current buffer capacity
* @param minCapacity required min buffer space
* @return new capacity
*/
private int calculateNewCapacity(int capacity, int minCapacity) {
int newCapacity = 0;
if (capacity == 0) {
newCapacity = DEFAULT_BUF_SIZE;

}
while (newCapacity < minCapacity) {
newCapacity = newCapacity << 1;
}
return newCapacity;
}

/**
* Convert Spark row data to byte array
*
Expand Down Expand Up @@ -220,5 +191,4 @@ private byte[] rowToByte(InternalRow row) throws DorisException {

}


}

0 comments on commit dc0fef4

Please sign in to comment.