Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose peek next commit to python #1

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/deltalake-core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ pub enum DeltaTableError {
#[error("Invalid table version: {0}")]
InvalidVersion(i64),

/// Error returned when the DeltaTable has no delta log version.
#[error("Delta log not found for table version: {0}")]
DeltaLogNotFound(i64),

/// Error returned when the DeltaTable has no data files.
#[error("Corrupted table, cannot read data file {}: {}", .path, .source)]
MissingDataFile {
Expand Down
18 changes: 18 additions & 0 deletions crates/deltalake-core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::fmt;
use std::fmt::Formatter;
use std::{cmp::max, cmp::Ordering, collections::HashSet};

use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::StreamExt;
use lazy_static::lazy_static;
Expand Down Expand Up @@ -455,6 +456,23 @@ impl DeltaTable {
self.update().await
}

/// Get the commit obj from the version
pub async fn get_obj_from_version(
&self,
current_version: i64,
) -> Result<Bytes, DeltaTableError> {
let commit_log_bytes = match self.log_store.read_commit_entry(current_version).await {
Ok(bytes) => Ok(bytes),
Err(DeltaTableError::ObjectStore {
source: ObjectStoreError::NotFound { .. },
}) => {
return Err(DeltaTableError::DeltaLogNotFound(current_version));
}
Err(err) => Err(err),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why return Ok(Err(err)) instead of Err(err)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, where is Ok(Err(err))?
Here we use the same method as peek_next_commit below to get the commit_log_bytest, then we return the full commit_log_bytes.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this line, we assign commit_log_bytes to Err(err). Then on line 470, we return Ok(commit_log_bytes). I believe as a result we will return Ok(Err(err)) if we reach line 468.

Am I interpreting this correctly? If so, this seems wrong. I think you'd want to return Err(err) here and on line 462 you'd want to assign commit_log_bytes to bytes, not Ok(bytes) (which would thus return Ok(Ok(bytes)) which is weird as well).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes seems weird we need to fix this, also need to add a test for error case.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as mentioned by ohan, rust ? seems to tidy the results, https://doc.rust-lang.org/rust-by-example/std/result/question_mark.html

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flat out missed the question mark at the end, ok great.

}?;
Ok(commit_log_bytes)
}

/// Get the list of actions for the next commit
pub async fn peek_next_commit(
&self,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"commitInfo":{"timestamp":1587968586154,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"5fba94ed-9794-4965-ba6e-6ee3c0d22af9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1587968585495}}
{"add":{"path":"part-00000-a72b1fb3-f2df-41fe-a8f0-e65b746382dd-c000.snappy.parquet","partitionValues":{},"size":262,"modificationTime":1587968586000,"dataChange":true}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1587968596254,"operation":"MERGE","operationParameters":{"predicate":"(oldData.`id` = newData.`id`)"},"readVersion":0,"isBlindAppend":false}}
{"add":{"path":"part-00004-95c9bc2c-ac85-4581-b3cc-84502b0c314f-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587968596000,"dataChange":true}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1587968614187,"operation":"UPDATE","operationParameters":{"predicate":"((id#697L % cast(2 as bigint)) = cast(0 as bigint))"},"readVersion":2,"isBlindAppend":false}}
{"add":{"path":"part-00001-bb70d2ba-c196-4df2-9c85-f34969ad3aa9-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587968614000,"dataChange":true}}
Binary file not shown.
Binary file not shown.
Binary file not shown.
1 change: 1 addition & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class RawDeltaTable:
) -> str: ...
def table_uri(self) -> str: ...
def version(self) -> int: ...
def get_obj(self, version: int) -> bytes: ...
def get_latest_version(self) -> int: ...
def metadata(self) -> RawDeltaTableMetaData: ...
def protocol_versions(self) -> List[int]: ...
Expand Down
33 changes: 32 additions & 1 deletion python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from deltalake._internal import create_deltalake as _create_deltalake
from deltalake._util import encode_partition_value
from deltalake.data_catalog import DataCatalog
from deltalake.exceptions import DeltaProtocolError
from deltalake.exceptions import DeltaError, DeltaProtocolError
from deltalake.fs import DeltaStorageHandler
from deltalake.schema import Schema as DeltaSchema

Expand Down Expand Up @@ -256,6 +256,7 @@ def __init__(

"""
self._storage_options = storage_options
self._latest_version = -1
self._table = RawDeltaTable(
str(table_uri),
version=version,
Expand Down Expand Up @@ -897,6 +898,36 @@ def update_incremental(self) -> None:
"""
self._table.update_incremental()

def get_latest_version(self) -> int:
"""
Get latest version of commit.
"""
return self._table.get_latest_version()

def peek_next_commit(
self, version: int
) -> Tuple[Optional[List[Dict[Any, Any]]], int]:
"""
Peek next commit of the input version.
"""
actions = []
next_version = version + 1
if next_version > self._latest_version:
self._latest_version = self.get_latest_version()
while next_version <= self._latest_version:
try:
commit_log_bytes = self._table.get_obj(next_version)
for commit_action in commit_log_bytes.split(b"\n"):
if commit_action:
actions.append(json.loads(commit_action))
return actions, next_version
except DeltaError as e:
if str(e) == f"Delta log not found for table version: {next_version}":
next_version += 1
else:
raise
return None, version

def create_checkpoint(self) -> None:
self._table.create_checkpoint()

Expand Down
9 changes: 8 additions & 1 deletion python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use deltalake::DeltaOps;
use deltalake::DeltaTableBuilder;
use pyo3::exceptions::{PyIOError, PyRuntimeError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::{PyFrozenSet, PyType};
use pyo3::types::{PyBytes, PyFrozenSet, PyType};
use serde_json::{Map, Value};

use crate::error::DeltaProtocolError;
Expand Down Expand Up @@ -154,6 +154,13 @@ impl RawDeltaTable {
Ok(self._table.version())
}

pub fn get_obj<'py>(&self, py: Python<'py>, version: i64) -> PyResult<&'py PyBytes> {
let commit_log_bytes = rt()?
.block_on(self._table.get_obj_from_version(version))
.map_err(PythonError::from)?;
return Ok(PyBytes::new(py, &commit_log_bytes));
}

pub fn metadata(&self) -> PyResult<RawDeltaTableMetaData> {
let metadata = self._table.metadata().map_err(PythonError::from)?;
Ok(RawDeltaTableMetaData {
Expand Down
23 changes: 23 additions & 0 deletions python/tests/test_table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,29 @@ def test_writer_fails_on_protocol():
dt.to_pandas()


@pytest.mark.parametrize("version, expected", [(2, (5, 3))])
def test_peek_next_commit(version, expected):
table_path = "../crates/deltalake-core/tests/data/simple_table"
dt = DeltaTable(table_path)
actions, next_version = dt.peek_next_commit(version=version)
assert (len(actions), next_version) == expected

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we could add a test to cover DeltaLogNotFound.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess there's no way to test the loop? I'm thinking. dt.peek_next_commit(version) where version does not have an object but version+1 does and _latest_version is currently version.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused here, do you mean use version=-1 as input or something else?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking there'd be a unit test in which the underlying delta table has actual latest version 10, the DeltaTable thinks the latest version is 6, and version 7 doesn't actually exist. So we call dt.peek_next_commit(6) and expect the next commit to be 8, which requires

  1. realizing 7 is bigger than the thought-to-be latest version 6
  2. updating the latest version which is now 10
  3. checking if version 7 is there, which it isn't
  4. trying again with version 8 and finding version 8 to return.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added data that missed a commit and a test for it. get_latest_version function can get the actual latest version when there is a missed commit (e.g., if version 7 doesn't exist and the actual latest version is 10, DeltaTable.get_latest_version will return 10), so we don't need to update the latest version.


def test_delta_log_not_found():
table_path = "../crates/deltalake-core/tests/data/simple_table"
dt = DeltaTable(table_path)
latest_version = dt.get_latest_version()
_, version = dt.peek_next_commit(version=latest_version)
assert version == latest_version


def test_delta_log_missed():
table_path = "../crates/deltalake-core/tests/data/simple_table_missing_commit"
dt = DeltaTable(table_path)
_, version = dt.peek_next_commit(version=1)
assert version == 3 # Missed commit version 2, should return version 3


class ExcPassThroughThread(Thread):
"""Wrapper around `threading.Thread` that propagates exceptions."""

Expand Down
Loading