diff --git a/src/compute/tests/cdc_tests.rs b/src/compute/tests/cdc_tests.rs index 34c8205c05f01..c733f78bf0b59 100644 --- a/src/compute/tests/cdc_tests.rs +++ b/src/compute/tests/cdc_tests.rs @@ -40,13 +40,12 @@ use risingwave_storage::memory::MemoryStateStore; use risingwave_storage::table::batch_table::storage_table::StorageTable; use risingwave_stream::common::table::state_table::StateTable; use risingwave_stream::error::StreamResult; -use risingwave_stream::executor::external::ExternalStorageTable; use risingwave_stream::executor::monitor::StreamingMetrics; use risingwave_stream::executor::test_utils::MockSource; use risingwave_stream::executor::{ expect_first_barrier, ActorContext, AddMutation, Barrier, BoxedExecutor as StreamBoxedExecutor, - BoxedMessageStream, CdcBackfillExecutor, Executor, ExecutorInfo, MaterializeExecutor, Message, - Mutation, PkIndices, PkIndicesRef, StreamExecutorError, + BoxedMessageStream, CdcBackfillExecutor, Executor, ExecutorInfo, ExternalStorageTable, + MaterializeExecutor, Message, Mutation, PkIndices, PkIndicesRef, StreamExecutorError, }; // mock upstream binlog offset starting from "1.binlog, pos=0" diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index 347cfb7f0efbe..98a3803494202 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -35,8 +35,8 @@ use risingwave_storage::StateStore; use crate::common::table::state_table::StateTable; use crate::executor::backfill::cdc::state::CdcBackfillState; -use crate::executor::backfill::upstream_table::external::ExternalStorageTable; -use crate::executor::backfill::upstream_table::snapshot::{ +use crate::executor::backfill::cdc::upstream_table::external::ExternalStorageTable; +use crate::executor::backfill::cdc::upstream_table::snapshot::{ SnapshotReadArgs, UpstreamTableRead, UpstreamTableReader, }; use crate::executor::backfill::utils::{ @@ -417,7 +417,7 @@ impl CdcBackfillExecutor { tracing::info!( upstream_table_id, - "CdcBackfill has already finished and forward messages directly to the downstream" + "CdcBackfill has already finished and will forward messages directly to the downstream" ); // Wait for first barrier to come after backfill is finished. diff --git a/src/stream/src/executor/backfill/cdc/mod.rs b/src/stream/src/executor/backfill/cdc/mod.rs index 3f0c48879e544..e86e42201e977 100644 --- a/src/stream/src/executor/backfill/cdc/mod.rs +++ b/src/stream/src/executor/backfill/cdc/mod.rs @@ -12,5 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod cdc_backfill; +mod cdc_backfill; mod state; +mod upstream_table; + +pub use cdc_backfill::CdcBackfillExecutor; +pub use upstream_table::external::ExternalStorageTable; diff --git a/src/stream/src/executor/backfill/upstream_table/external.rs b/src/stream/src/executor/backfill/cdc/upstream_table/external.rs similarity index 100% rename from src/stream/src/executor/backfill/upstream_table/external.rs rename to src/stream/src/executor/backfill/cdc/upstream_table/external.rs diff --git a/src/stream/src/executor/backfill/upstream_table/mod.rs b/src/stream/src/executor/backfill/cdc/upstream_table/mod.rs similarity index 100% rename from src/stream/src/executor/backfill/upstream_table/mod.rs rename to src/stream/src/executor/backfill/cdc/upstream_table/mod.rs diff --git a/src/stream/src/executor/backfill/upstream_table/snapshot.rs b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs similarity index 91% rename from src/stream/src/executor/backfill/upstream_table/snapshot.rs rename to src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs index 0e17ba7e722c4..6c5ba1affe212 100644 --- a/src/stream/src/executor/backfill/upstream_table/snapshot.rs +++ b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs @@ -22,7 +22,7 @@ use risingwave_common::row::OwnedRow; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_connector::source::external::{CdcOffset, ExternalTableReader}; -use crate::executor::backfill::upstream_table::external::ExternalStorageTable; +use super::external::ExternalStorageTable; use crate::executor::backfill::utils::iter_chunks; use crate::executor::{StreamExecutorError, StreamExecutorResult, INVALID_EPOCH}; @@ -46,20 +46,6 @@ pub struct SnapshotReadArgs { } impl SnapshotReadArgs { - pub fn new( - epoch: u64, - current_pos: Option, - ordered: bool, - chunk_size: usize, - ) -> Self { - Self { - epoch, - current_pos, - ordered, - chunk_size, - } - } - pub fn new_for_cdc(current_pos: Option, chunk_size: usize) -> Self { Self { epoch: INVALID_EPOCH, diff --git a/src/stream/src/executor/backfill/mod.rs b/src/stream/src/executor/backfill/mod.rs index 0c782596813ff..2f3b95f58074f 100644 --- a/src/stream/src/executor/backfill/mod.rs +++ b/src/stream/src/executor/backfill/mod.rs @@ -15,5 +15,4 @@ pub mod arrangement_backfill; pub mod cdc; pub mod no_shuffle_backfill; -pub mod upstream_table; pub mod utils; diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index 6fa4618e57a20..004b47559d8f7 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -106,9 +106,8 @@ mod utils; pub use actor::{Actor, ActorContext, ActorContextRef}; use anyhow::Context; -pub use backfill::cdc::cdc_backfill::CdcBackfillExecutor; +pub use backfill::cdc::{CdcBackfillExecutor, ExternalStorageTable}; pub use backfill::no_shuffle_backfill::*; -pub use backfill::upstream_table::*; pub use barrier_recv::BarrierRecvExecutor; pub use batch_query::BatchQueryExecutor; pub use chain::ChainExecutor; diff --git a/src/stream/src/from_proto/stream_cdc_scan.rs b/src/stream/src/from_proto/stream_cdc_scan.rs index 1736a38cecc1a..d988e0a75704e 100644 --- a/src/stream/src/from_proto/stream_cdc_scan.rs +++ b/src/stream/src/from_proto/stream_cdc_scan.rs @@ -23,8 +23,7 @@ use risingwave_pb::stream_plan::StreamCdcScanNode; use super::*; use crate::common::table::state_table::StateTable; -use crate::executor::external::ExternalStorageTable; -use crate::executor::CdcBackfillExecutor; +use crate::executor::{CdcBackfillExecutor, ExternalStorageTable}; pub struct StreamCdcScanExecutorBuilder;