From 1dc65b3b7846ad5e1996baba3692cf3feac82bf3 Mon Sep 17 00:00:00 2001
From: Jan Schweizer
Date: Tue, 7 Nov 2023 18:11:53 +0100
Subject: [PATCH 01/16] Add handling for unmanaged files to vacuum command
---
.../deltalake-core/src/operations/vacuum.rs | 40 +++++++++++++++----
crates/deltalake-core/tests/command_vacuum.rs | 3 +-
2 files changed, 33 insertions(+), 10 deletions(-)
diff --git a/crates/deltalake-core/src/operations/vacuum.rs b/crates/deltalake-core/src/operations/vacuum.rs
index 47f7c1d5c9..0f0a4e39ff 100644
--- a/crates/deltalake-core/src/operations/vacuum.rs
+++ b/crates/deltalake-core/src/operations/vacuum.rs
@@ -180,7 +180,6 @@ impl VacuumBuilder {
};
let expired_tombstones = get_stale_files(&self.snapshot, retention_period, now_millis);
- let valid_files = self.snapshot.file_paths_iter().collect::>();
let mut files_to_delete = vec![];
let mut file_sizes = vec![];
@@ -192,14 +191,35 @@ impl VacuumBuilder {
.ok_or(DeltaTableError::NoMetadata)?
.partition_columns;
+ let managed_files = self
+ .snapshot
+ .files()
+ .iter()
+ .map(|a| a.path.as_str())
+ .chain(
+ self.snapshot
+ .all_tombstones()
+ .iter()
+ .map(|r| r.path.as_str()),
+ )
+ .collect::>();
+
while let Some(obj_meta) = all_files.next().await {
// TODO should we allow NotFound here in case we have a temporary commit file in the list
let obj_meta = obj_meta.map_err(DeltaTableError::from)?;
- if valid_files.contains(&obj_meta.location) // file is still being tracked in table
- || !expired_tombstones.contains(obj_meta.location.as_ref()) // file is not an expired tombstone
- || is_hidden_directory(partition_columns, &obj_meta.location)?
- {
- continue;
+ let is_hidden = is_hidden_directory(partition_columns, &obj_meta.location)?;
+
+ if managed_files.contains(obj_meta.location.as_ref()) {
+ if !expired_tombstones.contains(obj_meta.location.as_ref()) || is_hidden {
+ continue;
+ }
+ } else {
+ if now_millis - retention_period.num_milliseconds()
+ < obj_meta.last_modified.timestamp_millis()
+ || is_hidden
+ {
+ continue;
+ }
}
files_to_delete.push(obj_meta.location);
@@ -357,8 +377,12 @@ impl VacuumPlan {
/// deleted even if they'd normally be hidden. The _db_index directory contains (bloom filter)
/// indexes and these must be deleted when the data they are tied to is deleted.
fn is_hidden_directory(partition_columns: &[String], path: &Path) -> Result {
- let path_name = path.to_string();
- Ok((path_name.starts_with('.') || path_name.starts_with('_'))
+ let is_hidden = path
+ .parts()
+ .any(|p| p.as_ref().starts_with('.') || p.as_ref().starts_with('_'));
+
+ let path_name = path.as_ref();
+ Ok(is_hidden
&& !path_name.starts_with("_delta_index")
&& !path_name.starts_with("_change_data")
&& !partition_columns
diff --git a/crates/deltalake-core/tests/command_vacuum.rs b/crates/deltalake-core/tests/command_vacuum.rs
index 0007f479d5..e21315e796 100644
--- a/crates/deltalake-core/tests/command_vacuum.rs
+++ b/crates/deltalake-core/tests/command_vacuum.rs
@@ -216,7 +216,6 @@ async fn test_partitions_included() {
);
}
-#[ignore]
#[tokio::test]
// files that are not managed by the delta log and have a last_modified greater
// than the retention period should be deleted. Unmanaged files and directories
@@ -276,7 +275,7 @@ async fn test_non_managed_files() {
// Validate unmanaged files are deleted after the retention period
let res = {
- clock.tick(Duration::hours(1));
+ clock.tick(Duration::days(7));
let (_, metrics) = DeltaOps(table)
.vacuum()
.with_clock(Arc::new(clock.clone()))
From e59bb34587e85ca00024223fc61d7f28f884fe75 Mon Sep 17 00:00:00 2001
From: Jan Schweizer
Date: Tue, 7 Nov 2023 18:55:40 +0100
Subject: [PATCH 02/16] collapse nested if block
---
crates/deltalake-core/src/operations/vacuum.rs | 12 +++++-------
1 file changed, 5 insertions(+), 7 deletions(-)
diff --git a/crates/deltalake-core/src/operations/vacuum.rs b/crates/deltalake-core/src/operations/vacuum.rs
index 0f0a4e39ff..5499c0be92 100644
--- a/crates/deltalake-core/src/operations/vacuum.rs
+++ b/crates/deltalake-core/src/operations/vacuum.rs
@@ -213,13 +213,11 @@ impl VacuumBuilder {
if !expired_tombstones.contains(obj_meta.location.as_ref()) || is_hidden {
continue;
}
- } else {
- if now_millis - retention_period.num_milliseconds()
- < obj_meta.last_modified.timestamp_millis()
- || is_hidden
- {
- continue;
- }
+ } else if now_millis - retention_period.num_milliseconds()
+ < obj_meta.last_modified.timestamp_millis()
+ || is_hidden
+ {
+ continue;
}
files_to_delete.push(obj_meta.location);
From 90b774127d6933000a127fe62c77b68fa4dd03b5 Mon Sep 17 00:00:00 2001
From: "R. Tyler Croy"
Date: Tue, 7 Nov 2023 08:35:26 -0800
Subject: [PATCH 03/16] chore: upgrade to the latest dynamodb-lock crate
The new version of this crate properly sets a lease duration such that the locks
can actually expire
---
crates/deltalake-core/Cargo.toml | 2 +-
crates/deltalake-core/src/storage/s3.rs | 12 +++++++-----
2 files changed, 8 insertions(+), 6 deletions(-)
diff --git a/crates/deltalake-core/Cargo.toml b/crates/deltalake-core/Cargo.toml
index ce1c7490ad..b3a6178203 100644
--- a/crates/deltalake-core/Cargo.toml
+++ b/crates/deltalake-core/Cargo.toml
@@ -117,7 +117,7 @@ sqlparser = { version = "0.38", optional = true }
fs_extra = { version = "1.3.0", optional = true }
tempdir = { version = "0", optional = true }
-dynamodb_lock = { version = "0", default-features = false, optional = true }
+dynamodb_lock = { version = "0.6.0", default-features = false, optional = true }
[dev-dependencies]
dotenvy = "0"
diff --git a/crates/deltalake-core/src/storage/s3.rs b/crates/deltalake-core/src/storage/s3.rs
index 23e091dea5..7594d1b06c 100644
--- a/crates/deltalake-core/src/storage/s3.rs
+++ b/crates/deltalake-core/src/storage/s3.rs
@@ -3,7 +3,7 @@
use super::utils::str_is_truthy;
use crate::table::builder::{s3_storage_options, str_option};
use bytes::Bytes;
-use dynamodb_lock::{DynamoError, LockClient, LockItem, DEFAULT_MAX_RETRY_ACQUIRE_LOCK_ATTEMPTS};
+use dynamodb_lock::{DynamoError, LockClient, LockItem};
use futures::stream::BoxStream;
use object_store::path::Path;
use object_store::{
@@ -23,6 +23,7 @@ use std::time::Duration;
use tokio::io::AsyncWrite;
const STORE_NAME: &str = "DeltaS3ObjectStore";
+const DEFAULT_MAX_RETRY_ACQUIRE_LOCK_ATTEMPTS: u32 = 1_000;
/// Error raised by storage lock client
#[derive(thiserror::Error, Debug)]
@@ -535,10 +536,11 @@ fn try_create_lock_client(options: &S3StorageOptions) -> Result
@@ -105,7 +105,7 @@ You can also try Delta Lake docker at [DockerHub](https://go.delta.io/dockerhub)
We encourage you to reach out, and are [committed](https://github.com/delta-io/delta-rs/blob/main/CODE_OF_CONDUCT.md)
to provide a welcoming community.
-- [Join us in our Slack workspace](https://join.slack.com/t/delta-users/shared_invite/zt-23h0xwez7-wDTm43ZVEW2ZcbKn6Bc8Fg)
+- [Join us in our Slack workspace](https://go.delta.io/slack)
- [Report an issue](https://github.com/delta-io/delta-rs/issues/new?template=bug_report.md)
- Looking to contribute? See our [good first issues](https://github.com/delta-io/delta-rs/contribute).
From a327fa80060bf5c1997ba59992c8110b2fafdac6 Mon Sep 17 00:00:00 2001
From: Jan Schweizer
Date: Sat, 11 Nov 2023 17:52:29 +0100
Subject: [PATCH 08/16] Correctly handle hidden files in _change_data and
_delta_index & deletion vector files
---
.../deltalake-core/src/operations/vacuum.rs | 53 +++++++++++++++----
1 file changed, 43 insertions(+), 10 deletions(-)
diff --git a/crates/deltalake-core/src/operations/vacuum.rs b/crates/deltalake-core/src/operations/vacuum.rs
index 2c4e00793c..015af693a9 100644
--- a/crates/deltalake-core/src/operations/vacuum.rs
+++ b/crates/deltalake-core/src/operations/vacuum.rs
@@ -32,6 +32,7 @@ use object_store::Error;
use object_store::{path::Path, ObjectStore};
use serde::Serialize;
use serde_json::Value;
+use url::Url;
use super::transaction::commit;
use crate::crate_version;
@@ -198,27 +199,44 @@ impl VacuumBuilder {
.snapshot
.files()
.iter()
- .map(|a| a.path.as_str())
+ .map(|a| a.path.clone())
.chain(
self.snapshot
.all_tombstones()
.iter()
- .map(|r| r.path.as_str()),
+ .map(|r| r.path.clone()),
)
- .collect::>();
+ .chain(self.snapshot.files().iter().filter_map(|a| {
+ return if let Some(deletion_vector) = &a.deletion_vector {
+ if let Ok(parent) = &Url::parse(self.log_store.root_uri().as_str()) {
+ if let Ok(dv_absolut_path) = deletion_vector.absolute_path(&parent) {
+ Some(dv_absolut_path?.to_string())
+ } else {
+ None
+ }
+ } else {
+ None
+ }
+ } else {
+ None
+ };
+ }))
+ .collect::>();
while let Some(obj_meta) = all_files.next().await {
// TODO should we allow NotFound here in case we have a temporary commit file in the list
let obj_meta = obj_meta.map_err(DeltaTableError::from)?;
- let is_hidden = is_hidden_directory(partition_columns, &obj_meta.location)?;
+
+ if is_hidden_file(partition_columns, &obj_meta.location)? {
+ continue;
+ }
if managed_files.contains(obj_meta.location.as_ref()) {
- if !expired_tombstones.contains(obj_meta.location.as_ref()) || is_hidden {
+ if !expired_tombstones.contains(obj_meta.location.as_ref()) {
continue;
}
} else if now_millis - retention_period.num_milliseconds()
< obj_meta.last_modified.timestamp_millis()
- || is_hidden
{
continue;
}
@@ -380,15 +398,15 @@ impl VacuumPlan {
/// Names of the form partitionCol=[value] are partition directories, and should be
/// deleted even if they'd normally be hidden. The _db_index directory contains (bloom filter)
/// indexes and these must be deleted when the data they are tied to is deleted.
-fn is_hidden_directory(partition_columns: &[String], path: &Path) -> Result {
+fn is_hidden_file(partition_columns: &[String], path: &Path) -> Result {
+ let path_name = path.as_ref();
+ let skip = path_name.starts_with("_delta_index") || path_name.starts_with("_change_data");
let is_hidden = path
.parts()
+ .skip(skip as usize)
.any(|p| p.as_ref().starts_with('.') || p.as_ref().starts_with('_'));
- let path_name = path.as_ref();
Ok(is_hidden
- && !path_name.starts_with("_delta_index")
- && !path_name.starts_with("_change_data")
&& !partition_columns
.iter()
.any(|partition_column| path_name.starts_with(partition_column)))
@@ -468,4 +486,19 @@ mod tests {
assert_eq!(result.files_deleted, empty);
}
+
+ #[tokio::test]
+ async fn vacuum_table_with_dv_small() {
+ let table = open_table("./tests/data/table-with-dv-small")
+ .await
+ .unwrap();
+
+ let (_table, result) = VacuumBuilder::new(table.log_store, table.state)
+ .with_dry_run(true)
+ .await
+ .unwrap();
+
+ let empty: Vec = Vec::new();
+ assert_eq!(result.files_deleted, empty);
+ }
}
From 752773acbd06ad58561545b25b4e1f2e83a6ac92 Mon Sep 17 00:00:00 2001
From: Jan Schweizer
Date: Wed, 15 Nov 2023 16:26:14 +0100
Subject: [PATCH 09/16] Fix paths for managed files
---
.../deltalake-core/src/operations/vacuum.rs | 47 ++++++++++++++-----
1 file changed, 36 insertions(+), 11 deletions(-)
diff --git a/crates/deltalake-core/src/operations/vacuum.rs b/crates/deltalake-core/src/operations/vacuum.rs
index 015af693a9..27408fe48c 100644
--- a/crates/deltalake-core/src/operations/vacuum.rs
+++ b/crates/deltalake-core/src/operations/vacuum.rs
@@ -195,22 +195,32 @@ impl VacuumBuilder {
.ok_or(DeltaTableError::NoMetadata)?
.partition_columns;
+ // A set containing the absolute paths to managed files
let managed_files = self
.snapshot
.files()
.iter()
- .map(|a| a.path.clone())
- .chain(
- self.snapshot
- .all_tombstones()
- .iter()
- .map(|r| r.path.clone()),
- )
+ .map(|a| {
+ if is_absolute_path(&a.path) {
+ a.path.clone()
+ } else {
+ format!("{}{}", self.log_store.root_uri(), a.path)
+ }
+ })
+ .chain(self.snapshot.all_tombstones().iter().map(|r| {
+ if is_absolute_path(&r.path) {
+ r.path.clone()
+ } else {
+ format!("{}{}", self.log_store.root_uri(), r.path)
+ }
+ }))
.chain(self.snapshot.files().iter().filter_map(|a| {
return if let Some(deletion_vector) = &a.deletion_vector {
- if let Ok(parent) = &Url::parse(self.log_store.root_uri().as_str()) {
- if let Ok(dv_absolut_path) = deletion_vector.absolute_path(&parent) {
- Some(dv_absolut_path?.to_string())
+ if let Ok(parent) =
+ &Url::parse(&format!("file://{}", self.log_store.root_uri().as_str()))
+ {
+ if let Ok(dv_absolute_path) = deletion_vector.absolute_path(&parent) {
+ Some(dv_absolute_path?.path().to_string())
} else {
None
}
@@ -231,7 +241,7 @@ impl VacuumBuilder {
continue;
}
- if managed_files.contains(obj_meta.location.as_ref()) {
+ if self.is_file_managed(&managed_files, &obj_meta.location) {
if !expired_tombstones.contains(obj_meta.location.as_ref()) {
continue;
}
@@ -253,6 +263,16 @@ impl VacuumBuilder {
specified_retention_millis: Some(retention_period.num_milliseconds()),
})
}
+
+ /// Whether a file is contained within the set of managed files.
+ fn is_file_managed(&self, managed_files: &HashSet, file: &Path) -> bool {
+ return if is_absolute_path(file.as_ref()) {
+ managed_files.contains(file.as_ref())
+ } else {
+ let path = format!("{}{}", self.log_store.root_uri(), file.as_ref());
+ managed_files.contains(&path)
+ };
+ }
}
impl std::future::IntoFuture for VacuumBuilder {
@@ -285,6 +305,11 @@ impl std::future::IntoFuture for VacuumBuilder {
}
}
+fn is_absolute_path(path: &str) -> bool {
+ let path = std::path::Path::new(path);
+ return path.is_absolute();
+}
+
/// Encapsulate which files are to be deleted and the parameters used to make that decision
struct VacuumPlan {
/// What files are to be deleted
From 48b4e3ca996d2f53ed97f74498abffc93bc3d0c8 Mon Sep 17 00:00:00 2001
From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com>
Date: Fri, 1 Dec 2023 18:16:56 +0100
Subject: [PATCH 10/16] docs: fix all examples and change overall structure
(#1931)
# Description
I have made a bunch of improvements to fix the overall structure due to
example sections not being consistent. I've also enabled some extra
features. Fixed also the issue of some classes/functions not being shown
properly.
---
.github/workflows/python_build.yml | 5 -
docs/api/catalog.md | 1 +
docs/api/delta_table.md | 10 -
docs/api/delta_table/delta_table_merger.md | 5 +
docs/api/delta_table/delta_table_optimizer.md | 5 +
docs/api/delta_table/index.md | 5 +
docs/api/delta_table/metadata.md | 6 +
docs/api/delta_writer.md | 7 +
docs/api/exceptions.md | 6 +
docs/api/schema.md | 23 +-
docs/api/storage.md | 6 +-
.../appending-overwriting-delta-lake-table.md | 2 +-
.../small-file-compaction-with-optimize.md | 24 +-
mkdocs.yml | 28 +-
python/deltalake/_internal.pyi | 290 ++++++----
python/deltalake/table.py | 538 ++++++++++--------
python/deltalake/writer.py | 14 +-
17 files changed, 544 insertions(+), 431 deletions(-)
create mode 100644 docs/api/catalog.md
delete mode 100644 docs/api/delta_table.md
create mode 100644 docs/api/delta_table/delta_table_merger.md
create mode 100644 docs/api/delta_table/delta_table_optimizer.md
create mode 100644 docs/api/delta_table/index.md
create mode 100644 docs/api/delta_table/metadata.md
create mode 100644 docs/api/delta_writer.md
create mode 100644 docs/api/exceptions.md
diff --git a/.github/workflows/python_build.yml b/.github/workflows/python_build.yml
index 51626310be..bc2f20cc9a 100644
--- a/.github/workflows/python_build.yml
+++ b/.github/workflows/python_build.yml
@@ -127,11 +127,6 @@ jobs:
python -m pytest -m "not pandas and not integration and not benchmark"
pip install pandas
- - name: Build Sphinx documentation
- run: |
- source venv/bin/activate
- make build-documentation
-
benchmark:
name: Python Benchmark
runs-on: ubuntu-latest
diff --git a/docs/api/catalog.md b/docs/api/catalog.md
new file mode 100644
index 0000000000..d75dd648db
--- /dev/null
+++ b/docs/api/catalog.md
@@ -0,0 +1 @@
+::: deltalake.data_catalog.DataCatalog
\ No newline at end of file
diff --git a/docs/api/delta_table.md b/docs/api/delta_table.md
deleted file mode 100644
index 75284664fe..0000000000
--- a/docs/api/delta_table.md
+++ /dev/null
@@ -1,10 +0,0 @@
-# DeltaTable
-
-::: deltalake.table
- options:
- show_root_heading: false
-
-## Writing Delta Tables
-
-::: deltalake.write_deltalake
-
diff --git a/docs/api/delta_table/delta_table_merger.md b/docs/api/delta_table/delta_table_merger.md
new file mode 100644
index 0000000000..7d707a16f2
--- /dev/null
+++ b/docs/api/delta_table/delta_table_merger.md
@@ -0,0 +1,5 @@
+# TableMerger
+
+::: deltalake.table.TableMerger
+ options:
+ show_root_heading: true
\ No newline at end of file
diff --git a/docs/api/delta_table/delta_table_optimizer.md b/docs/api/delta_table/delta_table_optimizer.md
new file mode 100644
index 0000000000..2275cbd0ca
--- /dev/null
+++ b/docs/api/delta_table/delta_table_optimizer.md
@@ -0,0 +1,5 @@
+# TableOptimizer
+
+::: deltalake.table.TableOptimizer
+ options:
+ show_root_heading: true
\ No newline at end of file
diff --git a/docs/api/delta_table/index.md b/docs/api/delta_table/index.md
new file mode 100644
index 0000000000..46a65af8d3
--- /dev/null
+++ b/docs/api/delta_table/index.md
@@ -0,0 +1,5 @@
+# DeltaTable
+
+::: deltalake.DeltaTable
+ options:
+ show_root_heading: true
\ No newline at end of file
diff --git a/docs/api/delta_table/metadata.md b/docs/api/delta_table/metadata.md
new file mode 100644
index 0000000000..92ff62370a
--- /dev/null
+++ b/docs/api/delta_table/metadata.md
@@ -0,0 +1,6 @@
+# Metadata
+
+::: deltalake.Metadata
+ options:
+ show_root_heading: true
+
diff --git a/docs/api/delta_writer.md b/docs/api/delta_writer.md
new file mode 100644
index 0000000000..71c31534b0
--- /dev/null
+++ b/docs/api/delta_writer.md
@@ -0,0 +1,7 @@
+# Writer
+## Write to Delta Tables
+
+::: deltalake.write_deltalake
+
+## Convert to Delta Tables
+::: deltalake.convert_to_deltalake
\ No newline at end of file
diff --git a/docs/api/exceptions.md b/docs/api/exceptions.md
new file mode 100644
index 0000000000..afe99f92f1
--- /dev/null
+++ b/docs/api/exceptions.md
@@ -0,0 +1,6 @@
+# Exceptions
+
+::: deltalake.exceptions.DeltaError
+::: deltalake.exceptions.DeltaProtocolError
+::: deltalake.exceptions.TableNotFoundError
+::: deltalake.exceptions.CommitFailedError
diff --git a/docs/api/schema.md b/docs/api/schema.md
index 9a91f61062..9a6ba7b2e6 100644
--- a/docs/api/schema.md
+++ b/docs/api/schema.md
@@ -1,28 +1,29 @@
-## Delta Lake Schemas
-
+## Schema and field
Schemas, fields, and data types are provided in the ``deltalake.schema`` submodule.
-::: deltalake.schema.Schema
+::: deltalake.Schema
options:
show_root_heading: true
show_root_toc_entry: true
-::: deltalake.schema.PrimitiveType
+::: deltalake.Field
options:
show_root_heading: true
show_root_toc_entry: true
-::: deltalake.schema.ArrayType
+
+## Data types
+::: deltalake.schema.PrimitiveType
options:
show_root_heading: true
show_root_toc_entry: true
-::: deltalake.schema.MapType
+::: deltalake.schema.ArrayType
options:
show_root_heading: true
show_root_toc_entry: true
-::: deltalake.schema.Field
+::: deltalake.schema.MapType
options:
show_root_heading: true
show_root_toc_entry: true
@@ -30,10 +31,4 @@ Schemas, fields, and data types are provided in the ``deltalake.schema`` submodu
::: deltalake.schema.StructType
options:
show_root_heading: true
- show_root_toc_entry: true
-
-::: deltalake.data_catalog
-
-## Delta Storage Handler
-
-::: deltalake.fs
+ show_root_toc_entry: true
\ No newline at end of file
diff --git a/docs/api/storage.md b/docs/api/storage.md
index 77fd28c81a..ddb18250cf 100644
--- a/docs/api/storage.md
+++ b/docs/api/storage.md
@@ -1,3 +1,5 @@
-## Delta Storage Handler
+# Storage
-::: deltalake.fs
+The delta filesystem handler for the pyarrow engine writer.
+
+::: deltalake.fs.DeltaStorageHandler
diff --git a/docs/usage/appending-overwriting-delta-lake-table.md b/docs/usage/appending-overwriting-delta-lake-table.md
index 0930d8da1e..397edb9d0d 100644
--- a/docs/usage/appending-overwriting-delta-lake-table.md
+++ b/docs/usage/appending-overwriting-delta-lake-table.md
@@ -63,7 +63,7 @@ Here are the contents of the Delta table after the overwrite operation:
Overwriting just performs a logical delete. It doesn't physically remove the previous data from storage. Time travel back to the previous version to confirm that the old version of the table is still accessable.
-```
+```python
dt = DeltaTable("tmp/some-table", version=1)
+-------+----------+
diff --git a/docs/usage/optimize/small-file-compaction-with-optimize.md b/docs/usage/optimize/small-file-compaction-with-optimize.md
index ece15deea4..78d8778ff5 100644
--- a/docs/usage/optimize/small-file-compaction-with-optimize.md
+++ b/docs/usage/optimize/small-file-compaction-with-optimize.md
@@ -16,7 +16,7 @@ Let’s start by creating a Delta table with a lot of small files so we can demo
Start by writing a function that generates on thousand rows of random data given a timestamp.
-```
+```python
def record_observations(date: datetime) -> pa.Table:
"""Pulls data for a certain datetime"""
nrows = 1000
@@ -31,7 +31,7 @@ def record_observations(date: datetime) -> pa.Table:
Let’s run this function and observe the output:
-```
+```python
record_observations(datetime(2021, 1, 1, 12)).to_pandas()
date timestamp value
@@ -44,7 +44,7 @@ record_observations(datetime(2021, 1, 1, 12)).to_pandas()
Let’s write 100 hours worth of data to the Delta table.
-```
+```python
# Every hour starting at midnight on 2021-01-01
hours_iter = (datetime(2021, 1, 1) + timedelta(hours=i) for i in itertools.count())
@@ -60,7 +60,7 @@ for timestamp in itertools.islice(hours_iter, 100):
This data was appended to the Delta table in 100 separate transactions, so the table will contain 100 transaction log entries and 100 data files. You can see the number of files with the `files()` method.
-```
+```python
dt = DeltaTable("observation_data")
len(dt.files()) # 100
```
@@ -101,7 +101,7 @@ Each of these Parquet files are tiny - they’re only 10 KB. Let’s see how to
Let’s run the optimize command to compact the existing small files into larger files:
-```
+```python
dt = DeltaTable("observation_data")
dt.optimize()
@@ -109,7 +109,7 @@ dt.optimize()
Here’s the output of the command:
-```
+```python
{'numFilesAdded': 5,
'numFilesRemoved': 100,
'filesAdded': {'min': 39000,
@@ -137,7 +137,7 @@ Let’s append some more data to the Delta table and see how we can selectively
Let’s append another 24 hours of data to the Delta table:
-```
+```python
for timestamp in itertools.islice(hours_iter, 24):
write_deltalake(
dt,
@@ -149,7 +149,7 @@ for timestamp in itertools.islice(hours_iter, 24):
We can use `get_add_actions()` to introspect the table state. We can see that `2021-01-06` has only a few hours of data so far, so we don't want to optimize that yet. But `2021-01-05` has all 24 hours of data, so it's ready to be optimized.
-```
+```python
dt.get_add_actions(flatten=True).to_pandas()[
"partition.date"
].value_counts().sort_index()
@@ -164,7 +164,7 @@ dt.get_add_actions(flatten=True).to_pandas()[
To optimize a single partition, you can pass in a `partition_filters` argument speficying which partitions to optimize.
-```
+```python
dt.optimize(partition_filters=[("date", "=", "2021-01-05")])
{'numFilesAdded': 1,
@@ -188,7 +188,7 @@ dt.optimize(partition_filters=[("date", "=", "2021-01-05")])
This optimize operation tombstones 21 small data files and adds one file with all the existing data properly condensed. Let’s take a look a portion of the `_delta_log/00000000000000000125.json` file, which is the transaction log entry that corresponds with this incremental optimize command.
-```
+```python
{
"remove": {
"path": "date=2021-01-05/part-00000-41178aab-2491-488f-943d-8f03867295ee-c000.snappy.parquet",
@@ -248,13 +248,13 @@ It’s normally a good idea to have a retention period of at least 7 days. For
Let’s run the vacuum command:
-```
+```python
dt.vacuum(retention_hours=0, enforce_retention_duration=False, dry_run=False)
```
The command returns a list of all the files that are removed from storage:
-```
+```python
['date=2021-01-02/39-a98680f2-0e0e-4f26-a491-18b183f9eb05-0.parquet',
'date=2021-01-02/41-e96bc8bb-c571-484c-b534-e897424fb7da-0.parquet',
…
diff --git a/mkdocs.yml b/mkdocs.yml
index 97b6e91b0e..a86257c932 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -10,10 +10,14 @@ theme:
features:
- navigation.tracking
- navigation.instant
+ - navigation.expand
- navigation.tabs
+ - navigation.indexes
- navigation.tabs.sticky
- navigation.footer
- content.tabs.link
+ - content.code.annotation
+ - content.code.copy
nav:
- Home: index.md
- Usage:
@@ -31,11 +35,17 @@ nav:
- Small file compaction: usage/optimize/small-file-compaction-with-optimize.md
- Z Order: usage/optimize/delta-lake-z-order.md
- API Reference:
- - api/delta_table.md
+ - api/delta_writer.md
+ - Table:
+ - api/delta_table/index.md
+ - api/delta_table/metadata.md
+ - api/delta_table/delta_table_merger.md
+ - api/delta_table/delta_table_optimizer.md
- api/schema.md
- api/storage.md
+ - api/catalog.md
+ - api/exceptions.md
- Integrations:
- - Arrow: integrations/delta-lake-arrow.md
- pandas: integrations/delta-lake-pandas.md
not_in_nav: |
/_build/
@@ -61,7 +71,7 @@ plugins:
show_source: false
show_symbol_type_in_heading: true
show_signature_annotations: true
- show_root_heading: false
+ show_root_heading: true
show_root_full_path: true
separate_signature: true
docstring_options:
@@ -81,6 +91,11 @@ plugins:
on_page_markdown: 'docs._build.hooks:on_page_markdown'
markdown_extensions:
+ - pymdownx.highlight:
+ anchor_linenums: true
+ line_spans: __span
+ pygments_lang_class: true
+ - pymdownx.inlinehilite
- admonition
- pymdownx.details
- attr_list
@@ -97,4 +112,9 @@ markdown_extensions:
- footnotes
extra:
- python_api_url: https://delta-io.github.io/delta-rs/api/
\ No newline at end of file
+ python_api_url: https://delta-io.github.io/delta-rs/api/
+ generator: false
+ social:
+ - icon: fontawesome/brands/slack
+ link: https://go.delta.io/slack
+ name: Delta slack channel
\ No newline at end of file
diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi
index d7c0e1a8f9..e1f5288b81 100644
--- a/python/deltalake/_internal.pyi
+++ b/python/deltalake/_internal.pyi
@@ -210,36 +210,39 @@ class PrimitiveType:
The JSON representation for a primitive type is just a quoted string: `PrimitiveType.from_json('"integer"')`
Args:
- json: A JSON string
+ json: a JSON string
- Returns a [PrimitiveType][deltalake.schema.PrimitiveType] type
+ Returns:
+ a PrimitiveType type
"""
def to_pyarrow(self) -> pyarrow.DataType:
"""Get the equivalent PyArrow type (pyarrow.DataType)"""
@staticmethod
def from_pyarrow(type: pyarrow.DataType) -> PrimitiveType:
- """Create a [PrimitiveType][deltalake.schema.PrimitiveType] from a PyArrow type
+ """Create a PrimitiveType from a PyArrow datatype
Will raise `TypeError` if the PyArrow type is not a primitive type.
Args:
- type: A PyArrow [DataType][pyarrow.DataType] type
+ type: A PyArrow DataType
- Returns: a [PrimitiveType][deltalake.schema.PrimitiveType] type
+ Returns:
+ a PrimitiveType
"""
class ArrayType:
"""An Array (List) DataType
- Can either pass the element type explicitly or can pass a string
- if it is a primitive type:
- ```
- ArrayType(PrimitiveType("integer"))
- # Returns ArrayType(PrimitiveType("integer"), contains_null=True)
+ Example:
+ Can either pass the element type explicitly or can pass a string
+ if it is a primitive type:
+ ```python
+ ArrayType(PrimitiveType("integer"))
+ # Returns ArrayType(PrimitiveType("integer"), contains_null=True)
- ArrayType("integer", contains_null=False)
- # Returns ArrayType(PrimitiveType("integer"), contains_null=False)
- ```
+ ArrayType("integer", contains_null=False)
+ # Returns ArrayType(PrimitiveType("integer"), contains_null=False)
+ ```
"""
def __init__(
@@ -269,23 +272,25 @@ class ArrayType:
def from_json(json: str) -> "ArrayType":
"""Create an ArrayType from a JSON string
- The JSON representation for an array type is an object with `type` (set to
- `"array"`), `elementType`, and `containsNull`:
- ```
- ArrayType.from_json(
- '''{
- "type": "array",
- "elementType": "integer",
- "containsNull": false
- }'''
- )
- # Returns ArrayType(PrimitiveType("integer"), contains_null=False)
- ```
-
Args:
- json: A JSON string
+ json: a JSON string
+
+ Returns:
+ an ArrayType
- Returns: an [ArrayType][deltalake.schema.ArrayType] type
+ Example:
+ The JSON representation for an array type is an object with `type` (set to
+ `"array"`), `elementType`, and `containsNull`.
+ ```python
+ ArrayType.from_json(
+ '''{
+ "type": "array",
+ "elementType": "integer",
+ "containsNull": false
+ }'''
+ )
+ # Returns ArrayType(PrimitiveType("integer"), contains_null=False)
+ ```
"""
def to_pyarrow(
self,
@@ -298,9 +303,10 @@ class ArrayType:
Will raise `TypeError` if a different PyArrow DataType is provided.
Args:
- type: The PyArrow [ListType][pyarrow.ListType]
+ type: The PyArrow ListType
- Returns: an [ArrayType][deltalake.schema.ArrayType] type
+ Returns:
+ an ArrayType
"""
class MapType:
@@ -310,13 +316,14 @@ class MapType:
or [StructType][deltalake.schema.StructType]. A string can also be passed, which will be
parsed as a primitive type:
- ```
- MapType(PrimitiveType("integer"), PrimitiveType("string"))
- # Returns MapType(PrimitiveType("integer"), PrimitiveType("string"), value_contains_null=True)
+ Example:
+ ```python
+ MapType(PrimitiveType("integer"), PrimitiveType("string"))
+ # Returns MapType(PrimitiveType("integer"), PrimitiveType("string"), value_contains_null=True)
- MapType("integer", "string", value_contains_null=False)
- # Returns MapType(PrimitiveType("integer"), PrimitiveType("string"), value_contains_null=False)
- ```
+ MapType("integer", "string", value_contains_null=False)
+ # Returns MapType(PrimitiveType("integer"), PrimitiveType("string"), value_contains_null=False)
+ ```
"""
def __init__(
@@ -352,29 +359,36 @@ class MapType:
"""
def to_json(self) -> str:
- """Get JSON string representation of map type."""
+ """Get JSON string representation of map type.
+
+ Returns:
+ a JSON string
+ """
@staticmethod
def from_json(json: str) -> MapType:
"""Create a MapType from a JSON string
- The JSON representation for a map type is an object with `type` (set to `map`),
- `keyType`, `valueType`, and `valueContainsNull`:
- ```
- MapType.from_json(
- '''{
- "type": "map",
- "keyType": "integer",
- "valueType": "string",
- "valueContainsNull": true
- }'''
- )
- # Returns MapType(PrimitiveType("integer"), PrimitiveType("string"), value_contains_null=True)
- ```
-
Args:
- json: A JSON string
+ json: a JSON string
- Returns: a [MapType][deltalake.schema.MapType] type
+ Returns:
+ an ArrayType
+
+ Example:
+ The JSON representation for a map type is an object with `type` (set to `map`),
+ `keyType`, `valueType`, and `valueContainsNull`:
+
+ ```python
+ MapType.from_json(
+ '''{
+ "type": "map",
+ "keyType": "integer",
+ "valueType": "string",
+ "valueContainsNull": true
+ }'''
+ )
+ # Returns MapType(PrimitiveType("integer"), PrimitiveType("string"), value_contains_null=True)
+ ```
"""
def to_pyarrow(self) -> pyarrow.MapType:
"""Get the equivalent PyArrow data type."""
@@ -387,25 +401,27 @@ class MapType:
Args:
type: the PyArrow MapType
- Returns: a [MapType][deltalake.schema.MapType] type
+ Returns:
+ a MapType
"""
class Field:
"""A field in a Delta StructType or Schema
- Can create with just a name and a type:
- ```
- Field("my_int_col", "integer")
- # Returns Field("my_int_col", PrimitiveType("integer"), nullable=True, metadata=None)
- ```
+ Example:
+ Can create with just a name and a type:
+ ```python
+ Field("my_int_col", "integer")
+ # Returns Field("my_int_col", PrimitiveType("integer"), nullable=True, metadata=None)
+ ```
- Can also attach metadata to the field. Metadata should be a dictionary with
- string keys and JSON-serializable values (str, list, int, float, dict):
+ Can also attach metadata to the field. Metadata should be a dictionary with
+ string keys and JSON-serializable values (str, list, int, float, dict):
- ```
- Field("my_col", "integer", metadata={"custom_metadata": {"test": 2}})
- # Returns Field("my_col", PrimitiveType("integer"), nullable=True, metadata={"custom_metadata": {"test": 2}})
- ```
+ ```python
+ Field("my_col", "integer", metadata={"custom_metadata": {"test": 2}})
+ # Returns Field("my_col", PrimitiveType("integer"), nullable=True, metadata={"custom_metadata": {"test": 2}})
+ ```
"""
def __init__(
@@ -440,10 +456,15 @@ class Field:
def to_json(self) -> str:
"""Get the field as JSON string.
- ```
- Field("col", "integer").to_json()
- # Returns '{"name":"col","type":"integer","nullable":true,"metadata":{}}'
- ```
+
+ Returns:
+ a JSON string
+
+ Example:
+ ```python
+ Field("col", "integer").to_json()
+ # Returns '{"name":"col","type":"integer","nullable":true,"metadata":{}}'
+ ```
"""
@staticmethod
def from_json(json: str) -> Field:
@@ -452,25 +473,27 @@ class Field:
Args:
json: the JSON string.
- Returns: Field
+ Returns:
+ Field
Example:
- ```
- Field.from_json('''{
- "name": "col",
- "type": "integer",
- "nullable": true,
- "metadata": {}
- }'''
- )
- # Returns Field(col, PrimitiveType("integer"), nullable=True)
- ```
+ ```
+ Field.from_json('''{
+ "name": "col",
+ "type": "integer",
+ "nullable": true,
+ "metadata": {}
+ }'''
+ )
+ # Returns Field(col, PrimitiveType("integer"), nullable=True)
+ ```
"""
def to_pyarrow(self) -> pyarrow.Field:
"""Convert to an equivalent PyArrow field
Note: This currently doesn't preserve field metadata.
- Returns: a [pyarrow.Field][pyarrow.Field] type
+ Returns:
+ a pyarrow Field
"""
@staticmethod
def from_pyarrow(field: pyarrow.Field) -> Field:
@@ -478,21 +501,21 @@ class Field:
Note: This currently doesn't preserve field metadata.
Args:
- field: a PyArrow Field type
+ field: a PyArrow Field
- Returns: a [Field][deltalake.schema.Field] type
+ Returns:
+ a Field
"""
class StructType:
"""A struct datatype, containing one or more subfields
Example:
-
- Create with a list of :class:`Field`:
- ```
- StructType([Field("x", "integer"), Field("y", "string")])
- # Creates: StructType([Field(x, PrimitiveType("integer"), nullable=True), Field(y, PrimitiveType("string"), nullable=True)])
- ```
+ Create with a list of :class:`Field`:
+ ```python
+ StructType([Field("x", "integer"), Field("y", "string")])
+ # Creates: StructType([Field(x, PrimitiveType("integer"), nullable=True), Field(y, PrimitiveType("string"), nullable=True)])
+ ```
"""
def __init__(self, fields: List[Field]) -> None: ...
@@ -503,33 +526,42 @@ class StructType:
def to_json(self) -> str:
"""Get the JSON representation of the type.
- ```
- StructType([Field("x", "integer")]).to_json()
- # Returns '{"type":"struct","fields":[{"name":"x","type":"integer","nullable":true,"metadata":{}}]}'
- ```
+
+ Returns:
+ a JSON string
+
+ Example:
+ ```python
+ StructType([Field("x", "integer")]).to_json()
+ # Returns '{"type":"struct","fields":[{"name":"x","type":"integer","nullable":true,"metadata":{}}]}'
+ ```
"""
@staticmethod
def from_json(json: str) -> StructType:
"""Create a new StructType from a JSON string.
- ```
- StructType.from_json(
- '''{
- "type": "struct",
- "fields": [{"name": "x", "type": "integer", "nullable": true, "metadata": {}}]
- }'''
- )
- # Returns StructType([Field(x, PrimitiveType("integer"), nullable=True)])
- ```
Args:
json: a JSON string
- Returns: a [StructType][deltalake.schema.StructType] type
+ Returns:
+ a StructType
+
+ Example:
+ ```python
+ StructType.from_json(
+ '''{
+ "type": "struct",
+ "fields": [{"name": "x", "type": "integer", "nullable": true, "metadata": {}}]
+ }'''
+ )
+ # Returns StructType([Field(x, PrimitiveType("integer"), nullable=True)])
+ ```
"""
def to_pyarrow(self) -> pyarrow.StructType:
"""Get the equivalent PyArrow StructType
- Returns: a PyArrow [StructType][pyarrow.StructType] type
+ Returns:
+ a PyArrow StructType
"""
@staticmethod
def from_pyarrow(type: pyarrow.StructType) -> StructType:
@@ -540,7 +572,8 @@ class StructType:
Args:
type: a PyArrow struct type.
- Returns: a [StructType][deltalake.schema.StructType] type
+ Returns:
+ a StructType
"""
class Schema:
@@ -553,38 +586,44 @@ class Schema:
"""
def to_json(self) -> str:
"""Get the JSON string representation of the Schema.
- A schema has the same JSON format as a StructType.
- ```
- Schema([Field("x", "integer")]).to_json()
- # Returns '{"type":"struct","fields":[{"name":"x","type":"integer","nullable":true,"metadata":{}}]}'
- ```
- Returns: a JSON string
+
+ Returns:
+ a JSON string
+
+ Example:
+ A schema has the same JSON format as a StructType.
+ ```python
+ Schema([Field("x", "integer")]).to_json()
+ # Returns '{"type":"struct","fields":[{"name":"x","type":"integer","nullable":true,"metadata":{}}]}'
+ ```
"""
@staticmethod
def from_json(json: str) -> Schema:
"""Create a new Schema from a JSON string.
- A schema has the same JSON format as a StructType.
- ```
- Schema.from_json('''{
- "type": "struct",
- "fields": [{"name": "x", "type": "integer", "nullable": true, "metadata": {}}]
- }
- )'''
- # Returns Schema([Field(x, PrimitiveType("integer"), nullable=True)])
- ```
-
Args:
json: a JSON string
+
+ Example:
+ A schema has the same JSON format as a StructType.
+ ```python
+ Schema.from_json('''{
+ "type": "struct",
+ "fields": [{"name": "x", "type": "integer", "nullable": true, "metadata": {}}]
+ }
+ )'''
+ # Returns Schema([Field(x, PrimitiveType("integer"), nullable=True)])
+ ```
"""
def to_pyarrow(self, as_large_types: bool = False) -> pyarrow.Schema:
"""Return equivalent PyArrow schema
Args:
- as_large_types: get schema with all variable size types (list, binary, string) as large variants (with int64 indices). This is for compatibility with systems like Polars that only support the large versions of Arrow types.
+ as_large_types: get schema with all variable size types (list, binary, string) as large variants (with int64 indices).
+ This is for compatibility with systems like Polars that only support the large versions of Arrow types.
Returns:
- a PyArrow [Schema][pyarrow.Schema] type
+ a PyArrow Schema
"""
@staticmethod
def from_pyarrow(type: pyarrow.Schema) -> Schema:
@@ -593,9 +632,10 @@ class Schema:
Will raise `TypeError` if the PyArrow type is not a primitive type.
Args:
- type: A PyArrow [Schema][pyarrow.Schema] type
+ type: A PyArrow Schema
- Returns: a [Schema][deltalake.schema.Schema] type
+ Returns:
+ a Schema
"""
class ObjectInputFile:
diff --git a/python/deltalake/table.py b/python/deltalake/table.py
index b238af7929..3ac28acf88 100644
--- a/python/deltalake/table.py
+++ b/python/deltalake/table.py
@@ -209,13 +209,13 @@ def _filters_to_expression(filters: FilterType) -> Expression:
the op is in or not in, the value must be a collection such as a list, a set or a tuple.
The supported type for value is str. Use empty string `''` for Null partition value.
-Examples:
-```
-("x", "=", "a")
-("x", "!=", "a")
-("y", "in", ["a", "b", "c"])
-("z", "not in", ["a","b"])
-```
+Example:
+ ```
+ ("x", "=", "a")
+ ("x", "!=", "a")
+ ("y", "in", ["a", "b", "c"])
+ ("z", "not in", ["a","b"])
+ ```
"""
@@ -329,13 +329,13 @@ def files(
the op is in or not in, the value must be a collection such as a list, a set or a tuple.
The supported type for value is str. Use empty string `''` for Null partition value.
- Examples:
- ```
- ("x", "=", "a")
- ("x", "!=", "a")
- ("y", "in", ["a", "b", "c"])
- ("z", "not in", ["a","b"])
- ```
+ Example:
+ ```
+ ("x", "=", "a")
+ ("x", "!=", "a")
+ ("y", "in", ["a", "b", "c"])
+ ("z", "not in", ["a","b"])
+ ```
"""
return self._table.files(self.__stringify_partition_values(partition_filters))
@@ -366,13 +366,13 @@ def file_uris(
the op is in or not in, the value must be a collection such as a list, a set or a tuple.
The supported type for value is str. Use empty string `''` for Null partition value.
- Examples:
- ```
- ("x", "=", "a")
- ("x", "!=", "a")
- ("y", "in", ["a", "b", "c"])
- ("z", "not in", ["a","b"])
- ```
+ Example:
+ ```
+ ("x", "=", "a")
+ ("x", "!=", "a")
+ ("y", "in", ["a", "b", "c"])
+ ("z", "not in", ["a","b"])
+ ```
"""
return self._table.file_uris(
self.__stringify_partition_values(partition_filters)
@@ -397,12 +397,12 @@ def load_with_datetime(self, datetime_string: str) -> None:
Args:
datetime_string: the identifier of the datetime point of the DeltaTable to load
- Examples:
- ```
- "2018-01-26T18:30:09Z"
- "2018-12-19T16:39:57-08:00"
- "2018-01-26T18:30:09.453+00:00"
- ```
+ Example:
+ ```
+ "2018-01-26T18:30:09Z"
+ "2018-12-19T16:39:57-08:00"
+ "2018-01-26T18:30:09.453+00:00"
+ ```
"""
self._table.load_with_datetime(datetime_string)
@@ -511,7 +511,7 @@ def update(
Args:
updates: a mapping of column name to update SQL expression.
new_values: a mapping of column name to python datatype.
- predicate: a logical expression, defaults to None
+ predicate: a logical expression.
writer_properties: Pass writer properties to the Rust parquet writer, see options https://arrow.apache.org/rust/parquet/file/properties/struct.WriterProperties.html,
only the following fields are supported: `data_page_size_limit`, `dictionary_page_size_limit`,
`data_page_row_count_limit`, `write_batch_size`, `max_row_group_size`.
@@ -520,34 +520,43 @@ def update(
Returns:
the metrics from update
- Examples:
-
- Update some row values with SQL predicate. This is equivalent to `UPDATE table SET deleted = true WHERE id = '3'`
+ Example:
+ **Update some row values with SQL predicate**
+
+ This is equivalent to `UPDATE table SET deleted = true WHERE id = '3'`
+ ```py
+ from deltalake import write_deltalake, DeltaTable
+ import pandas as pd
+ df = pd.DataFrame(
+ {"id": ["1", "2", "3"],
+ "deleted": [False, False, False],
+ "price": [10., 15., 20.]
+ })
+ write_deltalake("tmp", df)
+ dt = DeltaTable("tmp")
+ dt.update(predicate="id = '3'", updates = {"deleted": 'True'})
- >>> from deltalake import write_deltalake, DeltaTable
- >>> import pandas as pd
- >>> df = pd.DataFrame({"id": ["1", "2", "3"], "deleted": [False, False, False], "price": [10., 15., 20.]})
- >>> write_deltalake("tmp", df)
- >>> dt = DeltaTable("tmp")
- >>> dt.update(predicate="id = '3'", updates = {"deleted": 'True'})
-
{'num_added_files': 1, 'num_removed_files': 1, 'num_updated_rows': 1, 'num_copied_rows': 2, 'execution_time_ms': ..., 'scan_time_ms': ...}
+ ```
+
+ **Update all row values**
- Update all row values. This is equivalent to
- ``UPDATE table SET deleted = true, id = concat(id, '_old')``.
+ This is equivalent to ``UPDATE table SET deleted = true, id = concat(id, '_old')``.
+ ```py
+ dt.update(updates = {"deleted": 'True', "id": "concat(id, '_old')"})
- >>> dt.update(updates = {"deleted": 'True', "id": "concat(id, '_old')"})
-
-
{'num_added_files': 1, 'num_removed_files': 1, 'num_updated_rows': 3, 'num_copied_rows': 0, 'execution_time_ms': ..., 'scan_time_ms': ...}
+ ```
- To use Python objects instead of SQL strings, use the `new_values` parameter
- instead of the `updates` parameter. For example, this is equivalent to
- ``UPDATE table SET price = 150.10 WHERE id = '1'``
+ **Use Python objects instead of SQL strings**
- >>> dt.update(predicate="id = '1_old'", new_values = {"price": 150.10})
- {'num_added_files': 1, 'num_removed_files': 1, 'num_updated_rows': 1, 'num_copied_rows': 2, 'execution_time_ms': ..., 'scan_time_ms': ...}
+ Use the `new_values` parameter instead of the `updates` parameter. For example,
+ this is equivalent to ``UPDATE table SET price = 150.10 WHERE id = '1'``
+ ```py
+ dt.update(predicate="id = '1_old'", new_values = {"price": 150.10})
+ {'num_added_files': 1, 'num_removed_files': 1, 'num_updated_rows': 1, 'num_copied_rows': 2, 'execution_time_ms': ..., 'scan_time_ms': ...}
+ ```
"""
if updates is None and new_values is not None:
updates = {}
@@ -614,11 +623,11 @@ def merge(
match the underlying table.
Args:
- source (pyarrow.Table | pyarrow.RecordBatch | pyarrow.RecordBatchReader ): source data
- predicate (str): SQL like predicate on how to merge
- source_alias (str): Alias for the source table
- target_alias (str): Alias for the target table
- error_on_type_mismatch (bool): specify if merge will return error if data types are mismatching :default = True
+ source: source data
+ predicate: SQL like predicate on how to merge
+ source_alias: Alias for the source table
+ target_alias: Alias for the target table
+ error_on_type_mismatch: specify if merge will return error if data types are mismatching :default = True
Returns:
TableMerger: TableMerger Object
@@ -858,26 +867,27 @@ def get_add_actions(self, flatten: bool = False) -> pyarrow.RecordBatch:
a PyArrow RecordBatch containing the add action data.
Example:
-
- >>> from pprint import pprint
- >>> from deltalake import DeltaTable, write_deltalake
- >>> import pyarrow as pa
- >>> data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
- >>> write_deltalake("tmp", data, partition_by=["x"])
- >>> dt = DeltaTable("tmp")
- >>> df = dt.get_add_actions().to_pandas()
- >>> df["path"].sort_values(ignore_index=True)
- 0 x=1/0-...
- 1 x=2/0-...
- 2 x=3/0-...
- ...
- >>> df = dt.get_add_actions(flatten=True).to_pandas()
- >>> df["partition.x"].sort_values(ignore_index=True)
+ ```python
+ from pprint import pprint
+ from deltalake import DeltaTable, write_deltalake
+ import pyarrow as pa
+ data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
+ write_deltalake("tmp", data, partition_by=["x"])
+ dt = DeltaTable("tmp")
+ df = dt.get_add_actions().to_pandas()
+ df["path"].sort_values(ignore_index=True)
+ 0 x=1/0
+ 1 x=2/0
+ 2 x=3/0
+ ```
+
+ ```python
+ df = dt.get_add_actions(flatten=True).to_pandas()
+ df["partition.x"].sort_values(ignore_index=True)
0 1
1 2
2 3
- ...
-
+ ```
"""
return self._table.get_add_actions(flatten)
@@ -911,16 +921,16 @@ def repair(self, dry_run: bool = False) -> Dict[str, Any]:
Returns:
The metrics from repair (FSCK) action.
- Examples:
- ```
- from deltalake import DeltaTable
- dt = DeltaTable('TEST')
- dt.repair(dry_run=False)
- ```
- Results in
- ```
- {'dry_run': False, 'files_removed': ['6-0d084325-6885-4847-b008-82c1cf30674c-0.parquet', 5-4fba1d3e-3e20-4de1-933d-a8e13ac59f53-0.parquet']}
- ```
+ Example:
+ ```python
+ from deltalake import DeltaTable
+ dt = DeltaTable('TEST')
+ dt.repair(dry_run=False)
+ ```
+ Results in
+ ```
+ {'dry_run': False, 'files_removed': ['6-0d084325-6885-4847-b008-82c1cf30674c-0.parquet', 5-4fba1d3e-3e20-4de1-933d-a8e13ac59f53-0.parquet']}
+ ```
"""
metrics = self._table.repair(dry_run)
return json.loads(metrics)
@@ -969,11 +979,11 @@ def with_writer_properties(
"""Pass writer properties to the Rust parquet writer, see options https://arrow.apache.org/rust/parquet/file/properties/struct.WriterProperties.html:
Args:
- data_page_size_limit (int|None, Optional): Limit DataPage size to this in bytes. Defaults to None.
- dictionary_page_size_limit (int|None, Optional): Limit the size of each DataPage to store dicts to this amount in bytes. Defaults to None.
- data_page_row_count_limit (int|None, Optional): Limit the number of rows in each DataPage. Defaults to None.
- write_batch_size (int|None, Optional): Splits internally to smaller batch size. Defaults to None.
- max_row_group_size (int|None, Optional): Max number of rows in row group. Defaults to None.
+ data_page_size_limit: Limit DataPage size to this in bytes.
+ dictionary_page_size_limit: Limit the size of each DataPage to store dicts to this amount in bytes.
+ data_page_row_count_limit: Limit the number of rows in each DataPage.
+ write_batch_size: Splits internally to smaller batch size.
+ max_row_group_size: Max number of rows in row group.
Returns:
TableMerger: TableMerger Object
@@ -995,36 +1005,39 @@ def when_matched_update(
If a ``predicate`` is specified, then it must evaluate to true for the row to be updated.
Args:
- updates (dict): a mapping of column name to update SQL expression.
- predicate (str | None, Optional): SQL like predicate on when to update. Defaults to None.
+ updates: a mapping of column name to update SQL expression.
+ predicate: SQL like predicate on when to update.
Returns:
TableMerger: TableMerger Object
- Examples:
-
- >>> from deltalake import DeltaTable, write_deltalake
- >>> import pyarrow as pa
- >>> data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
- >>> write_deltalake("tmp", data)
- >>> dt = DeltaTable("tmp")
- >>> new_data = pa.table({"x": [1], "y": [7]})
- >>> (
- ... dt.merge(
- ... source=new_data,
- ... predicate="target.x = source.x",
- ... source_alias="source",
- ... target_alias="target")
- ... .when_matched_update(updates={"x": "source.x", "y": "source.y"})
- ... .execute()
- ... )
+ Example:
+ ```python
+ from deltalake import DeltaTable, write_deltalake
+ import pyarrow as pa
+
+ data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
+ write_deltalake("tmp", data)
+ dt = DeltaTable("tmp")
+ new_data = pa.table({"x": [1], "y": [7]})
+
+ (
+ dt.merge(
+ source=new_data,
+ predicate="target.x = source.x",
+ source_alias="source",
+ target_alias="target")
+ .when_matched_update(updates={"x": "source.x", "y": "source.y"})
+ .execute()
+ )
{'num_source_rows': 1, 'num_target_rows_inserted': 0, 'num_target_rows_updated': 1, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 2, 'num_output_rows': 3, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...}
- >>> dt.to_pandas()
+
+ dt.to_pandas()
x y
0 1 7
1 2 5
2 3 6
-
+ ```
"""
if isinstance(self.matched_update_updates, list) and isinstance(
self.matched_update_predicate, list
@@ -1041,35 +1054,38 @@ def when_matched_update_all(self, predicate: Optional[str] = None) -> "TableMerg
If a ``predicate`` is specified, then it must evaluate to true for the row to be updated.
Args:
- predicate (str | None, Optional): SQL like predicate on when to update all columns. Defaults to None.
+ predicate: SQL like predicate on when to update all columns.
Returns:
TableMerger: TableMerger Object
- Examples:
-
- >>> from deltalake import DeltaTable, write_deltalake
- >>> import pyarrow as pa
- >>> data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
- >>> write_deltalake("tmp", data)
- >>> dt = DeltaTable("tmp")
- >>> new_data = pa.table({"x": [1], "y": [7]})
- >>> (
- ... dt.merge(
- ... source=new_data,
- ... predicate="target.x = source.x",
- ... source_alias="source",
- ... target_alias="target")
- ... .when_matched_update_all()
- ... .execute()
- ... )
+ Example:
+ ```python
+ from deltalake import DeltaTable, write_deltalake
+ import pyarrow as pa
+
+ data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
+ write_deltalake("tmp", data)
+ dt = DeltaTable("tmp")
+ new_data = pa.table({"x": [1], "y": [7]})
+
+ (
+ dt.merge(
+ source=new_data,
+ predicate="target.x = source.x",
+ source_alias="source",
+ target_alias="target")
+ .when_matched_update_all()
+ .execute()
+ )
{'num_source_rows': 1, 'num_target_rows_inserted': 0, 'num_target_rows_updated': 1, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 2, 'num_output_rows': 3, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...}
- >>> dt.to_pandas()
+
+ dt.to_pandas()
x y
0 1 7
1 2 5
2 3 6
-
+ ```
"""
src_alias = (self.source_alias + ".") if self.source_alias is not None else ""
@@ -1096,54 +1112,59 @@ def when_matched_delete(self, predicate: Optional[str] = None) -> "TableMerger":
true for the matched row. If not specified it deletes all matches.
Args:
- predicate (str | None, Optional): SQL like predicate on when to delete. Defaults to None.
+ predicate (str | None, Optional): SQL like predicate on when to delete.
Returns:
TableMerger: TableMerger Object
- Examples:
-
- Delete on a predicate
-
- >>> from deltalake import DeltaTable, write_deltalake
- >>> import pyarrow as pa
- >>> data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
- >>> write_deltalake("tmp", data)
- >>> dt = DeltaTable("tmp")
- >>> new_data = pa.table({"x": [2, 3], "deleted": [False, True]})
- >>> (
- ... dt.merge(
- ... source=new_data,
- ... predicate='target.x = source.x',
- ... source_alias='source',
- ... target_alias='target')
- ... .when_matched_delete(
- ... predicate="source.deleted = true")
- ... .execute()
- ... )
+ Example:
+ **Delete on a predicate**
+
+ ```python
+ from deltalake import DeltaTable, write_deltalake
+ import pyarrow as pa
+
+ data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
+ write_deltalake("tmp", data)
+ dt = DeltaTable("tmp")
+ new_data = pa.table({"x": [2, 3], "deleted": [False, True]})
+
+ (
+ dt.merge(
+ source=new_data,
+ predicate='target.x = source.x',
+ source_alias='source',
+ target_alias='target')
+ .when_matched_delete(
+ predicate="source.deleted = true")
+ .execute()
+ )
{'num_source_rows': 2, 'num_target_rows_inserted': 0, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 1, 'num_target_rows_copied': 2, 'num_output_rows': 2, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...}
- >>> dt.to_pandas().sort_values("x", ignore_index=True)
+
+ dt.to_pandas().sort_values("x", ignore_index=True)
x y
0 1 4
1 2 5
-
- Delete all records that were matched
-
- >>> dt = DeltaTable("tmp")
- >>> (
- ... dt.merge(
- ... source=new_data,
- ... predicate='target.x = source.x',
- ... source_alias='source',
- ... target_alias='target')
- ... .when_matched_delete()
- ... .execute()
- ... )
+ ```
+
+ **Delete all records that were matched**
+ ```python
+ dt = DeltaTable("tmp")
+ (
+ dt.merge(
+ source=new_data,
+ predicate='target.x = source.x',
+ source_alias='source',
+ target_alias='target')
+ .when_matched_delete()
+ .execute()
+ )
{'num_source_rows': 2, 'num_target_rows_inserted': 0, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 1, 'num_target_rows_copied': 1, 'num_output_rows': 1, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...}
- >>> dt.to_pandas()
+
+ dt.to_pandas()
x y
0 1 4
-
+ ```
"""
if self.matched_delete_all is not None:
raise ValueError(
@@ -1168,40 +1189,43 @@ def when_not_matched_insert(
Args:
updates (dict): a mapping of column name to insert SQL expression.
- predicate (str | None, Optional): SQL like predicate on when to insert. Defaults to None.
+ predicate (str | None, Optional): SQL like predicate on when to insert.
Returns:
TableMerger: TableMerger Object
- Examples:
-
- >>> from deltalake import DeltaTable, write_deltalake
- >>> import pyarrow as pa
- >>> data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
- >>> write_deltalake("tmp", data)
- >>> dt = DeltaTable("tmp")
- >>> new_data = pa.table({"x": [4], "y": [7]})
- >>> (
- ... dt.merge(
- ... source=new_data,
- ... predicate='target.x = source.x',
- ... source_alias='source',
- ... target_alias='target')
- ... .when_not_matched_insert(
- ... updates = {
- ... "x": "source.x",
- ... "y": "source.y",
- ... })
- ... .execute()
- ... )
+ Example:
+ ```python
+ from deltalake import DeltaTable, write_deltalake
+ import pyarrow as pa
+
+ data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
+ write_deltalake("tmp", data)
+ dt = DeltaTable("tmp")
+ new_data = pa.table({"x": [4], "y": [7]})
+
+ (
+ dt.merge(
+ source=new_data,
+ predicate="target.x = source.x",
+ source_alias="source",
+ target_alias="target",)
+ .when_not_matched_insert(
+ updates={
+ "x": "source.x",
+ "y": "source.y",
+ })
+ .execute()
+ )
{'num_source_rows': 1, 'num_target_rows_inserted': 1, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 3, 'num_output_rows': 4, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...}
- >>> dt.to_pandas().sort_values("x", ignore_index=True)
+
+ dt.to_pandas().sort_values("x", ignore_index=True)
x y
0 1 4
1 2 5
2 3 6
3 4 7
-
+ ```
"""
if isinstance(self.not_matched_insert_updates, list) and isinstance(
@@ -1223,36 +1247,39 @@ def when_not_matched_insert_all(
the new row to be inserted.
Args:
- predicate (str | None, Optional): SQL like predicate on when to insert. Defaults to None.
+ predicate: SQL like predicate on when to insert.
Returns:
TableMerger: TableMerger Object
- Examples:
-
- >>> from deltalake import DeltaTable, write_deltalake
- >>> import pyarrow as pa
- >>> data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
- >>> write_deltalake("tmp", data)
- >>> dt = DeltaTable("tmp")
- >>> new_data = pa.table({"x": [4], "y": [7]})
- >>> (
- ... dt.merge(
- ... source=new_data,
- ... predicate='target.x = source.x',
- ... source_alias='source',
- ... target_alias='target')
- ... .when_not_matched_insert_all()
- ... .execute()
- ... )
+ Example:
+ ```python
+ from deltalake import DeltaTable, write_deltalake
+ import pyarrow as pa
+
+ data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
+ write_deltalake("tmp", data)
+ dt = DeltaTable("tmp")
+ new_data = pa.table({"x": [4], "y": [7]})
+
+ (
+ dt.merge(
+ source=new_data,
+ predicate='target.x = source.x',
+ source_alias='source',
+ target_alias='target')
+ .when_not_matched_insert_all()
+ .execute()
+ )
{'num_source_rows': 1, 'num_target_rows_inserted': 1, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 3, 'num_output_rows': 4, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...}
- >>> dt.to_pandas().sort_values("x", ignore_index=True)
+
+ dt.to_pandas().sort_values("x", ignore_index=True)
x y
0 1 4
1 2 5
2 3 6
3 4 7
-
+ ```
"""
src_alias = (self.source_alias + ".") if self.source_alias is not None else ""
@@ -1279,38 +1306,41 @@ def when_not_matched_by_source_update(
If a ``predicate`` is specified, then it must evaluate to true for the row to be updated.
Args:
- updates (dict): a mapping of column name to update SQL expression.
- predicate (str | None, Optional): SQL like predicate on when to update. Defaults to None.
+ updates: a mapping of column name to update SQL expression.
+ predicate: SQL like predicate on when to update.
Returns:
TableMerger: TableMerger Object
- Examples:
-
- >>> from deltalake import DeltaTable, write_deltalake
- >>> import pyarrow as pa
- >>> data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
- >>> write_deltalake("tmp", data)
- >>> dt = DeltaTable("tmp")
- >>> new_data = pa.table({"x": [2, 3, 4]})
- >>> (
- ... dt.merge(
- ... source=new_data,
- ... predicate='target.x = source.x',
- ... source_alias='source',
- ... target_alias='target')
- ... .when_not_matched_by_source_update(
- ... predicate = "y > 3",
- ... updates = {"y": "0"})
- ... .execute()
- ... )
+ Example:
+ ```python
+ from deltalake import DeltaTable, write_deltalake
+ import pyarrow as pa
+
+ data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
+ write_deltalake("tmp", data)
+ dt = DeltaTable("tmp")
+ new_data = pa.table({"x": [2, 3, 4]})
+
+ (
+ dt.merge(
+ source=new_data,
+ predicate='target.x = source.x',
+ source_alias='source',
+ target_alias='target')
+ .when_not_matched_by_source_update(
+ predicate = "y > 3",
+ updates = {"y": "0"})
+ .execute()
+ )
{'num_source_rows': 3, 'num_target_rows_inserted': 0, 'num_target_rows_updated': 1, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 2, 'num_output_rows': 3, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...}
- >>> dt.to_pandas().sort_values("x", ignore_index=True)
+
+ dt.to_pandas().sort_values("x", ignore_index=True)
x y
0 1 0
1 2 5
2 3 6
-
+ ```
"""
if isinstance(self.not_matched_by_source_update_updates, list) and isinstance(
@@ -1330,7 +1360,7 @@ def when_not_matched_by_source_delete(
``predicate`` (if specified) is true for the target row.
Args:
- predicate (str | None, Optional): SQL like predicate on when to delete when not matched by source. Defaults to None.
+ predicate: SQL like predicate on when to delete when not matched by source.
Returns:
TableMerger: TableMerger Object
@@ -1354,7 +1384,7 @@ def execute(self) -> Dict[str, Any]:
"""Executes `MERGE` with the previously provided settings in Rust with Apache Datafusion query engine.
Returns:
- Dict[str, Any]: metrics
+ Dict: metrics
"""
metrics = self.table._table.merge_execute(
source=self.source,
@@ -1434,19 +1464,21 @@ def compact(
Returns:
the metrics from optimize
- Examples:
+ Example:
Use a timedelta object to specify the seconds, minutes or hours of the interval.
+ ```python
+ from deltalake import DeltaTable, write_deltalake
+ from datetime import timedelta
+ import pyarrow as pa
- >>> from deltalake import DeltaTable, write_deltalake
- >>> from datetime import timedelta
- >>> import pyarrow as pa
- >>> write_deltalake("tmp", pa.table({"x": [1], "y": [4]}))
- >>> write_deltalake("tmp", pa.table({"x": [2], "y": [5]}), mode="append")
- >>> dt = DeltaTable("tmp")
- >>> time_delta = timedelta(minutes=10)
- >>> dt.optimize.compact(min_commit_interval=time_delta)
- {'numFilesAdded': 1, 'numFilesRemoved': 2, 'filesAdded': ..., 'filesRemoved': ..., 'partitionsOptimized': 1, 'numBatches': 2, 'totalConsideredFiles': 2, 'totalFilesSkipped': 0, 'preserveInsertionOrder': True}
+ write_deltalake("tmp", pa.table({"x": [1], "y": [4]}))
+ write_deltalake("tmp", pa.table({"x": [2], "y": [5]}), mode="append")
+ dt = DeltaTable("tmp")
+ time_delta = timedelta(minutes=10)
+ dt.optimize.compact(min_commit_interval=time_delta)
+ {'numFilesAdded': 1, 'numFilesRemoved': 2, 'filesAdded': ..., 'filesRemoved': ..., 'partitionsOptimized': 1, 'numBatches': 2, 'totalConsideredFiles': 2, 'totalFilesSkipped': 0, 'preserveInsertionOrder': True}
+ ```
"""
if isinstance(min_commit_interval, timedelta):
min_commit_interval = int(min_commit_interval.total_seconds())
@@ -1488,19 +1520,21 @@ def z_order(
Returns:
the metrics from optimize
- Examples:
+ Example:
Use a timedelta object to specify the seconds, minutes or hours of the interval.
+ ```python
+ from deltalake import DeltaTable, write_deltalake
+ from datetime import timedelta
+ import pyarrow as pa
- >>> from deltalake import DeltaTable, write_deltalake
- >>> from datetime import timedelta
- >>> import pyarrow as pa
- >>> write_deltalake("tmp", pa.table({"x": [1], "y": [4]}))
- >>> write_deltalake("tmp", pa.table({"x": [2], "y": [5]}), mode="append")
- >>> dt = DeltaTable("tmp")
- >>> time_delta = timedelta(minutes=10)
- >>> dt.optimize.z_order(["x"], min_commit_interval=time_delta)
- {'numFilesAdded': 1, 'numFilesRemoved': 2, 'filesAdded': ..., 'filesRemoved': ..., 'partitionsOptimized': 0, 'numBatches': 1, 'totalConsideredFiles': 2, 'totalFilesSkipped': 0, 'preserveInsertionOrder': True}
+ write_deltalake("tmp", pa.table({"x": [1], "y": [4]}))
+ write_deltalake("tmp", pa.table({"x": [2], "y": [5]}), mode="append")
+ dt = DeltaTable("tmp")
+ time_delta = timedelta(minutes=10)
+ dt.optimize.z_order(["x"], min_commit_interval=time_delta)
+ {'numFilesAdded': 1, 'numFilesRemoved': 2, 'filesAdded': ..., 'filesRemoved': ..., 'partitionsOptimized': 0, 'numBatches': 1, 'totalConsideredFiles': 2, 'totalFilesSkipped': 0, 'preserveInsertionOrder': True}
+ ```
"""
if isinstance(min_commit_interval, timedelta):
min_commit_interval = int(min_commit_interval.total_seconds())
diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py
index 2b4814f98b..bb69fee457 100644
--- a/python/deltalake/writer.py
+++ b/python/deltalake/writer.py
@@ -21,7 +21,7 @@
)
from urllib.parse import unquote
-from deltalake import Schema
+from deltalake import Schema as DeltaSchema
from deltalake.fs import DeltaStorageHandler
from ._util import encode_partition_value
@@ -82,7 +82,7 @@ def write_deltalake(
RecordBatchReader,
],
*,
- schema: Optional[Union[pa.Schema, Schema]] = ...,
+ schema: Optional[Union[pa.Schema, DeltaSchema]] = ...,
partition_by: Optional[Union[List[str], str]] = ...,
filesystem: Optional[pa_fs.FileSystem] = None,
mode: Literal["error", "append", "overwrite", "ignore"] = ...,
@@ -116,7 +116,7 @@ def write_deltalake(
RecordBatchReader,
],
*,
- schema: Optional[Union[pa.Schema, Schema]] = ...,
+ schema: Optional[Union[pa.Schema, DeltaSchema]] = ...,
partition_by: Optional[Union[List[str], str]] = ...,
mode: Literal["error", "append", "overwrite", "ignore"] = ...,
max_rows_per_group: int = ...,
@@ -143,7 +143,7 @@ def write_deltalake(
RecordBatchReader,
],
*,
- schema: Optional[Union[pa.Schema, Schema]] = None,
+ schema: Optional[Union[pa.Schema, DeltaSchema]] = None,
partition_by: Optional[Union[List[str], str]] = None,
filesystem: Optional[pa_fs.FileSystem] = None,
mode: Literal["error", "append", "overwrite", "ignore"] = "error",
@@ -231,7 +231,9 @@ def write_deltalake(
storage_options: options passed to the native delta filesystem. Unused if 'filesystem' is defined.
predicate: When using `Overwrite` mode, replace data that matches a predicate. Only used in rust engine.
partition_filters: the partition filters that will be used for partition overwrite. Only used in pyarrow engine.
- large_dtypes: If True, the data schema is kept in large_dtypes, has no effect on pandas dataframe input
+ large_dtypes: If True, the data schema is kept in large_dtypes, has no effect on pandas dataframe input.
+ engine: writer engine to write the delta table. `Rust` engine is still experimental but you may
+ see up to 4x performance improvements over pyarrow.
"""
table, table_uri = try_get_table_and_table_uri(table_or_uri, storage_options)
if table is not None:
@@ -245,7 +247,7 @@ def write_deltalake(
if isinstance(partition_by, str):
partition_by = [partition_by]
- if isinstance(schema, Schema):
+ if isinstance(schema, DeltaSchema):
schema = schema.to_pyarrow()
if isinstance(data, RecordBatchReader):
From f90b48cfe09cdee24e7f78c6707dbed9903efbb8 Mon Sep 17 00:00:00 2001
From: haruband
Date: Sat, 2 Dec 2023 09:41:06 +0900
Subject: [PATCH 11/16] fix: prune each merge bin with only 1 file (#1902)
# Description
This PR prunes each merge bin with only 1 file, even though there are
multiple merge bins.
# Related Issue(s)
- closes #1901
# Documentation
This PR adds test_idempotent_with_multiple_bins() for testing.
---
.../deltalake-core/src/operations/optimize.rs | 12 ++--
.../deltalake-core/tests/command_optimize.rs | 65 +++++++++++++++++++
2 files changed, 73 insertions(+), 4 deletions(-)
diff --git a/crates/deltalake-core/src/operations/optimize.rs b/crates/deltalake-core/src/operations/optimize.rs
index ef8905e0c9..09f9de087e 100644
--- a/crates/deltalake-core/src/operations/optimize.rs
+++ b/crates/deltalake-core/src/operations/optimize.rs
@@ -900,10 +900,14 @@ fn build_compaction_plan(
// Prune merge bins with only 1 file, since they have no effect
for (_, bins) in operations.iter_mut() {
- if bins.len() == 1 && bins[0].len() == 1 {
- metrics.total_files_skipped += 1;
- bins.clear();
- }
+ bins.retain(|bin| {
+ if bin.len() == 1 {
+ metrics.total_files_skipped += 1;
+ false
+ } else {
+ true
+ }
+ })
}
operations.retain(|_, files| !files.is_empty());
diff --git a/crates/deltalake-core/tests/command_optimize.rs b/crates/deltalake-core/tests/command_optimize.rs
index 14f9d4c410..b91558ce08 100644
--- a/crates/deltalake-core/tests/command_optimize.rs
+++ b/crates/deltalake-core/tests/command_optimize.rs
@@ -508,6 +508,71 @@ async fn test_idempotent_metrics() -> Result<(), Box> {
Ok(())
}
+#[tokio::test]
+/// Validate that multiple bins packing is idempotent.
+async fn test_idempotent_with_multiple_bins() -> Result<(), Box> {
+ //TODO: Compression makes it hard to get the target file size...
+ //Maybe just commit files with a known size
+ let context = setup_test(true).await?;
+ let mut dt = context.table;
+ let mut writer = RecordBatchWriter::for_table(&dt)?;
+
+ write(
+ &mut writer,
+ &mut dt,
+ generate_random_batch(records_for_size(6_000_000), "2022-05-22")?,
+ )
+ .await?;
+ write(
+ &mut writer,
+ &mut dt,
+ generate_random_batch(records_for_size(3_000_000), "2022-05-22")?,
+ )
+ .await?;
+ write(
+ &mut writer,
+ &mut dt,
+ generate_random_batch(records_for_size(6_000_000), "2022-05-22")?,
+ )
+ .await?;
+ write(
+ &mut writer,
+ &mut dt,
+ generate_random_batch(records_for_size(3_000_000), "2022-05-22")?,
+ )
+ .await?;
+ write(
+ &mut writer,
+ &mut dt,
+ generate_random_batch(records_for_size(9_900_000), "2022-05-22")?,
+ )
+ .await?;
+
+ let version = dt.version();
+
+ let filter = vec![PartitionFilter::try_from(("date", "=", "2022-05-22"))?];
+
+ let optimize = DeltaOps(dt)
+ .optimize()
+ .with_filters(&filter)
+ .with_target_size(10_000_000);
+ let (dt, metrics) = optimize.await?;
+ assert_eq!(metrics.num_files_added, 2);
+ assert_eq!(metrics.num_files_removed, 4);
+ assert_eq!(dt.version(), version + 1);
+
+ let optimize = DeltaOps(dt)
+ .optimize()
+ .with_filters(&filter)
+ .with_target_size(10_000_000);
+ let (dt, metrics) = optimize.await?;
+ assert_eq!(metrics.num_files_added, 0);
+ assert_eq!(metrics.num_files_removed, 0);
+ assert_eq!(dt.version(), version + 1);
+
+ Ok(())
+}
+
#[tokio::test]
/// Validate operation data and metadata was written
async fn test_commit_info() -> Result<(), Box> {
From d518f40c7bea2cd6cdecc123af28395dbd0e44b4 Mon Sep 17 00:00:00 2001
From: Will Jones
Date: Sat, 2 Dec 2023 00:24:41 -0800
Subject: [PATCH 12/16] chore: update python version (#1934)
# Description
Prepare for next release
# Related Issue(s)
# Documentation
---
python/Cargo.toml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/python/Cargo.toml b/python/Cargo.toml
index 5194a2fc22..a9936a483c 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "deltalake-python"
-version = "0.13.0"
+version = "0.14.0"
authors = ["Qingping Hou ", "Will Jones "]
homepage = "https://github.com/delta-io/delta-rs"
license = "Apache-2.0"
From 2733f3d34a593f4fb23f498969d73b0d83a1f05f Mon Sep 17 00:00:00 2001
From: bolkedebruin
Date: Mon, 6 Nov 2023 15:39:47 +0100
Subject: [PATCH 13/16] Support os.PathLike for table references
---
python/deltalake/table.py | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/python/deltalake/table.py b/python/deltalake/table.py
index 3ac28acf88..f05a02fb85 100644
--- a/python/deltalake/table.py
+++ b/python/deltalake/table.py
@@ -1,5 +1,6 @@
import json
import operator
+import os
import warnings
from dataclasses import dataclass
from datetime import datetime, timedelta
@@ -225,7 +226,7 @@ class DeltaTable:
def __init__(
self,
- table_uri: Union[str, Path],
+ table_uri: Union[str, Path, os.PathLike],
version: Optional[int] = None,
storage_options: Optional[Dict[str, str]] = None,
without_files: bool = False,
From f4b9e9178906563ae1e076fbfb50105594930b53 Mon Sep 17 00:00:00 2001
From: Will Jones
Date: Fri, 1 Dec 2023 18:51:27 -0800
Subject: [PATCH 14/16] add type param
---
python/deltalake/table.py | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git a/python/deltalake/table.py b/python/deltalake/table.py
index f05a02fb85..adf3ca92af 100644
--- a/python/deltalake/table.py
+++ b/python/deltalake/table.py
@@ -1,6 +1,5 @@
import json
import operator
-import os
import warnings
from dataclasses import dataclass
from datetime import datetime, timedelta
@@ -32,6 +31,8 @@
)
if TYPE_CHECKING:
+ import os
+
import pandas
from deltalake._internal import DeltaDataChecker as _DeltaDataChecker
@@ -226,7 +227,7 @@ class DeltaTable:
def __init__(
self,
- table_uri: Union[str, Path, os.PathLike],
+ table_uri: Union[str, Path, "os.PathLike[str]"],
version: Optional[int] = None,
storage_options: Optional[Dict[str, str]] = None,
without_files: bool = False,
From 83f2f9905df9c9535b7088ed437fd8db42e95d77 Mon Sep 17 00:00:00 2001
From: Dmytro Suvorov
Date: Sat, 2 Dec 2023 13:47:41 +0200
Subject: [PATCH 15/16] fix: get rid of panic in during table (#1928)
# Description
This is a continuation of the discussion in the
https://github.com/delta-io/delta-rs/pull/1917 Getting rid of panic in
the library crate in favor of returning an error so lib users could
handle it in a way they see it Test changes accordingly
Co-authored-by: Robert Pack <42610831+roeap@users.noreply.github.com>
---
crates/deltalake-core/src/lib.rs | 27 +++++++++-------------
crates/deltalake-core/src/table/builder.rs | 6 ++++-
2 files changed, 16 insertions(+), 17 deletions(-)
diff --git a/crates/deltalake-core/src/lib.rs b/crates/deltalake-core/src/lib.rs
index a14a89c33e..fda54ccc01 100644
--- a/crates/deltalake-core/src/lib.rs
+++ b/crates/deltalake-core/src/lib.rs
@@ -710,28 +710,23 @@ mod tests {
}
#[tokio::test()]
- #[should_panic(expected = "does not exist or you don't have access!")]
async fn test_fail_fast_on_not_existing_path() {
use std::path::Path as FolderPath;
- let path_str = "./tests/data/folder_doesnt_exist";
+ let non_existing_path_str = "./tests/data/folder_doesnt_exist";
// Check that there is no such path at the beginning
- let path_doesnt_exist = !FolderPath::new(path_str).exists();
+ let path_doesnt_exist = !FolderPath::new(non_existing_path_str).exists();
assert!(path_doesnt_exist);
- match crate::open_table(path_str).await {
- Ok(table) => Ok(table),
- Err(e) => {
- let path_still_doesnt_exist = !FolderPath::new(path_str).exists();
- assert!(
- path_still_doesnt_exist,
- "Path exists for some reason, but it shouldn't"
- );
-
- Err(e)
- }
- }
- .unwrap();
+ let error = crate::open_table(non_existing_path_str).await.unwrap_err();
+ let _expected_error_msg = format!(
+ "Local path \"{}\" does not exist or you don't have access!",
+ non_existing_path_str
+ );
+ assert!(matches!(
+ error,
+ DeltaTableError::InvalidTableLocation(_expected_error_msg),
+ ))
}
}
diff --git a/crates/deltalake-core/src/table/builder.rs b/crates/deltalake-core/src/table/builder.rs
index 89962ed518..f453c895f6 100644
--- a/crates/deltalake-core/src/table/builder.rs
+++ b/crates/deltalake-core/src/table/builder.rs
@@ -166,7 +166,11 @@ impl DeltaTableBuilder {
if let UriType::LocalPath(path) = resolve_uri_type(table_uri)? {
if !path.exists() {
- panic!("Path \"{table_uri}\" does not exist or you don't have access!");
+ let msg = format!(
+ "Local path \"{}\" does not exist or you don't have access!",
+ table_uri
+ );
+ return Err(DeltaTableError::InvalidTableLocation(msg));
}
}
From b946d075d12a78e12a5d1a43debfd425ef34419f Mon Sep 17 00:00:00 2001
From: Jan Schweizer
Date: Sat, 2 Dec 2023 13:29:00 +0100
Subject: [PATCH 16/16] Happify linter
---
crates/deltalake-core/src/operations/vacuum.rs | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/crates/deltalake-core/src/operations/vacuum.rs b/crates/deltalake-core/src/operations/vacuum.rs
index 27408fe48c..93ad44f153 100644
--- a/crates/deltalake-core/src/operations/vacuum.rs
+++ b/crates/deltalake-core/src/operations/vacuum.rs
@@ -219,7 +219,7 @@ impl VacuumBuilder {
if let Ok(parent) =
&Url::parse(&format!("file://{}", self.log_store.root_uri().as_str()))
{
- if let Ok(dv_absolute_path) = deletion_vector.absolute_path(&parent) {
+ if let Ok(dv_absolute_path) = deletion_vector.absolute_path(parent) {
Some(dv_absolute_path?.path().to_string())
} else {
None
@@ -307,7 +307,7 @@ impl std::future::IntoFuture for VacuumBuilder {
fn is_absolute_path(path: &str) -> bool {
let path = std::path::Path::new(path);
- return path.is_absolute();
+ path.is_absolute()
}
/// Encapsulate which files are to be deleted and the parameters used to make that decision