Skip to content

Commit

Permalink
feat: expose peek next commit to python
Browse files Browse the repository at this point in the history
  • Loading branch information
PengLiVectra committed Nov 7, 2023
1 parent b3f478e commit ab5e698
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 1 deletion.
4 changes: 4 additions & 0 deletions crates/deltalake-core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ pub enum DeltaTableError {
#[error("Invalid table version: {0}")]
InvalidVersion(i64),

/// Error returned when the DeltaTable has no delta log version.
#[error("Delta log not found for table version: {0}")]
DeltaLogNotFound(i64),

/// Error returned when the DeltaTable has no data files.
#[error("Corrupted table, cannot read data file {}: {}", .path, .source)]
MissingDataFile {
Expand Down
17 changes: 17 additions & 0 deletions crates/deltalake-core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,23 @@ impl DeltaTable {
self.update().await
}

/// Get the commit obj from the version
pub async fn get_obj_from_version(
&self,
current_version: i64,
) -> Result<Bytes, DeltaTableError> {
let commit_uri = commit_uri_from_version(current_version);
let commit_log_bytes = self.storage.get(&commit_uri).await;
let commit_log_bytes = match commit_log_bytes {
Err(ObjectStoreError::NotFound { .. }) => {
return Err(DeltaTableError::DeltaLogNotFound(current_version));
},
Err(err) => Err(err),
Ok(result) => result.bytes().await,
}?;
Ok(commit_log_bytes)
}

/// Get the list of actions for the next commit
pub async fn peek_next_commit(
&self,
Expand Down
29 changes: 28 additions & 1 deletion python/deltalake/table.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import operator
import warnings
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta
from functools import reduce
Expand Down Expand Up @@ -36,7 +37,7 @@
from ._internal import RawDeltaTable
from ._util import encode_partition_value
from .data_catalog import DataCatalog
from .exceptions import DeltaProtocolError
from .exceptions import DeltaError, DeltaProtocolError
from .fs import DeltaStorageHandler
from .schema import Schema

Expand Down Expand Up @@ -250,6 +251,7 @@ def __init__(
"""
self._storage_options = storage_options
self._latest_version = -1
self._table = RawDeltaTable(
str(table_uri),
version=version,
Expand Down Expand Up @@ -796,6 +798,31 @@ def update_incremental(self) -> None:
"""
self._table.update_incremental()

def peek_next_commit(
self, version: int
) -> Tuple[Optional[List[Dict[Any, Any]]], int]:
"""
Peek next commit of the input version.
"""
actions = []
next_version = version + 1
if next_version > self._latest_version:
self._latest_version = self._table.get_latest_version()
while next_version <= self._latest_version:
try:
commit_log_bytes = self._table.get_obj(next_version)
for commit_action in commit_log_bytes.split(b"\n"):
if commit_action:
actions.append(json.loads(commit_action))
return actions, next_version
except DeltaError as e:
if str(e) == f"Delta log not found for table version: {next_version}":
next_version += 1
else:
raise
logging.info(f"Provided Delta Version is up to date. Version: {version}")
return None, version

def create_checkpoint(self) -> None:
self._table.create_checkpoint()

Expand Down
14 changes: 14 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,20 @@ impl RawDeltaTable {
Ok(self._table.version())
}

pub fn get_obj<'py>(&self, py: Python<'py>, version: i64) -> PyResult<&'py PyBytes> {
let commit_log_bytes = rt()?
.block_on(self._table.get_obj_from_version(version))
.map_err(PythonError::from)?;
return Ok(PyBytes::new(py, &commit_log_bytes));
}

pub fn get_latest_version(&mut self) -> PyResult<i64> {
let latest_version = rt()?
.block_on(self._table.get_latest_version())
.map_err(PyDeltaTableError::from_raw)?;
Ok(latest_version)
}

pub fn metadata(&self) -> PyResult<RawDeltaTableMetaData> {
let metadata = self._table.get_metadata().map_err(PythonError::from)?;
Ok(RawDeltaTableMetaData {
Expand Down
8 changes: 8 additions & 0 deletions python/tests/test_table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,14 @@ def test_writer_fails_on_protocol():
dt.to_pandas()


@pytest.mark.parametrize("version, expected", [(2, (5, 3))])
def test_peek_next_commit(version, expected):
table_path = "../rust/tests/data/simple_table"
dt = DeltaTable(table_path)
actions, current_version = dt.peek_next_commit(version=version)
assert (len(actions), current_version) == expected


class ExcPassThroughThread(Thread):
"""Wrapper around `threading.Thread` that propagates exceptions."""

Expand Down

0 comments on commit ab5e698

Please sign in to comment.