Skip to content

Commit

Permalink
Prevent invalid buffer-state if writing the buffer-file fails once
Browse files Browse the repository at this point in the history
We need to move the write position even if writing fails
Improve preconditions and exception text
Add a reproducing test-case
  • Loading branch information
centic9 committed Jan 4, 2023
1 parent 799011f commit b2de6fa
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,9 @@ public synchronized void add(Chunk chunk) {
Preconditions.checkNotNull(chunk);
Preconditions.checkState(nextAdd - diskBufferWritePosition >= 0 &&
nextAdd - diskBufferWritePosition < numberOfChunks,
"Did have invalid positions: write-pos: %s, nextAdd: %s, numberOfChunks: %s",
"Did have invalid positions: %s needs to be in range [0, %s[, " +
"but had write-pos: %s, nextAdd: %s, numberOfChunks: %s",
nextAdd - diskBufferWritePosition, numberOfChunks,
diskBufferWritePosition, nextAdd, numberOfChunks);

diskBufferWrite[nextAdd - diskBufferWritePosition] = chunk;
Expand Down Expand Up @@ -309,21 +311,27 @@ private void checkReadBuffer() {
}
}

/**
* Check if the next add position is outside the chunks that we have available
*/
private void checkWriteBuffer() {
// check if the next get position is outside the chunk that we have available
if (nextAdd < diskBufferWritePosition ||
(nextAdd >= (diskBufferWritePosition + numberOfChunks))) {
try {
// make sure a dirty buffer is persisted
persistBuffer();

diskBufferWritePosition = getDiskPosition(nextAdd);
try {
persistBuffer();
} finally {
// make sure to adjust disk-buffer write position even if writing does fail
// e.g. reading thread may be interrupted by switching streams at the same time
diskBufferWritePosition = getDiskPosition(nextAdd);
}

// we read the previous chunk to keep the previous data if
// we seek away and thus flush the memory-buffer to disk again
diskBufferWrite = readBuffer(dataDir, diskBufferWritePosition, numberOfChunks);
} catch (IOException e) {
throw new IllegalStateException("Could not fetch current buffer for writing at position " +
throw new IllegalStateException("Could not update current buffers for writing at position " +
diskBufferWritePosition + " from " + dataDir, e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.dstadler.audio.stream.Stream;
import org.dstadler.commons.logging.jdk.LoggerFactory;
import org.dstadler.commons.testing.TestHelpers;
import org.dstadler.commons.testing.ThreadTestHelper;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;

import static org.junit.Assert.*;
Expand Down Expand Up @@ -278,6 +281,21 @@ public void testInvalidConstructorValuesOutOfRange() throws IOException {
DiskBasedBlockingSeekableRingBuffer.fromPersistence(dto));
}

@Test
public void testConstructFromPersistence() throws IOException {
Stream stream = new Stream();
stream.setUrl("url1");
stream.setStreamType(Stream.StreamType.live);

final BufferPersistenceDTO dto = new BufferPersistenceDTO(100, 20,
new File(getDataDir(), "test.bson"),
2, 99, 0, stream, false, false);

try (DiskBasedBlockingSeekableRingBuffer localBuffer = DiskBasedBlockingSeekableRingBuffer.fromPersistence(dto)) {
assertNotNull(localBuffer.peek());
}
}

@Test
public void testConcurrentAddRemove() {
for (int i = 0; i < 100; i++) {
Expand All @@ -286,4 +304,40 @@ public void testConcurrentAddRemove() {
assertNotNull(buffer.next());
}
}

@Test
public void testFailingWrite() throws IOException {
Stream stream = new Stream();
stream.setUrl("url1");
stream.setStreamType(Stream.StreamType.live);

File dir = new File(getDataDir(), "test.bson");
final BufferPersistenceDTO dto = new BufferPersistenceDTO(100, 20,
dir, 99, 0, 0, stream, false, false);

// create a directory so that writing the buffer-file fails
File bufferFile = new File(dir, "AudioBuffer-5.bson");
assertTrue(bufferFile.mkdirs());
bufferFile = new File(dir, "AudioBuffer-25.bson");
assertTrue(bufferFile.mkdirs());

try (DiskBasedBlockingSeekableRingBuffer localBuffer = DiskBasedBlockingSeekableRingBuffer.fromPersistence(dto)) {
for (int i = 0; i < 150; i++) {
try {
localBuffer.add(new Chunk(new byte[0], "", 1));
} catch (IllegalStateException e) {
assertTrue("Had: " + ExceptionUtils.getStackTrace(e),
e.getCause() instanceof FileNotFoundException);
assertTrue("Had: " + ExceptionUtils.getStackTrace(e),
e.getMessage().contains("Could not update current buffers for writing at position") ||
e.getMessage().contains("Could not fetch buffer for reading at position"));
assertTrue("Had: " + ExceptionUtils.getStackTrace(e),
e.getMessage().contains("position 5 ") ||
e.getMessage().contains("position 10 ") ||
e.getMessage().contains("position 25") ||
e.getMessage().contains("position 30"));
}
}
}
}
}

0 comments on commit b2de6fa

Please sign in to comment.