diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index a61bc2d7..6e77254a 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -10,12 +10,20 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - - name: Build documentation - run: cargo doc --all --no-deps + - name: Install latest nightly + uses: actions-rs/toolchain@v1 + with: + toolchain: nightly + override: true + - name: Generate documentation + uses: actions-rs/cargo@v1 + with: + command: doc + args: --all --no-deps - name: Deploy to GitHub Pages uses: peaceiris/actions-gh-pages@v3 with: github_token: ${{ secrets.GITHUB_TOKEN }} publish_branch: gh-pages - publish_dir: ./target/doc - force_orphan: true \ No newline at end of file + publish_dir: target/doc + force_orphan: true diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 25b91b53..8d29f56a 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -10,21 +10,39 @@ jobs: name: Build Linux runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 - - name: Build - run: cargo build --release --verbose - - uses: actions/upload-artifact@v1 - with: - name: linux - path: target/release/spartan + - uses: actions/checkout@v2 + - name: Install latest nightly + uses: actions-rs/toolchain@v1 + with: + toolchain: nightly + override: true + - name: Build + uses: actions-rs/cargo@v1 + with: + command: build + args: --release + - name: Upload binary + uses: actions/upload-artifact@v1 + with: + name: linux + path: target/release/spartan win: - name: Build Win + name: Build Windows runs-on: windows-latest steps: - - uses: actions/checkout@v2 - - name: Build - run: cargo build --release --verbose - - uses: actions/upload-artifact@v1 - with: - name: win - path: target/release/spartan.exe \ No newline at end of file + - uses: actions/checkout@v2 + - name: Install latest nightly + uses: actions-rs/toolchain@v1 + with: + toolchain: nightly + override: true + - name: Build + uses: actions-rs/cargo@v1 + with: + command: build + args: --release + - name: Upload binary + uses: actions/upload-artifact@v1 + with: + name: windows + path: target/release/spartan.exe diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 0c2543a4..4de9273c 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -12,12 +12,26 @@ jobs: linux: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 - - name: Run tests - run: cargo test --verbose + - uses: actions/checkout@v2 + - name: Install latest nightly + uses: actions-rs/toolchain@v1 + with: + toolchain: nightly + override: true + - name: Run tests + uses: actions-rs/cargo@v1 + with: + command: test win: runs-on: windows-latest steps: - - uses: actions/checkout@v2 - - name: Run tests - run: cargo test --verbose \ No newline at end of file + - uses: actions/checkout@v2 + - name: Install latest nightly + uses: actions-rs/toolchain@v1 + with: + toolchain: nightly + override: true + - name: Run tests + uses: actions-rs/cargo@v1 + with: + command: test diff --git a/Cargo.lock b/Cargo.lock index e743d5c0..7141f625 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -261,18 +261,18 @@ dependencies = [ [[package]] name = "addr2line" -version = "0.12.2" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "602d785912f476e480434627e8732e6766b760c045bbf897d9dfaa9f4fbd399c" +checksum = "1b6a2d3371669ab3ca9797670853d61402b03d0b4b9ebf33d677dfa720203072" dependencies = [ "gimli", ] [[package]] -name = "adler32" -version = "1.1.0" +name = "adler" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "567b077b825e468cc974f0020d4082ee6e03132512f207ef1a02fd5d00d1f32d" +checksum = "ee2a4ec343196209d6594e19543ae87a39f96d5534d7174822a3ad825dd6ed7e" [[package]] name = "aho-corasick" @@ -289,7 +289,7 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b" dependencies = [ - "winapi 0.3.8", + "winapi 0.3.9", ] [[package]] @@ -323,7 +323,7 @@ checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" dependencies = [ "hermit-abi", "libc", - "winapi 0.3.8", + "winapi 0.3.9", ] [[package]] @@ -357,9 +357,9 @@ dependencies = [ [[package]] name = "backtrace" -version = "0.3.49" +version = "0.3.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05100821de9e028f12ae3d189176b41ee198341eb8f369956407fea2f5cc666c" +checksum = "46254cf2fdcdf1badb5934448c1bcbe046a56537b3987d96c51a7afc5d03f293" dependencies = [ "addr2line", "cfg-if", @@ -437,9 +437,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.54" +version = "1.0.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7bbb73db36c1246e9034e307d0fba23f9a2e251faa47ade70c1bd252220c8311" +checksum = "f9a06fb2e53271d7c279ec1efea6ab691c35a2ae67ec0d91d7acec0caf13b518" [[package]] name = "cfg-if" @@ -449,9 +449,9 @@ checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" [[package]] name = "chrono" -version = "0.4.11" +version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80094f509cf8b5ae86a4966a39b3ff66cd7e2a3e594accec3743ff3fabeab5b2" +checksum = "c74d84029116787153e02106bf53e66828452a4b325cc8652b788b5967c0a0b6" dependencies = [ "num-integer", "num-traits", @@ -511,9 +511,9 @@ dependencies = [ [[package]] name = "derive_more" -version = "0.99.8" +version = "0.99.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc655351f820d774679da6cdc23355a93de496867d8203496675162e17b1d671" +checksum = "298998b1cf6b5b2c8a7b023dfd45821825ce3ba8a8af55c921a0e734e4653f76" dependencies = [ "proc-macro2", "quote", @@ -590,9 +590,9 @@ dependencies = [ [[package]] name = "flate2" -version = "1.0.14" +version = "1.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2cfff41391129e0a856d6d822600b8d71179d46879e310417eb9c762eb178b42" +checksum = "68c90b0fc46cf89d227cc78b40e494ff81287a92dd07631e5af0d06fe3cf885e" dependencies = [ "cfg-if", "crc32fast", @@ -736,7 +736,7 @@ dependencies = [ "libc", "log", "rustc_version", - "winapi 0.3.8", + "winapi 0.3.9", ] [[package]] @@ -752,9 +752,9 @@ dependencies = [ [[package]] name = "gimli" -version = "0.21.0" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bcc8e0c9bce37868955864dbecd2b1ab2bdf967e6f28066d65aaac620444b65c" +checksum = "aaf91faf136cb47367fa430cd46e37a788775e7fa104f8b4bcb3861dc389b724" [[package]] name = "h2" @@ -786,9 +786,9 @@ dependencies = [ [[package]] name = "hermit-abi" -version = "0.1.14" +version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9586eedd4ce6b3c498bc3b4dd92fc9f11166aa908a914071953768066c67909" +checksum = "3deed196b6e7f9e44a2ae8d94225d80302d81208b1bb673fd21fe634645c85a9" dependencies = [ "libc", ] @@ -801,7 +801,7 @@ checksum = "3c731c3e10504cc8ed35cfe2f1db4c9274c3d35fa486e3b31df46f068ef3e867" dependencies = [ "libc", "match_cfg", - "winapi 0.3.8", + "winapi 0.3.9", ] [[package]] @@ -867,10 +867,19 @@ checksum = "f7e2f18aece9709094573a9f24f483c4f65caa4298e2f7ae1b71cc65d853fad7" dependencies = [ "socket2", "widestring", - "winapi 0.3.8", + "winapi 0.3.9", "winreg", ] +[[package]] +name = "itertools" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "284f18f85651fe11e8a991b2adb42cb078325c996ed026d994719efcfca1d54b" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "0.4.6" @@ -901,9 +910,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.71" +version = "0.2.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9457b06509d27052635f90d6466700c65095fdf75409b3fbdd903e988b886f49" +checksum = "a9f8082297d534141b30c8d39e9b1773713ab50fdbe4ff30f750d063b3bfd701" [[package]] name = "linked-hash-map" @@ -961,6 +970,15 @@ version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08" +[[package]] +name = "maybe-owned" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4facc753ae494aeb6e3c22f839b158aebd4f9270f55cd3c79906c45476c47ab4" +dependencies = [ + "serde", +] + [[package]] name = "memchr" version = "2.3.3" @@ -975,11 +993,11 @@ checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" [[package]] name = "miniz_oxide" -version = "0.3.7" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "791daaae1ed6889560f8c4359194f56648355540573244a5448a83ba1ecc7435" +checksum = "be0f75932c1f6cfae3c04000e40114adf955636e19040f9c0a2c380702aa1c7f" dependencies = [ - "adler32", + "adler", ] [[package]] @@ -1032,7 +1050,7 @@ checksum = "2ba7c918ac76704fb42afcbbb43891e72731f3dcca3bef2a19786297baf14af7" dependencies = [ "cfg-if", "libc", - "winapi 0.3.8", + "winapi 0.3.9", ] [[package]] @@ -1097,7 +1115,7 @@ dependencies = [ "libc", "redox_syscall", "smallvec", - "winapi 0.3.8", + "winapi 0.3.9", ] [[package]] @@ -1156,9 +1174,9 @@ dependencies = [ [[package]] name = "proc-macro-error" -version = "1.0.2" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "98e9e4b82e0ef281812565ea4751049f1bdcdfccda7d3f459f2e138a40c08678" +checksum = "fc175e9777c3116627248584e8f8b3e2987405cabe1c0adf7d1dd28f09dc7880" dependencies = [ "proc-macro-error-attr", "proc-macro2", @@ -1169,9 +1187,9 @@ dependencies = [ [[package]] name = "proc-macro-error-attr" -version = "1.0.2" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f5444ead4e9935abd7f27dc51f7e852a0569ac888096d5ec2499470794e2e53" +checksum = "3cc9795ca17eb581285ec44936da7fc2335a3f34f2ddd13118b6f4d515435c50" dependencies = [ "proc-macro2", "quote", @@ -1259,9 +1277,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.1.56" +version = "0.1.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2439c63f3f6139d1b57529d16bc3b8bb855230c8efcc5d3a896c8bea7c3b1e84" +checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce" [[package]] name = "regex" @@ -1287,7 +1305,7 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" dependencies = [ - "winapi 0.3.8", + "winapi 0.3.9", ] [[package]] @@ -1370,9 +1388,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.55" +version = "1.0.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec2c5d7e739bc07a3e73381a39d61fdb5f671c60c1df26a130690665803d8226" +checksum = "3433e879a558dde8b5e8feb2a04899cf34fdde1fafb894687e52105fc1162ac3" dependencies = [ "itoa", "ryu", @@ -1415,9 +1433,9 @@ checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" [[package]] name = "smallvec" -version = "1.4.0" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7cb5678e1615754284ec264d9bb5b4c27d2018577fd90ac0ceb578591ed5ee4" +checksum = "3757cb9d89161a2f24e1cf78efa0c1fcff485d18e3f55e0aa3480824ddaa0f3f" [[package]] name = "socket2" @@ -1428,7 +1446,7 @@ dependencies = [ "cfg-if", "libc", "redox_syscall", - "winapi 0.3.8", + "winapi 0.3.9", ] [[package]] @@ -1442,7 +1460,9 @@ dependencies = [ "bincode", "derive-new", "futures-util", + "itertools", "log", + "maybe-owned", "once_cell", "pretty_env_logger", "serde", @@ -1451,6 +1471,7 @@ dependencies = [ "tempfile", "thiserror", "tokio", + "tokio-util 0.3.1", "toml", "uuid", ] @@ -1540,7 +1561,7 @@ dependencies = [ "rand", "redox_syscall", "remove_dir_all", - "winapi 0.3.8", + "winapi 0.3.9", ] [[package]] @@ -1606,7 +1627,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438" dependencies = [ "libc", - "winapi 0.3.8", + "winapi 0.3.9", ] [[package]] @@ -1634,7 +1655,7 @@ dependencies = [ "signal-hook-registry", "slab", "tokio-macros", - "winapi 0.3.8", + "winapi 0.3.9", ] [[package]] @@ -1750,9 +1771,9 @@ checksum = "e83e153d1053cbb5a118eeff7fd5be06ed99153f00dbcd8ae310c5fb2b22edc0" [[package]] name = "unicode-width" -version = "0.1.7" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "caaa9d531767d1ff2150b9332433f32a24622147e5ebb1f26409d5da67afd479" +checksum = "9337591893a19b88d8d87f2cec1e73fad5cdfd10e5a6f349f498ad6ea2ffb1e3" [[package]] name = "unicode-xid" @@ -1813,9 +1834,9 @@ checksum = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" [[package]] name = "winapi" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8093091eeb260906a183e6ae1abdba2ef5ef2257a21801128899c3fc699229c6" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" dependencies = [ "winapi-i686-pc-windows-gnu", "winapi-x86_64-pc-windows-gnu", @@ -1839,7 +1860,7 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" dependencies = [ - "winapi 0.3.8", + "winapi 0.3.9", ] [[package]] @@ -1854,7 +1875,7 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b2986deb581c4fe11b621998a5e53361efe6b48a151178d0cd9eeffa4dc6acc9" dependencies = [ - "winapi 0.3.8", + "winapi 0.3.9", ] [[package]] diff --git a/README.md b/README.md index 93ae98af..2579f686 100644 --- a/README.md +++ b/README.md @@ -63,6 +63,7 @@ cargo build --release * `persistence_timer` - Amount of seconds between each database write to disk (default: `900`). * `gc_timer` - Amount of seconds between each GC job wake (GC cycle times vary, default: `300`). * `access_keys` - Table of queue access keys. Anonymous access to queues will not be permitted if this key has any value. +* `replication` - Replication configuration for both primary and replica nodes. #### `access_keys` Spartan has authentication and authorization mechanism using access keys. @@ -90,3 +91,31 @@ Example of valid HTTP Authorization header: ``` Authorization: Bearer IHaveAccessToAllQueues ``` + +#### `replication` +Spartan also has support for queue replication. + +Replication process will be restarted in case of any minor error (protocol or queue config mismatch). + +If there is any problem with TCP socket, then connection will be dropped and re-opened with each replica. + +##### Primary + +The following config will start primary node that communicates with one replica every 180 seconds (default value): +```toml +replication = { Primary = { destination = ["127.0.0.1:12345"] } } +``` + +You may also use `replication_timer` key to change amount of seconds between each replication: +```toml +replication = { Primary = { destination = ["127.0.0.1:12345"], replication_timer = 30 } } +``` + +##### Replica + +Change your replication config to following example: +```toml +replication = { Replica = { host = "127.0.0.1:12345" } } +``` + +Then, start replica node with `startan replica` command. diff --git a/spartan/Cargo.toml b/spartan/Cargo.toml index 6196787f..b0435db8 100644 --- a/spartan/Cargo.toml +++ b/spartan/Cargo.toml @@ -18,7 +18,7 @@ version = "1.0" [dependencies.tokio] version = "0.2" -features = ["macros", "rt-threaded", "fs"] +features = ["macros", "rt-threaded", "fs", "tcp"] [dependencies.structopt] version = "0.3" @@ -57,5 +57,15 @@ version = "0.3" [dependencies.once_cell] version = "1.4" +[dependencies.maybe-owned] +version = "0.3" +features = ["serde"] + +[dependencies.tokio-util] +version = "0.3" + +[dependencies.itertools] +version = "0.9" + [dev-dependencies.tempfile] version = "3.1" \ No newline at end of file diff --git a/spartan/src/cli/commands/mod.rs b/spartan/src/cli/commands/mod.rs index ca091e63..2f988350 100644 --- a/spartan/src/cli/commands/mod.rs +++ b/spartan/src/cli/commands/mod.rs @@ -3,3 +3,6 @@ pub mod start; /// `init` command pub mod init; + +/// `replica` command +pub mod replica; diff --git a/spartan/src/cli/commands/replica.rs b/spartan/src/cli/commands/replica.rs new file mode 100644 index 00000000..112604d4 --- /dev/null +++ b/spartan/src/cli/commands/replica.rs @@ -0,0 +1,76 @@ +use crate::{ + cli::Server, + config::replication::Replication, + jobs::{ + exit::spawn_ctrlc_handler, + persistence::{load_from_fs, spawn_persistence}, + }, + node::{ + replication::{ + replica::{ + accept_connection, + error::{ReplicaError, ReplicaResult}, + ReplicaSocket, + }, + storage::{replica::ReplicaStorage, ReplicationStorage}, + }, + Manager, + }, +}; +use std::sync::Arc; +use structopt::StructOpt; +use tokio::{net::TcpListener, spawn}; + +#[derive(StructOpt)] +pub struct ReplicaCommand {} + +impl ReplicaCommand { + pub async fn dispatch(&self, server: &'static Server) -> ReplicaResult<()> { + let config = server.config().expect("Config not loaded"); + let mut manager = Manager::new(config); + + load_from_fs(&mut manager) + .await + .map_err(ReplicaError::PersistenceError)?; + + let manager = Arc::new(manager); + + let cloned_manager = manager.clone(); + spawn(async move { spawn_ctrlc_handler(&cloned_manager).await }); + + let cloned_manager = manager.clone(); + spawn(async move { spawn_persistence(&cloned_manager).await }); + + manager + .node + .prepare_replication( + |storage| matches!(storage, ReplicationStorage::Replica(_)), + || ReplicationStorage::Replica(ReplicaStorage::default()), + ) + .await; + + let config = match config + .replication + .as_ref() + .ok_or_else(|| ReplicaError::ReplicaConfigNotFound)? + { + Replication::Replica(config) => Ok(config), + _ => Err(ReplicaError::ReplicaConfigNotFound), + }?; + + let mut socket = TcpListener::bind(config.host) + .await + .map_err(ReplicaError::SocketError)?; + + loop { + match socket.accept().await { + Ok((socket, _)) => { + ReplicaSocket::new(&manager, config, socket) + .exchange(accept_connection) + .await + } + Err(e) => error!("Unable to accept TCP connection: {}", e), + } + } + } +} diff --git a/spartan/src/cli/commands/start.rs b/spartan/src/cli/commands/start.rs index c992c52e..fc9383b5 100644 --- a/spartan/src/cli/commands/start.rs +++ b/spartan/src/cli/commands/start.rs @@ -1,10 +1,13 @@ use crate::{ cli::Server, http::server::{start_http_server, ServerError}, - node::{ - gc::spawn_gc, load_from_fs, persistence::PersistenceError, spawn_ctrlc_handler, - spawn_persistence, Manager, + jobs::{ + exit::spawn_ctrlc_handler, + gc::spawn_gc, + persistence::{load_from_fs, spawn_persistence, PersistenceError}, + replication::spawn_replication, }, + node::Manager, }; use actix_rt::System; use actix_web::web::Data; @@ -64,17 +67,14 @@ impl StartCommand { let manager = Data::new(manager); - debug!("Spawning GC handler."); - let cloned_manager = manager.clone(); spawn(async move { spawn_gc(&cloned_manager).await }); - debug!("Spawning persistence job."); - let cloned_manager = manager.clone(); spawn(async move { spawn_persistence(&cloned_manager).await }); - debug!("Spawning Ctrl-C handler"); + let cloned_manager = manager.clone(); + spawn(async move { spawn_replication(&cloned_manager).await }); let cloned_manager = manager.clone(); spawn(async move { spawn_ctrlc_handler(&cloned_manager).await }); diff --git a/spartan/src/cli/mod.rs b/spartan/src/cli/mod.rs index 89125c5e..4304a933 100644 --- a/spartan/src/cli/mod.rs +++ b/spartan/src/cli/mod.rs @@ -2,7 +2,7 @@ mod commands; use crate::config::Config; -use commands::{init::InitCommand, start::StartCommand}; +use commands::{init::InitCommand, replica::ReplicaCommand, start::StartCommand}; use std::{ io::Error, path::{Path, PathBuf}, @@ -11,14 +11,18 @@ use structopt::StructOpt; use tokio::fs::read; use toml::from_slice; +/// Enum of all available CLI commands #[derive(StructOpt)] pub enum Command { #[structopt(about = "Start Spartan MQ server")] Start(StartCommand), #[structopt(about = "Initialize configuration file")] Init(InitCommand), + #[structopt(about = "Start replication server")] + Replica(ReplicaCommand), } +/// Server with config and selected command #[derive(StructOpt)] pub struct Server { /// Server configuration path diff --git a/spartan/src/config/mod.rs b/spartan/src/config/mod.rs index 1d86072f..eb90e7ae 100644 --- a/spartan/src/config/mod.rs +++ b/spartan/src/config/mod.rs @@ -1,6 +1,11 @@ +/// Queue access key pub mod key; +/// Replication config +pub mod replication; + use key::Key; +use replication::Replication; use serde::{Deserialize, Serialize}; use std::{ collections::HashSet, @@ -45,6 +50,9 @@ pub struct Config { /// Queue access keys pub access_keys: Option>, + + /// Replication config + pub replication: Option, } #[cfg(not(test))] @@ -57,6 +65,7 @@ impl Default for Config { queues: Box::new([]), encryption_key: None, access_keys: None, + replication: None, } } } @@ -71,6 +80,7 @@ impl Default for Config { queues: Box::new([String::from("test").into_boxed_str()]), encryption_key: None, access_keys: None, + replication: None, } } } diff --git a/spartan/src/config/replication.rs b/spartan/src/config/replication.rs new file mode 100644 index 00000000..81e6d4e9 --- /dev/null +++ b/spartan/src/config/replication.rs @@ -0,0 +1,42 @@ +use serde::{Deserialize, Serialize}; +use std::net::SocketAddr; + +/// Default amount of seconds between replication jobs +const fn default_replication_timer() -> u64 { + 180 +} + +/// Default amount of seconds between replication job restart tries +const fn default_primary_try_timer() -> u64 { + 10 +} + +/// Default amount of seconds between replica command restart tries +const fn default_replica_try_timer() -> u64 { + 5 +} + +#[derive(Serialize, Deserialize)] +pub struct Primary { + pub destination: Box<[SocketAddr]>, + + #[serde(default = "default_replication_timer")] + pub replication_timer: u64, + + #[serde(default = "default_primary_try_timer")] + pub try_timer: u64, +} + +#[derive(Serialize, Deserialize)] +pub struct Replica { + pub host: SocketAddr, + + #[serde(default = "default_replica_try_timer")] + pub try_timer: u64, +} + +#[derive(Serialize, Deserialize)] +pub enum Replication { + Primary(Primary), + Replica(Replica), +} diff --git a/spartan/src/node/exit.rs b/spartan/src/jobs/exit.rs similarity index 75% rename from spartan/src/node/exit.rs rename to spartan/src/jobs/exit.rs index 246e197a..de962689 100644 --- a/spartan/src/node/exit.rs +++ b/spartan/src/jobs/exit.rs @@ -1,4 +1,5 @@ -use super::{persist_manager, Manager}; +use super::persistence::persist_manager; +use crate::node::Manager; use actix_rt::signal::ctrl_c; use std::process::exit; @@ -6,6 +7,8 @@ use std::process::exit; /// /// Listens to Ctrl-C signal, and after receiving one starts persisting database. pub async fn spawn_ctrlc_handler(manager: &Manager<'_>) { + debug!("Spawning Ctrl-C handler"); + ctrl_c().await.expect("Unable to listen to Ctrl-C signal."); persist_manager(manager).await; diff --git a/spartan/src/node/gc.rs b/spartan/src/jobs/gc.rs similarity index 94% rename from spartan/src/node/gc.rs rename to spartan/src/jobs/gc.rs index ca2e460d..f3e65879 100644 --- a/spartan/src/node/gc.rs +++ b/spartan/src/jobs/gc.rs @@ -1,4 +1,4 @@ -use super::Manager; +use crate::node::Manager; use actix_rt::time::delay_for; use futures_util::stream::{iter, StreamExt}; use spartan_lib::core::dispatcher::SimpleDispatcher; @@ -6,7 +6,7 @@ use std::time::Duration; /// Concurrently iterates over all databases in node, and executes GC on them. async fn execute_gc(manager: &Manager<'_>) { - iter(manager.node.db.iter()) + iter(manager.node.iter()) .for_each_concurrent(None, |(name, db)| async move { let mut db = db.lock().await; @@ -23,6 +23,8 @@ async fn execute_gc(manager: &Manager<'_>) { /// /// Periodically iterates over all databases in node, and executes GC on them. pub async fn spawn_gc(manager: &Manager<'_>) { + debug!("Spawning GC handler."); + let timer = Duration::from_secs(manager.config.gc_timer); loop { diff --git a/spartan/src/jobs/mod.rs b/spartan/src/jobs/mod.rs new file mode 100644 index 00000000..be4f2124 --- /dev/null +++ b/spartan/src/jobs/mod.rs @@ -0,0 +1,11 @@ +/// Ctrl-C handler +pub mod exit; + +/// GC handler +pub mod gc; + +/// Persistence handler +pub mod persistence; + +/// Replication job +pub mod replication; diff --git a/spartan/src/node/persistence.rs b/spartan/src/jobs/persistence.rs similarity index 91% rename from spartan/src/node/persistence.rs rename to spartan/src/jobs/persistence.rs index 91172c7b..ba146b3c 100644 --- a/spartan/src/node/persistence.rs +++ b/spartan/src/jobs/persistence.rs @@ -1,9 +1,7 @@ -use super::Manager; +use crate::node::{Manager, MutexDB}; use actix_rt::time::delay_for; use bincode::{deserialize, serialize, Error as BincodeError}; -use futures_util::lock::Mutex; use futures_util::stream::{iter, StreamExt}; -use spartan_lib::core::{db::tree::TreeDatabase, message::Message}; use std::{io::Error, path::Path, time::Duration}; use thiserror::Error as ThisError; use tokio::fs::{read, write}; @@ -22,11 +20,7 @@ pub enum PersistenceError { type PersistenceResult = Result; /// Persist database to provided path -async fn persist_db( - name: &str, - db: &Mutex>, - path: &Path, -) -> Result<(), PersistenceError> { +async fn persist_db(name: &str, db: &MutexDB, path: &Path) -> Result<(), PersistenceError> { let db = db.lock().await; info!("Saving database \"{}\"", name); @@ -45,7 +39,7 @@ async fn persist_db( /// Persist all databases from manager pub async fn persist_manager(manager: &Manager<'_>) { - iter(manager.node.db.iter()) + iter(manager.node.iter()) .for_each_concurrent(None, |(name, db)| async move { match persist_db(name, db, &manager.config.path).await { Err(PersistenceError::SerializationError(e)) => { @@ -62,6 +56,8 @@ pub async fn persist_manager(manager: &Manager<'_>) { /// Persistence job handler, that persists all databases from manager pub async fn spawn_persistence(manager: &Manager<'_>) { + debug!("Spawning persistence job."); + let timer = Duration::from_secs(manager.config.persistence_timer); loop { @@ -77,7 +73,7 @@ pub async fn load_from_fs(manager: &mut Manager<'_>) -> PersistenceResult<()> { match read(manager.config.path.join(&**queue)).await { Ok(file_buf) => { let db = deserialize(&file_buf).map_err(PersistenceError::InvalidFileFormat)?; - manager.node.db.insert(queue, Mutex::new(db)); + manager.node.add_db(queue, db); } Err(e) => warn!("Unable to load database {}: {}", queue, e), } diff --git a/spartan/src/jobs/replication.rs b/spartan/src/jobs/replication.rs new file mode 100644 index 00000000..30023a4d --- /dev/null +++ b/spartan/src/jobs/replication.rs @@ -0,0 +1,90 @@ +use crate::{ + config::replication::{Primary, Replication}, + node::{ + replication::{ + primary::{ + error::{PrimaryError, PrimaryResult}, + stream::StreamPool, + }, + storage::{primary::PrimaryStorage, ReplicationStorage}, + }, + Manager, + }, +}; +use actix_rt::time::delay_for; +use std::time::Duration; +use tokio::io::Result as IoResult; + +async fn replicate_manager(manager: &Manager<'_>, pool: &mut StreamPool) -> PrimaryResult<()> { + debug!("Pinging stream pool."); + pool.ping().await?; + + debug!("Asking stream pool for indexes."); + let mut batch = pool.ask().await?; + + debug!("Starting event slice sync."); + batch.sync(manager).await?; + + debug!("Setting GC threshold."); + batch.set_gc(manager).await; + + Ok(()) +} + +async fn start_replication(manager: &Manager<'_>, pool: &mut StreamPool, config: &Primary) { + let timer = Duration::from_secs(config.replication_timer); + + loop { + delay_for(timer).await; + + info!("Starting database replication."); + + match replicate_manager(manager, pool).await { + Ok(_) => info!("Database replicated successfully!"), + Err(PrimaryError::EmptySocket) => { + error!("Empty TCP socket"); + return; + } + Err(PrimaryError::SocketError(e)) => { + error!("TCP socket error: {}", e); + return; + } + Err(e) => error!("Error happened during replication attempt: {}", e), + } + } +} + +pub async fn spawn_replication(manager: &Manager<'_>) -> IoResult<()> { + debug!("Spawning replication job."); + + if let Some(config) = manager.config.replication.as_ref() { + match config { + Replication::Primary(config) => { + manager + .node + .prepare_replication( + |storage| matches!(storage, ReplicationStorage::Primary(_)), + || ReplicationStorage::Primary(PrimaryStorage::default()), + ) + .await; + + let timer = Duration::from_secs(config.try_timer); + + loop { + delay_for(timer).await; + + match StreamPool::from_config(config).await { + Ok(mut pool) => start_replication(manager, &mut pool, config).await, + Err(e) => error!("Unable to open connection pool: {}", e), + } + } + } + Replication::Replica(_) => { + warn!("Primary node started with replica configuration!"); + warn!("Event log will be disabled for this session."); + } + }; + } + + Ok(()) +} diff --git a/spartan/src/main.rs b/spartan/src/main.rs index f4dc3a95..55f062ed 100644 --- a/spartan/src/main.rs +++ b/spartan/src/main.rs @@ -1,3 +1,5 @@ +#![feature(btree_drain_filter)] + #[macro_use] extern crate derive_new; @@ -22,6 +24,9 @@ mod routing; /// Configuration mod config; +/// Background jobs +mod jobs; + /// Utilities for easier development pub mod utils; @@ -46,6 +51,7 @@ async fn main() -> Result<(), Error> { match server.command() { Start(command) => command.dispatch(server).await?, Init(command) => command.dispatch(server).await?, + Replica(command) => command.dispatch(server).await?, }; Ok(()) diff --git a/spartan/src/node/mod.rs b/spartan/src/node/mod.rs index 82c0e33e..5c81c614 100644 --- a/spartan/src/node/mod.rs +++ b/spartan/src/node/mod.rs @@ -1,26 +1,22 @@ -/// Ctrl-C handler -pub mod exit; - -/// GC handler -pub mod gc; - /// Node manager pub mod manager; -/// Persistence handler -pub mod persistence; +/// Database replication +pub mod replication; -pub use exit::spawn_ctrlc_handler; pub use manager::Manager; -pub use persistence::{load_from_fs, persist_manager, spawn_persistence}; use crate::config::Config; use futures_util::lock::{Mutex, MutexGuard}; +use replication::{database::ReplicatedDatabase, storage::ReplicationStorage}; use spartan_lib::core::{db::tree::TreeDatabase, message::Message}; use std::collections::{hash_map::RandomState, HashMap}; -pub type DB = TreeDatabase; -type MutexDB = Mutex; +/// Conjucted type of replicated database based on tree database +pub type DB = ReplicatedDatabase>; + +/// Mutexed database +pub type MutexDB = Mutex; /// Key-value node implementation #[derive(Default)] @@ -36,19 +32,43 @@ impl<'a> Node<'a> { } /// Get locked queue instance - pub async fn get(&self, name: &str) -> Option>> { + pub async fn get(&self, name: &str) -> Option> { debug!("Obtaining queue \"{}\"", name); Some(self.queue(name)?.lock().await) } /// Add queue entry to node pub fn add(&mut self, name: &'a str) { + self.add_db(name, DB::default()) + } + + pub fn add_db(&mut self, name: &'a str, db: DB) { info!("Initializing queue \"{}\"", name); - self.db.insert(name, Mutex::new(TreeDatabase::default())); + self.db.insert(name, Mutex::new(db)); + } + + pub fn iter(&'a self) -> impl Iterator { + self.db.iter() } /// Load queues from config pub fn load_from_config(&mut self, config: &'a Config) { config.queues.iter().for_each(|queue| self.add(queue)); } + + pub async fn prepare_replication(&self, filter: F, replace: R) + where + F: Fn(&&ReplicationStorage) -> bool + Copy, + R: Fn() -> ReplicationStorage, + { + for (_, db) in self.iter() { + let mut db = db.lock().await; + + let storage = db.get_storage().as_ref().filter(filter); + + if storage.is_none() { + db.get_storage().replace(replace()); + } + } + } } diff --git a/spartan/src/node/replication/database.rs b/spartan/src/node/replication/database.rs new file mode 100644 index 00000000..3591d732 --- /dev/null +++ b/spartan/src/node/replication/database.rs @@ -0,0 +1,143 @@ +use super::storage::ReplicationStorage; +use crate::node::replication::event::Event; +use serde::{Deserialize, Serialize}; +use spartan_lib::core::{ + dispatcher::{ + simple::{Delete, PositionBasedDelete}, + SimpleDispatcher, StatusAwareDispatcher, + }, + message::Message, + payload::Identifiable, +}; +use std::ops::{Deref, DerefMut}; + +#[derive(Serialize, Deserialize)] +pub struct ReplicatedDatabase { + /// Proxied database + inner: DB, + + /// Replication storage + /// None if replication is not enabled + storage: Option, +} + +impl Default for ReplicatedDatabase +where + DB: Default, +{ + fn default() -> Self { + ReplicatedDatabase { + inner: DB::default(), + storage: None, + } + } +} + +impl Deref for ReplicatedDatabase { + type Target = DB; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl DerefMut for ReplicatedDatabase { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl ReplicatedDatabase { + fn call_storage(&mut self, f: F) + where + F: FnOnce(&mut ReplicationStorage), + { + self.storage.as_mut().map(f); + } + + fn push_event(&mut self, event: F) + where + F: FnOnce() -> Event, + { + self.call_storage(|storage| storage.map_primary(|storage| storage.push(event()))); + } + + fn gc(&mut self) { + self.call_storage(|storage| storage.map_primary(|storage| storage.gc())); + } + + pub fn get_storage(&mut self) -> &mut Option { + &mut self.storage + } +} + +impl SimpleDispatcher for ReplicatedDatabase +where + DB: SimpleDispatcher, +{ + fn push(&mut self, message: Message) { + self.push_event(|| Event::Push(message.clone())); + + self.inner.push(message) + } + + fn peek(&self) -> Option<&Message> { + self.inner.peek() + } + + fn gc(&mut self) { + self.push_event(|| Event::Gc); + + ReplicatedDatabase::gc(self); + self.inner.gc() + } + + fn size(&self) -> usize { + self.inner.size() + } + + fn clear(&mut self) { + self.push_event(|| Event::Clear); + + self.inner.clear() + } +} + +impl StatusAwareDispatcher for ReplicatedDatabase +where + DB: StatusAwareDispatcher, +{ + fn pop(&mut self) -> Option<&Message> { + self.push_event(|| Event::Pop); + + self.inner.pop() + } + + fn requeue(&mut self, id: ::Id) -> Option<()> { + self.push_event(|| Event::Requeue(id)); + + self.inner.requeue(id) + } +} + +impl Delete for ReplicatedDatabase +where + DB: Delete, +{ + fn delete(&mut self, id: ::Id) -> Option { + self.push_event(|| Event::Delete(id)); + + Delete::delete(&mut self.inner, id) + } +} + +impl PositionBasedDelete for ReplicatedDatabase +where + DB: PositionBasedDelete, +{ + fn delete(&mut self, id: ::Id) -> Option { + self.push_event(|| Event::Delete(id)); + + PositionBasedDelete::delete(&mut self.inner, id) + } +} diff --git a/spartan/src/node/replication/event.rs b/spartan/src/node/replication/event.rs new file mode 100644 index 00000000..501f5588 --- /dev/null +++ b/spartan/src/node/replication/event.rs @@ -0,0 +1,76 @@ +use crate::node::DB; +use maybe_owned::MaybeOwned; +use serde::{Deserialize, Serialize}; +use spartan_lib::core::{ + dispatcher::{simple::PositionBasedDelete, SimpleDispatcher, StatusAwareDispatcher}, + message::Message, + payload::Identifiable, +}; + +/// Database event +/// Only events that mutate database are present here +#[derive(Serialize, Deserialize)] +pub enum Event { + Push(Message), + Pop, + Requeue(::Id), + Delete(::Id), + Gc, + Clear, +} + +pub trait ApplyEvent { + /// Apply single event to database + fn apply_event(&mut self, event: Event); + + /// Apply slice of events to database + fn apply_events(&mut self, events: Box<[(MaybeOwned<'_, u64>, MaybeOwned<'_, Event>)]>); +} + +impl ApplyEvent for DB { + fn apply_event(&mut self, event: Event) { + let queue = &mut **self; + + match event { + Event::Push(message) => queue.push(message), + Event::Pop => { + queue.pop(); + } + Event::Requeue(id) => { + queue.requeue(id); + } + Event::Delete(id) => { + queue.delete(id); + } + Event::Gc => { + queue.gc(); + } + Event::Clear => { + queue.clear(); + } + } + } + + fn apply_events(&mut self, events: Box<[(MaybeOwned<'_, u64>, MaybeOwned<'_, Event>)]>) { + let index = events.last().map(|(index, _)| **index); + + // into_vec allows to use owned event + events + .into_vec() + .into_iter() + .for_each(|(_, event)| match event { + MaybeOwned::Owned(event) => self.apply_event(event), + MaybeOwned::Borrowed(_) => unreachable!(), + }); + + index.and_then(|index| { + self.get_storage() + .as_mut() + .expect("No storage provided") + .get_replica() + .confirm(index); + + Some(()) + }); + } +} diff --git a/spartan/src/node/replication/message.rs b/spartan/src/node/replication/message.rs new file mode 100644 index 00000000..d3e31946 --- /dev/null +++ b/spartan/src/node/replication/message.rs @@ -0,0 +1,24 @@ +//! See replication module documentation for messages description + +use crate::node::replication::event::Event; +use maybe_owned::MaybeOwned; +use serde::{Deserialize, Serialize}; +use std::borrow::Cow; + +#[derive(Serialize, Deserialize)] +pub enum PrimaryRequest<'a> { + Ping, + AskIndex, + SendRange( + Cow<'a, str>, + Box<[(MaybeOwned<'a, u64>, MaybeOwned<'a, Event>)]>, + ), +} + +#[derive(Serialize, Deserialize)] +pub enum ReplicaRequest<'a> { + Pong, + RecvIndex(Box<[(Box, u64)]>), + RecvRange, + QueueNotFound(Cow<'a, str>), +} diff --git a/spartan/src/node/replication/mod.rs b/spartan/src/node/replication/mod.rs new file mode 100644 index 00000000..0813ef31 --- /dev/null +++ b/spartan/src/node/replication/mod.rs @@ -0,0 +1,57 @@ +//! # General schema +//! +//! ``` +//! +-----------+ +---------------+ +//! | | | | +//! | Ping +----><----+ Pong | +//! | | | | +//! | AskIndex +----><----+ RecvIndex | +//! | | | | +//! | SendRange +----><--+-+ RecvRange | +//! | | ^ | | +//! +-----------+ +-+ QueueNotFound | +//! | | +//! +---------------+ +//! ``` +//! +//! This schema describes order, in which messages are sent between primary and replica nodes. +//! +//! Primary is on the left and always sends messages first, while replica node is on the right, and only responds to them. +//! +//! # Messages +//! +//! ## Ping and Pong +//! +//! These messages are used to check, if all replicas are still online. +//! +//! While just the TCP ping can be used, it's better to check if replica has the same replication protocol as primary. +//! +//! ## AskIndex and RecvIndex +//! +//! After we check replica health, we need to ask about last received index of each queue. +//! +//! Replica sends back RecvIndex message, that contains data about available queues and their indexes. +//! +//! ## SendRange and RecvRange (and probably QueueNotFound) +//! +//! With `queue = index` array of each replica available, primary node takes new events from event log of each queue, and sends it to replica. +//! +//! Replica responds with either RecvRange in case if everything is OK, or with QueueNotFound in case if primary node sent queue, that doesn't exist. + +/// Replication storage (event log) +pub mod storage; + +/// Database event +pub mod event; + +/// Replication proxy database +pub mod database; + +/// Replication TCP messages +pub mod message; + +/// Primary node +pub mod primary; + +/// Replica node +pub mod replica; diff --git a/spartan/src/node/replication/primary/error.rs b/spartan/src/node/replication/primary/error.rs new file mode 100644 index 00000000..29e5a48f --- /dev/null +++ b/spartan/src/node/replication/primary/error.rs @@ -0,0 +1,19 @@ +use bincode::ErrorKind; +use thiserror::Error; +use tokio::io::Error as IoError; + +#[derive(Error, Debug)] +pub enum PrimaryError { + #[error("Unable to serialize stream message: {0}")] + SerializationError(Box), + #[error("TCP connection error: {0}")] + SocketError(IoError), + #[error("TCP socket is empty")] + EmptySocket, + #[error("Protocol mismatch")] + ProtocolMismatch, + #[error("Queue configuration mismatch")] + QueueConfigMismatch, +} + +pub type PrimaryResult = Result; diff --git a/spartan/src/node/replication/primary/index.rs b/spartan/src/node/replication/primary/index.rs new file mode 100644 index 00000000..bcb2cdaf --- /dev/null +++ b/spartan/src/node/replication/primary/index.rs @@ -0,0 +1,105 @@ +use super::{ + error::{PrimaryError, PrimaryResult}, + stream::Stream, +}; +use crate::node::Manager; +use futures_util::{stream::iter, StreamExt, TryStreamExt}; +use itertools::Itertools; +use std::{ + collections::hash_map::DefaultHasher, + hash::{Hash, Hasher}, +}; + +pub struct RecvIndex<'a> { + stream: &'a mut Stream, + indexes: Box<[(Box, u64)]>, +} + +impl<'a> RecvIndex<'a> { + pub fn new(stream: &'a mut Stream, indexes: Box<[(Box, u64)]>) -> Self { + RecvIndex { stream, indexes } + } + + pub async fn sync(&mut self, manager: &Manager<'_>) -> PrimaryResult<()> { + for (name, start) in self.indexes.iter() { + self.stream + .send_range( + name, + manager + .queue(name) + .await + .map_err(|_| PrimaryError::QueueConfigMismatch)? + .get_storage() + .as_mut() + .expect("Replication storage is uninitialized") + .get_primary() + .slice(*start), + ) + .await?; + } + + Ok(()) + } +} + +pub struct BatchAskIndex<'a> { + batch: Vec>, +} + +impl<'a> BatchAskIndex<'a> { + pub fn with_capacity(capacity: usize) -> Self { + BatchAskIndex { + batch: Vec::with_capacity(capacity), + } + } + + pub fn push(&mut self, index: RecvIndex<'a>) { + self.batch.push(index); + } + + pub async fn sync(&mut self, manager: &Manager<'_>) -> PrimaryResult<()> { + iter(self.batch.iter_mut()) + .map(Ok) + .try_for_each_concurrent(None, |host| async move { host.sync(manager).await }) + .await?; + + Ok(()) + } + + /// Set GC threshold of each queue to minimal index of all replica's + /// + /// Example: + /// + /// ```no_run + /// First replica: [("TestQueue", 2), ("NextQueue", 3)] + /// Second replica: [("TestQueue", 1), ("AnotherQueue", 4)] + /// + /// Result: [("AnotherQueue", 4), ("NextQueue", 3), ("TestQueue", 1)] + /// ``` + pub async fn set_gc(&self, manager: &Manager<'_>) { + let iter = self + .batch + .iter() + .map(|index| index.indexes.iter()) + .flatten() + .sorted_by(|a, b| Ord::cmp(a, b)) + .unique_by(|(name, _)| { + let mut hasher = DefaultHasher::new(); + name.hash(&mut hasher); + hasher.finish() + }); + + for (queue, index) in iter { + let mut m = manager + .queue(&queue) + .await + .expect("set_gc called without sync before"); + + m.get_storage() + .as_mut() + .unwrap() + .get_primary() + .set_gc_threshold(*index); + } + } +} diff --git a/spartan/src/node/replication/primary/mod.rs b/spartan/src/node/replication/primary/mod.rs new file mode 100644 index 00000000..b579ea77 --- /dev/null +++ b/spartan/src/node/replication/primary/mod.rs @@ -0,0 +1,8 @@ +/// Primary error +pub mod error; + +/// Queue index exchange +pub mod index; + +/// Network stream +pub mod stream; diff --git a/spartan/src/node/replication/primary/stream.rs b/spartan/src/node/replication/primary/stream.rs new file mode 100644 index 00000000..b691388b --- /dev/null +++ b/spartan/src/node/replication/primary/stream.rs @@ -0,0 +1,115 @@ +use super::{ + error::{PrimaryError, PrimaryResult}, + index::{BatchAskIndex, RecvIndex}, +}; +use crate::{ + config::replication::Primary, + node::replication::{ + event::Event, + message::{PrimaryRequest, ReplicaRequest}, + }, +}; +use bincode::{deserialize, serialize}; +use futures_util::{stream::iter, SinkExt, StreamExt, TryStreamExt}; +use maybe_owned::MaybeOwned; +use std::borrow::Cow; +use tokio::net::TcpStream; +use tokio_util::codec::{BytesCodec, Decoder, Framed}; + +pub struct Stream(Framed); + +pub struct StreamPool(Box<[Stream]>); + +impl<'a> Stream { + fn serialize(message: &PrimaryRequest) -> PrimaryResult> { + serialize(&message).map_err(PrimaryError::SerializationError) + } + + pub async fn exchange( + &mut self, + message: &PrimaryRequest<'_>, + ) -> PrimaryResult> { + self.0 + .send(Self::serialize(message)?.into()) + .await + .map_err(PrimaryError::SocketError)?; + + self.0.flush().await.map_err(PrimaryError::SocketError)?; + + let buf = match self.0.next().await { + Some(r) => r.map_err(PrimaryError::SocketError)?, + None => return Err(PrimaryError::EmptySocket), + }; + + Ok(deserialize(&buf).map_err(PrimaryError::SerializationError)?) + } + + async fn ping(&mut self) -> PrimaryResult<()> { + match self.exchange(&PrimaryRequest::Ping).await? { + ReplicaRequest::Pong => Ok(()), + _ => Err(PrimaryError::ProtocolMismatch), + } + } + + async fn ask(&'a mut self) -> PrimaryResult> { + match self.exchange(&PrimaryRequest::AskIndex).await? { + ReplicaRequest::RecvIndex(recv) => Ok(RecvIndex::new(self, recv)), + _ => Err(PrimaryError::ProtocolMismatch), + } + } + + pub(super) async fn send_range( + &mut self, + queue: &str, + range: Box<[(MaybeOwned<'a, u64>, MaybeOwned<'a, Event>)]>, + ) -> PrimaryResult<()> { + match self + .exchange(&PrimaryRequest::SendRange(Cow::Borrowed(queue), range)) + .await? + { + ReplicaRequest::RecvRange => Ok(()), + ReplicaRequest::QueueNotFound(queue) => { + warn!("Queue {} not found on replica", queue); + Ok(()) + } + _ => Err(PrimaryError::ProtocolMismatch), + } + } +} + +impl<'a> StreamPool { + pub async fn from_config(config: &Primary) -> PrimaryResult { + let mut pool = Vec::with_capacity(config.destination.len()); + + for host in &*config.destination { + pool.push(Stream( + BytesCodec::new().framed( + TcpStream::connect(host) + .await + .map_err(PrimaryError::SocketError)?, + ), + )); + } + + Ok(StreamPool(pool.into_boxed_slice())) + } + + pub async fn ping(&mut self) -> PrimaryResult<()> { + iter(self.0.iter_mut()) + .map(Ok) + .try_for_each_concurrent(None, |stream| async move { stream.ping().await }) + .await?; + + Ok(()) + } + + pub async fn ask(&'a mut self) -> PrimaryResult> { + let mut batch = BatchAskIndex::with_capacity(self.0.len()); + + for host in &mut *self.0 { + batch.push(host.ask().await?); + } + + Ok(batch) + } +} diff --git a/spartan/src/node/replication/replica/error.rs b/spartan/src/node/replication/replica/error.rs new file mode 100644 index 00000000..573dac0c --- /dev/null +++ b/spartan/src/node/replication/replica/error.rs @@ -0,0 +1,20 @@ +use crate::jobs::persistence::PersistenceError; +use bincode::ErrorKind; +use thiserror::Error; +use tokio::io::Error as IoError; + +#[derive(Error, Debug)] +pub enum ReplicaError { + #[error("Manager persistence error: {0}")] + PersistenceError(PersistenceError), + #[error("Unable to find replica node config")] + ReplicaConfigNotFound, + #[error("TCP socket error: {0}")] + SocketError(IoError), + #[error("Empty TCP socket")] + EmptySocket, + #[error("Packet serialization error: {0}")] + SerializationError(Box), +} + +pub type ReplicaResult = Result; diff --git a/spartan/src/node/replication/replica/mod.rs b/spartan/src/node/replication/replica/mod.rs new file mode 100644 index 00000000..c877c491 --- /dev/null +++ b/spartan/src/node/replication/replica/mod.rs @@ -0,0 +1,126 @@ +/// Replica error +pub mod error; + +use crate::{ + config::replication::Replica, + node::{ + replication::{ + event::ApplyEvent, + message::{PrimaryRequest, ReplicaRequest}, + }, + Manager, + }, +}; +use actix_rt::time::delay_for; +use bincode::{deserialize, serialize}; +use error::{ReplicaError, ReplicaResult}; +use futures_util::{SinkExt, StreamExt}; +use std::{future::Future, time::Duration}; +use tokio::net::TcpStream; +use tokio_util::codec::{BytesCodec, Decoder, Framed}; + +pub struct ReplicaSocket<'a> { + manager: &'a Manager<'a>, + config: &'a Replica, + socket: Framed, +} + +impl<'a> ReplicaSocket<'a> { + pub fn new(manager: &'a Manager<'a>, config: &'a Replica, socket: TcpStream) -> Self { + ReplicaSocket { + manager, + config, + socket: BytesCodec::new().framed(socket), + } + } + + pub async fn exchange(&mut self, f: F) + where + F: Fn(PrimaryRequest<'static>, &'a Manager<'a>) -> Fut + Copy, + Fut: Future>, + { + let timer = Duration::from_secs(self.config.try_timer); + + loop { + delay_for(timer).await; + + match self.process(f).await { + Err(ReplicaError::EmptySocket) => { + error!("Empty TCP socket"); + return; + } + Err(e) => error!("Error occured during replication process: {}", e), + _ => (), + } + } + } + + async fn process(&mut self, f: F) -> ReplicaResult<()> + where + F: Fn(PrimaryRequest<'static>, &'a Manager<'a>) -> Fut, + Fut: Future>, + { + let buf = match self.socket.next().await { + Some(r) => r.map_err(ReplicaError::SocketError)?, + None => return Err(ReplicaError::EmptySocket), + }; + + let request = f( + deserialize(&buf).map_err(ReplicaError::SerializationError)?, + self.manager, + ) + .await; + + self.socket + .send( + serialize(&request) + .map_err(ReplicaError::SerializationError)? + .into(), + ) + .await + .map_err(ReplicaError::SocketError)?; + + self.socket + .flush() + .await + .map_err(ReplicaError::SocketError)?; + + Ok(()) + } +} + +pub async fn accept_connection<'a>( + request: PrimaryRequest<'static>, + manager: &Manager<'a>, +) -> ReplicaRequest<'a> { + match request { + PrimaryRequest::Ping => ReplicaRequest::Pong, + PrimaryRequest::AskIndex => { + debug!("Preparing indexes for primary node."); + let mut indexes = Vec::with_capacity(manager.config.queues.len()); + + for (name, db) in manager.node.iter() { + let index = db + .lock() + .await + .get_storage() + .as_mut() + .expect("No database present") + .get_replica() + .get_index(); + + indexes.push((name.to_string().into_boxed_str(), index)); + } + + ReplicaRequest::RecvIndex(indexes.into_boxed_slice()) + } + PrimaryRequest::SendRange(queue, range) => match manager.queue(&queue).await { + Ok(mut db) => { + debug!("Applying event slice."); + db.apply_events(range); + ReplicaRequest::RecvRange + } + Err(_) => ReplicaRequest::QueueNotFound(queue), + }, + } +} diff --git a/spartan/src/node/replication/storage/mod.rs b/spartan/src/node/replication/storage/mod.rs new file mode 100644 index 00000000..bb96f07e --- /dev/null +++ b/spartan/src/node/replication/storage/mod.rs @@ -0,0 +1,41 @@ +/// Storage for replication primary host +pub mod primary; + +/// Storage for replica's +pub mod replica; + +use primary::PrimaryStorage; +use replica::ReplicaStorage; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize)] +pub enum ReplicationStorage { + Primary(PrimaryStorage), + Replica(ReplicaStorage), +} + +impl ReplicationStorage { + pub fn get_primary(&mut self) -> &mut PrimaryStorage { + match self { + ReplicationStorage::Primary(storage) => storage, + _ => panic!("Replication storage is in replica mode."), + } + } + + pub fn get_replica(&mut self) -> &mut ReplicaStorage { + match self { + ReplicationStorage::Replica(storage) => storage, + _ => panic!("Replication storage is in primary mode."), + } + } + + pub fn map_primary(&mut self, f: F) + where + F: FnOnce(&mut PrimaryStorage), + { + match self { + ReplicationStorage::Primary(storage) => f(storage), + ReplicationStorage::Replica(_) => (), + } + } +} diff --git a/spartan/src/node/replication/storage/primary.rs b/spartan/src/node/replication/storage/primary.rs new file mode 100644 index 00000000..fc6f5d64 --- /dev/null +++ b/spartan/src/node/replication/storage/primary.rs @@ -0,0 +1,67 @@ +use crate::node::replication::event::Event; +use maybe_owned::MaybeOwned; +use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; + +#[derive(Serialize, Deserialize)] +pub struct PrimaryStorage { + next_index: u64, + gc_threshold: u64, + log: BTreeMap, +} + +impl Default for PrimaryStorage { + fn default() -> Self { + PrimaryStorage { + next_index: 1, + gc_threshold: 0, + log: BTreeMap::new(), + } + } +} + +impl PrimaryStorage { + pub fn push(&mut self, event: Event) { + self.log.insert(self.next_index, event); + self.next_index += 1; + } + + pub fn gc(&mut self) { + let gc_threshold = self.gc_threshold; + + self.log + .drain_filter(|index, _| *index <= gc_threshold) + .for_each(drop); + } + + pub fn slice(&self, start: u64) -> Box<[(MaybeOwned<'_, u64>, MaybeOwned<'_, Event>)]> { + self.log + .range(start..) + .map(|(k, v)| (MaybeOwned::Borrowed(k), MaybeOwned::Borrowed(v))) + .collect() + } + + pub fn set_gc_threshold(&mut self, threshold: u64) { + self.gc_threshold = threshold; + } +} + +#[cfg(test)] +mod tests { + use super::PrimaryStorage; + use crate::node::replication::event::Event; + + #[test] + fn test_gc() { + let mut storage = PrimaryStorage::default(); + + for _ in 0..6 { + storage.push(Event::Pop); + } + + storage.gc_threshold = 4; + storage.gc(); + + assert_eq!(storage.log.iter().map(|(k, _)| k).next(), Some(&5)); + } +} diff --git a/spartan/src/node/replication/storage/replica.rs b/spartan/src/node/replication/storage/replica.rs new file mode 100644 index 00000000..6098603f --- /dev/null +++ b/spartan/src/node/replication/storage/replica.rs @@ -0,0 +1,22 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize)] +pub struct ReplicaStorage { + confirmed_index: u64, +} + +impl Default for ReplicaStorage { + fn default() -> Self { + ReplicaStorage { confirmed_index: 0 } + } +} + +impl ReplicaStorage { + pub fn get_index(&self) -> u64 { + self.confirmed_index + 1 + } + + pub fn confirm(&mut self, index: u64) { + self.confirmed_index = index; + } +}