Skip to content

Commit

Permalink
feat: Delta lake data source (initial implementation) (#1119)
Browse files Browse the repository at this point in the history
* delta stubs

* Accessor (s3)

* Downgrade datafusion + use storage opts when opening table

* fmt
  • Loading branch information
scsmithr authored Jun 14, 2023
1 parent 425be39 commit 5a2d5b1
Show file tree
Hide file tree
Showing 20 changed files with 1,088 additions and 112 deletions.
801 changes: 710 additions & 91 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ edition = "2021"
lto = "thin"

[workspace.dependencies]
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "main" }
datafusion = { version = "26.0" }
deltalake = { git = "https://github.com/delta-io/delta-rs.git", branch = "main",features = ["s3", "gcs", "azure", "datafusion","arrow","parquet"] }
6 changes: 1 addition & 5 deletions crates/datafusion_planner/src/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
use crate::planner::{AsyncContextProvider, SqlQueryPlanner};
use datafusion::common::{DFSchema, DataFusionError, Result};
use datafusion::logical_expr::expr::{ScalarFunction, ScalarUDF};
use datafusion::logical_expr::function::suggest_valid_function;
use datafusion::logical_expr::utils::COUNT_STAR_EXPANSION;
use datafusion::logical_expr::window_frame::regularize;
use datafusion::logical_expr::{
Expand Down Expand Up @@ -169,10 +168,7 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
}

// Could not find the relevant function, so return an error
let suggested_func_name = suggest_valid_function(&name, is_function_window);
Err(DataFusionError::Plan(format!(
"Invalid function '{name}'.\nDid you mean '{suggested_func_name}'?"
)))
Err(DataFusionError::Plan(format!("Invalid function '{name}'.")))
}

pub(super) async fn sql_named_function_to_expr(
Expand Down
5 changes: 4 additions & 1 deletion crates/datasources/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,21 @@ bytes = "1.4.0"
chrono = "0.4.26"
datafusion = { workspace = true }
decimal = { path = "../decimal" }
deltalake = { workspace = true }
futures = "0.3.28"
gcp-bigquery-client = "0.16.7"
logutil = {path = "../logutil"}
metastoreproto = { path = "../metastoreproto" }
mongodb = "2.5.0"
mysql_async = { version = "0.32.2", default-features = false, features = ["default-rustls"] }
mysql_common = { version = "0.30.4", features = ["chrono"] }
object_store = { version = "0.6", features = ["gcp", "aws"] }
object_store = { version = "0.5", features = ["gcp", "aws"] }
once_cell = "1.18.0"
openssh = "0.9.9"
parking_lot = "0.12.1"
rand = "0.8.5"
repr = { path = "../repr" }
reqwest = { version = "0.11.18", default-features = false, features = ["json", "rustls-tls"] }
ring = "0.16.20"
rustls = "0.21.1"
rust_decimal = { version = "1.29.0", features = ["db-tokio-postgres"] }
Expand All @@ -42,4 +44,5 @@ tokio-postgres = { version = "0.7.8", features = ["with-uuid-1", "with-serde_jso
tokio-rustls = "0.24.1"
tracing = "0.1"
uuid = "1.3.3"
url = "2.4.0"
webpki-roots = "0.23.1"
68 changes: 68 additions & 0 deletions crates/datasources/src/delta/access.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use crate::delta::catalog::{DataCatalog, UnityCatalog};
use crate::delta::errors::Result;
use deltalake::DeltaTable;
use metastoreproto::types::options::{DeltaLakeCatalog, DeltaLakeUnityCatalog};
use std::collections::HashMap;
use std::sync::Arc;
use tracing::debug;

/// Access a delta lake.
pub struct DeltaLakeAccessor {
catalog: Arc<dyn DataCatalog>,
region: String,
access_key_id: String,
secret_access_key: String,
}

impl DeltaLakeAccessor {
/// Connect to a deltalake using the provided catalog information.
// TODO: Allow accessing delta tables without a catalog?
// TODO: Don't be S3 specific.
pub async fn connect(
catalog: &DeltaLakeCatalog,
access_key_id: &str,
secret_access_key: &str,
region: &str,
) -> Result<DeltaLakeAccessor> {
let catalog: Arc<dyn DataCatalog> = match catalog {
DeltaLakeCatalog::Unity(DeltaLakeUnityCatalog {
catalog_id,
databricks_access_token,
workspace_url,
}) => {
let catalog =
UnityCatalog::connect(databricks_access_token, workspace_url, catalog_id)
.await?;
Arc::new(catalog)
}
};

Ok(DeltaLakeAccessor {
catalog,
region: region.to_string(),
access_key_id: access_key_id.to_string(),
secret_access_key: secret_access_key.to_string(),
})
}

pub async fn load_table(self, database: &str, table: &str) -> Result<DeltaTable> {
let loc = self
.catalog
.get_table_storage_location(database, table)
.await?;

debug!(%loc, %database, %table, "deltalake location");

let mut opts = HashMap::new();
opts.insert("aws_access_key_id".to_string(), self.access_key_id);
opts.insert("aws_secret_access_key".to_string(), self.secret_access_key);
opts.insert("aws_region".to_string(), self.region);

let table = deltalake::open_table_with_storage_options(loc, opts).await?;

// Note that the deltalake crate does the appropriate jank for
// registering the object store in the datafusion session's runtime env
// during execution.
Ok(table)
}
}
92 changes: 92 additions & 0 deletions crates/datasources/src/delta/catalog.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
//! Delta lake catalog implementations.
//!
//! Most of this was copied in from the `deltalake` crate to make some
//! modifications with how we construct clients, and what errors get returned.
use crate::delta::errors::{DeltaError, Result};
use async_trait::async_trait;
use reqwest::header;
use serde::Deserialize;

#[async_trait]
pub trait DataCatalog: Sync + Send {
/// Get the storage location for a given table.
async fn get_table_storage_location(
&self,
database_name: &str, // "schema"
table_name: &str,
) -> Result<String>;
}

/// Databricks Unity Catalog - implementation of the `DataCatalog` trait
#[derive(Debug, Clone)]
pub struct UnityCatalog {
client: reqwest::Client,
workspace_url: String,
catalog_id: String,
}

impl UnityCatalog {
pub async fn connect(
access_token: &str,
workspace_url: &str,
catalog_id: &str,
) -> Result<Self> {
let auth_header_val = header::HeaderValue::from_str(&format!("Bearer {}", &access_token))
.map_err(|_| DeltaError::Static("Invalid Databricks access token"))?;

let headers = header::HeaderMap::from_iter([(header::AUTHORIZATION, auth_header_val)]);
let client = reqwest::Client::builder()
.default_headers(headers)
.build()?;

// Check that we can reach the databricks workspace.
let _resp = client
.get(format!("{}/api/2.1/unity-catalog/catalogs", workspace_url))
.send()
.await?;

Ok(Self {
client,
workspace_url: workspace_url.to_string(),
catalog_id: catalog_id.to_string(),
})
}
}

#[derive(Deserialize)]
#[serde(untagged)]
enum TableResponse {
Success { storage_location: String },
Error { error_code: String, message: String },
}

#[async_trait]
impl DataCatalog for UnityCatalog {
/// Get the table storage location from the UnityCatalog
async fn get_table_storage_location(
&self,
database_name: &str,
table_name: &str,
) -> Result<String> {
let resp = self
.client
.get(format!(
"{}/api/2.1/unity-catalog/tables/{}.{}.{}",
&self.workspace_url, self.catalog_id, database_name, table_name
))
.send()
.await?;

let parsed_resp: TableResponse = resp.json().await?;
match parsed_resp {
TableResponse::Success { storage_location } => Ok(storage_location),
TableResponse::Error {
error_code,
message,
} => Err(DeltaError::UnityInvalidTable {
error_code,
message,
}),
}
}
}
25 changes: 25 additions & 0 deletions crates/datasources/src/delta/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#[derive(Debug, thiserror::Error)]
pub enum DeltaError {
#[error(transparent)]
DeltaTable(#[from] deltalake::DeltaTableError),

#[error("Invalid table error from unity catalog: {error_code}: {message}")]
UnityInvalidTable { error_code: String, message: String },

#[error(transparent)]
Reqwest(#[from] reqwest::Error),

#[error(transparent)]
ObjectStore(#[from] object_store::Error),

#[error(transparent)]
DataFusion(#[from] datafusion::error::DataFusionError),

#[error(transparent)]
UrlParse(#[from] url::ParseError),

#[error("{0}")]
Static(&'static str),
}

pub type Result<T, E = DeltaError> = std::result::Result<T, E>;
3 changes: 3 additions & 0 deletions crates/datasources/src/delta/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod access;
pub mod catalog;
pub mod errors;
1 change: 1 addition & 0 deletions crates/datasources/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
pub mod bigquery;
pub mod debug;
pub mod delta;
pub mod mongodb;
pub mod mysql;
pub mod object_store;
Expand Down
2 changes: 1 addition & 1 deletion crates/datasources/src/object_store/csv/csv_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use bytes::{Buf, Bytes};
use datafusion::arrow::csv;
use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
use datafusion::datasource::file_format::file_type::FileCompressionType;
use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener};
use datafusion::error::{DataFusionError, Result as DatafusionResult};
use datafusion::physical_plan::file_format::{FileMeta, FileOpenFuture, FileOpener};
use futures::{Stream, StreamExt, TryStreamExt};
use object_store::{GetResult, ObjectStore};

Expand Down
2 changes: 1 addition & 1 deletion crates/datasources/src/object_store/csv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::file_format::file_type::FileCompressionType;
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{FileScanConfig, FileStream};
use datafusion::datasource::TableProvider;
use datafusion::error::{DataFusionError, Result as DatafusionResult};
use datafusion::execution::context::{SessionState, TaskContext};
use datafusion::logical_expr::TableType;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::file_format::{FileScanConfig, FileStream};
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
Expand Down
6 changes: 3 additions & 3 deletions crates/datasources/src/object_store/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
use datafusion::common::ToDFSchema;
use datafusion::datasource::file_format::parquet::fetch_parquet_metadata;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{
FileMeta, FileScanConfig, ParquetExec, ParquetFileReaderFactory,
};
use datafusion::datasource::TableProvider;
use datafusion::error::{DataFusionError, Result as DatafusionResult};
use datafusion::execution::context::SessionState;
Expand All @@ -22,6 +19,9 @@ use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder;
use datafusion::parquet::errors::{ParquetError, Result as ParquetResult};
use datafusion::parquet::file::metadata::ParquetMetaData;
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_plan::file_format::{
FileMeta, FileScanConfig, ParquetExec, ParquetFileReaderFactory,
};
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::{ExecutionPlan, Statistics};
use datafusion::prelude::Expr;
Expand Down
1 change: 1 addition & 0 deletions crates/logutil/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ edition = {workspace = true}
[dependencies]
tracing = "0.1"
tracing-subscriber = {version = "0.3", features = ["std", "fmt", "json", "env-filter"] }
tracing-log = "0.1"
7 changes: 7 additions & 0 deletions crates/logutil/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Utilities for logging and tracing.
use tracing::{info, subscriber, Level, Subscriber};
use tracing_log::LogTracer;
use tracing_subscriber::{
filter::{EnvFilter, LevelFilter},
fmt::SubscriberBuilder,
Expand Down Expand Up @@ -36,6 +37,10 @@ impl From<Verbosity> for Level {

/// Initialize a trace subsriber for a test.
pub fn init_test() {
// Failing to set the the global log adapter is fine, another test may have
// already set it.
let _ = LogTracer::init();

let subscriber = FmtSubscriber::builder()
.with_test_writer()
.with_max_level(Level::TRACE)
Expand All @@ -60,6 +65,8 @@ pub fn init(verbosity: impl Into<Verbosity>, json: bool) {
let verbosity = verbosity.into();
let level: Level = verbosity.into();

LogTracer::init().unwrap();

if json {
let mut builder = default_fmt_builder(level).json();
// Flatten fields into the top-level json. This is primarily done such
Expand Down
20 changes: 19 additions & 1 deletion crates/metastoreproto/proto/options.proto
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ message DatabaseOptions {
DatabaseOptionsMysql mysql = 5;
DatabaseOptionsMongo mongo = 6;
DatabaseOptionsSnowflake snowflake = 7;
DatabaseOptionsDeltaLake delta = 8;
}
// next: 8
// next: 9
}

message DatabaseOptionsInternal {}
Expand All @@ -70,6 +71,23 @@ message DatabaseOptionsSnowflake {
string role_name = 6;
}

message DatabaseOptionsDeltaLake {
oneof catalog { DeltaLakeUnityCatalog unity = 1; }

// TODO: Move these out into a separate "bucket auth" type. That could be
// shared with the other object storage stuff.
string access_key_id = 2;
string secret_access_key = 3;
string region = 4;
}

// Parameters specific to the unity catalog.
message DeltaLakeUnityCatalog {
string catalog_id = 1;
string databricks_access_token = 2;
string workspace_url = 3;
}

// Table options

message TableOptions {
Expand Down
Loading

0 comments on commit 5a2d5b1

Please sign in to comment.