diff --git a/Cargo.toml b/Cargo.toml index 8b37b11..ce138d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,14 +74,14 @@ path = "benches/criterion/writes.rs" required-features = ["sled"] [dependencies] -arrow = "53" +arrow = "54" async-lock = "3" async-stream = "0.3" async-trait = { version = "0.1", optional = true } bytes = { version = "1.7", optional = true } crc32fast = "1" crossbeam-skiplist = "0.1" -datafusion = { version = "42.2.0", optional = true } +datafusion = { version = "45", optional = true } flume = { version = "0.11", features = ["async"] } fusio = { version = "0.3.5", package = "fusio", features = [ "dyn", @@ -95,7 +95,7 @@ futures-io = "0.3" futures-util = "0.3" lockable = "0.1.1" once_cell = "1" -parquet = { version = "53", default-features = false, features = [ +parquet = { version = "54", default-features = false, features = [ "async", "base64", "brotli", diff --git a/examples/datafusion.rs b/examples/datafusion.rs index 34a9c45..defc9d3 100644 --- a/examples/datafusion.rs +++ b/examples/datafusion.rs @@ -18,7 +18,8 @@ use datafusion::{ execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}, physical_expr::EquivalenceProperties, physical_plan::{ - execute_stream, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties, + execute_stream, execution_plan::Boundedness, DisplayAs, DisplayFormatType, ExecutionPlan, + PlanProperties, }, prelude::*, sql::parser::DFParser, @@ -28,10 +29,7 @@ use futures_core::Stream; use futures_util::StreamExt; use tokio::fs; use tonbo::{ - executor::tokio::TokioExecutor, - inmem::immutable::ArrowArrays, - record::{Record, Schema}, - DbOption, DB, + executor::tokio::TokioExecutor, inmem::immutable::ArrowArrays, record::Schema, DbOption, DB, }; use tonbo_macros::Record; @@ -62,6 +60,14 @@ struct MusicStream { stream: Pin> + Send>>, } +impl Debug for MusicProvider { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MusicProvider") + .field("db", &"Music") + .finish() + } +} + #[async_trait] impl TableProvider for MusicProvider { fn as_any(&self) -> &dyn Any { @@ -113,7 +119,8 @@ impl MusicExec { cache: PlanProperties::new( EquivalenceProperties::new_with_orderings(schema, &[]), datafusion::physical_expr::Partitioning::UnknownPartitioning(1), - ExecutionMode::Unbounded, + datafusion::physical_plan::execution_plan::EmissionType::Incremental, + Boundedness::Bounded, ), db, projection: None, diff --git a/parquet-lru/Cargo.toml b/parquet-lru/Cargo.toml index c82fc15..e85a5ec 100644 --- a/parquet-lru/Cargo.toml +++ b/parquet-lru/Cargo.toml @@ -18,7 +18,7 @@ bytes = { version = "1.8.0", features = ["serde"] } foyer = { version = "0.12.2", optional = true } futures-core = "0.3.31" futures-util = "0.3.31" -parquet = { version = "53.2.0", default-features = false, features = [ +parquet = { version = "54", default-features = false, features = [ "arrow", "async", ] } diff --git a/src/lib.rs b/src/lib.rs index 2ede699..8611589 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -150,7 +150,7 @@ use magic::USER_COLUMN_OFFSET; pub use once_cell; pub use parquet; use parquet::{ - arrow::{arrow_to_parquet_schema, ProjectionMask}, + arrow::{ArrowSchemaConverter, ProjectionMask}, errors::ParquetError, }; use parquet_lru::{DynLruCache, NoCache}; @@ -629,7 +629,9 @@ where fixed_projection.dedup(); ProjectionMask::roots( - &arrow_to_parquet_schema(self.record_schema.arrow_schema()).unwrap(), + &ArrowSchemaConverter::new() + .convert(self.record_schema.arrow_schema()) + .unwrap(), fixed_projection, ) } @@ -764,7 +766,9 @@ where fixed_projection.dedup(); let mask = ProjectionMask::roots( - &arrow_to_parquet_schema(self.schema.record_schema.arrow_schema()).unwrap(), + &ArrowSchemaConverter::new() + .convert(self.schema.record_schema.arrow_schema()) + .unwrap(), fixed_projection.clone(), ); diff --git a/src/ondisk/sstable.rs b/src/ondisk/sstable.rs index b92acd6..38e1774 100644 --- a/src/ondisk/sstable.rs +++ b/src/ondisk/sstable.rs @@ -125,7 +125,7 @@ pub(crate) mod tests { use futures_util::StreamExt; use parquet::{ arrow::{ - arrow_to_parquet_schema, arrow_writer::ArrowWriterOptions, AsyncArrowWriter, + arrow_writer::ArrowWriterOptions, ArrowSchemaConverter, AsyncArrowWriter, ProjectionMask, }, basic::{Compression, ZstdLevel}, @@ -218,7 +218,9 @@ pub(crate) mod tests { .get( key.borrow(), ProjectionMask::roots( - &arrow_to_parquet_schema(TestSchema {}.arrow_schema()).unwrap(), + &ArrowSchemaConverter::new() + .convert(TestSchema {}.arrow_schema()) + .unwrap(), [0, 1, 2, 3], ), ) @@ -235,7 +237,9 @@ pub(crate) mod tests { .get( key.borrow(), ProjectionMask::roots( - &arrow_to_parquet_schema(TestSchema {}.arrow_schema()).unwrap(), + &ArrowSchemaConverter::new() + .convert(TestSchema {}.arrow_schema()) + .unwrap(), [0, 1, 2, 4], ), ) @@ -252,7 +256,9 @@ pub(crate) mod tests { .get( key.borrow(), ProjectionMask::roots( - &arrow_to_parquet_schema(TestSchema {}.arrow_schema()).unwrap(), + &ArrowSchemaConverter::new() + .convert(TestSchema {}.arrow_schema()) + .unwrap(), [0, 1, 2], ), ) @@ -296,7 +302,9 @@ pub(crate) mod tests { 1_u32.into(), None, ProjectionMask::roots( - &arrow_to_parquet_schema(TestSchema {}.arrow_schema()).unwrap(), + &ArrowSchemaConverter::new() + .convert(TestSchema {}.arrow_schema()) + .unwrap(), [0, 1, 2, 3], ), ) @@ -321,7 +329,9 @@ pub(crate) mod tests { 1_u32.into(), None, ProjectionMask::roots( - &arrow_to_parquet_schema(TestSchema {}.arrow_schema()).unwrap(), + &ArrowSchemaConverter::new() + .convert(TestSchema {}.arrow_schema()) + .unwrap(), [0, 1, 2, 4], ), ) @@ -346,7 +356,9 @@ pub(crate) mod tests { 1_u32.into(), None, ProjectionMask::roots( - &arrow_to_parquet_schema(TestSchema {}.arrow_schema()).unwrap(), + &ArrowSchemaConverter::new() + .convert(TestSchema {}.arrow_schema()) + .unwrap(), [0, 1, 2], ), ) diff --git a/src/stream/level.rs b/src/stream/level.rs index 24f1c90..f31692d 100644 --- a/src/stream/level.rs +++ b/src/stream/level.rs @@ -224,7 +224,7 @@ mod tests { use fusio::path::Path; use fusio_dispatch::FsOptions; use futures_util::StreamExt; - use parquet::arrow::{arrow_to_parquet_schema, ProjectionMask}; + use parquet::arrow::{ArrowSchemaConverter, ProjectionMask}; use parquet_lru::NoCache; use tempfile::TempDir; @@ -265,7 +265,9 @@ mod tests { 1_u32.into(), None, ProjectionMask::roots( - &arrow_to_parquet_schema(TestSchema {}.arrow_schema()).unwrap(), + &ArrowSchemaConverter::new() + .convert(TestSchema {}.arrow_schema()) + .unwrap(), [0, 1, 2, 3], ), manager.base_fs().clone(), @@ -302,7 +304,9 @@ mod tests { 1_u32.into(), None, ProjectionMask::roots( - &arrow_to_parquet_schema(TestSchema {}.arrow_schema()).unwrap(), + &ArrowSchemaConverter::new() + .convert(TestSchema {}.arrow_schema()) + .unwrap(), [0, 1, 2, 4], ), manager.base_fs().clone(), @@ -339,7 +343,9 @@ mod tests { 1_u32.into(), None, ProjectionMask::roots( - &arrow_to_parquet_schema(TestSchema {}.arrow_schema()).unwrap(), + &ArrowSchemaConverter::new() + .convert(TestSchema {}.arrow_schema()) + .unwrap(), [0, 1, 2], ), manager.base_fs().clone(), diff --git a/src/stream/mem_projection.rs b/src/stream/mem_projection.rs index fda5e60..23dabca 100644 --- a/src/stream/mem_projection.rs +++ b/src/stream/mem_projection.rs @@ -60,7 +60,7 @@ mod tests { use fusio::{disk::TokioFs, path::Path, DynFs}; use futures_util::StreamExt; - use parquet::arrow::{arrow_to_parquet_schema, ProjectionMask}; + use parquet::arrow::{ArrowSchemaConverter, ProjectionMask}; use crate::{ inmem::{immutable::tests::TestSchema, mutable::Mutable}, @@ -127,7 +127,9 @@ mod tests { .unwrap(); let mask = ProjectionMask::roots( - &arrow_to_parquet_schema(TestSchema.arrow_schema()).unwrap(), + &ArrowSchemaConverter::new() + .convert(TestSchema.arrow_schema()) + .unwrap(), vec![0, 1, 2, 4], ); diff --git a/tests/macros_correctness.rs b/tests/macros_correctness.rs index a1361ab..a7022da 100644 --- a/tests/macros_correctness.rs +++ b/tests/macros_correctness.rs @@ -15,7 +15,7 @@ mod tests { use arrow::array::{BooleanArray, RecordBatch, StringArray, UInt32Array, UInt8Array}; use fusio_log::{Decode, Encode}; use parquet::{ - arrow::{arrow_to_parquet_schema, ProjectionMask}, + arrow::{ArrowSchemaConverter, ProjectionMask}, format::SortingColumn, schema::types::ColumnPath, }; @@ -63,7 +63,9 @@ mod tests { let mut user_ref = user.as_record_ref(); user_ref.projection(&ProjectionMask::roots( - &arrow_to_parquet_schema(UserSchema {}.arrow_schema()).unwrap(), + &ArrowSchemaConverter::new() + .convert(UserSchema {}.arrow_schema()) + .unwrap(), vec![2, 3], )); @@ -75,7 +77,9 @@ mod tests { let mut user_ref = user.as_record_ref(); user_ref.projection(&ProjectionMask::roots( - &arrow_to_parquet_schema(UserSchema {}.arrow_schema()).unwrap(), + &ArrowSchemaConverter::new() + .convert(UserSchema {}.arrow_schema()) + .unwrap(), vec![], )); @@ -87,7 +91,9 @@ mod tests { let mut user_ref = user.as_record_ref(); user_ref.projection(&ProjectionMask::roots( - &arrow_to_parquet_schema(UserSchema {}.arrow_schema()).unwrap(), + &ArrowSchemaConverter::new() + .convert(UserSchema {}.arrow_schema()) + .unwrap(), vec![2], )); @@ -99,7 +105,9 @@ mod tests { let mut user_ref = user.as_record_ref(); user_ref.projection(&ProjectionMask::roots( - &arrow_to_parquet_schema(UserSchema {}.arrow_schema()).unwrap(), + &ArrowSchemaConverter::new() + .convert(UserSchema {}.arrow_schema()) + .unwrap(), vec![3], )); @@ -130,7 +138,9 @@ mod tests { .unwrap(); let project_mask = ProjectionMask::roots( - &arrow_to_parquet_schema(UserSchema {}.arrow_schema()).unwrap(), + &ArrowSchemaConverter::new() + .convert(UserSchema {}.arrow_schema()) + .unwrap(), vec![0, 1, 2, 3, 4], ); let record_ref = UserRef::from_record_batch( @@ -167,7 +177,9 @@ mod tests { .unwrap(); let project_mask = ProjectionMask::roots( - &arrow_to_parquet_schema(UserSchema {}.arrow_schema()).unwrap(), + &ArrowSchemaConverter::new() + .convert(UserSchema {}.arrow_schema()) + .unwrap(), vec![0, 1, 3, 4], ); let record_ref = UserRef::from_record_batch(