Skip to content
This repository has been archived by the owner on Mar 3, 2024. It is now read-only.

Commit

Permalink
Merge branch 'main' into expiration
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexeyShik authored Jan 9, 2024
2 parents f9f5780 + 94e5d9b commit 1b327a5
Show file tree
Hide file tree
Showing 15 changed files with 911 additions and 512 deletions.
Original file line number Diff line number Diff line change
@@ -1,223 +1,148 @@
package ru.vk.itmo.kovalchukvladislav;

import ru.vk.itmo.Config;
import ru.vk.itmo.Dao;
import ru.vk.itmo.Entry;
import ru.vk.itmo.kovalchukvladislav.model.DaoIterator;
import ru.vk.itmo.kovalchukvladislav.model.EntryExtractor;
import ru.vk.itmo.kovalchukvladislav.model.TableInfo;
import ru.vk.itmo.kovalchukvladislav.model.SimpleDaoLoggerUtility;
import ru.vk.itmo.kovalchukvladislav.storage.InMemoryStorage;
import ru.vk.itmo.kovalchukvladislav.storage.InMemoryStorageImpl;
import ru.vk.itmo.kovalchukvladislav.storage.SSTableStorage;
import ru.vk.itmo.kovalchukvladislav.storage.SSTableStorageImpl;

import java.io.IOException;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.logging.Level;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;

public abstract class AbstractBasedOnSSTableDao<D, E extends Entry<D>> extends AbstractInMemoryDao<D, E> {
// ===================================
// Constants
// ===================================
public abstract class AbstractBasedOnSSTableDao<D, E extends Entry<D>> implements Dao<D, E> {
private final Logger logger = SimpleDaoLoggerUtility.createLogger(getClass());
private static final String DB_FILENAME_PREFIX = "db_";
private static final String METADATA_FILENAME = "metadata";
private static final String OFFSETS_FILENAME_PREFIX = "offsets_";
private static final String DB_FILENAME_PREFIX = "db_";

// ===================================
// Variables
// ===================================

private final Path basePath;
private final Arena arena = Arena.ofShared();
private final long flushThresholdBytes;
private final EntryExtractor<D, E> extractor;
private final SSTableMemorySegmentWriter<D, E> writer;

// ===================================
// Storages
// ===================================

private int storagesCount;
private volatile boolean closed;
private final List<MemorySegment> dbMappedSegments;
private final List<MemorySegment> offsetMappedSegments;
private final Logger logger = Logger.getLogger(getClass().getSimpleName());
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final AtomicBoolean isFlushingOrCompacting = new AtomicBoolean(false);
private final ExecutorService flushOrCompactQueue = Executors.newSingleThreadExecutor();

/**
* В get(), upsert() и compact() для inMemoryStorage и ssTableStorage не требуется синхронизация между собой.
* Исключение составляет только flush() и compact().
* Следует проследить что на любом этапе оба стораджа в сумме будут иметь полные данные.
*/
private final InMemoryStorage<D, E> inMemoryStorage;
private final SSTableStorage<D, E> ssTableStorage;

protected AbstractBasedOnSSTableDao(Config config, EntryExtractor<D, E> extractor) throws IOException {
super(extractor);
this.closed = false;
this.storagesCount = 0;
this.extractor = extractor;
this.flushThresholdBytes = config.flushThresholdBytes();
this.basePath = Objects.requireNonNull(config.basePath());
this.dbMappedSegments = new ArrayList<>();
this.offsetMappedSegments = new ArrayList<>();
reloadFilesAndMapToSegment();
this.writer = new SSTableMemorySegmentWriter<>(basePath, DB_FILENAME_PREFIX, OFFSETS_FILENAME_PREFIX,
METADATA_FILENAME, extractor);
logger.setLevel(Level.OFF); // чтобы не засорять вывод в гитхабе, если такое возможно
}

// ===================================
// Restoring state
// ===================================

private void reloadFilesAndMapToSegment() throws IOException {
if (!Files.exists(basePath)) {
Files.createDirectory(basePath);
}
logger.info(() -> String.format("Reloading files from %s", basePath));
List<String> ssTableIds = getSSTableIds();
for (String ssTableId : ssTableIds) {
readFileAndMapToSegment(ssTableId);
}
logger.info(() -> String.format("Reloaded %d files", storagesCount));
}

private void readFileAndMapToSegment(String timestamp) throws IOException {
Path dbPath = basePath.resolve(DB_FILENAME_PREFIX + timestamp);
Path offsetsPath = basePath.resolve(OFFSETS_FILENAME_PREFIX + timestamp);
if (!Files.exists(dbPath) || !Files.exists(offsetsPath)) {
logger.severe(() -> String.format("File under path %s or %s doesn't exists", dbPath, offsetsPath));
return;
}

logger.info(() -> String.format("Reading files with timestamp %s", timestamp));

try (FileChannel dbChannel = FileChannel.open(dbPath, StandardOpenOption.READ);
FileChannel offsetChannel = FileChannel.open(offsetsPath, StandardOpenOption.READ)) {

MemorySegment db = dbChannel.map(FileChannel.MapMode.READ_ONLY, 0, Files.size(dbPath), arena);
MemorySegment offsets = offsetChannel.map(FileChannel.MapMode.READ_ONLY, 0, Files.size(offsetsPath), arena);
dbMappedSegments.add(db);
offsetMappedSegments.add(offsets);
storagesCount++;
}
logger.info(() -> String.format("Successfully read files with %s timestamp", timestamp));
}

private List<String> getSSTableIds() throws IOException {
Path metadataPath = basePath.resolve(METADATA_FILENAME);
if (!Files.exists(metadataPath)) {
return Collections.emptyList();
}
return Files.readAllLines(metadataPath, StandardCharsets.UTF_8);
}

private Path[] getAllTablesPath() throws IOException {
List<String> ssTableIds = getSSTableIds();
int size = ssTableIds.size();
Path[] files = new Path[2 * size];

for (int i = 0; i < size; i++) {
String id = ssTableIds.get(i);
files[2 * i] = basePath.resolve(DB_FILENAME_PREFIX + id);
files[2 * i + 1] = basePath.resolve(OFFSETS_FILENAME_PREFIX + id);
}
return files;
this.inMemoryStorage = new InMemoryStorageImpl<>(extractor, config.flushThresholdBytes());
this.ssTableStorage = new SSTableStorageImpl<>(basePath, METADATA_FILENAME,
DB_FILENAME_PREFIX, OFFSETS_FILENAME_PREFIX, extractor);
}

// ===================================
// Finding in storage
// ===================================
@Override
public Iterator<E> get(D from, D to) {
Iterator<E> inMemotyIterator = super.get(from, to);
return new DaoIterator<>(from, to, inMemotyIterator, dbMappedSegments, offsetMappedSegments, extractor);
List<Iterator<E>> iterators = new ArrayList<>();
iterators.addAll(inMemoryStorage.getIterators(from, to));
iterators.addAll(ssTableStorage.getIterators(from, to));
return new DaoIterator<>(iterators, extractor);
}

@Override
public E get(D key) {
E e = dao.get(key);
E e = inMemoryStorage.get(key);
if (e != null) {
return e.value() == null ? null : e;
}
E fromFile = findInStorages(key);
E fromFile = ssTableStorage.get(key);
return (fromFile == null || fromFile.value() == null) ? null : fromFile;
}

private E findInStorages(D key) {
for (int i = storagesCount - 1; i >= 0; i--) {
MemorySegment storage = dbMappedSegments.get(i);
MemorySegment offsets = offsetMappedSegments.get(i);

long offset = extractor.findLowerBoundValueOffset(key, storage, offsets);
if (offset == -1) {
continue;
}
D lowerBoundKey = extractor.readValue(storage, offset);

if (comparator.compare(lowerBoundKey, key) == 0) {
long valueOffset = offset + extractor.size(lowerBoundKey);
D value = extractor.readValue(storage, valueOffset);
return extractor.createEntry(lowerBoundKey, value);
}
@Override
public void upsert(E entry) {
long size = inMemoryStorage.upsertAndGetSize(entry);
if (size >= flushThresholdBytes) {
flush();
}
return null;
}

// ===================================
// Some utils
// ===================================

private TableInfo getInMemoryDaoSizeInfo() {
long size = 0;
for (E entry : dao.values()) {
size += extractor.size(entry);
@Override
public void flush() {
if (!isFlushingOrCompacting.compareAndSet(false, true)) {
logger.info("Flush or compact already in process");
return;
}
return new TableInfo(dao.size(), size);
}

private TableInfo getSSTableDaoSizeInfo() {
Iterator<E> allIterator = all();
long entriesCount = 0;
long daoSize = 0;

while (allIterator.hasNext()) {
E next = allIterator.next();
entriesCount++;
daoSize += extractor.size(next);
Callable<String> flushCallable = inMemoryStorage.prepareFlush(
basePath,
DB_FILENAME_PREFIX,
OFFSETS_FILENAME_PREFIX);
if (flushCallable == null) {
isFlushingOrCompacting.set(false);
return;
}

return new TableInfo(entriesCount, daoSize);
submitFlushAndAddSSTable(flushCallable);
}

// ===================================
// Flush and close
// ===================================

@Override
public synchronized void flush() throws IOException {
if (dao.isEmpty()) {
return;
}
writer.flush(dao.values().iterator(), getInMemoryDaoSizeInfo());
private void submitFlushAndAddSSTable(Callable<String> flushCallable) {
flushOrCompactQueue.execute(() -> {
try {
String newTimestamp = flushCallable.call();
ssTableStorage.addSSTableId(newTimestamp, true);
inMemoryStorage.completeFlush();
} catch (Exception e) {
inMemoryStorage.failFlush();
} finally {
isFlushingOrCompacting.set(false);
}
});
}

@Override
public synchronized void close() throws IOException {
if (closed) {
public void close() {
if (!isClosed.compareAndSet(false, true)) {
return;
}
flush();
if (arena.scope().isAlive()) {
arena.close();

flushOrCompactQueue.close();
try {
String newTimestamp = inMemoryStorage.close(basePath, DB_FILENAME_PREFIX, OFFSETS_FILENAME_PREFIX);
if (newTimestamp != null) {
ssTableStorage.addSSTableId(newTimestamp, false);
}
} catch (Exception e) {
logger.severe(() -> "Error while flushing on close: " + e.getMessage());
}
closed = true;
ssTableStorage.close();
}

@Override
public synchronized void compact() throws IOException {
if (storagesCount <= 1 && dao.isEmpty()) {
public void compact() {
if (!isFlushingOrCompacting.compareAndSet(false, true)) {
logger.info("Flush or compact already in process");
return;
}
Path[] oldTables = getAllTablesPath();
writer.compact(all(), getSSTableDaoSizeInfo());
writer.deleteUnusedFiles(oldTables);
flushOrCompactQueue.execute(() -> {
try {
ssTableStorage.compact();
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
isFlushingOrCompacting.set(false);
}
});
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ public long size(MemorySegment value) {

@Override
public long size(Entry<MemorySegment> entry) {
if (entry == null) {
return 0;
}
return size(entry.key()) + size(entry.value());
}

Expand Down
Loading

0 comments on commit 1b327a5

Please sign in to comment.