() {
- @Override
- public Boolean call() {
- logger.debug("AsyncWriteCache (" + name + "): flush started");
- long s = System.currentTimeMillis();
- boolean ret = flushingCache.flush();
- logger.debug("AsyncWriteCache (" + name + "): flush completed in " + (System.currentTimeMillis() - s) + " ms");
- return ret;
- }
+ lastFlush = flushExecutor.submit(() -> {
+ logger.debug("AsyncWriteCache (" + name + "): flush started");
+ long s = System.currentTimeMillis();
+ boolean ret = flushingCache.flush();
+ logger.debug("AsyncWriteCache (" + name + "): flush completed in " + (System.currentTimeMillis() - s) + " ms");
+ return ret;
});
return lastFlush;
}
diff --git a/ethereumj-core/src/main/java/org/ethereum/datasource/CountingQuotientFilter.java b/ethereumj-core/src/main/java/org/ethereum/datasource/CountingQuotientFilter.java
new file mode 100644
index 0000000000..2a61430425
--- /dev/null
+++ b/ethereumj-core/src/main/java/org/ethereum/datasource/CountingQuotientFilter.java
@@ -0,0 +1,98 @@
+package org.ethereum.datasource;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Supplies {@link QuotientFilter} with collisions counter map.
+ *
+ *
+ * Hence it can handle any number of hard and/or soft collisions without performance lack.
+ * While {@link QuotientFilter} experiencing performance problem when collision number tends to 10_000.
+ *
+ * @author Mikhail Kalinin
+ * @since 14.02.2018
+ */
+public class CountingQuotientFilter extends QuotientFilter {
+
+ long FINGERPRINT_MASK;
+
+ private Map counters = new HashMap<>();
+
+ private CountingQuotientFilter(int quotientBits, int remainderBits) {
+ super(quotientBits, remainderBits);
+ this.FINGERPRINT_MASK = LOW_MASK(QUOTIENT_BITS + REMAINDER_BITS);
+ }
+
+ public static CountingQuotientFilter create(long largestNumberOfElements, long startingElements) {
+ QuotientFilter filter = QuotientFilter.create(largestNumberOfElements, startingElements);
+ return new CountingQuotientFilter(filter.QUOTIENT_BITS, filter.REMAINDER_BITS);
+ }
+
+ @Override
+ public synchronized void insert(long hash) {
+ if (super.maybeContains(hash)) {
+ addRef(hash);
+ } else {
+ super.insert(hash);
+ }
+ }
+
+ @Override
+ public synchronized void remove(long hash) {
+ if (super.maybeContains(hash) && delRef(hash) < 0) {
+ super.remove(hash);
+ }
+ }
+
+ @Override
+ protected long hash(byte[] bytes) {
+ long hash = 1;
+ for (byte b : bytes) {
+ hash = 31 * hash + b;
+ }
+ return hash;
+ }
+
+ public synchronized int getCollisionNumber() {
+ return counters.size();
+ }
+
+ public long getEntryNumber() {
+ return entries;
+ }
+
+ public long getMaxInsertions() {
+ return MAX_INSERTIONS;
+ }
+
+ private void addRef(long hash) {
+ long fp = fingerprint(hash);
+ Counter cnt = counters.get(fp);
+ if (cnt == null) {
+ counters.put(fp, new Counter());
+ } else {
+ cnt.refs++;
+ }
+ }
+
+ private int delRef(long hash) {
+ long fp = fingerprint(hash);
+ Counter cnt = counters.get(fp);
+ if (cnt == null) {
+ return -1;
+ }
+ if (--cnt.refs < 1) {
+ counters.remove(fp);
+ }
+ return cnt.refs;
+ }
+
+ private long fingerprint(long hash) {
+ return hash & FINGERPRINT_MASK;
+ }
+
+ private static class Counter {
+ int refs = 1;
+ }
+}
diff --git a/ethereumj-core/src/main/java/org/ethereum/datasource/DbSource.java b/ethereumj-core/src/main/java/org/ethereum/datasource/DbSource.java
index 5dbd411883..28c297b4bb 100644
--- a/ethereumj-core/src/main/java/org/ethereum/datasource/DbSource.java
+++ b/ethereumj-core/src/main/java/org/ethereum/datasource/DbSource.java
@@ -55,4 +55,20 @@ public interface DbSource extends BatchSource {
* @throws RuntimeException if the method is not supported
*/
Set keys() throws RuntimeException;
+
+ /**
+ * Closes database, destroys its data and finally runs init()
+ */
+ void reset();
+
+ /**
+ * If supported, retrieves a value using a key prefix.
+ * Prefix extraction is meant to be done on the implementing side.
+ *
+ * @param key a key for the lookup
+ * @param prefixBytes prefix length in bytes
+ * @return first value picked by prefix lookup over DB or null if there is no match
+ * @throws RuntimeException if operation is not supported
+ */
+ V prefixLookup(byte[] key, int prefixBytes);
}
diff --git a/ethereumj-core/src/main/java/org/ethereum/datasource/JournalSource.java b/ethereumj-core/src/main/java/org/ethereum/datasource/JournalSource.java
index ad6783699e..3c62b6f158 100644
--- a/ethereumj-core/src/main/java/org/ethereum/datasource/JournalSource.java
+++ b/ethereumj-core/src/main/java/org/ethereum/datasource/JournalSource.java
@@ -18,10 +18,10 @@
package org.ethereum.datasource;
import org.ethereum.datasource.inmem.HashMapDB;
+import org.ethereum.db.prune.Pruner;
import org.ethereum.util.RLP;
import org.ethereum.util.RLPElement;
import org.ethereum.util.RLPList;
-import org.spongycastle.util.encoders.Hex;
import java.util.ArrayList;
import java.util.List;
@@ -29,28 +29,23 @@
/**
* The JournalSource records all the changes which were made before each commitUpdate
* Unlike 'put' deletes are not propagated to the backing Source immediately but are
- * delayed until 'persistUpdate' is called for the corresponding hash.
- * Also 'revertUpdate' might be called for a hash, in this case all inserts are removed
- * from the database.
+ * delayed until {@link Pruner} accepts and persists changes for the corresponding hash.
*
- * Normally this class is used for State pruning: we need all the state nodes for last N
+ * Normally this class is used together with State pruning: we need all the state nodes for last N
* blocks to be able to get back to previous state for applying fork block
* however we would like to delete 'zombie' nodes which are not referenced anymore by
- * calling 'persistUpdate' for the block CurrentBlockNumber - N and we would
+ * persisting update for the block CurrentBlockNumber - N and we would
* also like to remove the updates made by the blocks which weren't too lucky
- * to remain on the main chain by calling revertUpdate for such blocks
+ * to remain on the main chain by reverting update for such blocks
*
- * NOTE: the backing Source should be counting for this class to work correctly
- * if e.g. some key is deleted in block 100 then added in block 200
- * then pruning of the block 100 would delete this key from the backing store
- * if it was non-counting
+ * @see Pruner
*
* Created by Anton Nashatyrev on 08.11.2016.
*/
public class JournalSource extends AbstractChainedSource
implements HashedKeySource {
- private static class Update {
+ public static class Update {
byte[] updateHash;
List insertedKeys = new ArrayList<>();
List deletedKeys = new ArrayList<>();
@@ -86,6 +81,14 @@ private void parse(byte[] encoded) {
deletedKeys.add(aRDeleted.getRLPData());
}
}
+
+ public List getInsertedKeys() {
+ return insertedKeys;
+ }
+
+ public List getDeletedKeys() {
+ return deletedKeys;
+ }
}
private Update currentUpdate = new Update();
@@ -94,8 +97,6 @@ private void parse(byte[] encoded) {
/**
* Constructs instance with the underlying backing Source
- * @param src the Source must implement counting semantics
- * see e.g. {@link CountingBytesSource} or {@link WriteCache.CacheType#COUNTING}
*/
public JournalSource(Source src) {
super(src);
@@ -112,7 +113,7 @@ public void setJournalStore(Source journalSource) {
/**
* Inserts are immediately propagated to the backing Source
* though are still recorded to the current update
- * The insert might later be reverted due to revertUpdate call
+ * The insert might later be reverted by {@link Pruner}
*/
@Override
public synchronized void put(byte[] key, V val) {
@@ -121,14 +122,14 @@ public synchronized void put(byte[] key, V val) {
return;
}
- currentUpdate.insertedKeys.add(key);
getSource().put(key, val);
+ currentUpdate.insertedKeys.add(key);
}
/**
* Deletes are not propagated to the backing Source immediately
* but instead they are recorded to the current Update and
- * might be later persisted with persistUpdate call
+ * might be later persisted
*/
@Override
public synchronized void delete(byte[] key) {
@@ -144,45 +145,18 @@ public synchronized V get(byte[] key) {
* Records all the changes made prior to this call to a single chunk
* with supplied hash.
* Later those updates could be either persisted to backing Source (deletes only)
- * via persistUpdate call
* or reverted from the backing Source (inserts only)
- * via revertUpdate call
*/
- public synchronized void commitUpdates(byte[] updateHash) {
+ public synchronized Update commitUpdates(byte[] updateHash) {
currentUpdate.updateHash = updateHash;
journal.put(updateHash, currentUpdate);
+ Update committed = currentUpdate;
currentUpdate = new Update();
+ return committed;
}
- /**
- * Checks if the update with this hash key exists
- */
- public synchronized boolean hasUpdate(byte[] updateHash) {
- return journal.get(updateHash) != null;
- }
-
- /**
- * Persists all deletes to the backing store made under this hash key
- */
- public synchronized void persistUpdate(byte[] updateHash) {
- Update update = journal.get(updateHash);
- if (update == null) throw new RuntimeException("No update found: " + Hex.toHexString(updateHash));
- for (byte[] key : update.deletedKeys) {
- getSource().delete(key);
- }
- journal.delete(updateHash);
- }
-
- /**
- * Deletes all inserts to the backing store made under this hash key
- */
- public synchronized void revertUpdate(byte[] updateHash) {
- Update update = journal.get(updateHash);
- if (update == null) throw new RuntimeException("No update found: " + Hex.toHexString(updateHash));
- for (byte[] key : update.insertedKeys) {
- getSource().delete(key);
- }
- journal.delete(updateHash);
+ public Source getJournal() {
+ return journal;
}
@Override
diff --git a/ethereumj-core/src/main/java/org/ethereum/datasource/MemSizeEstimator.java b/ethereumj-core/src/main/java/org/ethereum/datasource/MemSizeEstimator.java
index 94f947bcd9..0434bdb3e9 100644
--- a/ethereumj-core/src/main/java/org/ethereum/datasource/MemSizeEstimator.java
+++ b/ethereumj-core/src/main/java/org/ethereum/datasource/MemSizeEstimator.java
@@ -29,10 +29,9 @@ public interface MemSizeEstimator {
/**
* byte[] type size estimator
*/
- MemSizeEstimator ByteArrayEstimator = new MemSizeEstimator() {
- @Override
- public long estimateSize(byte[] bytes) {
- return bytes == null ? 0 : bytes.length + 4; // 4 - compressed ref size
- }
+ MemSizeEstimator ByteArrayEstimator = bytes -> {
+ return bytes == null ? 0 : bytes.length + 16; // 4 - compressed ref size, 12 - Object header
};
+
+
}
diff --git a/ethereumj-core/src/main/java/org/ethereum/datasource/NodeKeyCompositor.java b/ethereumj-core/src/main/java/org/ethereum/datasource/NodeKeyCompositor.java
new file mode 100644
index 0000000000..5f0dc5bcf9
--- /dev/null
+++ b/ethereumj-core/src/main/java/org/ethereum/datasource/NodeKeyCompositor.java
@@ -0,0 +1,71 @@
+package org.ethereum.datasource;
+
+import org.ethereum.config.CommonConfig;
+import org.ethereum.db.RepositoryRoot;
+
+import static java.lang.System.arraycopy;
+import static org.ethereum.crypto.HashUtil.sha3;
+
+/**
+ * Composes keys for contract storage nodes.
+ *
+ * Input: 32-bytes node key, 20-bytes contract address
+ * Output: 32-bytes composed key [first 16-bytes of node key : first 16-bytes of address hash]
+ *
+ * Example:
+ * Contract address hash a9539c810cc2e8fa20785bdd78ec36ccb25e1b5be78dbadf6c4e817c6d170bbb
+ * Key of one of the storage nodes bbbbbb5be78dbadf6c4e817c6d170bbb47e9916f8f6cc4607c5f3819ce98497b
+ * Composed key will be a9539c810cc2e8fa20785bdd78ec36ccbbbbbb5be78dbadf6c4e817c6d170bbb
+ *
+ * This mechanism is a part of flat storage source which is free from reference counting
+ *
+ * @see CommonConfig#trieNodeSource()
+ * @see RepositoryRoot#RepositoryRoot(Source, byte[])
+ *
+ * @author Mikhail Kalinin
+ * @since 05.12.2017
+ */
+public class NodeKeyCompositor implements Serializer {
+
+ public static final int HASH_LEN = 32;
+ public static final int PREFIX_BYTES = 16;
+ private byte[] addrHash;
+
+ public NodeKeyCompositor(byte[] addrOrHash) {
+ this.addrHash = addrHash(addrOrHash);
+ }
+
+ @Override
+ public byte[] serialize(byte[] key) {
+ return composeInner(key, addrHash);
+ }
+
+ @Override
+ public byte[] deserialize(byte[] stream) {
+ return stream;
+ }
+
+ public static byte[] compose(byte[] key, byte[] addrOrHash) {
+ return composeInner(key, addrHash(addrOrHash));
+ }
+
+ private static byte[] composeInner(byte[] key, byte[] addrHash) {
+
+ validateKey(key);
+
+ byte[] derivative = new byte[key.length];
+ arraycopy(key, 0, derivative, 0, PREFIX_BYTES);
+ arraycopy(addrHash, 0, derivative, PREFIX_BYTES, PREFIX_BYTES);
+
+ return derivative;
+ }
+
+ private static void validateKey(byte[] key) {
+ if (key.length != HASH_LEN)
+ throw new IllegalArgumentException("Key is not a hash code");
+ }
+
+ private static byte[] addrHash(byte[] addrOrHash) {
+ return addrOrHash.length == HASH_LEN ? addrOrHash : sha3(addrOrHash);
+ }
+}
diff --git a/ethereumj-core/src/main/java/org/ethereum/datasource/PrefixLookupSource.java b/ethereumj-core/src/main/java/org/ethereum/datasource/PrefixLookupSource.java
new file mode 100644
index 0000000000..256c4e2cfc
--- /dev/null
+++ b/ethereumj-core/src/main/java/org/ethereum/datasource/PrefixLookupSource.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) [2016] [ ]
+ * This file is part of the ethereumJ library.
+ *
+ * The ethereumJ library is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * The ethereumJ library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with the ethereumJ library. If not, see .
+ */
+package org.ethereum.datasource;
+
+/**
+ * A kind of source which executes {@link #get(byte[])} query as
+ * a {@link DbSource#prefixLookup(byte[], int)} query of backing source.
+ *
+ * Other operations are simply propagated to backing {@link DbSource}.
+ *
+ * @author Mikhail Kalinin
+ * @since 01.12.2017
+ */
+public class PrefixLookupSource implements Source {
+
+ // prefix length in bytes
+ private int prefixBytes;
+ private DbSource source;
+
+ public PrefixLookupSource(DbSource source, int prefixBytes) {
+ this.source = source;
+ this.prefixBytes = prefixBytes;
+ }
+
+ @Override
+ public V get(byte[] key) {
+ return source.prefixLookup(key, prefixBytes);
+ }
+
+ @Override
+ public void put(byte[] key, V val) {
+ source.put(key, val);
+ }
+
+ @Override
+ public void delete(byte[] key) {
+ source.delete(key);
+ }
+
+ @Override
+ public boolean flush() {
+ return source.flush();
+ }
+}
diff --git a/ethereumj-core/src/main/java/org/ethereum/datasource/QuotientFilter.java b/ethereumj-core/src/main/java/org/ethereum/datasource/QuotientFilter.java
index e0019e08c0..54fa2b539a 100644
--- a/ethereumj-core/src/main/java/org/ethereum/datasource/QuotientFilter.java
+++ b/ethereumj-core/src/main/java/org/ethereum/datasource/QuotientFilter.java
@@ -135,7 +135,7 @@ static long LOW_MASK(long n) {
}
static int TABLE_SIZE(int quotientBits, int remainderBits) {
- long bits = (1 << quotientBits) * (remainderBits + 3);
+ long bits = (1L << quotientBits) * (remainderBits + 3);
long longs = bits / 64;
return Ints.checkedCast((bits % 64) > 0 ? (longs + 1) : longs);
}
@@ -192,7 +192,7 @@ public QuotientFilter(int quotientBits, int remainderBits) {
INDEX_MASK = LOW_MASK(QUOTIENT_BITS);
REMAINDER_MASK = LOW_MASK(REMAINDER_BITS);
ELEMENT_MASK = LOW_MASK(ELEMENT_BITS);
- MAX_SIZE = 1 << QUOTIENT_BITS;
+ MAX_SIZE = 1L << QUOTIENT_BITS;
MAX_INSERTIONS = (long) (MAX_SIZE * .75);
table = new long[TABLE_SIZE(QUOTIENT_BITS, REMAINDER_BITS)];
entries = 0;
@@ -364,7 +364,7 @@ public boolean overflowed() {
// insert(hashFactory.hash64().hash(data, offset, length, 0));
// }
- private long hash(byte[] bytes) {
+ protected long hash(byte[] bytes) {
return (bytes[0] & 0xFFL) << 56 |
(bytes[1] & 0xFFL) << 48 |
(bytes[2] & 0xFFL) << 40 |
diff --git a/ethereumj-core/src/main/java/org/ethereum/datasource/SourceCodec.java b/ethereumj-core/src/main/java/org/ethereum/datasource/SourceCodec.java
index 34527e41e2..f21dc4ca16 100644
--- a/ethereumj-core/src/main/java/org/ethereum/datasource/SourceCodec.java
+++ b/ethereumj-core/src/main/java/org/ethereum/datasource/SourceCodec.java
@@ -72,6 +72,15 @@ public ValueOnly(Source src, Serializer va
}
}
+ /**
+ * Shortcut class when only key conversion is required
+ */
+ public static class KeyOnly extends SourceCodec {
+ public KeyOnly(Source src, Serializer keySerializer) {
+ super(src, keySerializer, new Serializers.Identity());
+ }
+ }
+
/**
* Shortcut class when only value conversion is required and keys are of byte[] type
*/
diff --git a/ethereumj-core/src/main/java/org/ethereum/datasource/inmem/HashMapDB.java b/ethereumj-core/src/main/java/org/ethereum/datasource/inmem/HashMapDB.java
index 088605cdf0..5aeeaa5b86 100644
--- a/ethereumj-core/src/main/java/org/ethereum/datasource/inmem/HashMapDB.java
+++ b/ethereumj-core/src/main/java/org/ethereum/datasource/inmem/HashMapDB.java
@@ -20,6 +20,7 @@
import org.ethereum.datasource.DbSource;
import org.ethereum.util.ALock;
import org.ethereum.util.ByteArrayMap;
+import org.ethereum.util.FastByteComparisons;
import java.util.Map;
import java.util.Set;
@@ -102,6 +103,25 @@ public Set keys() {
}
}
+ @Override
+ public void reset() {
+ try (ALock l = writeLock.lock()) {
+ storage.clear();
+ }
+ }
+
+ @Override
+ public V prefixLookup(byte[] key, int prefixBytes) {
+ try (ALock l = readLock.lock()) {
+ for (Map.Entry e : storage.entrySet())
+ if (FastByteComparisons.compareTo(key, 0, prefixBytes, e.getKey(), 0, prefixBytes) == 0) {
+ return e.getValue();
+ }
+
+ return null;
+ }
+ }
+
@Override
public void updateBatch(Map rows) {
try (ALock l = writeLock.lock()) {
diff --git a/ethereumj-core/src/main/java/org/ethereum/datasource/inmem/HashMapDBSimple.java b/ethereumj-core/src/main/java/org/ethereum/datasource/inmem/HashMapDBSimple.java
index 8ac544d7ed..44e667248e 100644
--- a/ethereumj-core/src/main/java/org/ethereum/datasource/inmem/HashMapDBSimple.java
+++ b/ethereumj-core/src/main/java/org/ethereum/datasource/inmem/HashMapDBSimple.java
@@ -20,6 +20,7 @@
import org.ethereum.datasource.DbSource;
import org.ethereum.util.ALock;
import org.ethereum.util.ByteArrayMap;
+import org.ethereum.util.FastByteComparisons;
import java.util.Map;
import java.util.Set;
@@ -89,6 +90,22 @@ public Set keys() {
return getStorage().keySet();
}
+ @Override
+ public void reset() {
+ storage.clear();
+ }
+
+ @Override
+ public V prefixLookup(byte[] key, int prefixBytes) {
+
+ for (Map.Entry e : storage.entrySet())
+ if (FastByteComparisons.compareTo(key, 0, prefixBytes, e.getKey(), 0, prefixBytes) == 0) {
+ return e.getValue();
+ }
+
+ return null;
+ }
+
@Override
public void updateBatch(Map rows) {
for (Map.Entry entry : rows.entrySet()) {
diff --git a/ethereumj-core/src/main/java/org/ethereum/datasource/leveldb/LevelDbDataSource.java b/ethereumj-core/src/main/java/org/ethereum/datasource/leveldb/LevelDbDataSource.java
index 0494c00f1c..1c96296298 100644
--- a/ethereumj-core/src/main/java/org/ethereum/datasource/leveldb/LevelDbDataSource.java
+++ b/ethereumj-core/src/main/java/org/ethereum/datasource/leveldb/LevelDbDataSource.java
@@ -131,12 +131,18 @@ private Path getPath() {
return Paths.get(config.databaseDir(), name);
}
+ @Override
public void reset() {
close();
FileUtil.recursiveDelete(getPath().toString());
init();
}
+ @Override
+ public byte[] prefixLookup(byte[] key, int prefixBytes) {
+ throw new RuntimeException("LevelDbDataSource.prefixLookup() is not supported");
+ }
+
@Override
public boolean isAlive() {
return alive;
diff --git a/ethereumj-core/src/main/java/org/ethereum/datasource/rocksdb/RocksDbDataSource.java b/ethereumj-core/src/main/java/org/ethereum/datasource/rocksdb/RocksDbDataSource.java
new file mode 100644
index 0000000000..fb0c7289d6
--- /dev/null
+++ b/ethereumj-core/src/main/java/org/ethereum/datasource/rocksdb/RocksDbDataSource.java
@@ -0,0 +1,368 @@
+/*
+ * Copyright (c) [2016] [ ]
+ * This file is part of the ethereumJ library.
+ *
+ * The ethereumJ library is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * The ethereumJ library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with the ethereumJ library. If not, see .
+ */
+package org.ethereum.datasource.rocksdb;
+
+import org.ethereum.config.SystemProperties;
+import org.ethereum.datasource.DbSource;
+import org.ethereum.datasource.NodeKeyCompositor;
+import org.ethereum.util.FileUtil;
+import org.rocksdb.*;
+import org.rocksdb.CompressionType;
+import org.rocksdb.Options;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.spongycastle.util.encoders.Hex;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.*;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static java.lang.System.arraycopy;
+
+/**
+ * @author Mikhail Kalinin
+ * @since 28.11.2017
+ */
+public class RocksDbDataSource implements DbSource {
+
+ private static final Logger logger = LoggerFactory.getLogger("db");
+
+ @Autowired
+ SystemProperties config = SystemProperties.getDefault(); // initialized for standalone test
+
+ String name;
+ RocksDB db;
+ ReadOptions readOpts;
+ boolean alive;
+
+ // The native RocksDB insert/update/delete are normally thread-safe
+ // However close operation is not thread-safe.
+ // This ReadWriteLock still permits concurrent execution of insert/delete/update operations
+ // however blocks them on init/close/delete operations
+ private ReadWriteLock resetDbLock = new ReentrantReadWriteLock();
+
+ static {
+ RocksDB.loadLibrary();
+ }
+
+ public RocksDbDataSource() {
+ }
+
+ public RocksDbDataSource(String name) {
+ this.name = name;
+ logger.debug("New RocksDbDataSource: " + name);
+ }
+
+ @Override
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public void init() {
+ resetDbLock.writeLock().lock();
+ try {
+ logger.debug("~> RocksDbDataSource.init(): " + name);
+
+ if (isAlive()) return;
+
+ if (name == null) throw new NullPointerException("no name set to the db");
+
+ try (Options options = new Options()) {
+
+ // most of these options are suggested by https://github.com/facebook/rocksdb/wiki/Set-Up-Options
+
+ // general options
+ options.setCreateIfMissing(true);
+ options.setCompressionType(CompressionType.LZ4_COMPRESSION);
+ options.setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION);
+ options.setLevelCompactionDynamicLevelBytes(true);
+ options.setMaxBackgroundCompactions(4);
+ options.setMaxBackgroundFlushes(2);
+ options.setMaxOpenFiles(32);
+
+ // key prefix for state node lookups
+ options.useFixedLengthPrefixExtractor(NodeKeyCompositor.PREFIX_BYTES);
+
+ // table options
+ final BlockBasedTableConfig tableCfg;
+ options.setTableFormatConfig(tableCfg = new BlockBasedTableConfig());
+ tableCfg.setBlockSize(16 * 1024);
+ tableCfg.setBlockCacheSize(32 * 1024 * 1024);
+ tableCfg.setCacheIndexAndFilterBlocks(true);
+ tableCfg.setPinL0FilterAndIndexBlocksInCache(true);
+ tableCfg.setFilter(new BloomFilter(10, false));
+
+ // read options
+ readOpts = new ReadOptions().setPrefixSameAsStart(true)
+ .setVerifyChecksums(false);
+
+ try {
+ logger.debug("Opening database");
+ final Path dbPath = getPath();
+ if (!Files.isSymbolicLink(dbPath.getParent())) Files.createDirectories(dbPath.getParent());
+
+ if (config.databaseFromBackup() && backupPath().toFile().canWrite()) {
+ logger.debug("Restoring database from backup: '{}'", name);
+ try (BackupableDBOptions backupOptions = new BackupableDBOptions(backupPath().toString());
+ RestoreOptions restoreOptions = new RestoreOptions(false);
+ BackupEngine backups = BackupEngine.open(Env.getDefault(), backupOptions)) {
+
+ if (!backups.getBackupInfo().isEmpty()) {
+ backups.restoreDbFromLatestBackup(getPath().toString(), getPath().toString(),
+ restoreOptions);
+ }
+
+ } catch (RocksDBException e) {
+ logger.error("Failed to restore database '{}' from backup", name, e);
+ }
+ }
+
+ logger.debug("Initializing new or existing database: '{}'", name);
+ try {
+ db = RocksDB.open(options, dbPath.toString());
+ } catch (RocksDBException e) {
+ logger.error(e.getMessage(), e);
+ throw new RuntimeException("Failed to initialize database", e);
+ }
+
+ alive = true;
+
+ } catch (IOException ioe) {
+ logger.error(ioe.getMessage(), ioe);
+ throw new RuntimeException("Failed to initialize database", ioe);
+ }
+
+ logger.debug("<~ RocksDbDataSource.init(): " + name);
+ }
+ } finally {
+ resetDbLock.writeLock().unlock();
+ }
+ }
+
+ public void backup() {
+ resetDbLock.readLock().lock();
+ if (logger.isTraceEnabled()) logger.trace("~> RocksDbDataSource.backup(): " + name);
+ Path path = backupPath();
+ path.toFile().mkdirs();
+ try (BackupableDBOptions backupOptions = new BackupableDBOptions(path.toString());
+ BackupEngine backups = BackupEngine.open(Env.getDefault(), backupOptions)) {
+
+ backups.createNewBackup(db, true);
+
+ if (logger.isTraceEnabled()) logger.trace("<~ RocksDbDataSource.backup(): " + name + " done");
+ } catch (RocksDBException e) {
+ logger.error("Failed to backup database '{}'", name, e);
+ throw new RuntimeException(e);
+ } finally {
+ resetDbLock.readLock().unlock();
+ }
+ }
+
+ private Path backupPath() {
+ return Paths.get(config.databaseDir(), "backup", name);
+ }
+
+ @Override
+ public boolean isAlive() {
+ return alive;
+ }
+
+ @Override
+ public void close() {
+ resetDbLock.writeLock().lock();
+ try {
+ if (!isAlive()) return;
+
+ logger.debug("Close db: {}", name);
+ db.close();
+
+ alive = false;
+
+ } catch (Exception e) {
+ logger.error("Error closing db '{}'", name, e);
+ } finally {
+ resetDbLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public Set keys() throws RuntimeException {
+ resetDbLock.readLock().lock();
+ try {
+ if (logger.isTraceEnabled()) logger.trace("~> RocksDbDataSource.keys(): " + name);
+ try (RocksIterator iterator = db.newIterator()) {
+ Set result = new HashSet<>();
+ for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) {
+ result.add(iterator.key());
+ }
+ if (logger.isTraceEnabled()) logger.trace("<~ RocksDbDataSource.keys(): " + name + ", " + result.size());
+ return result;
+ } catch (Exception e) {
+ logger.error("Error iterating db '{}'", name, e);
+ throw new RuntimeException(e);
+ }
+ } finally {
+ resetDbLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public void reset() {
+ close();
+ FileUtil.recursiveDelete(getPath().toString());
+ init();
+ }
+
+ private Path getPath() {
+ return Paths.get(config.databaseDir(), name);
+ }
+
+ @Override
+ public void updateBatch(Map rows) {
+ resetDbLock.readLock().lock();
+ try {
+ if (logger.isTraceEnabled()) logger.trace("~> RocksDbDataSource.updateBatch(): " + name + ", " + rows.size());
+ try {
+
+ try (WriteBatch batch = new WriteBatch();
+ WriteOptions writeOptions = new WriteOptions()) {
+ for (Map.Entry entry : rows.entrySet()) {
+ if (entry.getValue() == null) {
+ batch.remove(entry.getKey());
+ } else {
+ batch.put(entry.getKey(), entry.getValue());
+ }
+ }
+ db.write(writeOptions, batch);
+ }
+
+ if (logger.isTraceEnabled()) logger.trace("<~ RocksDbDataSource.updateBatch(): " + name + ", " + rows.size());
+ } catch (RocksDBException e) {
+ logger.error("Error in batch update on db '{}'", name, e);
+ throw new RuntimeException(e);
+ }
+ } finally {
+ resetDbLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public void put(byte[] key, byte[] val) {
+ resetDbLock.readLock().lock();
+ try {
+ if (logger.isTraceEnabled()) logger.trace("~> RocksDbDataSource.put(): " + name + ", key: " + Hex.toHexString(key) + ", " + (val == null ? "null" : val.length));
+ if (val != null) {
+ db.put(key, val);
+ } else {
+ db.delete(key);
+ }
+ if (logger.isTraceEnabled()) logger.trace("<~ RocksDbDataSource.put(): " + name + ", key: " + Hex.toHexString(key) + ", " + (val == null ? "null" : val.length));
+ } catch (RocksDBException e) {
+ logger.error("Failed to put into db '{}'", name, e);
+ throw new RuntimeException(e);
+ } finally {
+ resetDbLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public byte[] get(byte[] key) {
+ resetDbLock.readLock().lock();
+ try {
+ if (logger.isTraceEnabled()) logger.trace("~> RocksDbDataSource.get(): " + name + ", key: " + Hex.toHexString(key));
+ byte[] ret = db.get(readOpts, key);
+ if (logger.isTraceEnabled()) logger.trace("<~ RocksDbDataSource.get(): " + name + ", key: " + Hex.toHexString(key) + ", " + (ret == null ? "null" : ret.length));
+ return ret;
+ } catch (RocksDBException e) {
+ logger.error("Failed to get from db '{}'", name, e);
+ throw new RuntimeException(e);
+ } finally {
+ resetDbLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public void delete(byte[] key) {
+ resetDbLock.readLock().lock();
+ try {
+ if (logger.isTraceEnabled()) logger.trace("~> RocksDbDataSource.delete(): " + name + ", key: " + Hex.toHexString(key));
+ db.delete(key);
+ if (logger.isTraceEnabled()) logger.trace("<~ RocksDbDataSource.delete(): " + name + ", key: " + Hex.toHexString(key));
+ } catch (RocksDBException e) {
+ logger.error("Failed to delete from db '{}'", name, e);
+ throw new RuntimeException(e);
+ } finally {
+ resetDbLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public byte[] prefixLookup(byte[] key, int prefixBytes) {
+
+ if (prefixBytes != NodeKeyCompositor.PREFIX_BYTES)
+ throw new RuntimeException("RocksDbDataSource.prefixLookup() supports only " + prefixBytes + "-bytes prefix");
+
+ resetDbLock.readLock().lock();
+ try {
+
+ if (logger.isTraceEnabled()) logger.trace("~> RocksDbDataSource.prefixLookup(): " + name + ", key: " + Hex.toHexString(key));
+
+ // RocksDB sets initial position of iterator to the first key which is greater or equal to the seek key
+ // since keys in RocksDB are ordered in asc order iterator must be initiated with the lowest key
+ // thus bytes with indexes greater than PREFIX_BYTES must be nullified
+ byte[] prefix = new byte[NodeKeyCompositor.PREFIX_BYTES];
+ arraycopy(key, 0, prefix, 0, NodeKeyCompositor.PREFIX_BYTES);
+
+ byte[] ret = null;
+ try (RocksIterator it = db.newIterator(readOpts)) {
+
+ it.seek(prefix);
+ if (it.isValid())
+ ret = it.value();
+
+ } catch (Exception e) {
+ logger.error("Failed to seek by prefix in db '{}'", name, e);
+ throw new RuntimeException(e);
+ }
+
+ if (logger.isTraceEnabled()) logger.trace("<~ RocksDbDataSource.prefixLookup(): " + name + ", key: " + Hex.toHexString(key) + ", " + (ret == null ? "null" : ret.length));
+
+ return ret;
+
+ } finally {
+ resetDbLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public boolean flush() {
+ return false;
+ }
+}
diff --git a/ethereumj-core/src/main/java/org/ethereum/db/DbFlushManager.java b/ethereumj-core/src/main/java/org/ethereum/db/DbFlushManager.java
index 4bbceac368..be544eb05f 100644
--- a/ethereumj-core/src/main/java/org/ethereum/db/DbFlushManager.java
+++ b/ethereumj-core/src/main/java/org/ethereum/db/DbFlushManager.java
@@ -23,10 +23,7 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.ethereum.config.CommonConfig;
import org.ethereum.config.SystemProperties;
-import org.ethereum.datasource.AbstractCachedSource;
-import org.ethereum.datasource.AsyncFlushable;
-import org.ethereum.datasource.DbSource;
-import org.ethereum.datasource.WriteCache;
+import org.ethereum.datasource.*;
import org.ethereum.listener.CompositeEthereumListener;
import org.ethereum.listener.EthereumListenerAdapter;
import org.slf4j.Logger;
@@ -45,7 +42,8 @@
public class DbFlushManager {
private static final Logger logger = LoggerFactory.getLogger("db");
- List> writeCaches = new ArrayList<>();
+ List> writeCaches = new ArrayList<>();
+ List> sources = new ArrayList<>();
Set dbSources = new HashSet<>();
AbstractCachedSource stateDbCache;
@@ -90,13 +88,17 @@ public void setSizeThreshold(long sizeThreshold) {
this.sizeThreshold = sizeThreshold;
}
- public void addCache(AbstractCachedSource cache) {
+ public void addCache(AbstractCachedSource cache) {
writeCaches.add(cache);
}
+ public void addSource(Source src) {
+ sources.add(src);
+ }
+
public long getCacheSize() {
long ret = 0;
- for (AbstractCachedSource writeCache : writeCaches) {
+ for (AbstractCachedSource writeCache : writeCaches) {
ret += writeCache.estimateCacheSize();
}
return ret;
@@ -141,7 +143,7 @@ public synchronized Future flush() {
}
}
logger.debug("Flipping async storages");
- for (AbstractCachedSource writeCache : writeCaches) {
+ for (AbstractCachedSource writeCache : writeCaches) {
try {
if (writeCache instanceof AsyncFlushable) {
((AsyncFlushable) writeCache).flipStorage();
@@ -152,32 +154,31 @@ public synchronized Future flush() {
}
logger.debug("Submitting flush task");
- return lastFlush = flushThread.submit(new Callable() {
- @Override
- public Boolean call() throws Exception {
- boolean ret = false;
- long s = System.nanoTime();
- logger.info("Flush started");
-
- for (AbstractCachedSource writeCache : writeCaches) {
- if (writeCache instanceof AsyncFlushable) {
- try {
- ret |= ((AsyncFlushable) writeCache).flushAsync().get();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- } else {
- ret |= writeCache.flush();
+ return lastFlush = flushThread.submit(() -> {
+ boolean ret = false;
+ long s = System.nanoTime();
+ logger.info("Flush started");
+
+ sources.forEach(Source::flush);
+
+ for (AbstractCachedSource writeCache : writeCaches) {
+ if (writeCache instanceof AsyncFlushable) {
+ try {
+ ret |= ((AsyncFlushable) writeCache).flushAsync().get();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
}
+ } else {
+ ret |= writeCache.flush();
}
- if (stateDbCache != null) {
- logger.debug("Flushing to DB");
- stateDbCache.flush();
- }
- logger.info("Flush completed in " + (System.nanoTime() - s) / 1000000 + " ms");
-
- return ret;
}
+ if (stateDbCache != null) {
+ logger.debug("Flushing to DB");
+ stateDbCache.flush();
+ }
+ logger.info("Flush completed in " + (System.nanoTime() - s) / 1000000 + " ms");
+
+ return ret;
});
}
diff --git a/ethereumj-core/src/main/java/org/ethereum/db/PeerSource.java b/ethereumj-core/src/main/java/org/ethereum/db/PeerSource.java
index cf94879a68..4147b48c0f 100644
--- a/ethereumj-core/src/main/java/org/ethereum/db/PeerSource.java
+++ b/ethereumj-core/src/main/java/org/ethereum/db/PeerSource.java
@@ -19,11 +19,12 @@
import org.apache.commons.lang3.tuple.Pair;
import org.ethereum.datasource.DataSourceArray;
+import org.ethereum.datasource.DbSource;
import org.ethereum.datasource.ObjectDataSource;
import org.ethereum.datasource.Serializer;
import org.ethereum.datasource.Source;
-import org.ethereum.datasource.leveldb.LevelDbDataSource;
import org.ethereum.net.rlpx.Node;
+import org.ethereum.util.ByteUtil;
import org.ethereum.util.RLP;
import org.ethereum.util.RLPList;
import org.slf4j.Logger;
@@ -65,11 +66,10 @@ public Pair deserialize(byte[] bytes) {
Node node = new Node(nodeRlp);
node.setDiscoveryNode(nodeIsDiscovery != null);
- return Pair.of(node, savedReputation == null ? 0 : (new BigInteger(1, savedReputation)).intValue());
+ return Pair.of(node, ByteUtil.byteArrayToInt(savedReputation));
}
};
-
public PeerSource(Source src) {
this.src = src;
INST = this;
@@ -82,8 +82,8 @@ public DataSourceArray> getNodes() {
}
public void clear() {
- if (src instanceof LevelDbDataSource) {
- ((LevelDbDataSource) src).reset();
+ if (src instanceof DbSource) {
+ ((DbSource) src).reset();
this.nodes = new DataSourceArray<>(
new ObjectDataSource<>(src, NODE_SERIALIZER, 512));
} else {
diff --git a/ethereumj-core/src/main/java/org/ethereum/db/PruneManager.java b/ethereumj-core/src/main/java/org/ethereum/db/PruneManager.java
index 4f186afaca..6cf04dafc7 100644
--- a/ethereumj-core/src/main/java/org/ethereum/db/PruneManager.java
+++ b/ethereumj-core/src/main/java/org/ethereum/db/PruneManager.java
@@ -21,55 +21,136 @@
import org.ethereum.core.Block;
import org.ethereum.core.BlockHeader;
import org.ethereum.datasource.JournalSource;
+import org.ethereum.datasource.Source;
+import org.ethereum.db.prune.Segment;
+import org.ethereum.db.prune.Pruner;
import org.springframework.beans.factory.annotation.Autowired;
+import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Collectors;
/**
+ * Manages state pruning part of block processing.
+ *
+ *
+ * Constructs chain segments and prune them when they are complete
+ *
* Created by Anton Nashatyrev on 10.11.2016.
+ *
+ * @see Segment
+ * @see Pruner
*/
public class PruneManager {
- private JournalSource journal;
+ private static final int LONGEST_CHAIN = 192;
+
+ private JournalSource> journalSource;
@Autowired
private IndexedBlockStore blockStore;
private int pruneBlocksCnt;
+ private Segment segment;
+ private Pruner pruner;
+
@Autowired
private PruneManager(SystemProperties config) {
pruneBlocksCnt = config.databasePruneDepth();
}
- public PruneManager(IndexedBlockStore blockStore, JournalSource journal, int pruneBlocksCnt) {
+ public PruneManager(IndexedBlockStore blockStore, JournalSource> journalSource,
+ Source pruneStorage, int pruneBlocksCnt) {
this.blockStore = blockStore;
- this.journal = journal;
+ this.journalSource = journalSource;
this.pruneBlocksCnt = pruneBlocksCnt;
+
+ if (journalSource != null && pruneStorage != null)
+ this.pruner = new Pruner(journalSource.getJournal(), pruneStorage);
}
@Autowired
public void setStateSource(StateSource stateSource) {
- journal = stateSource.getJournalSource();
+ journalSource = stateSource.getJournalSource();
+ if (journalSource != null)
+ pruner = new Pruner(journalSource.getJournal(), stateSource.getNoJournalSource());
}
public void blockCommitted(BlockHeader block) {
if (pruneBlocksCnt < 0) return; // pruning disabled
- journal.commitUpdates(block.getHash());
- long pruneBlockNum = block.getNumber() - pruneBlocksCnt;
- if (pruneBlockNum < 0) return;
-
- List pruneBlocks = blockStore.getBlocksByNumber(pruneBlockNum);
- Block chainBlock = blockStore.getChainBlockByNumber(pruneBlockNum);
- for (Block pruneBlock : pruneBlocks) {
- if (journal.hasUpdate(pruneBlock.getHash())) {
- if (chainBlock.isEqual(pruneBlock)) {
- journal.persistUpdate(pruneBlock.getHash());
- } else {
- journal.revertUpdate(pruneBlock.getHash());
+ JournalSource.Update update = journalSource.commitUpdates(block.getHash());
+ pruner.feed(update);
+
+ long forkBlockNum = block.getNumber() - getForkBlocksCnt();
+ if (forkBlockNum < 0) return;
+
+ List pruneBlocks = blockStore.getBlocksByNumber(forkBlockNum);
+ Block chainBlock = blockStore.getChainBlockByNumber(forkBlockNum);
+
+ if (segment == null) {
+ if (pruneBlocks.size() == 1) // wait for a single chain
+ segment = new Segment(chainBlock);
+ return;
+ }
+
+ Segment.Tracker tracker = segment.startTracking();
+ tracker.addMain(chainBlock);
+ tracker.addAll(pruneBlocks);
+ tracker.commit();
+
+ if (segment.isComplete()) {
+ if (!pruner.isReady()) {
+ List forkWindow = getAllChainsHashes(segment.getRootNumber() + 1, blockStore.getMaxNumber());
+ pruner.init(forkWindow, getForkBlocksCnt());
+
+ int mainChainWindowSize = pruneBlocksCnt - getForkBlocksCnt();
+ if (mainChainWindowSize > 0) {
+ List mainChainWindow = getMainChainHashes(Math.max(1, segment.getRootNumber() - mainChainWindowSize + 1),
+ segment.getRootNumber());
+ pruner.withSecondStep(mainChainWindow, mainChainWindowSize);
}
}
+ pruner.prune(segment);
+ segment = new Segment(chainBlock);
+ }
+
+ long mainBlockNum = block.getNumber() - getMainBlocksCnt();
+ if (mainBlockNum < 0) return;
+
+ byte[] hash = blockStore.getBlockHashByNumber(mainBlockNum);
+ pruner.persist(hash);
+ }
+
+ private int getForkBlocksCnt() {
+ return Math.min(pruneBlocksCnt, 2 * LONGEST_CHAIN);
+ }
+
+ private int getMainBlocksCnt() {
+ if (pruneBlocksCnt <= 2 * LONGEST_CHAIN) {
+ return Integer.MAX_VALUE;
+ } else {
+ return pruneBlocksCnt;
+ }
+ }
+
+ private List getAllChainsHashes(long fromBlock, long toBlock) {
+ List ret = new ArrayList<>();
+ for (long num = fromBlock; num <= toBlock; num++) {
+ List blocks = blockStore.getBlocksByNumber(num);
+ List hashes = blocks.stream().map(Block::getHash).collect(Collectors.toList());
+ ret.addAll(hashes);
+ }
+ return ret;
+ }
+
+ private List getMainChainHashes(long fromBlock, long toBlock) {
+ List ret = new ArrayList<>();
+ for (long num = fromBlock; num <= toBlock; num++) {
+ byte[] hash = blockStore.getBlockHashByNumber(num);
+ ret.add(hash);
}
+ return ret;
}
}
diff --git a/ethereumj-core/src/main/java/org/ethereum/db/RepositoryImpl.java b/ethereumj-core/src/main/java/org/ethereum/db/RepositoryImpl.java
index 20e0f6d03a..4dba1e06be 100644
--- a/ethereumj-core/src/main/java/org/ethereum/db/RepositoryImpl.java
+++ b/ethereumj-core/src/main/java/org/ethereum/db/RepositoryImpl.java
@@ -127,7 +127,7 @@ public synchronized boolean hasContractDetails(byte[] addr) {
@Override
public synchronized void saveCode(byte[] addr, byte[] code) {
byte[] codeHash = HashUtil.sha3(code);
- codeCache.put(codeHash, code);
+ codeCache.put(codeKey(codeHash, addr), code);
AccountState accountState = getOrCreateAccountState(addr);
accountStateCache.put(addr, accountState.withCodeHash(codeHash));
}
@@ -136,7 +136,12 @@ public synchronized void saveCode(byte[] addr, byte[] code) {
public synchronized byte[] getCode(byte[] addr) {
byte[] codeHash = getCodeHash(addr);
return FastByteComparisons.equal(codeHash, HashUtil.EMPTY_DATA_HASH) ?
- ByteUtil.EMPTY_BYTE_ARRAY : codeCache.get(codeHash);
+ ByteUtil.EMPTY_BYTE_ARRAY : codeCache.get(codeKey(codeHash, addr));
+ }
+
+ // composing a key as there can be several contracts with the same code
+ private byte[] codeKey(byte[] codeHash, byte[] addr) {
+ return NodeKeyCompositor.compose(codeHash, addr);
}
@Override
diff --git a/ethereumj-core/src/main/java/org/ethereum/db/RepositoryRoot.java b/ethereumj-core/src/main/java/org/ethereum/db/RepositoryRoot.java
index 8eb09d07fe..f6902e8c5b 100644
--- a/ethereumj-core/src/main/java/org/ethereum/db/RepositoryRoot.java
+++ b/ethereumj-core/src/main/java/org/ethereum/db/RepositoryRoot.java
@@ -44,7 +44,9 @@ public MultiStorageCache() {
@Override
protected synchronized StorageCache create(byte[] key, StorageCache srcCache) {
AccountState accountState = accountStateCache.get(key);
- TrieImpl storageTrie = createTrie(trieCache, accountState == null ? null : accountState.getStateRoot());
+ Serializer keyCompositor = new NodeKeyCompositor(key);
+ Source composingSrc = new SourceCodec.KeyOnly<>(trieCache, keyCompositor);
+ TrieImpl storageTrie = createTrie(composingSrc, accountState == null ? null : accountState.getStateRoot());
return new StorageCache(storageTrie);
}
@@ -80,9 +82,9 @@ public RepositoryRoot(Source stateDS) {
/**
* Building the following structure for snapshot Repository:
*
- * stateDS --> trieCacheCodec --> trieCache --> stateTrie --> accountStateCodec --> accountStateCache
- * \ \
- * \ \-->>> contractStorageTrie --> storageCodec --> StorageCache
+ * stateDS --> trieCache --> stateTrie --> accountStateCodec --> accountStateCache
+ * \ \
+ * \ \-->>> storageKeyCompositor --> contractStorageTrie --> storageCodec --> storageCache
* \--> codeCache
*
*
@@ -142,7 +144,7 @@ public synchronized void syncToRoot(byte[] root) {
stateTrie.setRoot(root);
}
- protected TrieImpl createTrie(CachedSource.BytesKey trieCache, byte[] root) {
+ protected TrieImpl createTrie(Source trieCache, byte[] root) {
return new SecureTrie(trieCache, root);
}
diff --git a/ethereumj-core/src/main/java/org/ethereum/db/StateSource.java b/ethereumj-core/src/main/java/org/ethereum/db/StateSource.java
index 77459e592e..415cf05f0b 100644
--- a/ethereumj-core/src/main/java/org/ethereum/db/StateSource.java
+++ b/ethereumj-core/src/main/java/org/ethereum/db/StateSource.java
@@ -20,8 +20,6 @@
import org.ethereum.config.CommonConfig;
import org.ethereum.config.SystemProperties;
import org.ethereum.datasource.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
/**
@@ -29,7 +27,6 @@
*/
public class StateSource extends SourceChainBox
implements HashedKeySource {
- private static final Logger logger = LoggerFactory.getLogger("db");
// for debug purposes
public static StateSource INST;
@@ -37,28 +34,18 @@ public class StateSource extends SourceChainBox
JournalSource journalSource;
NoDeleteSource noDeleteSource;
- CountingBytesSource countingSource;
ReadCache readCache;
AbstractCachedSource writeCache;
- BloomedSource bloomedSource;
public StateSource(Source src, boolean pruningEnabled) {
- this(src, pruningEnabled, 0);
- }
-
- public StateSource(Source src, boolean pruningEnabled, int maxBloomSize) {
super(src);
INST = this;
- add(bloomedSource = new BloomedSource(src, maxBloomSize));
- bloomedSource.setFlushSource(false);
- add(readCache = new ReadCache.BytesKey<>(bloomedSource).withMaxCapacity(16 * 1024 * 1024 / 512)); // 512 - approx size of a node
+ add(readCache = new ReadCache.BytesKey<>(src).withMaxCapacity(16 * 1024 * 1024 / 512)); // 512 - approx size of a node
readCache.setFlushSource(true);
- add(countingSource = new CountingBytesSource(readCache, true));
- countingSource.setFlushSource(true);
- writeCache = new AsyncWriteCache(countingSource) {
+ writeCache = new AsyncWriteCache(readCache) {
@Override
protected WriteCache createCache(Source source) {
- WriteCache.BytesKey ret = new WriteCache.BytesKey(source, WriteCache.CacheType.COUNTING);
+ WriteCache.BytesKey ret = new WriteCache.BytesKey(source, WriteCache.CacheType.SIMPLE);
ret.withSizeEstimators(MemSizeEstimator.ByteArrayEstimator, MemSizeEstimator.ByteArrayEstimator);
ret.setFlushSource(true);
return ret;
@@ -91,10 +78,6 @@ public JournalSource getJournalSource() {
return journalSource;
}
- public BloomedSource getBloomedSource() {
- return bloomedSource;
- }
-
/**
* Returns the source behind JournalSource
*/
diff --git a/ethereumj-core/src/main/java/org/ethereum/db/prune/Chain.java b/ethereumj-core/src/main/java/org/ethereum/db/prune/Chain.java
new file mode 100644
index 0000000000..49ff2df469
--- /dev/null
+++ b/ethereumj-core/src/main/java/org/ethereum/db/prune/Chain.java
@@ -0,0 +1,110 @@
+package org.ethereum.db.prune;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * A single chain in a blockchain {@link Segment}.
+ * It could represent either fork or the main chain.
+ *
+ *
+ * Chain consists of certain number of {@link ChainItem}
+ * connected to each other with inheritance
+ *
+ * @author Mikhail Kalinin
+ * @since 24.01.2018
+ */
+public class Chain {
+
+ static final Chain NULL = new Chain() {
+ @Override
+ boolean connect(ChainItem item) {
+ throw new RuntimeException("Not supported for null chain");
+ }
+ };
+
+ List items = new ArrayList<>();
+
+ public List getHashes() {
+ return items.stream().map(item -> item.hash).collect(Collectors.toList());
+ }
+
+ private Chain() {
+ }
+
+ Chain(ChainItem item) {
+ this.items.add(item);
+ }
+
+ ChainItem top() {
+ return items.size() > 0 ? items.get(items.size() - 1) : null;
+ }
+
+ long topNumber() {
+ return top() != null ? top().number : 0;
+ }
+
+ long startNumber() {
+ return items.isEmpty() ? 0 : items.get(0).number;
+ }
+
+ boolean isHigher(Chain other) {
+ return other.topNumber() < this.topNumber();
+ }
+
+ boolean contains(ChainItem other) {
+ for (ChainItem item : items) {
+ if (item.equals(other))
+ return true;
+ }
+ return false;
+ }
+
+ boolean connect(ChainItem item) {
+ if (top().isParentOf(item)) {
+ items.add(item);
+ return true;
+ }
+
+ return false;
+ }
+
+ static Chain fromItems(ChainItem ... items) {
+ if (items.length == 0) {
+ return NULL;
+ }
+
+ Chain chain = null;
+ for (ChainItem item : items) {
+ if (chain == null) {
+ chain = new Chain(item);
+ } else {
+ chain.connect(item);
+ }
+ }
+
+ return chain;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ Chain chain = (Chain) o;
+
+ return !(items != null ? !items.equals(chain.items) : chain.items != null);
+ }
+
+ @Override
+ public String toString() {
+ if (items.isEmpty()) {
+ return "(empty)";
+ }
+ return "[" + items.get(0) +
+ " ~> " + items.get(items.size() - 1) +
+ ']';
+ }
+
+}
diff --git a/ethereumj-core/src/main/java/org/ethereum/db/prune/ChainItem.java b/ethereumj-core/src/main/java/org/ethereum/db/prune/ChainItem.java
new file mode 100644
index 0000000000..9d00778bb7
--- /dev/null
+++ b/ethereumj-core/src/main/java/org/ethereum/db/prune/ChainItem.java
@@ -0,0 +1,52 @@
+package org.ethereum.db.prune;
+
+import org.ethereum.core.Block;
+import org.ethereum.util.FastByteComparisons;
+
+import java.util.Arrays;
+
+/**
+ * Represents a block in the {@link Chain}
+ *
+ * @author Mikhail Kalinin
+ * @since 26.01.2018
+ */
+class ChainItem {
+ long number;
+ byte[] hash;
+ byte[] parentHash;
+
+ ChainItem(Block block) {
+ this.number = block.getNumber();
+ this.hash = block.getHash();
+ this.parentHash = block.getParentHash();
+ }
+
+ ChainItem(long number, byte[] hash, byte[] parentHash) {
+ this.number = number;
+ this.hash = hash;
+ this.parentHash = parentHash;
+ }
+
+ boolean isParentOf(ChainItem that) {
+ return FastByteComparisons.equal(hash, that.parentHash);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ChainItem that = (ChainItem) o;
+ return FastByteComparisons.equal(hash, that.hash);
+ }
+
+ @Override
+ public int hashCode() {
+ return hash != null ? Arrays.hashCode(hash) : 0;
+ }
+
+ @Override
+ public String toString() {
+ return String.valueOf(number);
+ }
+}
diff --git a/ethereumj-core/src/main/java/org/ethereum/db/prune/Pruner.java b/ethereumj-core/src/main/java/org/ethereum/db/prune/Pruner.java
new file mode 100644
index 0000000000..bd9d994249
--- /dev/null
+++ b/ethereumj-core/src/main/java/org/ethereum/db/prune/Pruner.java
@@ -0,0 +1,370 @@
+package org.ethereum.db.prune;
+
+import org.ethereum.crypto.HashUtil;
+import org.ethereum.datasource.CountingQuotientFilter;
+import org.ethereum.datasource.JournalSource;
+import org.ethereum.datasource.QuotientFilter;
+import org.ethereum.datasource.Source;
+import org.ethereum.util.ByteArraySet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.spongycastle.util.encoders.Hex;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class is responsible for state pruning.
+ *
+ *
+ * Taking the information supplied by {@link #journal} (check {@link JournalSource} for details)
+ * removes unused nodes from the {@link #storage}.
+ * There are two types of unused nodes:
+ * nodes not references in the trie after N blocks from the current one and
+ * nodes which were inserted in the forks that finally were not accepted
+ *
+ *
+ * Each prune session uses a certain chain {@link Segment}
+ * which is going to be 'pruned'. To be confident that live nodes won't be removed,
+ * pruner must be initialized with the top of the chain, see {@link #init(List, int)}}.
+ * And after that it must be fed with each newly processed block, see {@link #feed(JournalSource.Update)}.
+ * {@link QuotientFilter} ({@link CountingQuotientFilter} implementation in particular) instance is used to
+ * efficiently keep upcoming inserts in memory and protect newly inserted nodes from being deleted during
+ * prune session. The filter is constantly recycled in {@link #prune(Segment)} method.
+ *
+ *
+ * When 'prune.maxDepth' param is quite big, it becomes not efficient to keep reverted nodes until prune block number has come.
+ * Hence Pruner has two step mode to mitigate memory consumption, second step is initiated by {@link #withSecondStep(List, int)}.
+ * In that mode nodes from not accepted forks are deleted from storage immediately but main chain deletions are
+ * postponed for the second step.
+ * Second step uses another one instance of QuotientFilter with less memory impact, check {@link #instantiateFilter(int, int)}.
+ *
+ *
+ * Basically, prune session initiated by {@link #prune(Segment)} method
+ * consists of 3 steps: first, it reverts forks, then it persists main chain,
+ * after that it recycles {@link #journal} by removing processed updates from it.
+ * During the session reverted and deleted nodes are propagated to the {@link #storage} immediately.
+ *
+ * @author Mikhail Kalinin
+ * @since 25.01.2018
+ */
+public class Pruner {
+
+ private static final Logger logger = LoggerFactory.getLogger("prune");
+
+ Source journal;
+ Source storage;
+ QuotientFilter filter;
+ QuotientFilter distantFilter;
+ boolean ready = false;
+
+ private static class Stats {
+ int collisions = 0;
+ int deleted = 0;
+ double load = 0;
+ @Override
+ public String toString() {
+ return String.format("load %.4f, collisions %d, deleted %d", load, collisions, deleted);
+ }
+ }
+ Stats maxLoad = new Stats();
+ Stats maxCollisions = new Stats();
+ int maxKeysInMemory = 0;
+ int statsTracker = 0;
+
+ Stats distantMaxLoad = new Stats();
+ Stats distantMaxCollisions = new Stats();
+
+ public Pruner(Source journal, Source storage) {
+ this.storage = storage;
+ this.journal = journal;
+ }
+
+ public boolean isReady() {
+ return ready;
+ }
+
+ public boolean init(List forkWindow, int sizeInBlocks) {
+ if (ready) return true;
+
+ if (!forkWindow.isEmpty() && journal.get(forkWindow.get(0)) == null) {
+ logger.debug("pruner init aborted: can't fetch update " + Hex.toHexString(forkWindow.get(0)));
+ return false;
+ }
+
+ QuotientFilter filter = instantiateFilter(sizeInBlocks, FILTER_ENTRIES_FORK);
+ for (byte[] hash : forkWindow) {
+ JournalSource.Update update = journal.get(hash);
+ if (update == null) {
+ logger.debug("pruner init aborted: can't fetch update " + Hex.toHexString(hash));
+ return false;
+ }
+ update.getInsertedKeys().forEach(filter::insert);
+ }
+
+ this.filter = filter;
+ return ready = true;
+ }
+
+ public boolean withSecondStep() {
+ return distantFilter != null;
+ }
+
+ public void withSecondStep(List mainChainWindow, int sizeInBlocks) {
+ if (!ready) return;
+
+ QuotientFilter filter = instantiateFilter(sizeInBlocks, FILTER_ENTRIES_DISTANT);
+
+ if (!mainChainWindow.isEmpty()) {
+ int i = mainChainWindow.size() - 1;
+ for (; i >= 0; i--) {
+ byte[] hash = mainChainWindow.get(i);
+ JournalSource.Update update = journal.get(hash);
+ if (update == null) {
+ break;
+ }
+ update.getInsertedKeys().forEach(filter::insert);
+ }
+ logger.debug("distant filter initialized with set of " + (i < 0 ? mainChainWindow.size() : mainChainWindow.size() - i) +
+ " hashes, last hash " + Hex.toHexString(mainChainWindow.get(i < 0 ? 0 : i)));
+ } else {
+ logger.debug("distant filter initialized with empty set");
+ }
+
+ this.distantFilter = filter;
+ }
+
+ private static final int FILTER_ENTRIES_FORK = 1 << 13; // approximate number of nodes per block
+ private static final int FILTER_ENTRIES_DISTANT = 1 << 11;
+ private static final int FILTER_MAX_SIZE = Integer.MAX_VALUE >> 1; // that filter will consume ~3g of mem
+ private QuotientFilter instantiateFilter(int blocksCnt, int entries) {
+ int size = Math.min(entries * blocksCnt, FILTER_MAX_SIZE);
+ return CountingQuotientFilter.create(size, size);
+ }
+
+ public boolean init(byte[] ... upcoming) {
+ return init(Arrays.asList(upcoming), 192);
+ }
+
+ public void feed(JournalSource.Update update) {
+ if (ready)
+ update.getInsertedKeys().forEach(filter::insert);
+ }
+
+ public void prune(Segment segment) {
+ if (!ready) return;
+ assert segment.isComplete();
+
+ logger.trace("prune " + segment);
+
+ long t = System.currentTimeMillis();
+ Pruning pruning = new Pruning();
+ // important for fork management, check Pruning#insertedInMainChain and Pruning#insertedInForks for details
+ segment.forks.sort((f1, f2) -> Long.compare(f1.startNumber(), f2.startNumber()));
+ segment.forks.forEach(pruning::revert);
+
+ // delete updates
+ for (Chain chain : segment.forks) {
+ chain.getHashes().forEach(journal::delete);
+ }
+
+ int nodesPostponed = 0;
+ if (withSecondStep()) {
+ nodesPostponed = postpone(segment.main);
+ } else {
+ pruning.nodesDeleted += persist(segment.main);
+ segment.main.getHashes().forEach(journal::delete);
+ }
+
+ if (logger.isTraceEnabled()) logger.trace("nodes {}, keys in mem: {}, filter load: {}/{}: {}, distinct collisions: {}",
+ (withSecondStep() ? "postponed: " + nodesPostponed : "deleted: " + pruning.nodesDeleted),
+ pruning.insertedInForks.size() + pruning.insertedInMainChain.size(),
+ ((CountingQuotientFilter) filter).getEntryNumber(), ((CountingQuotientFilter) filter).getMaxInsertions(),
+ String.format("%.4f", (double) ((CountingQuotientFilter) filter).getEntryNumber() /
+ ((CountingQuotientFilter) filter).getMaxInsertions()),
+ ((CountingQuotientFilter) filter).getCollisionNumber());
+
+ if (logger.isDebugEnabled()) {
+ int collisions = ((CountingQuotientFilter) filter).getCollisionNumber();
+ double load = (double) ((CountingQuotientFilter) filter).getEntryNumber() /
+ ((CountingQuotientFilter) filter).getMaxInsertions();
+ if (collisions > maxCollisions.collisions) {
+ maxCollisions.collisions = collisions;
+ maxCollisions.load = load;
+ maxCollisions.deleted = pruning.nodesDeleted;
+ }
+ if (load > maxLoad.load) {
+ maxLoad.load = load;
+ maxLoad.collisions = collisions;
+ maxLoad.deleted = pruning.nodesDeleted;
+ }
+ maxKeysInMemory = Math.max(maxKeysInMemory, pruning.insertedInForks.size() + pruning.insertedInMainChain.size());
+
+ if (++statsTracker % 100 == 0) {
+ logger.debug("fork filter: max load: " + maxLoad);
+ logger.debug("fork filter: max collisions: " + maxCollisions);
+ logger.debug("fork filter: max keys in mem: " + maxKeysInMemory);
+ }
+ }
+
+ logger.trace(segment + " pruned in {}ms", System.currentTimeMillis() - t);
+ }
+
+ public void persist(byte[] hash) {
+ if (!ready || !withSecondStep()) return;
+
+ logger.trace("persist [{}]", Hex.toHexString(hash));
+
+ long t = System.currentTimeMillis();
+ JournalSource.Update update = journal.get(hash);
+ if (update == null) {
+ logger.debug("skip [{}]: can't fetch update", HashUtil.shortHash(hash));
+ return;
+ }
+
+ // persist deleted keys
+ int nodesDeleted = 0;
+ for (byte[] key : update.getDeletedKeys()) {
+ if (!filter.maybeContains(key) && !distantFilter.maybeContains(key)) {
+ ++nodesDeleted;
+ storage.delete(key);
+ }
+ }
+ // clean up filter
+ update.getInsertedKeys().forEach(distantFilter::remove);
+ // delete update
+ journal.delete(hash);
+
+ if (logger.isDebugEnabled()) {
+ int collisions = ((CountingQuotientFilter) distantFilter).getCollisionNumber();
+ double load = (double) ((CountingQuotientFilter) distantFilter).getEntryNumber() /
+ ((CountingQuotientFilter) distantFilter).getMaxInsertions();
+ if (collisions > distantMaxCollisions.collisions) {
+ distantMaxCollisions.collisions = collisions;
+ distantMaxCollisions.load = load;
+ distantMaxCollisions.deleted = nodesDeleted;
+ }
+ if (load > distantMaxLoad.load) {
+ distantMaxLoad.load = load;
+ distantMaxLoad.collisions = collisions;
+ distantMaxLoad.deleted = nodesDeleted;
+ }
+ if (statsTracker % 100 == 0) {
+ logger.debug("distant filter: max load: " + distantMaxLoad);
+ logger.debug("distant filter: max collisions: " + distantMaxCollisions);
+ }
+ }
+
+ if (logger.isTraceEnabled()) logger.trace("[{}] persisted in {}ms: {}/{} ({}%) nodes deleted, filter load: {}/{}: {}, distinct collisions: {}",
+ HashUtil.shortHash(hash), System.currentTimeMillis() - t, nodesDeleted, update.getDeletedKeys().size(),
+ nodesDeleted * 100 / update.getDeletedKeys().size(),
+ ((CountingQuotientFilter) distantFilter).getEntryNumber(),
+ ((CountingQuotientFilter) distantFilter).getMaxInsertions(),
+ String.format("%.4f", (double) ((CountingQuotientFilter) distantFilter).getEntryNumber() /
+ ((CountingQuotientFilter) distantFilter).getMaxInsertions()),
+ ((CountingQuotientFilter) distantFilter).getCollisionNumber());
+ }
+
+ private int postpone(Chain chain) {
+ if (logger.isTraceEnabled())
+ logger.trace("<~ postponing " + chain + ": " + strSample(chain.getHashes()));
+
+ int nodesPostponed = 0;
+ for (byte[] hash : chain.getHashes()) {
+ JournalSource.Update update = journal.get(hash);
+ if (update == null) {
+ logger.debug("postponing: can't fetch update " + Hex.toHexString(hash));
+ continue;
+ }
+ // feed distant filter
+ update.getInsertedKeys().forEach(distantFilter::insert);
+ // clean up fork filter
+ update.getInsertedKeys().forEach(filter::remove);
+
+ nodesPostponed += update.getDeletedKeys().size();
+ }
+
+ return nodesPostponed;
+ }
+
+ private int persist(Chain chain) {
+ if (logger.isTraceEnabled())
+ logger.trace("<~ persisting " + chain + ": " + strSample(chain.getHashes()));
+
+ int nodesDeleted = 0;
+ for (byte[] hash : chain.getHashes()) {
+ JournalSource.Update update = journal.get(hash);
+ if (update == null) {
+ logger.debug("pruning aborted: can't fetch update of main chain " + Hex.toHexString(hash));
+ return 0;
+ }
+ // persist deleted keys
+ for (byte[] key : update.getDeletedKeys()) {
+ if (!filter.maybeContains(key)) {
+ ++nodesDeleted;
+ storage.delete(key);
+ }
+ }
+ // clean up filter
+ update.getInsertedKeys().forEach(filter::remove);
+ }
+
+ return nodesDeleted;
+ }
+
+ private String strSample(Collection hashes) {
+ String sample = hashes.stream().limit(3)
+ .map(HashUtil::shortHash).collect(Collectors.joining(", "));
+ if (hashes.size() > 3) {
+ sample += ", ... (" + hashes.size() + " total)";
+ }
+ return sample;
+ }
+
+ private class Pruning {
+
+ // track nodes inserted and deleted in forks
+ // to avoid deletion of those nodes which were originally inserted in the main chain
+ Set insertedInMainChain = new ByteArraySet();
+ Set insertedInForks = new ByteArraySet();
+ int nodesDeleted = 0;
+
+ private void revert(Chain chain) {
+ if (logger.isTraceEnabled())
+ logger.trace("<~ reverting " + chain + ": " + strSample(chain.getHashes()));
+
+ for (byte[] hash : chain.getHashes()) {
+ JournalSource.Update update = journal.get(hash);
+ if (update == null) {
+ logger.debug("reverting chain " + chain + " aborted: can't fetch update " + Hex.toHexString(hash));
+ return;
+ }
+ // clean up filter
+ update.getInsertedKeys().forEach(filter::remove);
+
+ // node that was deleted in fork considered as a node that had earlier been inserted in main chain
+ update.getDeletedKeys().forEach(key -> {
+ if (!insertedInForks.contains(key)) {
+ insertedInMainChain.add(key);
+ }
+ });
+ update.getInsertedKeys().forEach(key -> {
+ if (!insertedInMainChain.contains(key)) {
+ insertedInForks.add(key);
+ }
+ });
+
+ // revert inserted keys
+ for (byte[] key : update.getInsertedKeys()) {
+ if (!filter.maybeContains(key) && !insertedInMainChain.contains(key)) {
+ ++nodesDeleted;
+ storage.delete(key);
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/ethereumj-core/src/main/java/org/ethereum/db/prune/Segment.java b/ethereumj-core/src/main/java/org/ethereum/db/prune/Segment.java
new file mode 100644
index 0000000000..7cb3623324
--- /dev/null
+++ b/ethereumj-core/src/main/java/org/ethereum/db/prune/Segment.java
@@ -0,0 +1,168 @@
+package org.ethereum.db.prune;
+
+import org.ethereum.core.Block;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Provides an interface for building and tracking chain segment.
+ *
+ *
+ * Chain segment is a fragment of the blockchain, it includes both forks and main chain.
+ * Segment always has a 'root' item which must belong to the main chain,
+ * anyway 'root' item itself is not treated as a part of the segment.
+ *
+ *
+ * Segment is complete when its main chain top item is the highest (fork tops have lower numbers).
+ * Whether segment is complete or not can be checked by call to {@link #isComplete()}
+ *
+ *
+ * Segment has a {@link Tracker} class which helps to update segment with new blocks.
+ * Its Usage is simple: add all blocks with {@link Tracker#addAll(List)},
+ * add main chain blocks with {@link Tracker#addMain(Block)},
+ * then when all blocks are added {@link Tracker#commit()} should be fired
+ * to connect added blocks to the segment
+ *
+ * @author Mikhail Kalinin
+ * @since 24.01.2018
+ *
+ * @see Chain
+ * @see ChainItem
+ */
+public class Segment {
+
+ List forks = new ArrayList<>();
+ Chain main = Chain.NULL;
+ ChainItem root;
+
+ public Segment(Block root) {
+ this.root = new ChainItem(root);
+ }
+
+ public Segment(long number, byte[] hash, byte[] parentHash) {
+ this.root = new ChainItem(number, hash, parentHash);
+ }
+
+ public boolean isComplete() {
+ if (main == Chain.NULL)
+ return false;
+
+ for (Chain fork : forks) {
+ if (!main.isHigher(fork))
+ return false;
+ }
+ return true;
+ }
+
+ public long getRootNumber() {
+ return root.number;
+ }
+
+ public long getMaxNumber() {
+ return main.topNumber();
+ }
+
+ public Tracker startTracking() {
+ return new Tracker(this);
+ }
+
+ public int size() {
+ return main.items.size();
+ }
+
+ private void branch(ChainItem item) {
+ forks.add(new Chain(item));
+ }
+
+ private void connectMain(ChainItem item) {
+ if (main == Chain.NULL) {
+ if (root.isParentOf(item))
+ main = new Chain(item); // start new
+ } else {
+ main.connect(item);
+ }
+ }
+
+ private void connectFork(ChainItem item) {
+
+ for (Chain fork : forks) {
+ if (fork.contains(item))
+ return;
+ }
+
+ if (root.isParentOf(item)) {
+ branch(item);
+ } else {
+ for (ChainItem mainItem : main.items) {
+ if (mainItem.isParentOf(item)) {
+ branch(item);
+ }
+ }
+
+ for (Chain fork : forks) {
+ if (fork.connect(item)) {
+ return;
+ }
+ }
+
+ List branchedForks = new ArrayList<>();
+ for (Chain fork : forks) {
+ for (ChainItem forkItem : fork.items) {
+ if (forkItem.isParentOf(item)) {
+ branchedForks.add(new Chain(item));
+ }
+ }
+ }
+ forks.addAll(branchedForks);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "" + main;
+ }
+
+ public static final class Tracker {
+
+ Segment segment;
+ List main = new ArrayList<>();
+ List items = new ArrayList<>();
+
+ Tracker(Segment segment) {
+ this.segment = segment;
+ }
+
+ public void addMain(Block block) {
+ main.add(new ChainItem(block));
+ }
+
+ public void addAll(List blocks) {
+ items.addAll(blocks.stream()
+ .map(ChainItem::new)
+ .collect(Collectors.toList()));
+ }
+
+ public Tracker addMain(long number, byte[] hash, byte[] parentHash) {
+ main.add(new ChainItem(number, hash, parentHash));
+ return this;
+ }
+
+ public Tracker addItem(long number, byte[] hash, byte[] parentHash) {
+ items.add(new ChainItem(number, hash, parentHash));
+ return this;
+ }
+
+ public void commit() {
+
+ items.removeAll(main);
+
+ main.sort((i1, i2) -> Long.compare(i1.number, i2.number));
+ items.sort((i1, i2) -> Long.compare(i1.number, i2.number));
+
+ main.forEach(segment::connectMain);
+ items.forEach(segment::connectFork);
+ }
+ }
+}
diff --git a/ethereumj-core/src/main/java/org/ethereum/jsonrpc/JsonRpc.java b/ethereumj-core/src/main/java/org/ethereum/jsonrpc/JsonRpc.java
deleted file mode 100644
index bf8f5a9fd7..0000000000
--- a/ethereumj-core/src/main/java/org/ethereum/jsonrpc/JsonRpc.java
+++ /dev/null
@@ -1,341 +0,0 @@
-/*
- * Copyright (c) [2016] [ ]
- * This file is part of the ethereumJ library.
- *
- * The ethereumJ library is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * The ethereumJ library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with the ethereumJ library. If not, see .
- */
-package org.ethereum.jsonrpc;
-
-import org.ethereum.core.Block;
-import org.ethereum.core.CallTransaction;
-import org.ethereum.core.Transaction;
-import org.ethereum.vm.LogInfo;
-
-import java.util.Arrays;
-
-import static org.ethereum.jsonrpc.TypeConverter.toJsonHex;
-
-/**
- * Created by Anton Nashatyrev on 25.11.2015.
- */
-public interface JsonRpc {
-
- class SyncingResult {
- public String startingBlock;
- public String currentBlock;
- public String highestBlock;
-
- @Override
- public String toString() {
- return "SyncingResult{" +
- "startingBlock='" + startingBlock + '\'' +
- ", currentBlock='" + currentBlock + '\'' +
- ", highestBlock='" + highestBlock + '\'' +
- '}';
- }
- }
-
- class CallArguments {
- public String from;
- public String to;
- public String gas;
- public String gasPrice;
- public String value;
- public String data; // compiledCode
- public String nonce;
-
- @Override
- public String toString() {
- return "CallArguments{" +
- "from='" + from + '\'' +
- ", to='" + to + '\'' +
- ", gasLimit='" + gas + '\'' +
- ", gasPrice='" + gasPrice + '\'' +
- ", value='" + value + '\'' +
- ", data='" + data + '\'' +
- ", nonce='" + nonce + '\'' +
- '}';
- }
- }
-
- class BlockResult {
- public String number; // QUANTITY - the block number. null when its pending block.
- public String hash; // DATA, 32 Bytes - hash of the block. null when its pending block.
- public String parentHash; // DATA, 32 Bytes - hash of the parent block.
- public String nonce; // DATA, 8 Bytes - hash of the generated proof-of-work. null when its pending block.
- public String sha3Uncles; // DATA, 32 Bytes - SHA3 of the uncles data in the block.
- public String logsBloom; // DATA, 256 Bytes - the bloom filter for the logs of the block. null when its pending block.
- public String transactionsRoot; // DATA, 32 Bytes - the root of the transaction trie of the block.
- public String stateRoot; // DATA, 32 Bytes - the root of the final state trie of the block.
- public String receiptsRoot; // DATA, 32 Bytes - the root of the receipts trie of the block.
- public String miner; // DATA, 20 Bytes - the address of the beneficiary to whom the mining rewards were given.
- public String difficulty; // QUANTITY - integer of the difficulty for this block.
- public String totalDifficulty; // QUANTITY - integer of the total difficulty of the chain until this block.
- public String extraData; // DATA - the "extra data" field of this block
- public String size;//QUANTITY - integer the size of this block in bytes.
- public String gasLimit;//: QUANTITY - the maximum gas allowed in this block.
- public String gasUsed; // QUANTITY - the total used gas by all transactions in this block.
- public String timestamp; //: QUANTITY - the unix timestamp for when the block was collated.
- public Object[] transactions; //: Array - Array of transaction objects, or 32 Bytes transaction hashes depending on the last given parameter.
- public String[] uncles; //: Array - Array of uncle hashes.
-
- @Override
- public String toString() {
- return "BlockResult{" +
- "number='" + number + '\'' +
- ", hash='" + hash + '\'' +
- ", parentHash='" + parentHash + '\'' +
- ", nonce='" + nonce + '\'' +
- ", sha3Uncles='" + sha3Uncles + '\'' +
- ", logsBloom='" + logsBloom + '\'' +
- ", transactionsRoot='" + transactionsRoot + '\'' +
- ", stateRoot='" + stateRoot + '\'' +
- ", receiptsRoot='" + receiptsRoot + '\'' +
- ", miner='" + miner + '\'' +
- ", difficulty='" + difficulty + '\'' +
- ", totalDifficulty='" + totalDifficulty + '\'' +
- ", extraData='" + extraData + '\'' +
- ", size='" + size + '\'' +
- ", gasLimit='" + gasLimit + '\'' +
- ", gasUsed='" + gasUsed + '\'' +
- ", timestamp='" + timestamp + '\'' +
- ", transactions=" + Arrays.toString(transactions) +
- ", uncles=" + Arrays.toString(uncles) +
- '}';
- }
- }
-
- class CompilationResult {
- public String code;
- public CompilationInfo info;
-
- @Override
- public String toString() {
- return "CompilationResult{" +
- "code='" + code + '\'' +
- ", info=" + info +
- '}';
- }
- }
-
- class CompilationInfo {
- public String source;
- public String language;
- public String languageVersion;
- public String compilerVersion;
- public CallTransaction.Function[] abiDefinition;
- public String userDoc;
- public String developerDoc;
-
- @Override
- public String toString() {
- return "CompilationInfo{" +
- "source='" + source + '\'' +
- ", language='" + language + '\'' +
- ", languageVersion='" + languageVersion + '\'' +
- ", compilerVersion='" + compilerVersion + '\'' +
- ", abiDefinition=" + abiDefinition +
- ", userDoc='" + userDoc + '\'' +
- ", developerDoc='" + developerDoc + '\'' +
- '}';
- }
- }
-
- class FilterRequest {
- public String fromBlock;
- public String toBlock;
- public Object address;
- public Object[] topics;
-
- @Override
- public String toString() {
- return "FilterRequest{" +
- "fromBlock='" + fromBlock + '\'' +
- ", toBlock='" + toBlock + '\'' +
- ", address=" + address +
- ", topics=" + Arrays.toString(topics) +
- '}';
- }
- }
-
- class LogFilterElement {
- public String logIndex;
- public String blockNumber;
- public String blockHash;
- public String transactionHash;
- public String transactionIndex;
- public String address;
- public String data;
- public String[] topics;
-
- public LogFilterElement(LogInfo logInfo, Block b, int txIndex, Transaction tx, int logIdx) {
- logIndex = toJsonHex(logIdx);
- blockNumber = b == null ? null : toJsonHex(b.getNumber());
- blockHash = b == null ? null : toJsonHex(b.getHash());
- transactionIndex = b == null ? null : toJsonHex(txIndex);
- transactionHash = toJsonHex(tx.getHash());
- address = toJsonHex(tx.getReceiveAddress());
- data = toJsonHex(logInfo.getData());
- topics = new String[logInfo.getTopics().size()];
- for (int i = 0; i < topics.length; i++) {
- topics[i] = toJsonHex(logInfo.getTopics().get(i).getData());
- }
- }
-
- @Override
- public String toString() {
- return "LogFilterElement{" +
- "logIndex='" + logIndex + '\'' +
- ", blockNumber='" + blockNumber + '\'' +
- ", blockHash='" + blockHash + '\'' +
- ", transactionHash='" + transactionHash + '\'' +
- ", transactionIndex='" + transactionIndex + '\'' +
- ", address='" + address + '\'' +
- ", data='" + data + '\'' +
- ", topics=" + Arrays.toString(topics) +
- '}';
- }
- }
-
- String web3_clientVersion();
- String web3_sha3(String data) throws Exception;
- String net_version();
- String net_peerCount();
- boolean net_listening();
- String eth_protocolVersion();
- SyncingResult eth_syncing();
- String eth_coinbase();
- boolean eth_mining();
- String eth_hashrate();
- String eth_gasPrice();
- String[] eth_accounts();
- String eth_blockNumber();
- String eth_getBalance(String address, String block) throws Exception;
- String eth_getBalance(String address) throws Exception;
-
- String eth_getStorageAt(String address, String storageIdx, String blockId) throws Exception;
-
- String eth_getTransactionCount(String address, String blockId) throws Exception;
-
- String eth_getBlockTransactionCountByHash(String blockHash)throws Exception;
- String eth_getBlockTransactionCountByNumber(String bnOrId)throws Exception;
- String eth_getUncleCountByBlockHash(String blockHash)throws Exception;
- String eth_getUncleCountByBlockNumber(String bnOrId)throws Exception;
- String eth_getCode(String addr, String bnOrId)throws Exception;
- String eth_sign(String addr,String data) throws Exception;
- String eth_sendTransaction(CallArguments transactionArgs) throws Exception;
- // TODO: Remove, obsolete with this params
- String eth_sendTransaction(String from,String to, String gas,
- String gasPrice, String value,String data,String nonce) throws Exception;
- String eth_sendRawTransaction(String rawData) throws Exception;
- String eth_call(CallArguments args, String bnOrId) throws Exception;
- String eth_estimateGas(CallArguments args) throws Exception;
- BlockResult eth_getBlockByHash(String blockHash,Boolean fullTransactionObjects) throws Exception;
- BlockResult eth_getBlockByNumber(String bnOrId,Boolean fullTransactionObjects) throws Exception;
- TransactionResultDTO eth_getTransactionByHash(String transactionHash) throws Exception;
- TransactionResultDTO eth_getTransactionByBlockHashAndIndex(String blockHash,String index) throws Exception;
- TransactionResultDTO eth_getTransactionByBlockNumberAndIndex(String bnOrId,String index) throws Exception;
- TransactionReceiptDTO eth_getTransactionReceipt(String transactionHash) throws Exception;
-
- TransactionReceiptDTOExt ethj_getTransactionReceipt(String transactionHash) throws Exception;
-
- BlockResult eth_getUncleByBlockHashAndIndex(String blockHash, String uncleIdx) throws Exception;
-
- BlockResult eth_getUncleByBlockNumberAndIndex(String blockId, String uncleIdx) throws Exception;
-
- String[] eth_getCompilers();
- CompilationResult eth_compileLLL(String contract);
- CompilationResult eth_compileSolidity(String contract) throws Exception;
- CompilationResult eth_compileSerpent(String contract);
- String eth_resend();
- String eth_pendingTransactions();
-
- String eth_newFilter(FilterRequest fr) throws Exception;
-
-// String eth_newFilter(String fromBlock, String toBlock, String address, String[] topics) throws Exception;
-
- String eth_newBlockFilter();
- String eth_newPendingTransactionFilter();
- boolean eth_uninstallFilter(String id);
- Object[] eth_getFilterChanges(String id);
-
- Object[] eth_getFilterLogs(String id);
-
- Object[] eth_getLogs(FilterRequest fr) throws Exception;
-
- String eth_getWork();
- String eth_submitWork();
- String eth_submitHashrate();
- String db_putString();
- String db_getString();
- String db_putHex();
- String db_getHex();
- String shh_post();
- String shh_version();
- String shh_newIdentity();
- String shh_hasIdentity();
- String shh_newGroup();
- String shh_addToGroup();
- String shh_newFilter();
- String shh_uninstallFilter();
- String shh_getFilterChanges();
- String shh_getMessages();
-
-
- boolean admin_addPeer(String s);
-
- String admin_exportChain();
- String admin_importChain();
- String admin_sleepBlocks();
- String admin_verbosity();
- String admin_setSolc();
- String admin_startRPC();
- String admin_stopRPC();
- String admin_setGlobalRegistrar();
- String admin_setHashReg();
- String admin_setUrlHint();
- String admin_saveInfo();
- String admin_register();
- String admin_registerUrl();
- String admin_startNatSpec();
- String admin_stopNatSpec();
- String admin_getContractInfo();
- String admin_httpGet();
- String admin_nodeInfo();
- String admin_peers();
- String admin_datadir();
- String net_addPeer();
- boolean miner_start();
- boolean miner_stop();
- boolean miner_setEtherbase(String coinBase) throws Exception;
- boolean miner_setExtra(String data) throws Exception;
- boolean miner_setGasPrice(String newMinGasPrice);
- boolean miner_startAutoDAG();
- boolean miner_stopAutoDAG();
- boolean miner_makeDAG();
- String miner_hashrate();
- String debug_printBlock();
- String debug_getBlockRlp();
- String debug_setHead();
- String debug_processBlock();
- String debug_seedHash();
- String debug_dumpBlock();
- String debug_metrics();
-
- String personal_newAccount(String seed);
-
- boolean personal_unlockAccount(String addr, String pass, String duration);
-
- String[] personal_listAccounts();
-}
diff --git a/ethereumj-core/src/main/java/org/ethereum/jsonrpc/JsonRpcImpl.java b/ethereumj-core/src/main/java/org/ethereum/jsonrpc/JsonRpcImpl.java
deleted file mode 100644
index 789cedbfb7..0000000000
--- a/ethereumj-core/src/main/java/org/ethereum/jsonrpc/JsonRpcImpl.java
+++ /dev/null
@@ -1,1524 +0,0 @@
-/*
- * Copyright (c) [2016] [ ]
- * This file is part of the ethereumJ library.
- *
- * The ethereumJ library is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * The ethereumJ library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with the ethereumJ library. If not, see .
- */
-package org.ethereum.jsonrpc;
-
-import org.apache.commons.collections4.map.LRUMap;
-import org.ethereum.config.CommonConfig;
-import org.ethereum.config.SystemProperties;
-import org.ethereum.core.*;
-import org.ethereum.crypto.ECKey;
-import org.ethereum.crypto.HashUtil;
-import org.ethereum.db.BlockStore;
-import org.ethereum.db.ByteArrayWrapper;
-import org.ethereum.core.TransactionInfo;
-import org.ethereum.db.TransactionStore;
-import org.ethereum.facade.Ethereum;
-import org.ethereum.listener.CompositeEthereumListener;
-import org.ethereum.listener.EthereumListenerAdapter;
-import org.ethereum.manager.WorldManager;
-import org.ethereum.mine.BlockMiner;
-import org.ethereum.net.client.Capability;
-import org.ethereum.net.client.ConfigCapabilities;
-import org.ethereum.net.rlpx.Node;
-import org.ethereum.net.server.ChannelManager;
-import org.ethereum.net.server.PeerServer;
-import org.ethereum.solidity.compiler.SolidityCompiler;
-import org.ethereum.sync.SyncManager;
-import org.ethereum.util.BuildInfo;
-import org.ethereum.util.ByteUtil;
-import org.ethereum.util.RLP;
-import org.ethereum.vm.DataWord;
-import org.ethereum.vm.LogInfo;
-import org.ethereum.vm.program.invoke.ProgramInvokeFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.annotation.Lazy;
-import org.springframework.stereotype.Component;
-
-import java.math.BigInteger;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static java.lang.Math.max;
-import static org.ethereum.crypto.HashUtil.sha3;
-import static org.ethereum.jsonrpc.TypeConverter.*;
-import static org.ethereum.jsonrpc.TypeConverter.StringHexToByteArray;
-import static org.ethereum.util.ByteUtil.EMPTY_BYTE_ARRAY;
-import static org.ethereum.util.ByteUtil.bigIntegerToBytes;
-
-/**
- * Created by Anton Nashatyrev on 25.11.2015.
- */
-@Component
-@Lazy
-public class JsonRpcImpl implements JsonRpc {
- private static final Logger logger = LoggerFactory.getLogger("jsonrpc");
-
-
-
- public class BinaryCallArguments {
- public long nonce;
- public long gasPrice;
- public long gasLimit;
- public String toAddress;
- public long value;
- public byte[] data;
- public void setArguments(CallArguments args) throws Exception {
- nonce = 0;
- if (args.nonce != null && args.nonce.length() != 0)
- nonce = JSonHexToLong(args.nonce);
-
- gasPrice = 0;
- if (args.gasPrice != null && args.gasPrice.length()!=0)
- gasPrice = JSonHexToLong(args.gasPrice);
-
- gasLimit = 4_000_000;
- if (args.gas != null && args.gas.length()!=0)
- gasLimit = JSonHexToLong(args.gas);
-
- toAddress = null;
- if (args.to != null && !args.to.isEmpty())
- toAddress = JSonHexToHex(args.to);
-
- value=0;
- if (args.value != null && args.value.length()!=0)
- value = JSonHexToLong(args.value);
-
- data = null;
-
- if (args.data != null && args.data.length()!=0)
- data = TypeConverter.StringHexToByteArray(args.data);
- }
- }
-
- @Autowired
- SystemProperties config;
-
- @Autowired
- ConfigCapabilities configCapabilities;
-
- @Autowired
- public WorldManager worldManager;
-
- @Autowired
- public Repository repository;
-
- @Autowired
- Ethereum eth;
-
- @Autowired
- PeerServer peerServer;
-
- @Autowired
- SyncManager syncManager;
-
- @Autowired
- TransactionStore txStore;
-
- @Autowired
- ChannelManager channelManager;
-
- @Autowired
- BlockMiner blockMiner;
-
- @Autowired
- TransactionStore transactionStore;
-
- @Autowired
- PendingStateImpl pendingState;
-
- @Autowired
- SolidityCompiler solidityCompiler;
-
- @Autowired
- ProgramInvokeFactory programInvokeFactory;
-
- @Autowired
- CommonConfig commonConfig = CommonConfig.getDefault();
-
- BlockchainImpl blockchain;
-
- CompositeEthereumListener compositeEthereumListener;
-
-
- long initialBlockNumber;
-
- Map accounts = new HashMap<>();
- AtomicInteger filterCounter = new AtomicInteger(1);
- Map installedFilters = new Hashtable<>();
- Map pendingReceipts = Collections.synchronizedMap(new LRUMap(1024));
-
- @Autowired
- public JsonRpcImpl(final BlockchainImpl blockchain, final CompositeEthereumListener compositeEthereumListener) {
- this.blockchain = blockchain;
- this.compositeEthereumListener = compositeEthereumListener;
- initialBlockNumber = blockchain.getBestBlock().getNumber();
-
- compositeEthereumListener.addListener(new EthereumListenerAdapter() {
- @Override
- public void onBlock(Block block, List receipts) {
- for (Filter filter : installedFilters.values()) {
- filter.newBlockReceived(block);
- }
- }
-
- @Override
- public void onPendingTransactionsReceived(List transactions) {
- for (Filter filter : installedFilters.values()) {
- for (Transaction tx : transactions) {
- filter.newPendingTx(tx);
- }
- }
- }
-
- @Override
- public void onPendingTransactionUpdate(TransactionReceipt txReceipt, PendingTransactionState state, Block block) {
- ByteArrayWrapper txHashW = new ByteArrayWrapper(txReceipt.getTransaction().getHash());
- if (state.isPending() || state == PendingTransactionState.DROPPED) {
- pendingReceipts.put(txHashW, txReceipt);
- } else {
- pendingReceipts.remove(txHashW);
- }
- }
- });
-
- }
-
- public long JSonHexToLong(String x) throws Exception {
- if (!x.startsWith("0x"))
- throw new Exception("Incorrect hex syntax");
- x = x.substring(2);
- return Long.parseLong(x, 16);
- }
-
- public int JSonHexToInt(String x) throws Exception {
- if (!x.startsWith("0x"))
- throw new Exception("Incorrect hex syntax");
- x = x.substring(2);
- return Integer.parseInt(x, 16);
- }
-
- public String JSonHexToHex(String x) throws Exception {
- if (!x.startsWith("0x"))
- throw new Exception("Incorrect hex syntax");
- x = x.substring(2);
- return x;
- }
-
- public Block getBlockByJSonHash(String blockHash) throws Exception {
- byte[] bhash = TypeConverter.StringHexToByteArray(blockHash);
- return worldManager.getBlockchain().getBlockByHash(bhash);
- }
-
- private Block getByJsonBlockId(String id) {
- if ("earliest".equalsIgnoreCase(id)) {
- return blockchain.getBlockByNumber(0);
- } else if ("latest".equalsIgnoreCase(id)) {
- return blockchain.getBestBlock();
- } else if ("pending".equalsIgnoreCase(id)) {
- return null;
- } else {
- long blockNumber = StringHexToBigInteger(id).longValue();
- return blockchain.getBlockByNumber(blockNumber);
- }
- }
-
- private Repository getRepoByJsonBlockId(String id) {
- if ("pending".equalsIgnoreCase(id)) {
- return pendingState.getRepository();
- } else {
- Block block = getByJsonBlockId(id);
- return this.repository.getSnapshotTo(block.getStateRoot());
- }
- }
-
- private List getTransactionsByJsonBlockId(String id) {
- if ("pending".equalsIgnoreCase(id)) {
- return pendingState.getPendingTransactions();
- } else {
- Block block = getByJsonBlockId(id);
- return block != null ? block.getTransactionsList() : null;
- }
- }
-
- protected Account getAccount(String address) throws Exception {
- return accounts.get(new ByteArrayWrapper(StringHexToByteArray(address)));
- }
-
- protected Account addAccount(String seed) {
- return addAccount(ECKey.fromPrivate(sha3(seed.getBytes())));
- }
-
- protected Account addAccount(ECKey key) {
- Account account = new Account();
- account.init(key);
- accounts.put(new ByteArrayWrapper(account.getAddress()), account);
- return account;
- }
-
- public String web3_clientVersion() {
-
- String s = "EthereumJ" + "/v" + config.projectVersion() + "/" +
- System.getProperty("os.name") + "/Java1.7/" + config.projectVersionModifier() + "-" + BuildInfo.buildHash;
- if (logger.isDebugEnabled()) logger.debug("web3_clientVersion(): " + s);
- return s;
- };
-
- public String web3_sha3(String data) throws Exception {
- String s = null;
- try {
- byte[] result = HashUtil.sha3(TypeConverter.StringHexToByteArray(data));
- return s = TypeConverter.toJsonHex(result);
- } finally {
- if (logger.isDebugEnabled()) logger.debug("web3_sha3(" + data + "): " + s);
- }
- }
-
- public String net_version() {
- String s = null;
- try {
- return s = eth_protocolVersion();
- } finally {
- if (logger.isDebugEnabled()) logger.debug("net_version(): " + s);
- }
- }
-
- public String net_peerCount(){
- String s = null;
- try {
- int n = channelManager.getActivePeers().size();
- return s = TypeConverter.toJsonHex(n);
- } finally {
- if (logger.isDebugEnabled()) logger.debug("net_peerCount(): " + s);
- }
- }
-
- public boolean net_listening() {
- Boolean s = null;
- try {
- return s = peerServer.isListening();
- }finally {
- if (logger.isDebugEnabled()) logger.debug("net_listening(): " + s);
- }
- }
-
- public String eth_protocolVersion(){
- String s = null;
- try {
- int version = 0;
- for (Capability capability : configCapabilities.getConfigCapabilities()) {
- if (capability.isEth()) {
- version = max(version, capability.getVersion());
- }
- }
- return s = Integer.toString(version);
- } finally {
- if (logger.isDebugEnabled()) logger.debug("eth_protocolVersion(): " + s);
- }
- }
-
- public SyncingResult eth_syncing(){
- SyncingResult s = new SyncingResult();
- try {
- s.startingBlock = TypeConverter.toJsonHex(initialBlockNumber);
- s.currentBlock = TypeConverter.toJsonHex(blockchain.getBestBlock().getNumber());
- s.highestBlock = TypeConverter.toJsonHex(syncManager.getLastKnownBlockNumber());
-
- return s;
- }finally {
- if (logger.isDebugEnabled()) logger.debug("eth_syncing(): " + s);
- }
- };
-
- public String eth_coinbase() {
- String s = null;
- try {
- return s = toJsonHex(blockchain.getMinerCoinbase());
- } finally {
- if (logger.isDebugEnabled()) logger.debug("eth_coinbase(): " + s);
- }
- }
-
- public boolean eth_mining() {
- Boolean s = null;
- try {
- return s = blockMiner.isMining();
- } finally {
- if (logger.isDebugEnabled()) logger.debug("eth_mining(): " + s);
- }
- }
-
-
- public String eth_hashrate() {
- String s = null;
- try {
- return s = null;
- } finally {
- if (logger.isDebugEnabled()) logger.debug("eth_hashrate(): " + s);
- }
- }
-
- public String eth_gasPrice(){
- String s = null;
- try {
- return s = TypeConverter.toJsonHex(eth.getGasPrice());
- } finally {
- if (logger.isDebugEnabled()) logger.debug("eth_gasPrice(): " + s);
- }
- }
-
- public String[] eth_accounts() {
- String[] s = null;
- try {
- return s = personal_listAccounts();
- } finally {
- if (logger.isDebugEnabled()) logger.debug("eth_accounts(): " + Arrays.toString(s));
- }
- }
-
- public String eth_blockNumber(){
- String s = null;
- try {
- Block bestBlock = blockchain.getBestBlock();
- long b = 0;
- if (bestBlock != null) {
- b = bestBlock.getNumber();
- }
- return s = TypeConverter.toJsonHex(b);
- } finally {
- if (logger.isDebugEnabled()) logger.debug("eth_blockNumber(): " + s);
- }
- }
-
-
- public String eth_getBalance(String address, String blockId) throws Exception {
- String s = null;
- try {
- byte[] addressAsByteArray = TypeConverter.StringHexToByteArray(address);
- BigInteger balance = getRepoByJsonBlockId(blockId).getBalance(addressAsByteArray);
- return s = TypeConverter.toJsonHex(balance);
- } finally {
- if (logger.isDebugEnabled()) logger.debug("eth_getBalance(" + address + ", " + blockId + "): " + s);
- }
- }
-
- public String eth_getBalance(String address) throws Exception {
- String s = null;
- try {
- return s = eth_getBalance(address, "latest");
- } finally {
- if (logger.isDebugEnabled()) logger.debug("eth_getBalance(" + address + "): " + s);
- }
- }
-
- @Override
- public String eth_getStorageAt(String address, String storageIdx, String blockId) throws Exception {
- String s = null;
- try {
- byte[] addressAsByteArray = StringHexToByteArray(address);
- DataWord storageValue = getRepoByJsonBlockId(blockId).
- getStorageValue(addressAsByteArray, new DataWord(StringHexToByteArray(storageIdx)));
- return s = TypeConverter.toJsonHex(storageValue.getData());
- } finally {
- if (logger.isDebugEnabled()) logger.debug("eth_getStorageAt(" + address + ", " + storageIdx + ", " + blockId + "): " + s);
- }
- }
-
- @Override
- public String eth_getTransactionCount(String address, String blockId) throws Exception {
- String s = null;
- try {
- byte[] addressAsByteArray = TypeConverter.StringHexToByteArray(address);
- BigInteger nonce = getRepoByJsonBlockId(blockId).getNonce(addressAsByteArray);
- return s = TypeConverter.toJsonHex(nonce);
- } finally {
- if (logger.isDebugEnabled()) logger.debug("eth_getTransactionCount(" + address + ", " + blockId + "): " + s);
- }
- }
-
- public String eth_getBlockTransactionCountByHash(String blockHash) throws Exception {
- String s = null;
- try {
- Block b = getBlockByJSonHash(blockHash);
- if (b == null) return null;
- long n = b.getTransactionsList().size();
- return s = TypeConverter.toJsonHex(n);
- } finally {
- if (logger.isDebugEnabled()) logger.debug("eth_getBlockTransactionCountByHash(" + blockHash + "): " + s);
- }
- }
-
- public String eth_getBlockTransactionCountByNumber(String bnOrId) throws Exception {
- String s = null;
- try {
- List list = getTransactionsByJsonBlockId(bnOrId);
- if (list == null) return null;
- long n = list.size();
- return s = TypeConverter.toJsonHex(n);
- } finally {
- if (logger.isDebugEnabled()) logger.debug("eth_getBlockTransactionCountByNumber(" + bnOrId + "): " + s);
- }
- }
-
- public String eth_getUncleCountByBlockHash(String blockHash) throws Exception {
- String s = null;
- try {
- Block b = getBlockByJSonHash(blockHash);
- if (b == null) return null;
- long n = b.getUncleList().size();
- return s = TypeConverter.toJsonHex(n);
- } finally {
- if (logger.isDebugEnabled()) logger.debug("eth_getUncleCountByBlockHash(" + blockHash + "): " + s);
- }
- }
-
- public String eth_getUncleCountByBlockNumber(String bnOrId) throws Exception {
- String s = null;
- try {
- Block b = getByJsonBlockId(bnOrId);
- if (b == null) return null;
- long n = b.getUncleList().size();
- return s = TypeConverter.toJsonHex(n);
- } finally {
- if (logger.isDebugEnabled()) logger.debug("eth_getUncleCountByBlockNumber(" + bnOrId + "): " + s);
- }
- }
-
- public String eth_getCode(String address, String blockId) throws Exception {
- String s = null;
- try {
- byte[] addressAsByteArray = TypeConverter.StringHexToByteArray(address);
- byte[] code = getRepoByJsonBlockId(blockId).getCode(addressAsByteArray);
- return s = TypeConverter.toJsonHex(code);
- } finally {
- if (logger.isDebugEnabled()) logger.debug("eth_getCode(" + address + ", " + blockId + "): " + s);
- }
- }
-
- public String eth_sign(String addr,String data) throws Exception {
- String s = null;
- try {
- String ha = JSonHexToHex(addr);
- Account account = getAccount(ha);
-
- if (account==null)
- throw new Exception("Inexistent account");
-
- // Todo: is not clear from the spec what hash function must be used to sign
- byte[] masgHash= HashUtil.sha3(TypeConverter.StringHexToByteArray(data));
- ECKey.ECDSASignature signature = account.getEcKey().sign(masgHash);
- // Todo: is not clear if result should be RlpEncoded or serialized by other means
- byte[] rlpSig = RLP.encode(signature);
- return s = TypeConverter.toJsonHex(rlpSig);
- } finally {
- if (logger.isDebugEnabled()) logger.debug("eth_sign(" + addr + ", " + data + "): " + s);
- }
- }
-
- public String eth_sendTransaction(CallArguments args) throws Exception {
-
- String s = null;
- try {
- Account account = getAccount(JSonHexToHex(args.from));
-
- if (account == null)
- throw new Exception("From address private key could not be found in this node");
-
- if (args.data != null && args.data.startsWith("0x"))
- args.data = args.data.substring(2);
-
- Transaction tx = new Transaction(
- args.nonce != null ? StringHexToByteArray(args.nonce) : bigIntegerToBytes(pendingState.getRepository().getNonce(account.getAddress())),
- args.gasPrice != null ? StringHexToByteArray(args.gasPrice) : ByteUtil.longToBytesNoLeadZeroes(eth.getGasPrice()),
- args.gas != null ? StringHexToByteArray(args.gas) : ByteUtil.longToBytes(90_000),
- args.to != null ? StringHexToByteArray(args.to) : EMPTY_BYTE_ARRAY,
- args.value != null ? StringHexToByteArray(args.value) : EMPTY_BYTE_ARRAY,
- args.data != null ? StringHexToByteArray(args.data) : EMPTY_BYTE_ARRAY,
- eth.getChainIdForNextBlock());
- tx.sign(account.getEcKey().getPrivKeyBytes());
-
- eth.submitTransaction(tx);
-
- return s = TypeConverter.toJsonHex(tx.getHash());
- } finally {
- if (logger.isDebugEnabled()) logger.debug("eth_sendTransaction(" + args + "): " + s);
- }
- }
-
- public String eth_sendTransaction(String from, String to, String gas,
- String gasPrice, String value,String data,String nonce) throws Exception {
- String s = null;
- try {
- Transaction tx = new Transaction(
- TypeConverter.StringHexToByteArray(nonce),
- TypeConverter.StringHexToByteArray(gasPrice),
- TypeConverter.StringHexToByteArray(gas),
- TypeConverter.StringHexToByteArray(to), /*receiveAddress*/
- TypeConverter.StringHexToByteArray(value),
- TypeConverter.StringHexToByteArray(data),
- eth.getChainIdForNextBlock());
-
- Account account = getAccount(from);
- if (account == null) throw new RuntimeException("No account " + from);
-
- tx.sign(account.getEcKey());
-
- eth.submitTransaction(tx);
-
- return s = TypeConverter.toJsonHex(tx.getHash());
- } finally {
- if (logger.isDebugEnabled()) logger.debug("eth_sendTransaction(" +
- "from = [" + from + "], to = [" + to + "], gas = [" + gas + "], gasPrice = [" + gasPrice +
- "], value = [" + value + "], data = [" + data + "], nonce = [" + nonce + "]" + "): " + s);
- }
- }
-
- public String eth_sendRawTransaction(String rawData) throws Exception {
- String s = null;
- try {
- Transaction tx = new Transaction(StringHexToByteArray(rawData));
- tx.verify();
-
- eth.submitTransaction(tx);
-
- return s = TypeConverter.toJsonHex(tx.getHash());
- } finally {
- if (logger.isDebugEnabled()) logger.debug("eth_sendRawTransaction(" + rawData + "): " + s);
- }
- }
-
- public TransactionReceipt createCallTxAndExecute(CallArguments args, Block block) throws Exception {
- Repository repository = ((Repository) worldManager.getRepository())
- .getSnapshotTo(block.getStateRoot())
- .startTracking();
-
- return createCallTxAndExecute(args, block, repository, worldManager.getBlockStore());
- }
-
- public TransactionReceipt createCallTxAndExecute(CallArguments args, Block block, Repository repository, BlockStore blockStore) throws Exception {
- BinaryCallArguments bca = new BinaryCallArguments();
- bca.setArguments(args);
- Transaction tx = CallTransaction.createRawTransaction(0,
- bca.gasPrice,
- bca.gasLimit,
- bca.toAddress,
- bca.value,
- bca.data);
-
- // put mock signature if not present
- if (tx.getSignature() == null) {
- tx.sign(ECKey.fromPrivate(new byte[32]));
- }
-
- try {
- TransactionExecutor executor = new TransactionExecutor
- (tx, block.getCoinbase(), repository, blockStore,
- programInvokeFactory, block, new EthereumListenerAdapter(), 0)
- .withCommonConfig(commonConfig)
- .setLocalCall(true);
-
- executor.init();
- executor.execute();
- executor.go();
- executor.finalization();
-
- return executor.getReceipt();
- } finally {
- repository.rollback();
- }
- }
-
- public String eth_call(CallArguments args, String bnOrId) throws Exception {
-
- String s = null;
- try {
- TransactionReceipt res;
- if ("pending".equals(bnOrId)) {
- Block pendingBlock = blockchain.createNewBlock(blockchain.getBestBlock(), pendingState.getPendingTransactions(), Collections.emptyList());
- res = createCallTxAndExecute(args, pendingBlock, pendingState.getRepository(), worldManager.getBlockStore());
- } else {
- res = createCallTxAndExecute(args, getByJsonBlockId(bnOrId));
- }
- return s = TypeConverter.toJsonHex(res.getExecutionResult());
- } finally {
- if (logger.isDebugEnabled()) logger.debug("eth_call(" + args + "): " + s);
- }
- }
-
- public String eth_estimateGas(CallArguments args) throws Exception {
- String s = null;
- try {
- TransactionReceipt res = createCallTxAndExecute(args, blockchain.getBestBlock());
- return s = TypeConverter.toJsonHex(res.getGasUsed());
- } finally {
- if (logger.isDebugEnabled()) logger.debug("eth_estimateGas(" + args + "): " + s);
- }
- }
-
-
- public BlockResult getBlockResult(Block b, boolean fullTx) {
- if (b==null)
- return null;
- boolean isPending = ByteUtil.byteArrayToLong(b.getNonce()) == 0;
- BlockResult br = new BlockResult();
- br.number = isPending ? null : TypeConverter.toJsonHex(b.getNumber());
- br.hash = isPending ? null : TypeConverter.toJsonHex(b.getHash());
- br.parentHash = TypeConverter.toJsonHex(b.getParentHash());
- br.nonce = isPending ? null : TypeConverter.toJsonHex(b.getNonce());
- br.sha3Uncles= TypeConverter.toJsonHex(b.getUnclesHash());
- br.logsBloom = isPending ? null : TypeConverter.toJsonHex(b.getLogBloom());
- br.transactionsRoot =TypeConverter.toJsonHex(b.getTxTrieRoot());
- br.stateRoot = TypeConverter.toJsonHex(b.getStateRoot());
- br.receiptsRoot =TypeConverter.toJsonHex(b.getReceiptsRoot());
- br.miner = isPending ? null : TypeConverter.toJsonHex(b.getCoinbase());
- br.difficulty = TypeConverter.toJsonHex(b.getDifficulty());
- br.totalDifficulty = TypeConverter.toJsonHex(blockchain.getTotalDifficulty());
- if (b.getExtraData() != null)
- br.extraData =TypeConverter.toJsonHex(b.getExtraData());
- br.size = TypeConverter.toJsonHex(b.getEncoded().length);
- br.gasLimit =TypeConverter.toJsonHex(b.getGasLimit());
- br.gasUsed =TypeConverter.toJsonHex(b.getGasUsed());
- br.timestamp =TypeConverter.toJsonHex(b.getTimestamp());
-
- List