Chronicle Map Replication is part of Chronicle Map (Enterprise Edition); a commercially supported version of our successful open source Chronicle Map. Extended features include:
-
Replication to ensure real-time backup of all your map data.
-
Resilience support for robust fail-over and disaster recovery environments.
In addition, you will be fully supported by our technical experts.
For more information on Chronicle Map (Enterprise Edition), please contact sales@chronicle.software.
Chronicle Map Replication is a multi-master replicated data store. Chronicle Map supports replication over TCP/IP
TCP/IP is a reliable protocol. This means that, unless you have a network failure or hardware outage, the data is guaranteed to arrive. TCP/IP provides point-to-point connectivity. For example, if the message was sent to 100 hosts, the message would have to be sent 100 times.
We are careful not to swamp your network with too much TCP/IP traffic. We do this by providing a throttled version of TCP replication. This works because Chronicle Map only broadcasts the latest update of each entry.
Chronicle Map provides multi-master hash-map replication. This means that each remote map mirrors its changes over to another remote map. Neither map is considered to be the master store of data. Each map uses timestamps to reconcile changes.
We refer to an instance of a remote map as a node. Each node can be connected to up to 128 other nodes.
The data that is stored locally in each node becomes eventually consistent. So changes made to one node, for example by calling put()
, will be replicated over to the other node.
To achieve a high level of performance and throughput, the call to put()
will not block.
With ConcurrentHashMap
, it is typical to check the return code of some methods to obtain the old value; for example, remove()
.
Due to the loose coupling, and lock-free nature of this multi-master implementation, the return value is only the old value on the node’s local data store. In other words, the nodes are only concurrent locally. Another node, performing exactly the same operation, may return a different value. However, reconciliation will ensure that all the maps will become eventually consistent.
If two or more nodes receive a change to their maps for the same key, but with different values, say by a user of the maps, calling the put(key,value)
, then, initially each node will update its local store, and each local store will hold a different value.
The aim of multi-master replication is to provide eventual consistency across the nodes. Using multi-master replication, whenever a node is changed it will notify the other nodes of its change; we refer to this notification as an event.
The event will hold a timestamp indicating the time that the change occurred. It will also hold the state transition; in this case it was a put
with a key and value.
Eventual consistency is achieved by looking at the timestamp from the remote node. If, for a given key, the remote node’s timestamp is newer than the local node’s timestamp, then the event from the remote node will be applied to the local node; otherwise, the event will be ignored.
Since none of the nodes is a primary, each node holds information about the other nodes. For a specific node, its own identifier is referred to as the 'localIdentifier'. The identifiers of other nodes are the 'remoteIdentifiers'.
On an update, or insert, of a key/value, the node pushes the information about the change to the remote nodes. The nodes use non-blocking Java NIO I/O, and all replication is done on a single thread.
However, there is a specific edge case. If two nodes update their map at precisely the same time with different values, we have to deterministically resolve which update wins. This is because eventual consistency mandates that both nodes should end up holding the same data locally.
Although it is rare that two remote nodes receive an update to their maps at exactly the same time, for the same key, we have to handle this edge case. We cannot therefore rely on timestamps alone to reconcile the updates.
Typically, the update with the newest timestamp should win, but in this example both timestamps are the same, and the decision made to one node should be identical to the decision made to the other. This dilemma is resolved by using a node identifier. The node identifier is a unique 'byte' value that is assigned to each node. When the timestamps are the same, the remote node with the smaller identifier will be preferred.
On a single server, if you have a number of Java processes, and then within each Java process you create an instance of a Chronicle Map which binds to the same underline 'file', they exchange data via shared memory, rather than by TCP or UDP replication.
If an instance of Chronicle Map, which is not performing TCP Replication, is updated, then this update can be picked up by another instance of Chronicle Map. This other Chronicle Map instance could be TCP replicated. In such an example, the TCP replicated Chronicle Map instance would then push the update to the remote nodes.
Likewise, if the TCP replicated Chronicle Map instance received an update from a remote node, then this update would be immediately available to all the instances of Chronicle Map on the server.
If you are only replicating your Chronicle Map instances on the same server, then you do not have to set up TCP and UDP replication. You also do not have to set the identifiers; as the identifiers are only used for the resolution of conflicts amongst remote servers.
If however, you wish to replicate data between two or more servers, then all of the Chronicle Map instances, including those not actively participating in TCP or UDP replication, must have their identifiers set.
The identifier must be unique to each server. Each ChronicleMap on the same server must have the same identifier. The reason that all Chronicle Map instances must have the identifier set, is because the memory is laid out slightly differently when using replication, so even if a map is not actively performing TCP or UDP replication itself, if it wishes to replicate with one that is, it must have its memory laid out in the same way to be compatible.
If the identifiers are not set up uniquely, then the updates will be ignored. For example, a Chronicle Map instance that is set up with the identifier equal to '1', will ignore all events which contain the remote identifier of '1'. In other words, Chronicle Map replication ignores updates which have originated from itself. This is to avoid the circularity of events.
When setting up the identifier you can use values from 1
to 127
.
The identifier is setup on the builder as follows:
TcpTransportAndNetworkConfig tcpConfig = ...
map = ChronicleMapBuilder
.of(Integer.class, CharSequence.class)
.replication(identifier, tcpConfig)
.create();
Configuration of map nodes is done either, by creating configuration programmatically, or through YAML configuration files.
The following example uses a basic yaml
configuration file to define clustered replication for the map named fx
:
!MapReplicationCfg {
cluster: {
host1: {
hostId: 1,
connectUri: hostport1,
},
host2: {
hostId: 2,
connectUri: hostport2,
},
host3: {
hostId: 3,
connectUri: hostport3,
}
},
maps: {
fx: {
entries: 10000,
keyClass: !type String,
valueClass: !type software.chronicle.enterprise.map.ValueObject,
averageKeySize: 64,
averageValueSize: 128,
mapFileDataDirectory: data/$hostId/,
mapLogDirectory: logs/$hostId/,
enableReplicationLogging: true
},
}
}
And below is an example using this configuration file to start up cluster and insert entries in different maps, verifying that all maps are eventually in sync:
try (ReplicatedMap clusterOnHost1 = createCluster(CLUSTER_YAML, 1);
ReplicatedMap clusterOnHost3 = createCluster(CLUSTER_YAML, 3);
ReplicatedMap clusterOnHost2 = createCluster(CLUSTER_YAML, 2)) {
final ChronicleMap<String, ValueObject> mapOnHost1 = clusterOnHost1.getReplicatedMap("fx");
final ChronicleMap<String, ValueObject> mapOnHost2 = clusterOnHost2.getReplicatedMap("fx");
final ChronicleMap<String, ValueObject> mapOnHost3 = clusterOnHost3.getReplicatedMap("fx");
mapOnHost1.put("USD/GBP", new ValueObject("BATS", System.currentTimeMillis(), 0.767957));
mapOnHost2.put("GBP/USD", new ValueObject("BATS", System.currentTimeMillis(), 1.30216));
mapOnHost3.put("EUR/USD", new ValueObject("LXN", System.currentTimeMillis(), 1.16337));
Jvm.pause(500L);
printMap("one", mapOnHost1);
printMap("two", mapOnHost2);
printMap("three", mapOnHost3);
}
This example is available in the repository here
Chronicle Map Enterprise can be configured to log all replication events to a Chronicle Queue for auditing purposes.
Currently, a map can be configured to log all outgoing events that it sends to remote peers.
The example below shows the message flow for a map with a single remote peer receiving replication events:
.. header omitted
== encoded replication update sent to remote peer
targetHostId: 2
replicatedEntry: !!binary AYChq5LqwKXqFAFOEwAAAAAAAAD/////////fw==