diff --git a/Cargo.lock b/Cargo.lock index 436138462..ea6b0636a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23,7 +23,7 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "cipher", "cpufeatures", ] @@ -45,7 +45,7 @@ version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "const-random", "getrandom", "once_cell", @@ -946,7 +946,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a" dependencies = [ "addr2line", - "cfg-if", + "cfg-if 1.0.0", "libc", "miniz_oxide", "object", @@ -1114,7 +1114,7 @@ dependencies = [ "arrayref", "arrayvec", "cc", - "cfg-if", + "cfg-if 1.0.0", "constant_time_eq 0.3.1", ] @@ -1335,6 +1335,12 @@ dependencies = [ "nom", ] +[[package]] +name = "cfg-if" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" + [[package]] name = "cfg-if" version = "1.0.0" @@ -1740,7 +1746,7 @@ version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96e58d342ad113c2b878f16d5d034c03be492ae460cdbc02b7f0f2284d310c7d" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", ] [[package]] @@ -1773,7 +1779,7 @@ version = "1.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a97769d94ddab943e4510d138150169a2758b5ef3eb191a9ee688de3e23ef7b3" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", ] [[package]] @@ -2025,7 +2031,7 @@ version = "6.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "crossbeam-utils", "hashbrown 0.14.5", "lock_api", @@ -2074,7 +2080,7 @@ dependencies = [ "parking_lot", "paste", "pin-project-lite", - "rand", + "rand 0.8.5", "sqlparser", "tempfile", "tokio", @@ -2146,7 +2152,7 @@ dependencies = [ "log", "object_store", "parking_lot", - "rand", + "rand 0.8.5", "tempfile", "url", ] @@ -2204,7 +2210,7 @@ dependencies = [ "itertools 0.13.0", "log", "md-5", - "rand", + "rand 0.8.5", "regex", "sha2", "unicode-segmentation", @@ -2243,7 +2249,7 @@ dependencies = [ "datafusion-common", "datafusion-expr-common", "datafusion-physical-expr-common", - "rand", + "rand 0.8.5", ] [[package]] @@ -2321,7 +2327,7 @@ dependencies = [ "datafusion-common", "datafusion-expr-common", "hashbrown 0.14.5", - "rand", + "rand 0.8.5", ] [[package]] @@ -2369,7 +2375,7 @@ dependencies = [ "once_cell", "parking_lot", "pin-project-lite", - "rand", + "rand 0.8.5", "tokio", ] @@ -2756,6 +2762,28 @@ dependencies = [ "regex-syntax 0.8.5", ] +[[package]] +name = "fasthash" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "032213946b4eaae09117ec63f020322b78ca7a31d8aa2cf64df3032e1579690f" +dependencies = [ + "cfg-if 0.1.10", + "fasthash-sys", + "num-traits", + "seahash 3.0.7", + "xoroshiro128", +] + +[[package]] +name = "fasthash-sys" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6de941abfe2e715cdd34009d90546f850597eb69ca628ddfbf616e53dda28f8" +dependencies = [ + "gcc", +] + [[package]] name = "fastrand" version = "2.3.0" @@ -2782,7 +2810,7 @@ version = "0.2.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35c0522e981e68cbfa8c3f978441a5f34b30b96e146b33cd3359176b50fe8586" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "libc", "libredox", "windows-sys 0.59.0", @@ -2905,6 +2933,12 @@ dependencies = [ "libc", ] +[[package]] +name = "fuchsia-cprng" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" + [[package]] name = "funty" version = "2.0.0" @@ -3006,6 +3040,12 @@ dependencies = [ "slab", ] +[[package]] +name = "gcc" +version = "0.3.55" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f5f3913fa0bfe7ee1fd8248b6b9f42a5af4b9d65ec2dd2c3c26132b950ecfc2" + [[package]] name = "generic-array" version = "0.14.7" @@ -3022,7 +3062,7 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "js-sys", "libc", "wasi", @@ -3108,7 +3148,7 @@ version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6dd08c532ae367adf81c312a4580bc67f1d0fe8bc9c460520283f4c0ff277888" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "crunchy", "num-traits", ] @@ -3200,7 +3240,7 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f9c7c7c8ac16c798734b8a24560c1362120597c40d5e1459f09498f8f6c8f2ba" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "libc", "windows", ] @@ -3707,7 +3747,7 @@ version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "js-sys", "wasm-bindgen", "web-sys", @@ -4038,7 +4078,7 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "windows-targets 0.52.6", ] @@ -4168,7 +4208,7 @@ version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "digest", ] @@ -4339,7 +4379,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "63a11d485edf0f3f04a508615d36c7d50d299cf61a7ee6d3e2530651e0a31771" dependencies = [ "cc", - "cfg-if", + "cfg-if 1.0.0", "pkg-config", ] @@ -4388,7 +4428,7 @@ version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39a6bfcc6c8c7eed5ee98b9c3e33adc726054389233e201c95dab2d41a3839d2" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "downcast", "fragile", "mockall_derive", @@ -4402,7 +4442,7 @@ version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "25ca3004c2efe9011bd4e461bd8256445052b9615405b4f7ea43fc8ca5c20898" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "proc-macro2", "quote", "syn 2.0.90", @@ -4451,7 +4491,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "598beaf3cc6fdd9a5dfb1630c2800c7acd31df7aaf0f565796fba2b53ca1af1b" dependencies = [ "bitflags 1.3.2", - "cfg-if", + "cfg-if 1.0.0", "libc", ] @@ -4462,7 +4502,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" dependencies = [ "bitflags 2.6.0", - "cfg-if", + "cfg-if 1.0.0", "cfg_aliases", "libc", ] @@ -4705,7 +4745,7 @@ dependencies = [ "parking_lot", "percent-encoding", "quick-xml 0.36.2", - "rand", + "rand 0.8.5", "reqwest", "ring", "serde", @@ -4727,7 +4767,7 @@ dependencies = [ "async-trait", "base64 0.21.7", "bytes", - "cfg-if", + "cfg-if 1.0.0", "chrono", "either", "futures", @@ -4933,7 +4973,7 @@ dependencies = [ "glob", "opentelemetry", "percent-encoding", - "rand", + "rand 0.8.5", "serde_json", "thiserror 1.0.69", "tokio", @@ -4990,7 +5030,7 @@ version = "0.9.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "libc", "redox_syscall", "smallvec", @@ -5013,7 +5053,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7676374caaee8a325c9e7a2ae557f216c5563a171d6997b0ef8a65af35147700" dependencies = [ "base64ct", - "rand_core", + "rand_core 0.6.4", "subtle", ] @@ -5115,7 +5155,7 @@ dependencies = [ "lazy-regex", "md5", "postgres-types", - "rand", + "rand 0.8.5", "ring", "rust_decimal", "thiserror 1.0.69", @@ -5150,7 +5190,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48e4cc64c2ad9ebe670cb8fd69dd50ae301650392e81c05f9bfcb2d5bdbc24b0" dependencies = [ "phf_shared", - "rand", + "rand 0.8.5", ] [[package]] @@ -5247,7 +5287,7 @@ dependencies = [ "hmac", "md-5", "memchr", - "rand", + "rand 0.8.5", "sha2", "stringprep", ] @@ -5279,7 +5319,7 @@ checksum = "ebbe2f8898beba44815fdc9e5a4ae9c929e21c5dc29b0c774a15555f7f58d6d0" dependencies = [ "aligned-vec", "backtrace", - "cfg-if", + "cfg-if 1.0.0", "criterion", "findshlibs", "inferno", @@ -5626,7 +5666,7 @@ checksum = "a2fe5ef3495d7d2e377ff17b1a8ce2ee2ec2a18cde8b6ad6619d65d0701c135d" dependencies = [ "bytes", "getrandom", - "rand", + "rand 0.8.5", "ring", "rustc-hash 2.1.0", "rustls 0.23.19", @@ -5677,6 +5717,19 @@ dependencies = [ "nibble_vec", ] +[[package]] +name = "rand" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293" +dependencies = [ + "fuchsia-cprng", + "libc", + "rand_core 0.3.1", + "rdrand", + "winapi", +] + [[package]] name = "rand" version = "0.8.5" @@ -5685,7 +5738,7 @@ checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", "rand_chacha", - "rand_core", + "rand_core 0.6.4", ] [[package]] @@ -5695,9 +5748,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_core" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b" +dependencies = [ + "rand_core 0.4.2", ] +[[package]] +name = "rand_core" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc" + [[package]] name = "rand_core" version = "0.6.4" @@ -5766,6 +5834,15 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "rdrand" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2" +dependencies = [ + "rand_core 0.3.1", +] + [[package]] name = "redox_syscall" version = "0.5.7" @@ -5979,7 +6056,7 @@ dependencies = [ "prost", "prost-dto", "prost-types", - "rand", + "rand 0.8.5", "restate-admin-rest-model", "restate-bifrost", "restate-core", @@ -6049,7 +6126,7 @@ dependencies = [ "futures-util", "http 1.2.0", "pprof", - "rand", + "rand 0.8.5", "reqwest", "restate-core", "restate-node", @@ -6086,7 +6163,7 @@ dependencies = [ "pin-project", "pprof", "prost", - "rand", + "rand 0.8.5", "restate-core", "restate-log-server", "restate-metadata-store", @@ -6238,7 +6315,7 @@ dependencies = [ "pin-project-lite", "prost", "prost-types", - "rand", + "rand 0.8.5", "restate-core-derive", "restate-test-util", "restate-types", @@ -6458,7 +6535,7 @@ dependencies = [ "http 1.2.0", "itertools 0.13.0", "nix 0.29.0", - "rand", + "rand 0.8.5", "regex", "reqwest", "restate-metadata-store", @@ -6639,7 +6716,7 @@ dependencies = [ "prost", "prost-build", "prost-types", - "rand", + "rand 0.8.5", "restate-core", "restate-errors", "restate-rocksdb", @@ -7014,6 +7091,7 @@ dependencies = [ "dyn-clone", "enum-map", "enumset", + "fasthash", "figment", "flexbuffers", "googletest", @@ -7023,6 +7101,7 @@ dependencies = [ "humantime", "itertools 0.13.0", "jsonptr", + "md5", "moka", "notify", "notify-debouncer-mini", @@ -7035,7 +7114,7 @@ dependencies = [ "prost-build", "prost-dto", "prost-types", - "rand", + "rand 0.8.5", "regex", "regress 0.10.1", "restate-base64-util", @@ -7144,7 +7223,7 @@ dependencies = [ "parking_lot", "pin-project", "prost", - "rand", + "rand 0.8.5", "restate-bifrost", "restate-core", "restate-errors", @@ -7207,7 +7286,7 @@ dependencies = [ "itertools 0.13.0", "json-patch", "prost-types", - "rand", + "rand 0.8.5", "restate-admin", "restate-bifrost", "restate-cli-util", @@ -7262,7 +7341,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d" dependencies = [ "cc", - "cfg-if", + "cfg-if 1.0.0", "getrandom", "libc", "spin", @@ -7283,7 +7362,7 @@ dependencies = [ "ptr_meta", "rend", "rkyv_derive", - "seahash", + "seahash 4.1.0", "tinyvec", "uuid", ] @@ -7326,7 +7405,7 @@ version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "825ea780781b15345a146be27eaefb05085e337e869bff01b4306a4fd4a9ad5a" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "glob", "proc-macro-crate 3.2.0", "proc-macro2", @@ -7377,7 +7456,7 @@ dependencies = [ "bytes", "num-traits", "postgres-types", - "rand", + "rand 0.8.5", "rkyv", "serde", "serde_json", @@ -7595,6 +7674,12 @@ dependencies = [ "untrusted", ] +[[package]] +name = "seahash" +version = "3.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58f57ca1d128a43733fd71d583e837b1f22239a37ebea09cde11d8d9a9080f47" + [[package]] name = "seahash" version = "4.1.0" @@ -7788,7 +7873,7 @@ version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "cpufeatures", "digest", ] @@ -7799,7 +7884,7 @@ version = "0.10.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "cpufeatures", "digest", ] @@ -8178,7 +8263,7 @@ version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28cce251fcbc87fac86a866eeb0d6c2d536fc16d06f184bb61aeae11aa4cee0c" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "fastrand", "once_cell", "rustix", @@ -8299,7 +8384,7 @@ version = "1.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "once_cell", ] @@ -8646,7 +8731,7 @@ dependencies = [ "indexmap 1.9.3", "pin-project", "pin-project-lite", - "rand", + "rand 0.8.5", "slab", "tokio", "tokio-util", @@ -8876,7 +8961,7 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "static_assertions", ] @@ -8960,7 +9045,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04f903f293d11f31c0c29e4148f6dc0d033a7f80cebc0282bea147611667d289" dependencies = [ "getrandom", - "rand", + "rand 0.8.5", "web-time", ] @@ -9113,7 +9198,7 @@ checksum = "2990d9ea5967266ea0ccf413a4aa5c42a93dbcfda9cb49a97de6931726b12566" dependencies = [ "anyhow", "cargo_metadata", - "cfg-if", + "cfg-if 1.0.0", "regex", "rustversion", "time", @@ -9162,7 +9247,7 @@ version = "0.2.99" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a474f6281d1d70c17ae7aa6a613c87fce69a127e2624002df63dcb39d6cf6396" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "once_cell", "wasm-bindgen-macro", ] @@ -9187,7 +9272,7 @@ version = "0.4.49" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38176d9b44ea84e9184eff0bc34cc167ed044f816accfe5922e54d84cf48eca2" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "js-sys", "once_cell", "wasm-bindgen", @@ -9538,6 +9623,15 @@ version = "0.13.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" +[[package]] +name = "xoroshiro128" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0eeda34baec49c4f1eb2c04d59b761582fd6330010f9330ca696ca1a355dfcd" +dependencies = [ + "rand 0.4.6", +] + [[package]] name = "xtask" version = "1.1.6" @@ -9719,7 +9813,7 @@ dependencies = [ "lzma-rs", "memchr", "pbkdf2 0.12.2", - "rand", + "rand 0.8.5", "sha1", "thiserror 2.0.6", "time", diff --git a/Cargo.toml b/Cargo.toml index 12f59736b..980d3c143 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -112,6 +112,7 @@ downcast-rs = { version = "1.2.1" } enum-map = { version = "2.7.3" } enumset = { version = "1.1.3" } etcd-client = { version = "0.14" } +fasthash = { version = "0.4" } flexbuffers = { version = "2.0.0" } futures = "0.3.25" futures-sink = "0.3.25" @@ -135,6 +136,7 @@ hyper-rustls = { version = "0.27.2", default-features = false, features = [ hyper-util = { version = "0.1" } itertools = "0.13.0" jsonschema = "0.26.0" +md5 = { version = "0.7" } metrics = { version = "0.24" } metrics-tracing-context = { version = "0.17.0" } metrics-exporter-prometheus = { version = "0.16", default-features = false, features = [ diff --git a/crates/types/Cargo.toml b/crates/types/Cargo.toml index a200af27d..314a4bcc7 100644 --- a/crates/types/Cargo.toml +++ b/crates/types/Cargo.toml @@ -37,6 +37,7 @@ downcast-rs = { workspace = true } dyn-clone = { version = "1.0" } enum-map = { workspace = true } enumset = { workspace = true, features = ["serde"] } +fasthash = { workspace = true } figment = { version = "0.10.8", features = ["env", "toml"] } flexbuffers = { workspace = true } hostname = { workspace = true } @@ -44,6 +45,7 @@ http = { workspace = true } http-serde = { workspace = true } humantime = { workspace = true } itertools = { workspace = true } +md5 = { workspace = true} moka = { workspace = true, features = ["sync", "logging"] } notify = { version = "7.0.0" } notify-debouncer-mini = { version = "0.5.0" } diff --git a/crates/types/src/partition_table.rs b/crates/types/src/partition_table.rs index 699f998b5..8cc1e273d 100644 --- a/crates/types/src/partition_table.rs +++ b/crates/types/src/partition_table.rs @@ -8,19 +8,24 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashSet}; use std::fmt::Display; +use std::hash::{Hash, Hasher}; +use std::io::Write; use std::num::{NonZero, NonZeroU32}; use std::ops::RangeInclusive; use std::str::FromStr; use std::sync::LazyLock; use anyhow::Context; +use fasthash::{murmur, FastHasher}; use regex::Regex; +use serde_with::{serde_as, FromInto}; use crate::identifiers::{PartitionId, PartitionKey}; +use crate::nodes_config::{NodesConfiguration, Role}; use crate::protobuf::cluster_configuration::ReplicationStrategy as ProtoReplicationStrategy; -use crate::{flexbuffers_storage_encode_decode, Version, Versioned}; +use crate::{flexbuffers_storage_encode_decode, PlainNodeId, Version, Versioned}; static REPLICATION_STRATEGY_FACTOR_PATTERN: LazyLock = LazyLock::new(|| { Regex::new(r"^(?i)factor\(\s*(?\d+)\s*\)$").expect("is valid pattern") @@ -53,6 +58,7 @@ impl From for RangeInclusive { #[serde(try_from = "PartitionTableShadow", into = "PartitionTableShadow")] pub struct PartitionTable { version: Version, + nodes_version: Version, partitions: BTreeMap, // Interval-map like structure which maps the inclusive end partition key of a partition to its // [`PartitionId`]. To validate that a partition key falls into a partition one also needs to @@ -67,6 +73,7 @@ impl Default for PartitionTable { fn default() -> Self { Self { version: Version::INVALID, + nodes_version: Version::INVALID, partitions: BTreeMap::default(), partition_key_index: BTreeMap::default(), replication_strategy: ReplicationStrategy::default(), @@ -166,14 +173,34 @@ impl FindPartition for PartitionTable { } } +#[serde_as] +#[derive( + Debug, + Clone, + Default, + Eq, + PartialEq, + serde::Serialize, + serde::Deserialize, + derive_more::Deref, + derive_more::DerefMut, + derive_more::From, +)] +pub struct ReplicaGroup(#[serde_as(as = "HashSet>")] HashSet); + #[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)] pub struct Partition { pub key_range: RangeInclusive, + #[serde(default)] + pub replica_group: ReplicaGroup, } impl Partition { pub fn new(key_range: RangeInclusive) -> Self { - Self { key_range } + Self { + key_range, + replica_group: ReplicaGroup::default(), + } } } @@ -282,6 +309,63 @@ impl PartitionTableBuilder { } } + pub fn set_partitions_placements(&mut self, nodes_config: &NodesConfiguration) { + // sanity check: if the nodes config version is older than the partition table nodes_version, + if nodes_config.version() <= self.inner.nodes_version { + return; + } + + let workers: Vec = nodes_config + .iter() + .filter_map(|(node_id, node_config)| { + node_config.has_role(Role::Worker).then_some(node_id) + }) + .collect(); + + if workers.is_empty() { + return; + } + + // Note: + // The replication strategy does not influence how partitions are assigned to worker nodes. + // It's the worker that determines whether it should start processing a partition based on the replication strategy. + // The first active node in the worker set will act as the leader for the partition, while all other nodes + // will assume the role of followers according to the strategy. + + // Note: We use the murmur hash function to deterministically assign partitions to workers. + // the murmur(md5(start_key:node_id)) shows the most stable distribution of partitions across workers. + // and avoids the need to shuffle partitions when the number of workers changes. + + for (_, partition) in self.inner.partitions_mut() { + let mut nodes = vec![]; + for node in workers.iter() { + let mut md5_ctx = md5::Context::new(); + write!( + &mut md5_ctx, + "{}:{}", + partition.key_range.start(), + u32::from(*node) + ) + .unwrap(); + + let mut state = murmur::Hasher32::new(); + md5_ctx.compute().hash(&mut state); + let hash = state.finish(); + nodes.push((*node, hash)); + } + + // sort the nodes in a descending order based on the hash value + nodes.sort_by(|a, b| b.1.cmp(&a.1)); + + let replica_group = ReplicaGroup(nodes.into_iter().map(|n| n.0).collect()); + + if replica_group != partition.replica_group { + self.modified = true; + partition.replica_group = replica_group; + } + } + } + /// Builds the new [`PartitionTable`] with an incremented version. pub fn build(mut self) -> PartitionTable { self.inner.version = Version::MIN.max(self.inner.version.next()); @@ -295,6 +379,7 @@ impl PartitionTableBuilder { None } + /// Builds the new [`PartitionTable`] with the same version. fn build_with_same_version(self) -> PartitionTable { self.inner