Skip to content

Commit

Permalink
Update DiskBasedBuffer some more
Browse files Browse the repository at this point in the history
Rename tempDir to dataDir
Add more JavaDoc and refactor slightly
Add a bit more tests
  • Loading branch information
centic9 committed Dec 8, 2021
1 parent 4f933ee commit 119c8de
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ public class BufferPersistenceDTO {

private final int numberOfDiskChunks;
private final int numberOfDiskFiles;
private final File tempDir;
private final File dataDir;

// default constructor for persistence
@SuppressWarnings("unused")
private BufferPersistenceDTO() {
this.numberOfDiskChunks = 0;
this.numberOfDiskFiles = 0;
this.tempDir = null;
this.dataDir = null;
this.buffer = null;
this.nextGet = 0;
this.nextAdd = 0;
Expand All @@ -51,7 +51,7 @@ private BufferPersistenceDTO() {
public BufferPersistenceDTO(long nextDownloadPosition, Stream stream, boolean playing, boolean downloadWhilePaused) {
this.numberOfDiskChunks = 0;
this.numberOfDiskFiles = 0;
this.tempDir = null;
this.dataDir = null;
this.buffer = null;
this.nextGet = 0;
this.nextAdd = 0;
Expand All @@ -66,7 +66,7 @@ public BufferPersistenceDTO(Chunk[] buffer, int nextGet, int nextAdd, int fill,
boolean downloadWhilePaused) {
this.numberOfDiskChunks = 0;
this.numberOfDiskFiles = 0;
this.tempDir = null;
this.dataDir = null;
// copy the array to be able to continue adding items to the buffer
// while the data is written
this.buffer = ArrayUtils.clone(buffer);
Expand All @@ -80,11 +80,11 @@ public BufferPersistenceDTO(Chunk[] buffer, int nextGet, int nextAdd, int fill,

}

public BufferPersistenceDTO(int numberOfDiskChunks, int numberOfDiskFiles, File tempDir, int nextGet, int nextAdd, int fill,
public BufferPersistenceDTO(int numberOfDiskChunks, int numberOfDiskFiles, File dataDir, int nextGet, int nextAdd, int fill,
Stream stream, boolean playing, boolean downloadWhilePaused) {
this.numberOfDiskChunks = numberOfDiskChunks;
this.numberOfDiskFiles = numberOfDiskFiles;
this.tempDir = tempDir;
this.dataDir = dataDir;
this.buffer = null;
this.nextGet = nextGet;
this.nextAdd = nextAdd;
Expand Down Expand Up @@ -135,8 +135,8 @@ public int getNumberOfDiskFiles() {
return numberOfDiskFiles;
}

public File getTempDir() {
return tempDir;
public File getDataDir() {
return dataDir;
}

/*@Override
Expand All @@ -162,7 +162,7 @@ public String toString() {
", downloadWhilePaused=" + downloadWhilePaused +
(numberOfDiskChunks == 0 ? "" : ", numberOfDiskChunks=" + numberOfDiskChunks) +
(numberOfDiskFiles == 0 ? "" : ", numberOfDiskFiles=" + numberOfDiskFiles) +
(tempDir == null ? "" : ", tempDir=" + tempDir) +
(dataDir == null ? "" : ", dataDir=" + dataDir) +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@
* data to the caller, but internally only parts of it are loaded
* into memory and most of the data is persisted to disk and only
* fetched when needed.
*
* There are separate memory buffers for reading and writing, as
* the current position for reading and writing can be anywhere
* across the full virtual buffer.
*
*
*/
public class DiskBasedBlockingSeekableRingBuffer implements SeekableRingBuffer<Chunk>, Persistable {
private final static Logger log = LoggerFactory.make();
Expand All @@ -39,46 +45,51 @@ public class DiskBasedBlockingSeekableRingBuffer implements SeekableRingBuffer<C
private final int numberOfDiskChunks;
private final int numberOfDiskFiles;
private final int numberOfChunks;
private final File tempDir;
private final File dataDir;

/**
* Chunk-position of the in-memory data in the overall virtual buffer
* Chunk-position of the current in-memory buffers in the overall virtual buffer.
*/
private int diskBufferReadPosition = 0;
private int diskBufferWritePosition = 0;

/**
* List of chunks that were either fetched from disk or written to the buffer.
* Chunks that are available for reading or writing.
*
* The data is at the diskBufferPosition
* The diskBufferRead/WritePosition indicates where in the virtual buffer these
* arrays are located.
* Whenever the reading or writing position moves out of these in-memory buffers,
* the data is flushed to disk if necessary and then replaced with the proper data
* from disk again.
*/
private Chunk[] diskBufferRead;
private Chunk[] diskBufferWrite;

/**
* Indicates if the in-memory diskBuffer needs to be flushed
* before switching to another buffer
* Indicates if there were writes to the in-memory diskBuffer and it therefore
* needs to be flushed before switching to another buffer
*/
private boolean isDirty;


/**
* indicates the next position to read in the virtual buffer,
* there is no more data to read if nextGet == nextAdd
* Indicates the next position to read in the virtual buffer.
*
* There is no more data to read if nextGet == nextAdd
* this is always in the range [0, numberOfChunks[
*/
private int nextGet = 0;

/**
* indicates the next position to write,
* this is always in the range [0, numberOfChunks[
* Indicates the next position to write in the virtual buffer.
*
* This is always in the range [0, numberOfChunks[
*/
private int nextAdd = 0;

private int fill = 0;

/**
* This enables breaking the blocking wait in next(),
* This enables breaking a blocking wait in next(),
* set via calling close()
*/
private boolean stop = false;
Expand All @@ -93,17 +104,17 @@ public class DiskBasedBlockingSeekableRingBuffer implements SeekableRingBuffer<C
* @param numberOfDiskChunks Number of byte-array chunks that are stored on disk
* @param numberOfDiskFiles Into how many files the disk-buffer is split. This also
* controls how big the in-memory area needs to be
* @param tempDir The directory where buffers can be persisted.
* @param dataDir The directory where buffers can be persisted.
*/
public DiskBasedBlockingSeekableRingBuffer(int numberOfDiskChunks, int numberOfDiskFiles, File tempDir) {
public DiskBasedBlockingSeekableRingBuffer(int numberOfDiskChunks, int numberOfDiskFiles, File dataDir) {
this.numberOfDiskChunks = numberOfDiskChunks;
this.numberOfDiskFiles = numberOfDiskFiles;
this.tempDir = tempDir;
this.dataDir = dataDir;

Preconditions.checkNotNull(tempDir, "Need a valid temporary directory");
Preconditions.checkState((tempDir.exists() || tempDir.mkdirs()) && tempDir.isDirectory(),
Preconditions.checkNotNull(dataDir, "Need a valid temporary directory");
Preconditions.checkState((dataDir.exists() || dataDir.mkdirs()) && dataDir.isDirectory(),
"Invalid temporary directory provided: %s, exists: %s, isDirectory: %s",
tempDir, tempDir.exists(), tempDir.isDirectory());
dataDir, dataDir.exists(), dataDir.isDirectory());

Preconditions.checkArgument(numberOfDiskChunks > 0, "Had disk chunks: %s", numberOfDiskChunks);
Preconditions.checkArgument(numberOfDiskFiles > 0, "Had disk blocks: %s", numberOfDiskFiles);
Expand All @@ -112,14 +123,9 @@ public DiskBasedBlockingSeekableRingBuffer(int numberOfDiskChunks, int numberOfD

this.numberOfChunks = numberOfDiskChunks / numberOfDiskFiles;

this.diskBufferRead = new Chunk[numberOfChunks];
this.diskBufferWrite = new Chunk[numberOfChunks];

// initialize buffer with empty chunks
for(int i = 0;i < numberOfChunks;i++) {
this.diskBufferRead[i] = new Chunk(EMPTY, "", 0);
this.diskBufferWrite[i] = new Chunk(EMPTY, "", 0);
}
// initialize buffers with empty data
this.diskBufferRead = createEmptyBuffer(numberOfChunks);
this.diskBufferWrite = createEmptyBuffer(numberOfChunks);
}

/**
Expand All @@ -129,22 +135,22 @@ public DiskBasedBlockingSeekableRingBuffer(int numberOfDiskChunks, int numberOfD
* @param numberOfDiskChunks Number of byte-array chunks that are stored on disk
* @param numberOfDiskFiles Into how many files the disk-buffer is split. This also
* controls how big the in-memory area needs to be
* @param tempDir The directory where buffers can be persisted.
* @param dataDir The directory where buffers can be persisted.
* @param nextGet The position for the next get operation
* @param nextAdd The position for the next add operation
* @param fill The current fill value
*/
private DiskBasedBlockingSeekableRingBuffer(int numberOfDiskChunks, int numberOfDiskFiles, File tempDir,
private DiskBasedBlockingSeekableRingBuffer(int numberOfDiskChunks, int numberOfDiskFiles, File dataDir,
int nextGet, int nextAdd, int fill)
throws IOException {
this.numberOfDiskChunks = numberOfDiskChunks;
this.numberOfDiskFiles = numberOfDiskFiles;
this.tempDir = tempDir;
this.dataDir = dataDir;

Preconditions.checkNotNull(tempDir, "Need a valid temporary directory");
Preconditions.checkState((tempDir.exists() || tempDir.mkdirs()) && tempDir.isDirectory(),
Preconditions.checkNotNull(dataDir, "Need a valid temporary directory");
Preconditions.checkState((dataDir.exists() || dataDir.mkdirs()) && dataDir.isDirectory(),
"Invalid temporary directory provided: %s, exists: %s, isDirectory: %s",
tempDir, tempDir.exists(), tempDir.isDirectory());
dataDir, dataDir.exists(), dataDir.isDirectory());

Preconditions.checkArgument(numberOfDiskChunks > 0, "Had disk chunks: %s", numberOfDiskChunks);
Preconditions.checkArgument(numberOfDiskFiles > 0, "Had disk blocks: %s", numberOfDiskFiles);
Expand All @@ -162,8 +168,8 @@ private DiskBasedBlockingSeekableRingBuffer(int numberOfDiskChunks, int numberOf
this.diskBufferWritePosition = getDiskPosition(nextAdd);

// try to read the buffer from disk based on these positions
this.diskBufferRead = readBuffer(tempDir, diskBufferReadPosition, numberOfChunks);
this.diskBufferWrite = readBuffer(tempDir, diskBufferWritePosition, numberOfChunks);
this.diskBufferRead = readBuffer(dataDir, diskBufferReadPosition, numberOfChunks);
this.diskBufferWrite = readBuffer(dataDir, diskBufferWritePosition, numberOfChunks);
}

private int getDiskPosition(int pos) {
Expand All @@ -179,7 +185,7 @@ private int getDiskPosition(int pos) {
*/
private void persistBuffer() throws IOException {
if (isDirty) {
File bufferFile = new File(tempDir, "PiRdadio-" + diskBufferWritePosition + ".bson");
File bufferFile = new File(dataDir, "PiRdadio-" + diskBufferWritePosition + ".bson");

log.info("Writing buffer for position " + diskBufferWritePosition + " to file " + bufferFile);
try (OutputStream stream = new BufferedOutputStream(new FileOutputStream(bufferFile))) {
Expand All @@ -192,21 +198,17 @@ private void persistBuffer() throws IOException {
}

/**
* Fetches data for the current position into the buffer
* Fetches data for the given position into the buffer
*
* @throws IOException If reading from the file fails
*/
private static Chunk[] readBuffer(File tempDir, int diskBufferPosition, int numberOfChunks) throws IOException {
File bufferFile = new File(tempDir, "PiRdadio-" + diskBufferPosition + ".bson");

if (!bufferFile.exists()) {
Chunk[] buffer = new Chunk[numberOfChunks];

log.info("Could not read disk-buffer from " + tempDir);
for(int i = 0;i < numberOfChunks;i++) {
buffer[i] = new Chunk(EMPTY, "", 0);
}
return buffer;

return createEmptyBuffer(numberOfChunks);
} else {
log.info("Reading buffer from file " + bufferFile);
try (InputStream stream = new BufferedInputStream(new FileInputStream(bufferFile))) {
Expand All @@ -217,6 +219,13 @@ private static Chunk[] readBuffer(File tempDir, int diskBufferPosition, int numb
}
}

private static Chunk[] createEmptyBuffer(int numberOfChunks) {
Chunk[] buffer = new Chunk[numberOfChunks];
for(int i = 0; i < numberOfChunks; i++) {
buffer[i] = new Chunk(EMPTY, "", 0);
}
return buffer;
}

@SuppressForbidden(reason = "Uses Object.notify() on purpose here")
@Override
Expand All @@ -230,7 +239,8 @@ public synchronized void add(Chunk chunk) {
diskBufferWrite[nextAdd - diskBufferWritePosition] = chunk;
isDirty = true;

// we may also need to update the read-buffer
// we may also need to update the read-buffer if we write into the
// area that we have in-memory for reading
if (nextAdd >= diskBufferReadPosition &&
nextAdd < (diskBufferReadPosition + numberOfChunks)) {

Expand All @@ -239,6 +249,9 @@ public synchronized void add(Chunk chunk) {
}

nextAdd = (nextAdd + 1) % numberOfDiskChunks;

// check if we are overflowing and thus need to move the read-position forward
// and discard this item
if(nextAdd == nextGet) {
Preconditions.checkState(nextGet - diskBufferReadPosition >= 0 &&
nextGet - diskBufferReadPosition < numberOfChunks,
Expand All @@ -262,17 +275,38 @@ public synchronized void add(Chunk chunk) {
notify();
}

/**
* Check if the next get position is outside the chunks that we have available
*/
private void checkReadBuffer() {
// check if the next get position is outside of the chunk that we have
// available
if (nextGet < diskBufferReadPosition ||
(nextGet >= (diskBufferReadPosition + numberOfChunks))) {
diskBufferReadPosition = getDiskPosition(nextGet);
try {
diskBufferRead = readBuffer(tempDir, diskBufferReadPosition, numberOfChunks);
diskBufferRead = readBuffer(dataDir, diskBufferReadPosition, numberOfChunks);
} catch (IOException e) {
throw new IllegalStateException("Could not fetch buffer for reading at position " +
diskBufferReadPosition + " from " + tempDir, e);
diskBufferReadPosition + " from " + dataDir, e);
}
}
}

private void checkWriteBuffer() {
// check if the next get position is outside the chunk that we have available
if (nextAdd < diskBufferWritePosition ||
(nextAdd >= (diskBufferWritePosition + numberOfChunks))) {
try {
// make sure a dirty buffer is persisted
persistBuffer();

diskBufferWritePosition = getDiskPosition(nextAdd);

// we read the previous chunk to keep the previous data if
// we seek away and thus flush the memory-buffer to disk again
diskBufferWrite = readBuffer(dataDir, diskBufferWritePosition, numberOfChunks);
} catch (IOException e) {
throw new IllegalStateException("Could not fetch current buffer for writing at position " +
diskBufferWritePosition + " from " + dataDir, e);
}
}
}
Expand Down Expand Up @@ -325,26 +359,6 @@ public synchronized Chunk peek() {
return diskBufferRead[nextGet - diskBufferReadPosition];
}

private void checkWriteBuffer() {
// check if the next get position is outside the chunk that we have available
if (nextAdd < diskBufferWritePosition ||
(nextAdd >= (diskBufferWritePosition + numberOfChunks))) {
try {
// make sure a dirty buffer is persisted
persistBuffer();

diskBufferWritePosition = getDiskPosition(nextAdd);

// we read the previous chunk to keep the previous data if
// we seek away and thus flush the memory-buffer to disk again
diskBufferWrite = readBuffer(tempDir, diskBufferWritePosition, numberOfChunks);
} catch (IOException e) {
throw new IllegalStateException("Could not fetch current buffer for writing at position " +
diskBufferWritePosition + " from " + tempDir, e);
}
}
}

@Override
public synchronized int seek(int nrOfChunks) {
// this is a very naive initial implementation which actually loops
Expand Down Expand Up @@ -493,20 +507,20 @@ public synchronized BufferPersistenceDTO toPersistence(Stream stream, boolean pl
try {
persistBuffer();
} catch (IOException e) {
throw new IllegalStateException("With temp-dir: " + tempDir, e);
throw new IllegalStateException("With temp-dir: " + dataDir, e);
}

// only persist nextGet/nextAdd, we can re-create the positions and buffers from that
return new BufferPersistenceDTO(numberOfDiskChunks, numberOfDiskFiles, tempDir,
return new BufferPersistenceDTO(numberOfDiskChunks, numberOfDiskFiles, dataDir,
nextGet, nextAdd, fill, stream, playing, downloadWhilePaused);
}

public static DiskBasedBlockingSeekableRingBuffer fromPersistence(BufferPersistenceDTO dto) throws IOException {
if(dto.getTempDir() == null || dto.getNumberOfDiskFiles() <= 0 || dto.getNumberOfDiskChunks() <= 0) {
if(dto.getDataDir() == null || dto.getNumberOfDiskFiles() <= 0 || dto.getNumberOfDiskChunks() <= 0) {
throw new IOException("Could not read buffer from persistent file, having: " + dto);
}

return new DiskBasedBlockingSeekableRingBuffer(dto.getNumberOfDiskChunks(), dto.getNumberOfDiskFiles(), dto.getTempDir(),
return new DiskBasedBlockingSeekableRingBuffer(dto.getNumberOfDiskChunks(), dto.getNumberOfDiskFiles(), dto.getDataDir(),
dto.getNextGet(), dto.getNextAdd(), dto.getFill());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void testPersistence() throws IOException {
assertEquals(10, dto.getBuffer().length);
assertEquals(0, dto.getNumberOfDiskFiles());
assertEquals(0, dto.getNumberOfDiskChunks());
assertNull(dto.getTempDir());
assertNull(dto.getDataDir());

// then convert the DTO back into a buffer and do a next() as well
BlockingSeekableRingBuffer back = BlockingSeekableRingBuffer.fromPersistence(dto);
Expand Down
Loading

0 comments on commit 119c8de

Please sign in to comment.