From 457e82a7310ca904502905b9be721e867b8aff5f Mon Sep 17 00:00:00 2001 From: Mahadevuni Naveen Kumar Date: Thu, 13 Feb 2025 13:50:59 +0530 Subject: [PATCH] fix(iceberg): Date partition value parse issue --- velox/connectors/hive/HiveConnectorUtil.cpp | 15 +- velox/connectors/hive/SplitReader.cpp | 15 +- velox/connectors/hive/TableHandle.h | 19 ++- .../hive/iceberg/tests/IcebergReadTest.cpp | 145 ++++++++++++++---- 4 files changed, 158 insertions(+), 36 deletions(-) diff --git a/velox/connectors/hive/HiveConnectorUtil.cpp b/velox/connectors/hive/HiveConnectorUtil.cpp index 2525064cee85..e28c58010e20 100644 --- a/velox/connectors/hive/HiveConnectorUtil.cpp +++ b/velox/connectors/hive/HiveConnectorUtil.cpp @@ -634,12 +634,18 @@ namespace { bool applyPartitionFilter( const TypePtr& type, const std::string& partitionValue, + bool isPartitionDateDaysSinceEpoch, common::Filter* filter) { if (type->isDate()) { - const auto result = util::fromDateString( - StringView(partitionValue), util::ParseMode::kPrestoCast); - VELOX_CHECK(!result.hasError()); - return applyFilter(*filter, result.value()); + int32_t result = 0; + // days_since_epoch partition values are integers in string format. Eg. + // Iceberg partition values. + if (isPartitionDateDaysSinceEpoch) { + result = folly::to(partitionValue); + } else { + result = DATE()->toDays(static_cast(partitionValue)); + } + return applyFilter(*filter, result); } switch (type->kind()) { @@ -701,6 +707,7 @@ bool testFilters( return applyPartitionFilter( handlesIter->second->dataType(), iter->second.value(), + handlesIter->second->isPartitionDateValueDaysSinceEpoch(), child->filter()); } // Column is missing, most likely due to schema evolution. Or it's a diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index fd30a77558c6..691d1abd4907 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -36,14 +36,22 @@ VectorPtr newConstantFromString( vector_size_t size, velox::memory::MemoryPool* pool, const std::string& sessionTimezone, - bool asLocalTime) { + bool asLocalTime, + bool isPartitionDateDaysSinceEpoch = false) { using T = typename TypeTraits::NativeType; if (!value.has_value()) { return std::make_shared>(pool, size, true, type, T()); } if (type->isDate()) { - auto days = DATE()->toDays(static_cast(value.value())); + int32_t days = 0; + // For Iceberg, the date partition values are already in daysSinceEpoch + // form. + if (isPartitionDateDaysSinceEpoch) { + days = folly::to(value.value()); + } else { + days = DATE()->toDays(static_cast(value.value())); + } return std::make_shared>( pool, size, false, type, std::move(days)); } @@ -402,7 +410,8 @@ void SplitReader::setPartitionValue( connectorQueryCtx_->memoryPool(), connectorQueryCtx_->sessionTimezone(), hiveConfig_->readTimestampPartitionValueAsLocalTime( - connectorQueryCtx_->sessionProperties())); + connectorQueryCtx_->sessionProperties()), + it->second->isPartitionDateValueDaysSinceEpoch()); spec->setConstantValue(constant); } diff --git a/velox/connectors/hive/TableHandle.h b/velox/connectors/hive/TableHandle.h index 14916f51a734..2711da55a37a 100644 --- a/velox/connectors/hive/TableHandle.h +++ b/velox/connectors/hive/TableHandle.h @@ -35,6 +35,13 @@ class HiveColumnHandle : public ColumnHandle { kRowId, }; + struct ColumnParseParameters { + enum PartitionDateValueFormat { + kISO8601, + kDaysSinceEpoch, + } partitionDateValueFormat; + }; + /// NOTE: 'dataType' is the column type in target write table. 'hiveType' is /// converted type of the corresponding column in source table which might not /// be the same type, and the table scan needs to do data coercion if needs. @@ -45,12 +52,14 @@ class HiveColumnHandle : public ColumnHandle { ColumnType columnType, TypePtr dataType, TypePtr hiveType, - std::vector requiredSubfields = {}) + std::vector requiredSubfields = {}, + ColumnParseParameters columnParseParameters = {}) : name_(name), columnType_(columnType), dataType_(std::move(dataType)), hiveType_(std::move(hiveType)), - requiredSubfields_(std::move(requiredSubfields)) { + requiredSubfields_(std::move(requiredSubfields)), + columnParseParameters_(columnParseParameters) { VELOX_USER_CHECK( dataType_->equivalent(*hiveType_), "data type {} and hive type {} do not match", @@ -96,6 +105,11 @@ class HiveColumnHandle : public ColumnHandle { return columnType_ == ColumnType::kPartitionKey; } + bool isPartitionDateValueDaysSinceEpoch() const { + return columnParseParameters_.partitionDateValueFormat == + ColumnParseParameters::kDaysSinceEpoch; + } + std::string toString() const; folly::dynamic serialize() const override; @@ -115,6 +129,7 @@ class HiveColumnHandle : public ColumnHandle { const TypePtr dataType_; const TypePtr hiveType_; const std::vector requiredSubfields_; + const ColumnParseParameters columnParseParameters_; }; class HiveTableHandle : public ConnectorTableHandle { diff --git a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp index 7416713868ad..d74a840e9a33 100644 --- a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp +++ b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp @@ -14,6 +14,7 @@ * limitations under the License. */ +#include "velox/common/base/tests/GTestUtils.h" #include "velox/common/file/FileSystems.h" #include "velox/connectors/hive/HiveConnectorSplit.h" #include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" @@ -227,6 +228,36 @@ class HiveIcebergTest : public HiveConnectorTestBase { const static int rowCount = 20000; + protected: + std::shared_ptr config_; + std::function()> flushPolicyFactory_; + + std::shared_ptr makeIcebergSplit( + const std::string& dataFilePath, + const std::vector& deleteFiles = {}, + const std::unordered_map> + partitionKeys = {}) { + std::unordered_map customSplitInfo; + customSplitInfo["table_format"] = "hive-iceberg"; + + auto file = filesystems::getFileSystem(dataFilePath, nullptr) + ->openFileForRead(dataFilePath); + const int64_t fileSize = file->size(); + + return std::make_shared( + kHiveConnectorId, + dataFilePath, + fileFomat_, + 0, + fileSize, + partitionKeys, + std::nullopt, + customSplitInfo, + nullptr, + /*cacheable=*/true, + deleteFiles); + } + private: std::map> writeDataFiles( std::map> rowGroupSizesForFiles) { @@ -335,31 +366,6 @@ class HiveIcebergTest : public HiveConnectorTestBase { return vectors; } - std::shared_ptr makeIcebergSplit( - const std::string& dataFilePath, - const std::vector& deleteFiles = {}) { - std::unordered_map> partitionKeys; - std::unordered_map customSplitInfo; - customSplitInfo["table_format"] = "hive-iceberg"; - - auto file = filesystems::getFileSystem(dataFilePath, nullptr) - ->openFileForRead(dataFilePath); - const int64_t fileSize = file->size(); - - return std::make_shared( - kHiveConnectorId, - dataFilePath, - fileFomat_, - 0, - fileSize, - partitionKeys, - std::nullopt, - customSplitInfo, - nullptr, - /*cacheable=*/true, - deleteFiles); - } - std::string getDuckDBQuery( const std::map>& rowGroupSizesForFiles, const std::unordered_map< @@ -478,8 +484,6 @@ class HiveIcebergTest : public HiveConnectorTestBase { } dwio::common::FileFormat fileFomat_{dwio::common::FileFormat::DWRF}; - std::shared_ptr config_; - std::function()> flushPolicyFactory_; RowTypePtr rowType_{ROW({"c0"}, {BIGINT()})}; std::shared_ptr pathColumn_ = @@ -660,4 +664,91 @@ TEST_F(HiveIcebergTest, positionalDeletesMultipleSplits) { assertMultipleSplits({}, 10, 3); } +TEST_F(HiveIcebergTest, testPartitionedRead) { + RowTypePtr rowType{ROW({"c0", "ds"}, {BIGINT(), DateType::get()})}; + std::unordered_map> partitionKeys; + // Iceberg API sets partition values for dates to daysSinceEpoch, so + // in velox, we do not need to convert it to days. + // Test query on two partitions ds=17627(2018-04-06), ds=17628(2018-04-07) + std::vector> splits; + std::vector> dataFilePaths; + for (int i = 0; i <= 1; ++i) { + std::vector dataVectors; + int32_t daysSinceEpoch = 17627 + i; + VectorPtr c0 = makeFlatVector((std::vector){i}); + VectorPtr ds = + makeFlatVector((std::vector){daysSinceEpoch}); + dataVectors.push_back(makeRowVector({"c0", "ds"}, {c0, ds})); + + auto dataFilePath = TempFilePath::create(); + dataFilePaths.push_back(dataFilePath); + writeToFile( + dataFilePath->getPath(), dataVectors, config_, flushPolicyFactory_); + partitionKeys["ds"] = std::to_string(daysSinceEpoch); + splits.emplace_back( + makeIcebergSplit(dataFilePath->getPath(), {}, partitionKeys)); + } + + std::unordered_map> + assignments; + + assignments.insert( + {"c0", + std::make_shared( + "c0", + HiveColumnHandle::ColumnType::kRegular, + rowType->childAt(0), + rowType->childAt(0))}); + + std::vector requiredSubFields; + HiveColumnHandle::ColumnParseParameters columnParseParameters; + columnParseParameters.partitionDateValueFormat = + HiveColumnHandle::ColumnParseParameters::kDaysSinceEpoch; + assignments.insert( + {"ds", + std::make_shared( + "ds", + HiveColumnHandle::ColumnType::kPartitionKey, + rowType->childAt(1), + rowType->childAt(1), + std::move(requiredSubFields), + columnParseParameters)}); + + auto planBuilder = new PlanBuilder(pool_.get()); + auto plan = planBuilder->startTableScan() + .outputType(rowType) + .assignments(assignments) + .endTableScan() + .planNode(); + HiveConnectorTestBase::assertQuery( + plan, + splits, + "SELECT * FROM (VALUES (0, '2018-04-06'), (1, '2018-04-07'))", + 0); + + // Test filter on non-partitioned non-date column + std::vector nonPartitionFilters = {"c0 = 1"}; + plan = planBuilder->startTableScan() + .outputType(rowType) + .assignments(assignments) + .subfieldFilters(nonPartitionFilters) + .endTableScan() + .planNode(); + HiveConnectorTestBase::assertQuery(plan, splits, "SELECT 1, '2018-04-07'"); + + // Test filter on non-partitioned date column + std::vector filters = {"ds = date'2018-04-06'"}; + plan = planBuilder->startTableScan() + .outputType(rowType) + .subfieldFilters(filters) + .endTableScan() + .planNode(); + splits.clear(); + for (auto dataFilePath : dataFilePaths) { + splits.emplace_back(makeIcebergSplit(dataFilePath->getPath(), {}, {})); + } + + HiveConnectorTestBase::assertQuery(plan, splits, "SELECT 0, '2018-04-06'"); +} + } // namespace facebook::velox::connector::hive::iceberg