From 12b10fb7d30a91b153c66a6a8aebf8a523d05e37 Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Tue, 31 Dec 2024 17:44:26 +0200 Subject: [PATCH] Add trim gap handling end-to-end test --- Cargo.lock | 3 + server/Cargo.toml | 3 + server/tests/trim_gap_handling.rs | 288 ++++++++++++++++++++++++++++++ 3 files changed, 294 insertions(+) create mode 100644 server/tests/trim_gap_handling.rs diff --git a/Cargo.lock b/Cargo.lock index f2e0ed8b2..3dafbc87e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6735,8 +6735,10 @@ dependencies = [ "googletest", "humantime", "hyper-util", + "mock-service-endpoint", "pin-project", "regex", + "reqwest", "restate-admin", "restate-bifrost", "restate-core", @@ -6753,6 +6755,7 @@ dependencies = [ "rust-rocksdb", "schemars", "serde", + "serde_json", "serde_with", "tempfile", "test-log", diff --git a/server/Cargo.toml b/server/Cargo.toml index e8485a7c1..8244fedbc 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -72,6 +72,7 @@ restate-core = { workspace = true, features = ["test-util"] } restate-local-cluster-runner = { workspace = true } restate-test-util = { workspace = true } restate-types = { workspace = true, features = ["test-util"] } +mock-service-endpoint = { workspace = true } anyhow = { workspace = true } async-trait = { workspace = true } @@ -83,6 +84,8 @@ test-log = { workspace = true } tonic = { workspace = true, features = ["transport", "prost"] } tower = { workspace = true } tracing-subscriber = { workspace = true } +reqwest = { workspace = true } +serde_json = { workspace = true } url = { workspace = true } [target.'cfg(not(target_env = "msvc"))'.dependencies] diff --git a/server/tests/trim_gap_handling.rs b/server/tests/trim_gap_handling.rs new file mode 100644 index 000000000..ca9feeca1 --- /dev/null +++ b/server/tests/trim_gap_handling.rs @@ -0,0 +1,288 @@ +// Copyright (c) 2024 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::net::SocketAddr; +use std::time::Duration; + +use enumset::enum_set; +use futures_util::StreamExt; +use googletest::fail; +use hyper_util::rt::TokioIo; +use tempfile::TempDir; +use test_log::test; +use tokio::io; +use tokio::net::UnixStream; +use tonic::codec::CompressionEncoding; +use tonic::transport::{Channel, Endpoint, Uri}; +use tower::service_fn; +use tracing::{error, info}; +use url::Url; + +use restate_admin::cluster_controller::protobuf::cluster_ctrl_svc_client::ClusterCtrlSvcClient; +use restate_admin::cluster_controller::protobuf::{ + ClusterStateRequest, CreatePartitionSnapshotRequest, TrimLogRequest, +}; +use restate_local_cluster_runner::{ + cluster::Cluster, + node::{BinarySource, Node}, +}; +use restate_types::config::{LogFormat, MetadataStoreClient}; +use restate_types::logs::metadata::ProviderKind::Replicated; +use restate_types::net::AdvertisedAddress; +use restate_types::protobuf::cluster::node_state::State; +use restate_types::retries::RetryPolicy; +use restate_types::{config::Configuration, nodes_config::Role}; + +mod common; + +#[test(restate_core::test)] +async fn fast_forward_over_trim_gap() -> googletest::Result<()> { + let mut base_config = Configuration::default(); + base_config.common.bootstrap_num_partitions = 1.try_into()?; + base_config.bifrost.default_provider = Replicated; + base_config.common.log_filter = "restate=debug,warn".to_owned(); + base_config.common.log_format = LogFormat::Compact; + + let snapshots_dir = TempDir::new()?; + base_config.worker.snapshots.destination = Some( + Url::from_file_path(snapshots_dir.path()) + .unwrap() + .to_string(), + ); + + let nodes = Node::new_test_nodes_with_metadata( + base_config.clone(), + BinarySource::CargoTest, + enum_set!(Role::Worker | Role::LogServer), + 2, + ); + let admin_node = &nodes[0]; + + let worker_1 = &nodes[1]; + let worker_2 = &nodes[2]; + + let mut worker_1_ready = worker_1.lines("PartitionProcessor starting event loop".parse()?); + let mut worker_2_ready = worker_2.lines("PartitionProcessor starting event loop".parse()?); + + let mut cluster = Cluster::builder() + .temp_base_dir() + .nodes(nodes.clone()) + .build() + .start() + .await?; + + cluster.wait_healthy(Duration::from_secs(10)).await?; + tokio::time::timeout(Duration::from_secs(10), worker_1_ready.next()).await?; + tokio::time::timeout(Duration::from_secs(10), worker_2_ready.next()).await?; + + let mut client = + ClusterCtrlSvcClient::new(grpc_connect(cluster.nodes[0].node_address().clone()).await?) + .accept_compressed(CompressionEncoding::Gzip); + + any_partition_active(&mut client, Duration::from_secs(5)).await?; + + let addr: SocketAddr = "127.0.0.1:9080".parse()?; + tokio::spawn(async move { + info!("Starting mock service on http://{}", addr); + if let Err(e) = mock_service_endpoint::listener::run_listener(addr).await { + error!("Error running listener: {:?}", e); + } + }); + + let http_client = reqwest::Client::new(); + let registration_response = http_client + .post(format!( + "http://{}/deployments", + admin_node.config().admin.bind_address + )) + .header("content-type", "application/json") + .json(&serde_json::json!({ "uri": "http://localhost:9080" })) + .send() + .await?; + info!("Registration response: {:?}", registration_response); + assert!(registration_response.status().is_success()); + + let ingress_url = format!( + "http://{}/Counter/0/get", + worker_1.config().ingress.bind_address + ); + + // It takes a little bit for the service to become available for invocations + let mut retry = RetryPolicy::fixed_delay(Duration::from_millis(500), Some(10)).into_iter(); + loop { + let invoke_response = http_client.post(ingress_url.clone()).send().await?; + info!("Invoke response: {:?}", invoke_response); + if invoke_response.status().is_success() { + break; + } + if let Some(delay) = retry.next() { + tokio::time::sleep(delay).await; + } else { + fail!("Failed to invoke worker")?; + } + } + + let snapshot_response = client + .create_partition_snapshot(CreatePartitionSnapshotRequest { partition_id: 0 }) + .await? + .into_inner(); + + // todo(pavel): we don't have a confirmed trimmed LSN, and it takes a bit of time for the log tail info to propagate + client + .trim_log(TrimLogRequest { + log_id: 0, + trim_point: 3, + }) + .await?; + + tokio::time::sleep(Duration::from_secs(1)).await; + client + .trim_log(TrimLogRequest { + log_id: 0, + trim_point: 3, + }) + .await?; + + tokio::time::sleep(Duration::from_secs(1)).await; + client + .trim_log(TrimLogRequest { + log_id: 0, + trim_point: 3, + }) + .await?; + + // todo(pavel): if create snapshot returned an LSN, we could trim the log to that specific LSN instead of guessing + let mut no_snapshot_config = base_config.clone(); + no_snapshot_config.worker.snapshots.destination = None; + let node_3_name = "node-3"; + let mut node_3 = Node::new_test_node( + node_3_name, + no_snapshot_config, + BinarySource::CargoTest, + enum_set!(Role::HttpIngress | Role::Worker), + ); + *node_3.metadata_store_client_mut() = MetadataStoreClient::Embedded { + address: cluster.nodes[0].node_address().clone(), + }; + + let mut trim_gap_encountered = + node_3.lines("Partition processor stopped due to a log trim gap, and no snapshot repository is configured".parse()?); + cluster.push_node(node_3).await?; + assert!( + tokio::time::timeout(Duration::from_secs(20), trim_gap_encountered.next()) + .await + .is_ok() + ); + + let worker_3 = &mut cluster.nodes[3]; + assert_eq!(worker_3.config().node_name(), node_3_name); + worker_3.graceful_shutdown(Duration::from_secs(2)).await?; + + // Restart node 3 with snapshot repository configured + let mut worker_3 = Node::new_test_node( + node_3_name, + base_config.clone(), + BinarySource::CargoTest, + enum_set!(Role::HttpIngress | Role::Worker), + ); + *worker_3.metadata_store_client_mut() = MetadataStoreClient::Embedded { + address: cluster.nodes[0].node_address().clone(), + }; + + let worker_3_ingress_url = format!( + "http://{}/Counter/0/get", + worker_3.config().ingress.bind_address + ); + + let mut worker_3_ready = worker_3.lines( + format!( + "Importing partition store snapshot.*{}", + snapshot_response.snapshot_id + ) + .parse()?, + ); + + let started_worker_3 = worker_3 + .start_clustered(cluster.base_dir(), cluster.cluster_name()) + .await?; + + assert!( + tokio::time::timeout(Duration::from_secs(20), worker_3_ready.next()) + .await + .is_ok() + ); + + let invoke_response = http_client + .post(worker_3_ingress_url.clone()) + .send() + .await?; + assert!(invoke_response.status().is_success()); + + Ok(()) +} + +async fn any_partition_active( + client: &mut ClusterCtrlSvcClient, + timeout: Duration, +) -> googletest::Result<()> { + let deadline = tokio::time::Instant::now() + timeout; + loop { + let cluster_state = client + .get_cluster_state(ClusterStateRequest {}) + .await? + .into_inner() + .cluster_state + .unwrap(); + + if cluster_state.nodes.values().any(|n| { + n.state.as_ref().is_some_and(|s| match s { + State::Alive(s) => s + .partitions + .values() + .any(|p| p.effective_mode.cmp(&1).is_eq()), + _ => false, + }) + }) { + break; // partition is ready; we can request snapshot + } + if tokio::time::Instant::now() > deadline { + fail!( + "Partition processor did not become ready within {:?}", + timeout + )?; + } + tokio::time::sleep(Duration::from_millis(250)).await; + } + Ok(()) +} + +async fn grpc_connect(address: AdvertisedAddress) -> Result { + match address { + AdvertisedAddress::Uds(uds_path) => { + // dummy endpoint required to specify an uds connector, it is not used anywhere + Endpoint::try_from("http://127.0.0.1") + .expect("/ should be a valid Uri") + .connect_with_connector(service_fn(move |_: Uri| { + let uds_path = uds_path.clone(); + async move { + Ok::<_, io::Error>(TokioIo::new(UnixStream::connect(uds_path).await?)) + } + })).await + } + AdvertisedAddress::Http(uri) => { + Channel::builder(uri) + .connect_timeout(Duration::from_secs(2)) + .timeout(Duration::from_secs(2)) + .http2_adaptive_window(true) + .connect() + .await + } + } +}