Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(iceberg): Date partition value parse issue #12126

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -634,12 +634,18 @@ namespace {
bool applyPartitionFilter(
const TypePtr& type,
const std::string& partitionValue,
bool isPartitionDateDaysSinceEpoch,
common::Filter* filter) {
if (type->isDate()) {
const auto result = util::fromDateString(
StringView(partitionValue), util::ParseMode::kPrestoCast);
VELOX_CHECK(!result.hasError());
return applyFilter(*filter, result.value());
int32_t result = 0;
// days_since_epoch partition values are integers in string format. Eg.
// Iceberg partition values.
if (isPartitionDateDaysSinceEpoch) {
result = folly::to<int32_t>(partitionValue);
} else {
result = DATE()->toDays(static_cast<folly::StringPiece>(partitionValue));
}
return applyFilter(*filter, result);
}

switch (type->kind()) {
Expand Down Expand Up @@ -701,6 +707,7 @@ bool testFilters(
return applyPartitionFilter(
handlesIter->second->dataType(),
iter->second.value(),
handlesIter->second->isPartitionDateValueDaysSinceEpoch(),
child->filter());
}
// Column is missing, most likely due to schema evolution. Or it's a
Expand Down
15 changes: 12 additions & 3 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,22 @@ VectorPtr newConstantFromString(
vector_size_t size,
velox::memory::MemoryPool* pool,
const std::string& sessionTimezone,
bool asLocalTime) {
bool asLocalTime,
bool isPartitionDateDaysSinceEpoch = false) {
using T = typename TypeTraits<kind>::NativeType;
if (!value.has_value()) {
return std::make_shared<ConstantVector<T>>(pool, size, true, type, T());
}

if (type->isDate()) {
auto days = DATE()->toDays(static_cast<folly::StringPiece>(value.value()));
int32_t days = 0;
majetideepak marked this conversation as resolved.
Show resolved Hide resolved
// For Iceberg, the date partition values are already in daysSinceEpoch
// form.
if (isPartitionDateDaysSinceEpoch) {
days = folly::to<int32_t>(value.value());
} else {
days = DATE()->toDays(static_cast<folly::StringPiece>(value.value()));
}
return std::make_shared<ConstantVector<int32_t>>(
pool, size, false, type, std::move(days));
}
Expand Down Expand Up @@ -402,7 +410,8 @@ void SplitReader::setPartitionValue(
connectorQueryCtx_->memoryPool(),
connectorQueryCtx_->sessionTimezone(),
hiveConfig_->readTimestampPartitionValueAsLocalTime(
connectorQueryCtx_->sessionProperties()));
connectorQueryCtx_->sessionProperties()),
it->second->isPartitionDateValueDaysSinceEpoch());
spec->setConstantValue(constant);
}

Expand Down
19 changes: 17 additions & 2 deletions velox/connectors/hive/TableHandle.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ class HiveColumnHandle : public ColumnHandle {
kRowId,
};

struct ColumnParseParameters {
enum PartitionDateValueFormat {
kISO8601,
kDaysSinceEpoch,
} partitionDateValueFormat;
};

/// NOTE: 'dataType' is the column type in target write table. 'hiveType' is
/// converted type of the corresponding column in source table which might not
/// be the same type, and the table scan needs to do data coercion if needs.
Expand All @@ -45,12 +52,14 @@ class HiveColumnHandle : public ColumnHandle {
ColumnType columnType,
TypePtr dataType,
TypePtr hiveType,
std::vector<common::Subfield> requiredSubfields = {})
std::vector<common::Subfield> requiredSubfields = {},
ColumnParseParameters columnParseParameters = {})
: name_(name),
columnType_(columnType),
dataType_(std::move(dataType)),
hiveType_(std::move(hiveType)),
requiredSubfields_(std::move(requiredSubfields)) {
requiredSubfields_(std::move(requiredSubfields)),
columnParseParameters_(columnParseParameters) {
VELOX_USER_CHECK(
dataType_->equivalent(*hiveType_),
"data type {} and hive type {} do not match",
Expand Down Expand Up @@ -96,6 +105,11 @@ class HiveColumnHandle : public ColumnHandle {
return columnType_ == ColumnType::kPartitionKey;
}

bool isPartitionDateValueDaysSinceEpoch() const {
return columnParseParameters_.partitionDateValueFormat ==
ColumnParseParameters::kDaysSinceEpoch;
}

std::string toString() const;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

toString() should also be enhanced to show the columnParameters_.

Please update unit tests as well.


folly::dynamic serialize() const override;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The serialize and creata methods should be enhanced to also serialize/deserialize the columnParameters_ in the HiveColumnHandle.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to change serde and toString methods? The columnParameters_ will be set in Prestissimo code. Even tableParameters_ from HiveTableHandle class is implemented the same way.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nmahadevuni : Yes, serde and toString methods should always be updated when we update classes that appear in a Velox plan.

toString is used in printPlanWithStats debugging utilities https://facebookincubator.github.io/velox/develop/debugging/print-plan-with-stats.html and the serde methods have been used in non-Presto use-cases.

tableParameters_ should be handled this way as well. Its a bug if it is not.

Expand All @@ -115,6 +129,7 @@ class HiveColumnHandle : public ColumnHandle {
const TypePtr dataType_;
const TypePtr hiveType_;
const std::vector<common::Subfield> requiredSubfields_;
const ColumnParseParameters columnParseParameters_;
};

class HiveTableHandle : public ConnectorTableHandle {
Expand Down
137 changes: 110 additions & 27 deletions velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/file/FileSystems.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h"
Expand Down Expand Up @@ -227,6 +228,36 @@ class HiveIcebergTest : public HiveConnectorTestBase {

const static int rowCount = 20000;

protected:
std::shared_ptr<dwrf::Config> config_;
std::function<std::unique_ptr<dwrf::DWRFFlushPolicy>()> flushPolicyFactory_;

std::shared_ptr<ConnectorSplit> makeIcebergSplit(
const std::string& dataFilePath,
majetideepak marked this conversation as resolved.
Show resolved Hide resolved
const std::vector<IcebergDeleteFile>& deleteFiles = {},
const std::unordered_map<std::string, std::optional<std::string>>&
partitionKeys = {}) {
std::unordered_map<std::string, std::string> customSplitInfo;
customSplitInfo["table_format"] = "hive-iceberg";

auto file = filesystems::getFileSystem(dataFilePath, nullptr)
->openFileForRead(dataFilePath);
const int64_t fileSize = file->size();

return std::make_shared<HiveIcebergSplit>(
kHiveConnectorId,
dataFilePath,
fileFomat_,
0,
fileSize,
partitionKeys,
std::nullopt,
customSplitInfo,
nullptr,
/*cacheable=*/true,
deleteFiles);
}

private:
std::map<std::string, std::shared_ptr<TempFilePath>> writeDataFiles(
std::map<std::string, std::vector<int64_t>> rowGroupSizesForFiles) {
Expand Down Expand Up @@ -335,31 +366,6 @@ class HiveIcebergTest : public HiveConnectorTestBase {
return vectors;
}

std::shared_ptr<ConnectorSplit> makeIcebergSplit(
const std::string& dataFilePath,
const std::vector<IcebergDeleteFile>& deleteFiles = {}) {
std::unordered_map<std::string, std::optional<std::string>> partitionKeys;
std::unordered_map<std::string, std::string> customSplitInfo;
customSplitInfo["table_format"] = "hive-iceberg";

auto file = filesystems::getFileSystem(dataFilePath, nullptr)
->openFileForRead(dataFilePath);
const int64_t fileSize = file->size();

return std::make_shared<HiveIcebergSplit>(
kHiveConnectorId,
dataFilePath,
fileFomat_,
0,
fileSize,
partitionKeys,
std::nullopt,
customSplitInfo,
nullptr,
/*cacheable=*/true,
deleteFiles);
}

std::string getDuckDBQuery(
const std::map<std::string, std::vector<int64_t>>& rowGroupSizesForFiles,
const std::unordered_map<
Expand Down Expand Up @@ -478,8 +484,6 @@ class HiveIcebergTest : public HiveConnectorTestBase {
}

dwio::common::FileFormat fileFomat_{dwio::common::FileFormat::DWRF};
std::shared_ptr<dwrf::Config> config_;
std::function<std::unique_ptr<dwrf::DWRFFlushPolicy>()> flushPolicyFactory_;

RowTypePtr rowType_{ROW({"c0"}, {BIGINT()})};
std::shared_ptr<IcebergMetadataColumn> pathColumn_ =
Expand Down Expand Up @@ -660,4 +664,83 @@ TEST_F(HiveIcebergTest, positionalDeletesMultipleSplits) {
assertMultipleSplits({}, 10, 3);
}

TEST_F(HiveIcebergTest, testPartitionedRead) {
RowTypePtr rowType{ROW({"c0", "ds"}, {BIGINT(), DateType::get()})};
std::unordered_map<std::string, std::optional<std::string>> partitionKeys;
// Iceberg API sets partition values for dates to daysSinceEpoch, so
// in velox, we do not need to convert it to days.
// Test query on two partitions ds=17627(2018-04-06), ds=17628(2018-04-07)
std::vector<std::shared_ptr<ConnectorSplit>> splits;
std::vector<std::shared_ptr<TempFilePath>> dataFilePaths;
for (int i = 0; i <= 1; ++i) {
std::vector<RowVectorPtr> dataVectors;
int32_t daysSinceEpoch = 17627 + i;
VectorPtr c0 = makeFlatVector<int64_t>((std::vector<int64_t>){i});
VectorPtr ds =
makeFlatVector<int32_t>((std::vector<int32_t>){daysSinceEpoch});
dataVectors.push_back(makeRowVector({"c0", "ds"}, {c0, ds}));

auto dataFilePath = TempFilePath::create();
dataFilePaths.push_back(dataFilePath);
writeToFile(
dataFilePath->getPath(), dataVectors, config_, flushPolicyFactory_);
partitionKeys["ds"] = std::to_string(daysSinceEpoch);
splits.emplace_back(
makeIcebergSplit(dataFilePath->getPath(), {}, partitionKeys));
}

std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
assignments;
assignments.insert(
{"c0",
std::make_shared<HiveColumnHandle>(
"c0",
HiveColumnHandle::ColumnType::kRegular,
rowType->childAt(0),
rowType->childAt(0))});

std::vector<common::Subfield> requiredSubFields;
HiveColumnHandle::ColumnParseParameters columnParseParameters;
columnParseParameters.partitionDateValueFormat =
HiveColumnHandle::ColumnParseParameters::kDaysSinceEpoch;
assignments.insert(
{"ds",
std::make_shared<HiveColumnHandle>(
"ds",
HiveColumnHandle::ColumnType::kPartitionKey,
rowType->childAt(1),
rowType->childAt(1),
std::move(requiredSubFields),
columnParseParameters)});

auto plan = PlanBuilder(pool_.get())
.tableScan(rowType, {}, "", nullptr, assignments)
.planNode();

HiveConnectorTestBase::assertQuery(
majetideepak marked this conversation as resolved.
Show resolved Hide resolved
plan,
splits,
"SELECT * FROM (VALUES (0, '2018-04-06'), (1, '2018-04-07'))",
0);

// Test filter on non-partitioned non-date column
std::vector<std::string> nonPartitionFilters = {"c0 = 1"};
plan = PlanBuilder(pool_.get())
.tableScan(rowType, nonPartitionFilters, "", nullptr, assignments)
.planNode();

HiveConnectorTestBase::assertQuery(plan, splits, "SELECT 1, '2018-04-07'");
majetideepak marked this conversation as resolved.
Show resolved Hide resolved

// Test filter on non-partitioned date column
std::vector<std::string> filters = {"ds = date'2018-04-06'"};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is very simple. The partition keys, filters are the same and there is only one matching row that is selected. Would be good to add rows in the input that don't match the partition key and are filtered out. Also would be good to add a filter for another non-partition key value that matches an input row.

plan = PlanBuilder(pool_.get()).tableScan(rowType, filters).planNode();

splits.clear();
majetideepak marked this conversation as resolved.
Show resolved Hide resolved
for (auto& dataFilePath : dataFilePaths) {
splits.emplace_back(makeIcebergSplit(dataFilePath->getPath(), {}, {}));
}

HiveConnectorTestBase::assertQuery(plan, splits, "SELECT 0, '2018-04-06'");
}

} // namespace facebook::velox::connector::hive::iceberg
Loading