Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Handle timestamp conversions from nanosecond to microseconds on write #1287

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 53 additions & 1 deletion rust/src/writer/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use bytes::Bytes;
use log::*;
use object_store::ObjectStore;
use parquet::{arrow::ArrowWriter, errors::ParquetError};
use parquet::{basic::Compression, file::properties::WriterProperties};
Expand Down Expand Up @@ -297,7 +298,15 @@ impl PartitionWriter {
/// This method buffers the write stream internally so it can be invoked for many
/// record batches and flushed after the appropriate number of bytes has been written.
pub fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DeltaWriterError> {
/*
* This schema check is redundant with one which ArrowWriter
* performs itself
*/
if record_batch.schema() != self.arrow_schema {
debug!("Attempting to write RecordBatch which does not match self.arrow_schema");
let new_batch = RecordBatch::try_new(self.arrow_schema.clone(), record_batch.columns().iter().map(|c| c.clone()).collect())?;

println!("record batch mismatch, new: {:?}", new_batch);
return Err(DeltaWriterError::SchemaMismatch {
record_batch_schema: record_batch.schema(),
expected_schema: self.arrow_schema.clone(),
Expand Down Expand Up @@ -413,7 +422,7 @@ pub(crate) fn divide_by_partition_values(
mod tests {
use super::*;
use crate::writer::{
test_utils::{create_initialized_table, get_record_batch},
test_utils::{create_initialized_table, create_initialized_table_with, get_record_batch},
utils::PartitionPath,
};
use std::path::Path;
Expand Down Expand Up @@ -477,6 +486,49 @@ mod tests {
assert_eq!(adds.len(), 1);
}

/*
* This is a test case to address:
* <https://github.com/delta-io/delta-rs/issues/1286>
*/
#[tokio::test]
async fn test_write_batch_with_timestamps() {
use crate::{SchemaDataType, SchemaField};
use arrow::array::*;
use arrow::datatypes::{Field, TimeUnit, DataType as ArrowDataType};

let schema = Schema::new(vec![
SchemaField::new(
"id".to_string(),
SchemaDataType::primitive("string".to_string()),
true,
HashMap::new(),
),
SchemaField::new(
"timestamp".to_string(),
SchemaDataType::primitive("timestamp".to_string()),
true,
HashMap::new(),
),
]);

let batch_schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", ArrowDataType::Utf8, true),
Field::new("timestamp", ArrowDataType::Timestamp(TimeUnit::Nanosecond, None), true),
]));

let table = create_initialized_table_with(schema, &vec![]).await;

let id_values = Arc::new(StringArray::from(vec![Some("Hi")]));
let timestamp_values = Arc::new(TimestampNanosecondArray::from(vec![1]));
let batch = RecordBatch::try_new(batch_schema, vec![id_values, timestamp_values])
.unwrap();
let mut writer = RecordBatchWriter::for_table(&table).unwrap();

writer.write(batch).await.unwrap();
let adds = writer.flush().await.unwrap();
assert_eq!(adds.len(), 1);
}

#[tokio::test]
async fn test_write_multiple_partitions() {
let batch = get_record_batch(None, false);
Expand Down
24 changes: 20 additions & 4 deletions rust/src/writer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ pub fn get_delta_metadata(partition_cols: &[String]) -> DeltaTableMetaData {
)
}

/*
* Create a bare table without any commits, this basically just creates a directory and then
* initializes a DeltaTable from it
*/
pub fn create_bare_table() -> DeltaTable {
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path();
Expand All @@ -172,10 +176,14 @@ pub fn create_bare_table() -> DeltaTable {
.unwrap()
}

pub async fn create_initialized_table(partition_cols: &[String]) -> DeltaTable {
/*
* Create an initialized table with a schema
*/
pub async fn create_initialized_table_with(
schema: Schema,
partition_cols: &[String],
) -> DeltaTable {
let mut table = create_bare_table();
let table_schema = get_delta_schema();

let mut commit_info = serde_json::Map::<String, serde_json::Value>::new();
commit_info.insert(
"operation".to_string(),
Expand All @@ -195,7 +203,7 @@ pub async fn create_initialized_table(partition_cols: &[String]) -> DeltaTable {
None,
None,
None,
table_schema,
schema,
partition_cols.to_vec(),
HashMap::new(),
);
Expand All @@ -207,3 +215,11 @@ pub async fn create_initialized_table(partition_cols: &[String]) -> DeltaTable {

table
}

/*
* Create an initialize table in a temp directory with a test schema
*/
pub async fn create_initialized_table(partition_cols: &[String]) -> DeltaTable {
let table_schema = get_delta_schema();
create_initialized_table_with(table_schema, partition_cols).await
}