diff --git a/README.md b/README.md index 56f7583..86dea31 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,10 @@ way to store and retrieve data from RocksDB. - [x] **MultiSet (RocksMultiSet) :** A basic implementation for set where you can add, remove and check if an item exists. It supports multiple sets under the same name, but different namespaces. - [x] **ZSet (RocksZSet) :** A basic implementation for sorted set where you can add, remove and check if an item exists. You can find score of an item and items within a score range. - [x] **MultiZSet (RocksMultiZSet) :** A basic implementation for sorted set where you can add, remove and check if an item exists. You can find score of an item and items within a score range. It supports multiple sorted sets under the same name, but different namespaces. -- [ ] **Map (RocksMap) :** Not implemented yet +- [x] **Map (RocksMap) :** A basic implementation for map where you can add, remove and check if an item exists. +- [x] **MultiMap (RocksMultiMap) :** A basic implementation for map where you can add, remove and check if an item exists. It supports multiple maps under the same name, but different namespaces. +- [x] **Bitmap (RocksBitmap) :** A basic implementation for bitmap where you can set, unset and check if a bit is set. +- [x] **MultiBitmap (RocksMultiBitmap) :** A basic implementation for bitmap where you can set, unset and check if a bit is set. It supports multiple bitmaps under the same name, but different namespaces. ## Pre-requisites diff --git a/build.gradle b/build.gradle index c062c20..f3b1024 100644 --- a/build.gradle +++ b/build.gradle @@ -105,6 +105,7 @@ subprojects { maven { url "https://oss.sonatype.org/content/repositories/snapshots" } + maven { url 'https://jitpack.io' } } archivesBaseName = 'rocks-types-' + project.name diff --git a/core/build.gradle b/core/build.gradle index 75b2d51..f7c59b3 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -2,6 +2,7 @@ dependencies { api libs.slf4j.api api libs.rocksdb + api libs.roaringbitmap implementation libs.messagepack.jackson } diff --git a/core/src/main/java/com/bloxbean/rocks/types/collection/BaseDataType.java b/core/src/main/java/com/bloxbean/rocks/types/collection/BaseDataType.java index edc4f45..87ffe31 100644 --- a/core/src/main/java/com/bloxbean/rocks/types/collection/BaseDataType.java +++ b/core/src/main/java/com/bloxbean/rocks/types/collection/BaseDataType.java @@ -57,6 +57,24 @@ protected void writeBatch(WriteBatch batch, byte[] key, byte[] value) { } } + @SneakyThrows + protected void merge(WriteBatch writeBatch, byte[] key, byte[] value) { + if (writeBatch != null) { + mergeBatch(writeBatch, key, value); + } else { + merge(key, value); + } + } + + @SneakyThrows + protected void mergeBatch(WriteBatch batch, byte[] key, byte[] value) { + if (columnFamilyHandle != null) { + batch.merge(columnFamilyHandle, key, value); + } else { + batch.merge(key, value); + } + } + @SneakyThrows protected void deleteBatch(WriteBatch batch, byte[] key) { if (columnFamilyHandle != null) { @@ -93,6 +111,15 @@ protected void delete(byte[] key) { } } + @SneakyThrows + protected void merge(byte[] key, byte[] value) { + if (columnFamilyHandle != null) { + db.merge(columnFamilyHandle, key, value); + } else { + db.merge(key, value); + } + } + protected RocksIterator iterator() { if (columnFamilyHandle != null) { return db.newIterator(columnFamilyHandle); diff --git a/core/src/main/java/com/bloxbean/rocks/types/collection/RocksBitmap.java b/core/src/main/java/com/bloxbean/rocks/types/collection/RocksBitmap.java new file mode 100644 index 0000000..3831a82 --- /dev/null +++ b/core/src/main/java/com/bloxbean/rocks/types/collection/RocksBitmap.java @@ -0,0 +1,72 @@ +package com.bloxbean.rocks.types.collection; + +import com.bloxbean.rocks.types.config.RocksDBConfig; +import org.roaringbitmap.RoaringBitmap; +import org.rocksdb.WriteBatch; + +/** + * Bitmap implementation using RocksDB + * RocksBitmap is a wrapper class for RocksMultiBitmap with default column family + */ +public class RocksBitmap extends RocksMultiBitmap { + + public RocksBitmap(RocksDBConfig rocksDBConfig, String name) { + super(rocksDBConfig, name); + } + + public RocksBitmap(RocksDBConfig rocksDBConfig, String name, int fragmentSize) { + super(rocksDBConfig, name, fragmentSize); + } + + public RocksBitmap(RocksDBConfig rocksDBConfig, String columnFamily, String name) { + super(rocksDBConfig, columnFamily, name); + } + + public RocksBitmap(RocksDBConfig rocksDBConfig, String columnFamily, String name, int fragmentSize) { + super(rocksDBConfig, columnFamily, name, fragmentSize); + } + + public void setBit(int index) { + super.setBit(null, index); + } + + public void setBitBatch(WriteBatch writeBatch, int... bitIndexes) { + super.setBitBatch(null, writeBatch, bitIndexes); + } + + public boolean getBit(int bitIndex) { + return super.getBit(null, bitIndex); + } + + public void clearBit(int bitIndex) { + super.clearBit(null, bitIndex); + } + + public void clearBitBatch(WriteBatch writeBatch, int... bitIndexes) { + super.clearBitBatch(null, writeBatch, bitIndexes); + } + + public long nextSetBit(int fromIndex) { + return super.nextSetBit(null, fromIndex); + } + + public long nextClearBit(int fromIndex) { + return super.nextClearBit(null, fromIndex); + } + + public long previousSetBit(int fromIndex) { + return super.previousSetBit(null, fromIndex); + } + + public long previousClearBit(int fromIndex) { + return super.previousClearBit(null, fromIndex); + } + + public RoaringBitmap getAllBits() { + return super.getAllBits(null); + } + + public RoaringBitmap getBits(int fromFragmentIndex, int toFragmentIndex) { + return super.getBits(null, fromFragmentIndex, toFragmentIndex); + } +} diff --git a/core/src/main/java/com/bloxbean/rocks/types/collection/RocksMultiBitmap.java b/core/src/main/java/com/bloxbean/rocks/types/collection/RocksMultiBitmap.java new file mode 100644 index 0000000..53d1a68 --- /dev/null +++ b/core/src/main/java/com/bloxbean/rocks/types/collection/RocksMultiBitmap.java @@ -0,0 +1,440 @@ +package com.bloxbean.rocks.types.collection; + +import com.bloxbean.rocks.types.collection.metadata.BitmapMetadata; +import com.bloxbean.rocks.types.common.KeyBuilder; +import com.bloxbean.rocks.types.config.RocksDBConfig; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.roaringbitmap.RoaringBitmap; +import org.rocksdb.WriteBatch; + +import java.io.*; +import java.util.Optional; + +/** + * Bitmap implementation using RocksDB. It supports multiple bitmaps under the same name but different namespaces. + * This implementation uses RoaringBitmap as the underlying bitmap implementation. + *
+ * This implementation is not thread safe.
+ */
+@Slf4j
+public class RocksMultiBitmap extends BaseDataType {
+ private static final int DEFAULT_FRAGMENT_SIZE = 1000; //1000 bits
+ private int fragmentSize;
+
+ public RocksMultiBitmap(RocksDBConfig rocksDBConfig, String name) {
+ super(rocksDBConfig, name, null);
+ this.fragmentSize = DEFAULT_FRAGMENT_SIZE;
+ }
+
+ public RocksMultiBitmap(RocksDBConfig rocksDBConfig, String name, int fragmentSize) {
+ super(rocksDBConfig, name, null);
+ this.fragmentSize = fragmentSize;
+ }
+
+ public RocksMultiBitmap(RocksDBConfig rocksDBConfig, String columnFamily, String name) {
+ super(rocksDBConfig, columnFamily, name, null);
+ }
+
+ public RocksMultiBitmap(RocksDBConfig rocksDBConfig, String columnFamily, String name, int fragmentSize) {
+ super(rocksDBConfig, columnFamily, name, null);
+ this.fragmentSize = fragmentSize;
+ }
+
+ public void setBit(String ns, int bitIndex) {
+ var metadata = createMetadata(ns).orElseThrow();
+ setBit(ns, null, metadata, bitIndex);
+ }
+
+ public void setBitBatch(String ns, WriteBatch writeBatch, int... bitIndexes) {
+ var metadata = createMetadata(ns).orElseThrow();
+ for (int bitIndex : bitIndexes) {
+ setBit(ns, writeBatch, metadata, bitIndex);
+ }
+ }
+
+ @SneakyThrows
+ private void setBit(String ns, WriteBatch writeBatch, BitmapMetadata metadata, int bitIndex) {
+
+ int fragmentIndex = bitIndex / fragmentSize;
+ int fragmentBitIndex = bitIndex % fragmentSize;
+
+ byte[] keyBytes = getKey(metadata, ns, fragmentIndex);
+
+
+ RoaringBitmap bitSet = null;
+ byte[] valueBytes = get(keyBytes);
+ if (valueBytes == null || valueBytes.length == 0) {
+ bitSet = new RoaringBitmap();
+ } else {
+ bitSet = getRoaringBitmap(valueBytes);
+ }
+
+ bitSet.add(fragmentBitIndex);
+
+ byte[] bytes = serializeRoaringBitmap(bitSet);
+
+ write(writeBatch, keyBytes, bytes);
+ if (fragmentIndex > metadata.getMaxFragmentIndex()) {
+ metadata.setMaxFragmentIndex(fragmentIndex);
+ updateMetadata(writeBatch, metadata, ns);
+ log.info("Max fragment index updated to {}", metadata);
+ }
+ }
+
+ private static byte[] serializeRoaringBitmap(RoaringBitmap bitSet) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ bitSet.serialize(new DataOutputStream(baos));
+ return baos.toByteArray();
+ }
+
+ private static RoaringBitmap getRoaringBitmap(byte[] valueBytes) throws IOException {
+ RoaringBitmap bitSet;
+ bitSet = new RoaringBitmap();//BitSet.valueOf(valueBytes);
+ ByteArrayInputStream bais = new ByteArrayInputStream(valueBytes);
+ bitSet.deserialize(new DataInputStream(bais));
+ return bitSet;
+ }
+
+ public void clearBit(String ns, int bitIndex) {
+ var metadata = createMetadata(ns).orElseThrow();
+ clearBit(ns, null, metadata, bitIndex);
+ }
+
+ public void clearBitBatch(String ns, WriteBatch writeBatch, int... bitIndexes) {
+ var metadata = createMetadata(ns).orElseThrow();
+ for (int bitIndex : bitIndexes) {
+ clearBit(ns, writeBatch, metadata, bitIndex);
+ }
+ }
+
+ @SneakyThrows
+ private void clearBit(String ns, WriteBatch writeBatch, BitmapMetadata metadata, int bitIndex) {
+
+ int fragmentIndex = bitIndex / fragmentSize;
+ int fragmentBitIndex = bitIndex % fragmentSize;
+
+ byte[] keyBytes = getKey(metadata, ns, fragmentIndex);
+
+ byte[] valueBytes = get(keyBytes);
+ if (valueBytes == null || valueBytes.length == 0) {
+ return; //nothing to clear
+ }
+
+ var bitSet = getRoaringBitmap(valueBytes);
+ bitSet.remove(fragmentBitIndex);
+
+ byte[] serializedBytes = serializeRoaringBitmap(bitSet);
+
+ write(writeBatch, keyBytes, serializedBytes);
+ }
+
+ public boolean getBit(String ns, int bitIndex) {
+ var metadata = getMetadata(ns).orElseThrow();
+ return getBit(ns, metadata, bitIndex);
+ }
+
+ @SneakyThrows
+ private boolean getBit(String ns, BitmapMetadata metadata, int bitIndex) {
+
+ int fragmentIndex = bitIndex / fragmentSize;
+ int fragmentBitIndex = bitIndex % fragmentSize;
+
+ byte[] keyBytes = getKey(metadata, ns, fragmentIndex);
+
+ byte[] valueBytes = get(keyBytes);
+ if (valueBytes == null || valueBytes.length == 0) {
+ return false;
+ }
+
+ var bitSet = getRoaringBitmap(valueBytes);
+ return bitSet.contains(fragmentBitIndex);
+ }
+
+ //Get all the bits set in the bitmap in all fragments
+ public RoaringBitmap getAllBits(String ns) {
+ var metadata = getMetadata(ns).orElseThrow();
+ return getAllBits(ns, metadata);
+ }
+
+ @SneakyThrows
+ private RoaringBitmap getAllBits(String ns, BitmapMetadata metadata) {
+ RoaringBitmap roaringBitmap = null;
+ for (int i = 0; i <= metadata.getMaxFragmentIndex(); i++) {
+ byte[] keyBytes = getKey(metadata, ns, i);
+ byte[] valueBytes = get(keyBytes);
+ RoaringBitmap fragmentBitSet = null;
+ if (valueBytes == null || valueBytes.length == 0) {
+ fragmentBitSet = new RoaringBitmap();
+ } else {
+ fragmentBitSet = getRoaringBitmap(valueBytes);
+ }
+
+ fragmentBitSet = RoaringBitmap.addOffset(fragmentBitSet, i * fragmentSize);
+
+ if (roaringBitmap == null) {
+ roaringBitmap = fragmentBitSet;
+ } else {
+ roaringBitmap.or(fragmentBitSet);
+ }
+ }
+
+ return roaringBitmap;
+ }
+
+ public RoaringBitmap getBits(String ns, int fromFragmentIndex, int toFragmentIndex) {
+ var metadata = getMetadata(ns).orElseThrow();
+ return getBits(ns, metadata, fromFragmentIndex, toFragmentIndex);
+ }
+
+ @SneakyThrows
+ private RoaringBitmap getBits(String ns, BitmapMetadata metadata, int fromFragmentIndex, int toFragmentIndex) {
+ RoaringBitmap roaringBitmap = null;
+ for (int i = fromFragmentIndex; i <= toFragmentIndex; i++) {
+ byte[] keyBytes = getKey(metadata, ns, i);
+ byte[] valueBytes = get(keyBytes);
+ RoaringBitmap fragmentBitSet = null;
+ if (valueBytes == null || valueBytes.length == 0) {
+ fragmentBitSet = new RoaringBitmap();
+ } else {
+ fragmentBitSet = getRoaringBitmap(valueBytes);
+ }
+
+ fragmentBitSet = RoaringBitmap.addOffset(fragmentBitSet, i * fragmentSize);
+
+ if (roaringBitmap == null) {
+ roaringBitmap = fragmentBitSet;
+ } else {
+ roaringBitmap.or(fragmentBitSet);
+ }
+ }
+
+ return roaringBitmap;
+ }
+
+ public long nextSetBit(String ns, int fromIndex) {
+ var metadata = getMetadata(ns).orElseThrow();
+ return nextSetBit(ns, metadata, fromIndex);
+ }
+
+ @SneakyThrows
+ private long nextSetBit(String ns, BitmapMetadata metadata, int fromIndex) {
+ int fragmentIndex = fromIndex / fragmentSize;
+ int fragmentBitIndex = fromIndex % fragmentSize;
+
+ byte[] keyBytes = getKey(metadata, ns, fragmentIndex);
+
+ byte[] valueBytes = get(keyBytes);
+ RoaringBitmap bitSet = null;
+ if (valueBytes == null || valueBytes.length == 0) {
+ bitSet = new RoaringBitmap();
+ } else {
+ bitSet = getRoaringBitmap(valueBytes);
+ }
+
+ long nextSetBit = bitSet.nextValue(fragmentBitIndex);
+ if (nextSetBit == -1) {
+ for (int i = fragmentIndex + 1; i <= metadata.getMaxFragmentIndex(); i++) {
+ byte[] nextKeyBytes = getKey(metadata, ns, i);
+ byte[] nextValueBytes = get(nextKeyBytes);
+ if (nextValueBytes == null || nextValueBytes.length == 0) {
+ continue;
+ }
+
+ var nextBitSet = getRoaringBitmap(nextValueBytes);
+ nextSetBit = nextBitSet.nextValue(0);
+ if (nextSetBit != -1) {
+ return nextSetBit + (i * fragmentSize);
+ }
+ }
+ } else {
+ return nextSetBit + (fragmentIndex * fragmentSize);
+ }
+
+ return -1;
+ }
+
+ public long nextClearBit(String ns, int fromIndex) {
+ var metadata = getMetadata(ns).orElseThrow();
+ return nextClearBit(ns, metadata, fromIndex);
+ }
+
+ @SneakyThrows
+ private long nextClearBit(String ns, BitmapMetadata metadata, int fromIndex) {
+ int fragmentIndex = fromIndex / fragmentSize;
+ int fragmentBitIndex = fromIndex % fragmentSize;
+
+ byte[] keyBytes = getKey(metadata, ns, fragmentIndex);
+
+ RoaringBitmap bitSet = null;
+ byte[] valueBytes = get(keyBytes);
+ if (valueBytes == null || valueBytes.length == 0) {
+ bitSet = new RoaringBitmap();
+ } else {
+ bitSet = getRoaringBitmap(valueBytes);
+ }
+
+ long nextClearBit = bitSet.nextAbsentValue(fragmentBitIndex);
+ if (nextClearBit == -1) {
+ for (int i = fragmentIndex + 1; i <= metadata.getMaxFragmentIndex(); i++) {
+ byte[] nextKeyBytes = getKey(metadata, ns, i);
+ byte[] nextValueBytes = get(nextKeyBytes);
+ if (nextValueBytes == null || nextValueBytes.length == 0) {
+ continue;
+ }
+
+ var nextBitSet = getRoaringBitmap(nextValueBytes);
+ nextClearBit = nextBitSet.nextAbsentValue(0);
+ if (nextClearBit != -1) {
+ return nextClearBit + (i * fragmentSize);
+ }
+ }
+ } else {
+ return nextClearBit + (fragmentIndex * fragmentSize);
+ }
+
+ return -1;
+ }
+
+ public long previousSetBit(String ns, int fromIndex) {
+ var metadata = getMetadata(ns).orElseThrow();
+ return previousSetBit(ns, metadata, fromIndex);
+ }
+
+ @SneakyThrows
+ private long previousSetBit(String ns, BitmapMetadata metadata, int fromIndex) {
+ int fragmentIndex = fromIndex / fragmentSize;
+ int fragmentBitIndex = fromIndex % fragmentSize;
+
+ byte[] keyBytes = getKey(metadata, ns, fragmentIndex);
+
+ RoaringBitmap bitSet = null;
+ byte[] valueBytes = get(keyBytes);
+ if (valueBytes == null || valueBytes.length == 0) {
+ bitSet = new RoaringBitmap();
+ } else {
+ bitSet = getRoaringBitmap(valueBytes);
+ }
+
+ long previousSetBit = bitSet.previousValue(fragmentBitIndex);
+ if (previousSetBit == -1) {
+ for (int i = fragmentIndex - 1; i >= 0; i--) {
+ byte[] nextKeyBytes = getKey(metadata, ns, i);
+ byte[] nextValueBytes = get(nextKeyBytes);
+ RoaringBitmap nextBitSet = null;
+ if (nextValueBytes == null || nextValueBytes.length == 0) {
+ nextBitSet = new RoaringBitmap();
+ } else {
+ nextBitSet = getRoaringBitmap(nextValueBytes);
+ }
+
+ previousSetBit = nextBitSet.previousValue(fragmentSize - 1);
+ if (previousSetBit != -1) {
+ return previousSetBit + (i * fragmentSize);
+ }
+ }
+ } else {
+ return previousSetBit + (fragmentIndex * fragmentSize);
+ }
+
+ return -1;
+ }
+
+ public long previousClearBit(String ns, int fromIndex) {
+ var metadata = getMetadata(ns).orElseThrow();
+ return previousClearBit(ns, metadata, fromIndex);
+ }
+
+ @SneakyThrows
+ private long previousClearBit(String ns, BitmapMetadata metadata, int fromIndex) {
+ int fragmentIndex = fromIndex / fragmentSize;
+ int fragmentBitIndex = fromIndex % fragmentSize;
+
+ byte[] keyBytes = getKey(metadata, ns, fragmentIndex);
+
+ RoaringBitmap bitSet = null;
+ byte[] valueBytes = get(keyBytes);
+ if (valueBytes == null || valueBytes.length == 0) {
+ bitSet = new RoaringBitmap();
+ } else {
+ bitSet = getRoaringBitmap(valueBytes);
+ }
+
+ long previousClearBit = bitSet.previousAbsentValue(fragmentBitIndex);
+ if (previousClearBit == -1) {
+ for (int i = fragmentIndex - 1; i >= 0; i--) {
+ byte[] nextKeyBytes = getKey(metadata, ns, i);
+ byte[] nextValueBytes = get(nextKeyBytes);
+ RoaringBitmap nextBitSet = null;
+ if (nextValueBytes == null || nextValueBytes.length == 0) {
+ nextBitSet = new RoaringBitmap();
+ } else {
+ nextBitSet = getRoaringBitmap(nextValueBytes);
+ }
+
+ previousClearBit = nextBitSet.previousAbsentValue(fragmentSize - 1);
+ if (previousClearBit != -1) {
+ return previousClearBit + (i * fragmentSize);
+ }
+ }
+ } else {
+ return previousClearBit + (fragmentIndex * fragmentSize);
+ }
+
+ return -1;
+ }
+
+ @SneakyThrows
+ private BitmapMetadata updateMetadata(WriteBatch writeBatch, BitmapMetadata metadata, String ns) {
+ var metadataKeyName = getMetadataKey(ns);
+ write(writeBatch, metadataKeyName, valueSerializer.serialize(metadata));
+ return metadata;
+ }
+
+ @SneakyThrows
+ protected Optional