Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore:bump arrow and parquet to 54 #274

Merged
merged 2 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,14 @@ path = "benches/criterion/writes.rs"
required-features = ["sled"]

[dependencies]
arrow = "53"
arrow = "54"
async-lock = "3"
async-stream = "0.3"
async-trait = { version = "0.1", optional = true }
bytes = { version = "1.7", optional = true }
crc32fast = "1"
crossbeam-skiplist = "0.1"
datafusion = { version = "42.2.0", optional = true }
datafusion = { version = "45", optional = true }
flume = { version = "0.11", features = ["async"] }
fusio = { version = "0.3.5", package = "fusio", features = [
"dyn",
Expand All @@ -95,7 +95,7 @@ futures-io = "0.3"
futures-util = "0.3"
lockable = "0.1.1"
once_cell = "1"
parquet = { version = "53", default-features = false, features = [
parquet = { version = "54", default-features = false, features = [
"async",
"base64",
"brotli",
Expand Down
19 changes: 13 additions & 6 deletions examples/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use datafusion::{
execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext},
physical_expr::EquivalenceProperties,
physical_plan::{
execute_stream, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties,
execute_stream, execution_plan::Boundedness, DisplayAs, DisplayFormatType, ExecutionPlan,
PlanProperties,
},
prelude::*,
sql::parser::DFParser,
Expand All @@ -28,10 +29,7 @@ use futures_core::Stream;
use futures_util::StreamExt;
use tokio::fs;
use tonbo::{
executor::tokio::TokioExecutor,
inmem::immutable::ArrowArrays,
record::{Record, Schema},
DbOption, DB,
executor::tokio::TokioExecutor, inmem::immutable::ArrowArrays, record::Schema, DbOption, DB,
};
use tonbo_macros::Record;

Expand Down Expand Up @@ -62,6 +60,14 @@ struct MusicStream {
stream: Pin<Box<dyn Stream<Item = Result<RecordBatch, DataFusionError>> + Send>>,
}

impl Debug for MusicProvider {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MusicProvider")
.field("db", &"Music")
.finish()
}
}

#[async_trait]
impl TableProvider for MusicProvider {
fn as_any(&self) -> &dyn Any {
Expand Down Expand Up @@ -113,7 +119,8 @@ impl MusicExec {
cache: PlanProperties::new(
EquivalenceProperties::new_with_orderings(schema, &[]),
datafusion::physical_expr::Partitioning::UnknownPartitioning(1),
ExecutionMode::Unbounded,
datafusion::physical_plan::execution_plan::EmissionType::Incremental,
Boundedness::Bounded,
),
db,
projection: None,
Expand Down
2 changes: 1 addition & 1 deletion parquet-lru/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ bytes = { version = "1.8.0", features = ["serde"] }
foyer = { version = "0.12.2", optional = true }
futures-core = "0.3.31"
futures-util = "0.3.31"
parquet = { version = "53.2.0", default-features = false, features = [
parquet = { version = "54", default-features = false, features = [
"arrow",
"async",
] }
Expand Down
10 changes: 7 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ use magic::USER_COLUMN_OFFSET;
pub use once_cell;
pub use parquet;
use parquet::{
arrow::{arrow_to_parquet_schema, ProjectionMask},
arrow::{ArrowSchemaConverter, ProjectionMask},
errors::ParquetError,
};
use parquet_lru::{DynLruCache, NoCache};
Expand Down Expand Up @@ -629,7 +629,9 @@ where
fixed_projection.dedup();

ProjectionMask::roots(
&arrow_to_parquet_schema(self.record_schema.arrow_schema()).unwrap(),
&ArrowSchemaConverter::new()
.convert(self.record_schema.arrow_schema())
.unwrap(),
fixed_projection,
)
}
Expand Down Expand Up @@ -764,7 +766,9 @@ where
fixed_projection.dedup();

let mask = ProjectionMask::roots(
&arrow_to_parquet_schema(self.schema.record_schema.arrow_schema()).unwrap(),
&ArrowSchemaConverter::new()
.convert(self.schema.record_schema.arrow_schema())
.unwrap(),
fixed_projection.clone(),
);

Expand Down
26 changes: 19 additions & 7 deletions src/ondisk/sstable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
ts: Timestamp,
limit: Option<usize>,
projection_mask: ProjectionMask,
) -> Result<SsTableScan<R>, parquet::errors::ParquetError> {

Check warning on line 97 in src/ondisk/sstable.rs

View workflow job for this annotation

GitHub Actions / check

elided lifetime has a name

Check warning on line 97 in src/ondisk/sstable.rs

View workflow job for this annotation

GitHub Actions / check

elided lifetime has a name

Check warning on line 97 in src/ondisk/sstable.rs

View workflow job for this annotation

GitHub Actions / check

elided lifetime has a name

Check warning on line 97 in src/ondisk/sstable.rs

View workflow job for this annotation

GitHub Actions / check

elided lifetime has a name

Check warning on line 97 in src/ondisk/sstable.rs

View workflow job for this annotation

GitHub Actions / Rust project check (macos-latest)

elided lifetime has a name

Check warning on line 97 in src/ondisk/sstable.rs

View workflow job for this annotation

GitHub Actions / Rust project check (macos-latest)

elided lifetime has a name

Check warning on line 97 in src/ondisk/sstable.rs

View workflow job for this annotation

GitHub Actions / Rust project check (ubuntu-latest)

elided lifetime has a name

Check warning on line 97 in src/ondisk/sstable.rs

View workflow job for this annotation

GitHub Actions / Rust project check (ubuntu-latest)

elided lifetime has a name
let builder = self
.into_parquet_builder(limit, projection_mask.clone())
.await?;
Expand Down Expand Up @@ -125,7 +125,7 @@
use futures_util::StreamExt;
use parquet::{
arrow::{
arrow_to_parquet_schema, arrow_writer::ArrowWriterOptions, AsyncArrowWriter,
arrow_writer::ArrowWriterOptions, ArrowSchemaConverter, AsyncArrowWriter,
ProjectionMask,
},
basic::{Compression, ZstdLevel},
Expand Down Expand Up @@ -218,7 +218,9 @@
.get(
key.borrow(),
ProjectionMask::roots(
&arrow_to_parquet_schema(TestSchema {}.arrow_schema()).unwrap(),
&ArrowSchemaConverter::new()
.convert(TestSchema {}.arrow_schema())
.unwrap(),
[0, 1, 2, 3],
),
)
Expand All @@ -235,7 +237,9 @@
.get(
key.borrow(),
ProjectionMask::roots(
&arrow_to_parquet_schema(TestSchema {}.arrow_schema()).unwrap(),
&ArrowSchemaConverter::new()
.convert(TestSchema {}.arrow_schema())
.unwrap(),
[0, 1, 2, 4],
),
)
Expand All @@ -252,7 +256,9 @@
.get(
key.borrow(),
ProjectionMask::roots(
&arrow_to_parquet_schema(TestSchema {}.arrow_schema()).unwrap(),
&ArrowSchemaConverter::new()
.convert(TestSchema {}.arrow_schema())
.unwrap(),
[0, 1, 2],
),
)
Expand Down Expand Up @@ -296,7 +302,9 @@
1_u32.into(),
None,
ProjectionMask::roots(
&arrow_to_parquet_schema(TestSchema {}.arrow_schema()).unwrap(),
&ArrowSchemaConverter::new()
.convert(TestSchema {}.arrow_schema())
.unwrap(),
[0, 1, 2, 3],
),
)
Expand All @@ -321,7 +329,9 @@
1_u32.into(),
None,
ProjectionMask::roots(
&arrow_to_parquet_schema(TestSchema {}.arrow_schema()).unwrap(),
&ArrowSchemaConverter::new()
.convert(TestSchema {}.arrow_schema())
.unwrap(),
[0, 1, 2, 4],
),
)
Expand All @@ -346,7 +356,9 @@
1_u32.into(),
None,
ProjectionMask::roots(
&arrow_to_parquet_schema(TestSchema {}.arrow_schema()).unwrap(),
&ArrowSchemaConverter::new()
.convert(TestSchema {}.arrow_schema())
.unwrap(),
[0, 1, 2],
),
)
Expand Down
14 changes: 10 additions & 4 deletions src/stream/level.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ mod tests {
use fusio::path::Path;
use fusio_dispatch::FsOptions;
use futures_util::StreamExt;
use parquet::arrow::{arrow_to_parquet_schema, ProjectionMask};
use parquet::arrow::{ArrowSchemaConverter, ProjectionMask};
use parquet_lru::NoCache;
use tempfile::TempDir;

Expand Down Expand Up @@ -265,7 +265,9 @@ mod tests {
1_u32.into(),
None,
ProjectionMask::roots(
&arrow_to_parquet_schema(TestSchema {}.arrow_schema()).unwrap(),
&ArrowSchemaConverter::new()
.convert(TestSchema {}.arrow_schema())
.unwrap(),
[0, 1, 2, 3],
),
manager.base_fs().clone(),
Expand Down Expand Up @@ -302,7 +304,9 @@ mod tests {
1_u32.into(),
None,
ProjectionMask::roots(
&arrow_to_parquet_schema(TestSchema {}.arrow_schema()).unwrap(),
&ArrowSchemaConverter::new()
.convert(TestSchema {}.arrow_schema())
.unwrap(),
[0, 1, 2, 4],
),
manager.base_fs().clone(),
Expand Down Expand Up @@ -339,7 +343,9 @@ mod tests {
1_u32.into(),
None,
ProjectionMask::roots(
&arrow_to_parquet_schema(TestSchema {}.arrow_schema()).unwrap(),
&ArrowSchemaConverter::new()
.convert(TestSchema {}.arrow_schema())
.unwrap(),
[0, 1, 2],
),
manager.base_fs().clone(),
Expand Down
6 changes: 4 additions & 2 deletions src/stream/mem_projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ mod tests {

use fusio::{disk::TokioFs, path::Path, DynFs};
use futures_util::StreamExt;
use parquet::arrow::{arrow_to_parquet_schema, ProjectionMask};
use parquet::arrow::{ArrowSchemaConverter, ProjectionMask};

use crate::{
inmem::{immutable::tests::TestSchema, mutable::Mutable},
Expand Down Expand Up @@ -127,7 +127,9 @@ mod tests {
.unwrap();

let mask = ProjectionMask::roots(
&arrow_to_parquet_schema(TestSchema.arrow_schema()).unwrap(),
&ArrowSchemaConverter::new()
.convert(TestSchema.arrow_schema())
.unwrap(),
vec![0, 1, 2, 4],
);

Expand Down
26 changes: 19 additions & 7 deletions tests/macros_correctness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ mod tests {
use arrow::array::{BooleanArray, RecordBatch, StringArray, UInt32Array, UInt8Array};
use fusio_log::{Decode, Encode};
use parquet::{
arrow::{arrow_to_parquet_schema, ProjectionMask},
arrow::{ArrowSchemaConverter, ProjectionMask},
format::SortingColumn,
schema::types::ColumnPath,
};
Expand Down Expand Up @@ -63,7 +63,9 @@ mod tests {
let mut user_ref = user.as_record_ref();

user_ref.projection(&ProjectionMask::roots(
&arrow_to_parquet_schema(UserSchema {}.arrow_schema()).unwrap(),
&ArrowSchemaConverter::new()
.convert(UserSchema {}.arrow_schema())
.unwrap(),
vec![2, 3],
));

Expand All @@ -75,7 +77,9 @@ mod tests {
let mut user_ref = user.as_record_ref();

user_ref.projection(&ProjectionMask::roots(
&arrow_to_parquet_schema(UserSchema {}.arrow_schema()).unwrap(),
&ArrowSchemaConverter::new()
.convert(UserSchema {}.arrow_schema())
.unwrap(),
vec![],
));

Expand All @@ -87,7 +91,9 @@ mod tests {
let mut user_ref = user.as_record_ref();

user_ref.projection(&ProjectionMask::roots(
&arrow_to_parquet_schema(UserSchema {}.arrow_schema()).unwrap(),
&ArrowSchemaConverter::new()
.convert(UserSchema {}.arrow_schema())
.unwrap(),
vec![2],
));

Expand All @@ -99,7 +105,9 @@ mod tests {
let mut user_ref = user.as_record_ref();

user_ref.projection(&ProjectionMask::roots(
&arrow_to_parquet_schema(UserSchema {}.arrow_schema()).unwrap(),
&ArrowSchemaConverter::new()
.convert(UserSchema {}.arrow_schema())
.unwrap(),
vec![3],
));

Expand Down Expand Up @@ -130,7 +138,9 @@ mod tests {
.unwrap();

let project_mask = ProjectionMask::roots(
&arrow_to_parquet_schema(UserSchema {}.arrow_schema()).unwrap(),
&ArrowSchemaConverter::new()
.convert(UserSchema {}.arrow_schema())
.unwrap(),
vec![0, 1, 2, 3, 4],
);
let record_ref = UserRef::from_record_batch(
Expand Down Expand Up @@ -167,7 +177,9 @@ mod tests {
.unwrap();

let project_mask = ProjectionMask::roots(
&arrow_to_parquet_schema(UserSchema {}.arrow_schema()).unwrap(),
&ArrowSchemaConverter::new()
.convert(UserSchema {}.arrow_schema())
.unwrap(),
vec![0, 1, 3, 4],
);
let record_ref = UserRef::from_record_batch(
Expand Down
Loading