From 72225e2612d44041925138d62e50e6767788d7a7 Mon Sep 17 00:00:00 2001 From: Jack Del Vecchio Date: Tue, 28 Nov 2023 18:20:19 +0000 Subject: [PATCH] HPCC 30381 Parquet plugin functions should be more consistent. --- plugins/parquet/parquetembed.cpp | 1453 +++++++++++++++--------------- plugins/parquet/parquetembed.hpp | 553 ++++++------ 2 files changed, 992 insertions(+), 1014 deletions(-) diff --git a/plugins/parquet/parquetembed.cpp b/plugins/parquet/parquetembed.cpp index d3ea141c1f6..e07ef588779 100644 --- a/plugins/parquet/parquetembed.cpp +++ b/plugins/parquet/parquetembed.cpp @@ -97,38 +97,20 @@ extern void fail(const char *message) } /** - * @brief Simple constructor that stores the inputs from the user. + * @brief Contructs a ParquetReader for a specific file location. * - * @param option The read or write option. - * - * @param location The location to read a parquet file. - * - * @param destination The destination to write a parquet file. - * - * @param rowsize The max row group size when reading parquet files. - * - * @param _batchSize The size of the batches when converting parquet columns to rows. + * @param option The read or write option as well as information about partitioning. + * @param _location The full path from which to read a Parquet file or partitioned dataset. Can be a filename or directory. + * @param _maxRowCountInTable The number of rows in each batch when converting Parquet columns to rows. + * @param _activityCtx Additional context about the thor workers running. */ -ParquetHelper::ParquetHelper(const char *option, const char *_location, const char *destination, int _rowSize, int _batchSize, - bool _overwrite, arrow::Compression::type _compressionOption, const char *_partitionFields, const IThorActivityContext *_activityCtx) - : partOption(option), location(_location), destination(destination) +ParquetReader::ParquetReader(const char *option, const char *_location, int _maxRowCountInTable, const char *_partitionFields, const IThorActivityContext *_activityCtx) + : partOption(option), location(_location) { - rowSize = _rowSize; - batchSize = _batchSize; - overwrite = _overwrite; - compressionOption = _compressionOption; + maxRowCountInTable = _maxRowCountInTable; activityCtx = _activityCtx; pool = arrow::default_memory_pool(); - - parquetDoc = std::vector(rowSize); - - if (activityCtx->querySlave() == 0 && startsWith(option, "write")) - { - reportIfFailure(checkDirContents()); - } - - partition = endsWithIgnoreCase(option, "partition"); - if (partition) + if (_partitionFields) { std::stringstream ss(_partitionFields); std::string field; @@ -137,123 +119,23 @@ ParquetHelper::ParquetHelper(const char *option, const char *_location, const ch } } -ParquetHelper::~ParquetHelper() +ParquetReader::~ParquetReader() { pool->ReleaseUnused(); - jsonAlloc.Clear(); } /** - * @brief Get the Schema shared pointer + * @brief Opens a read stream at the target location set in the constructor. * - * @return std::shared_ptr Shared_ptr of schema object for building the write stream. + * @return Status object arrow::Status::OK if successful. */ -std::shared_ptr ParquetHelper::getSchema() +arrow::Status ParquetReader::openReadFile() { - return schema; -} - -arrow::Status ParquetHelper::checkDirContents() -{ - if (destination.empty()) + if (location.empty()) { - failx("Missing target location when writing Parquet data."); - } - StringBuffer path; - StringBuffer filename; - StringBuffer ext; - splitFilename(destination.c_str(), nullptr, &path, &filename, &ext, false); - - ARROW_ASSIGN_OR_RAISE(auto filesystem, arrow::fs::FileSystemFromUriOrPath(destination)); - - Owned itr = createDirectoryIterator(path.str(), filename.appendf("*%s", ext.str())); - ForEach (*itr) - { - IFile &file = itr->query(); - if (file.isFile() == fileBool::foundYes) - { - if(overwrite) - { - if (!file.remove()) - { - failx("Failed to remove file %s", file.queryFilename()); - } - } - else - { - failx("The target file %s already exists. To delete the file set the overwrite option to true.", file.queryFilename()); - } - } - else - { - if (overwrite) - { - reportIfFailure(filesystem->DeleteDirContents(path.str())); - break; - } - else - { - failx("The target directory %s is not empty. To delete the contents of the directory set the overwrite option to true.", path.str()); - } - } - } - return arrow::Status::OK(); -} - -/** - * @brief Opens the write stream with the schema and destination. T - * - */ -arrow::Status ParquetHelper::openWriteFile() -{ - if (destination == "") failx("Invalid option: The destination was not supplied."); - - if (partition) - { - ARROW_ASSIGN_OR_RAISE(auto filesystem, arrow::fs::FileSystemFromUriOrPath(destination)); - auto format = std::make_shared(); - writeOptions.file_write_options = format->DefaultWriteOptions(); - writeOptions.filesystem = filesystem; - writeOptions.base_dir = destination; - writeOptions.partitioning = partitionType; - writeOptions.existing_data_behavior = arrow::dataset::ExistingDataBehavior::kOverwriteOrIgnore; - } - else - { - if(!endsWith(destination.c_str(), ".parquet")) - failx("Error opening file: Invalid file extension for file %s", destination.c_str()); - - // Currently under the assumption that all channels and workers are given a worker id and no matter - // the configuration will show up in activityCtx->numSlaves() - if (activityCtx->numSlaves() > 1) - { - destination.insert(destination.find(".parquet"), std::to_string(activityCtx->querySlave())); - } - - std::shared_ptr outfile; - - PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(destination)); - - // Choose compression - std::shared_ptr props = parquet::WriterProperties::Builder().compression(compressionOption)->build(); - - // Opt to store Arrow schema for easier reads back into Arrow - std::shared_ptr arrow_props = parquet::ArrowWriterProperties::Builder().store_schema()->build(); - - // Create a writer - ARROW_ASSIGN_OR_RAISE(writer, parquet::arrow::FileWriter::Open(*schema.get(), pool, outfile, props, arrow_props)); } - return arrow::Status::OK(); -} - -/** - * @brief Opens the read stream with the schema and location. - * - */ -arrow::Status ParquetHelper::openReadFile() -{ - if (partition) + if (endsWithIgnoreCase(partOption.c_str(), "partition")) { // Create a filesystem std::shared_ptr fs; @@ -281,13 +163,13 @@ arrow::Status ParquetHelper::openReadFile() failx("Incorrect partitioning type %s.", partOption.c_str()); } // Create the dataset factory - PARQUET_ASSIGN_OR_THROW(auto dataset_factory, arrow::dataset::FileSystemDatasetFactory::Make(fs, selector, format, options)); + PARQUET_ASSIGN_OR_THROW(auto datasetFactory, arrow::dataset::FileSystemDatasetFactory::Make(fs, selector, format, options)); // Get scanner - PARQUET_ASSIGN_OR_THROW(auto dataset, dataset_factory->Finish()); - ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan()); - reportIfFailure(scan_builder->Pool(pool)); - ARROW_ASSIGN_OR_RAISE(scanner, scan_builder->Finish()); + PARQUET_ASSIGN_OR_THROW(auto dataset, datasetFactory->Finish()); + ARROW_ASSIGN_OR_RAISE(auto scanBuilder, dataset->NewScan()); + reportIfFailure(scanBuilder->Pool(pool)); + ARROW_ASSIGN_OR_RAISE(scanner, scanBuilder->Finish()); } else { @@ -296,86 +178,36 @@ arrow::Status ParquetHelper::openReadFile() splitFilename(location.c_str(), nullptr, &path, &filename, nullptr, false); Owned itr = createDirectoryIterator(path.str(), filename.append("*.parquet")); - auto reader_properties = parquet::ReaderProperties(pool); - auto arrow_reader_props = parquet::ArrowReaderProperties(); + auto readerProperties = parquet::ReaderProperties(pool); + auto arrowReaderProps = parquet::ArrowReaderProperties(); ForEach (*itr) { IFile &file = itr->query(); - parquet::arrow::FileReaderBuilder reader_builder; - reportIfFailure(reader_builder.OpenFile(file.queryFilename(), false, reader_properties)); - reader_builder.memory_pool(pool); - reader_builder.properties(arrow_reader_props); + parquet::arrow::FileReaderBuilder readerBuilder; + reportIfFailure(readerBuilder.OpenFile(file.queryFilename(), false, readerProperties)); + readerBuilder.memory_pool(pool); + readerBuilder.properties(arrowReaderProps); std::unique_ptr parquetFileReader; - reportIfFailure(reader_builder.Build(&parquetFileReader)); + reportIfFailure(readerBuilder.Build(&parquetFileReader)); parquetFileReaders.push_back(std::move(parquetFileReader)); } } return arrow::Status::OK(); } -arrow::Status ParquetHelper::writePartition(std::shared_ptr table) -{ - // Create dataset for writing partitioned files. - auto dataset = std::make_shared(table); - - StringBuffer basename_template; - basename_template.appendf("part{i}_%lld.parquet", tablesProcessed++); - writeOptions.basename_template = basename_template.str(); - - ARROW_ASSIGN_OR_RAISE(auto scanner_builder, dataset->NewScan()); - reportIfFailure(scanner_builder->Pool(pool)); - ARROW_ASSIGN_OR_RAISE(auto scanner, scanner_builder->Finish()); - - // Write partitioned files. - reportIfFailure(arrow::dataset::FileSystemDataset::Write(writeOptions, scanner)); - - return arrow::Status::OK(); -} - /** - * @brief Returns a pointer to the stream writer for writing to the destination. + * @brief Divide row groups being read from a Parquet file among any number of thor workers. * - * @return - */ -parquet::arrow::FileWriter *ParquetHelper::queryWriter() -{ - return writer.get(); -} - -/** - * @brief Returns a pointer to the top of the stack for the current row being built. - * - * @return A rapidjson::Value containing the row - */ -rapidjson::Value *ParquetHelper::queryCurrentRow() -{ - return &rowStack[rowStack.size() - 1]; -} - -/** - * @brief A helper method for updating the current row on writes and keeping - * it within the boundary of the rowSize set by the user when creating RowGroups. - */ -void ParquetHelper::updateRow() -{ - if (++currentRow == rowSize) - currentRow = 0; -} - -std::vector &ParquetHelper::queryRecordBatch() -{ - return parquetDoc; -} - -/** - * @brief Divide row groups being read from a parquet file among any number of thor workers. If running hthor all row groups are assigned to it. This function - * will handle all cases where the number of groups is greater than, less than or divisible by the number of thor workers. + * @param activityCtx Context information about which thor worker is reading the file. + * @param totalRowGroups The total row groups in the file or files that are being read. + * @param numRowGroups The number of row groups that this worker needs to read. + * @param startRowGroup The starting row group index for each thor worker. */ void divide_row_groups(const IThorActivityContext *activityCtx, __int64 totalRowGroups, __int64 &numRowGroups, __int64 &startRowGroup) { int workers = activityCtx->numSlaves(); int strands = activityCtx->numStrands(); - int worker_id = activityCtx->querySlave(); + int workerId = activityCtx->querySlave(); // Currently under the assumption that all channels and workers are given a worker id and no matter // the configuration will show up in activityCtx->numSlaves() @@ -386,7 +218,7 @@ void divide_row_groups(const IThorActivityContext *activityCtx, __int64 totalRow if (totalRowGroups % workers == 0) { numRowGroups = totalRowGroups / workers; - startRowGroup = numRowGroups * worker_id; + startRowGroup = numRowGroups * workerId; } // If the totalRowGroups is not evenly divisible by the number of workers then we divide them up // with the first n-1 workers getting slightly more and the nth worker gets the remainder @@ -395,25 +227,25 @@ void divide_row_groups(const IThorActivityContext *activityCtx, __int64 totalRow __int64 groupsPerWorker = totalRowGroups / workers; __int64 remainder = totalRowGroups % workers; - if (worker_id < remainder) + if (workerId < remainder) { numRowGroups = groupsPerWorker + 1; - startRowGroup = numRowGroups * worker_id; + startRowGroup = numRowGroups * workerId; } else { numRowGroups = groupsPerWorker; - startRowGroup = (remainder * (numRowGroups + 1)) + ((worker_id - remainder) * numRowGroups); + startRowGroup = (remainder * (numRowGroups + 1)) + ((workerId - remainder) * numRowGroups); } } // If the number of totalRowGroups is less than the number of workers we give as many as possible // a single row group to read. else { - if (worker_id < totalRowGroups) + if (workerId < totalRowGroups) { numRowGroups = 1; - startRowGroup = worker_id; + startRowGroup = workerId; } else { @@ -430,7 +262,13 @@ void divide_row_groups(const IThorActivityContext *activityCtx, __int64 totalRow } } -void ParquetHelper::chunkTable(std::shared_ptr &table) +/** + * @brief Splits an arrow table into an unordered map with the left side containing the + * column names and the right side containing an Array of the column values. + * + * @param table The table to be split and stored in the unordered map. +*/ +void ParquetReader::splitTable(std::shared_ptr &table) { auto columns = table->columns(); parquetTable.clear(); @@ -440,7 +278,13 @@ void ParquetHelper::chunkTable(std::shared_ptr &table) } } -std::shared_ptr ParquetHelper::queryCurrentTable(__int64 currTable) +/** + * @brief Get the current table taking into account multiple files with variable table counts. + * + * @param currTable The index of the current table relative to the total number in all files being read. + * @return std::shared_ptr The RowGroupReader to read columns from the table. + */ +std::shared_ptr ParquetReader::queryCurrentTable(__int64 currTable) { __int64 tables = 0; __int64 offset = 0; @@ -457,33 +301,20 @@ std::shared_ptr ParquetHelper::queryCurrentTable } /** - * @brief Sets the parquetTable member to the output of what is read from the given - * parquet file. + * @brief Open the file reader for the target file and read the metadata for the row counts. + * + * @return arrow::Status Returns ok if opening a file and reading the metadata succeeds. */ -arrow::Status ParquetHelper::processReadFile() +arrow::Status ParquetReader::processReadFile() { - if (partition) + reportIfFailure(openReadFile()); // Open the file with the target location before processing. + if (endsWithIgnoreCase(partOption.c_str(), "partition")) { - // rowsProcessed starts at zero and we read in batches until it is equal to rowsCount - rowsProcessed = 0; - totalRowsProcessed = 0; PARQUET_ASSIGN_OR_THROW(rbatchReader, scanner->ToRecordBatchReader()); rbatchItr = arrow::RecordBatchReader::RecordBatchReaderIterator(rbatchReader.get()); + PARQUET_ASSIGN_OR_THROW(auto datasetRows, scanner->CountRows()); // Divide the work among any number of workers - PARQUET_ASSIGN_OR_THROW(auto total_rows, scanner->CountRows()); - divide_row_groups(activityCtx, total_rows, totalRowCount, startRow); - if (totalRowCount != 0) - { - std::shared_ptr table; - PARQUET_ASSIGN_OR_THROW(table, queryRows()); - rowsCount = table->num_rows(); - chunkTable(table); - tablesProcessed++; - } - else - { - rowsCount = 0; - } + divide_row_groups(activityCtx, datasetRows, totalRowCount, startRowGroup); } else { @@ -497,158 +328,272 @@ arrow::Status ParquetHelper::processReadFile() } divide_row_groups(activityCtx, totalTables, tableCount, startRowGroup); - rowsProcessed = 0; - if (tableCount != 0) - { - std::shared_ptr table; - reportIfFailure(queryCurrentTable(tablesProcessed + startRowGroup)->ReadTable(&table)); - rowsCount = table->num_rows(); - chunkTable(table); - tablesProcessed++; - } - else - { - rowsCount = 0; - } } + totalRowsProcessed = 0; + rowsProcessed = 0; + rowsCount = 0; return arrow::Status::OK(); } /** - * @brief Returns a boolean so we know if we are writing partitioned files. + * @brief Checks if all the rows have been read in a partitioned dataset, or if reading a single file checks if + * all the RowGroups and every row in the last group has been read. * - * @return true If we are partitioning. - * @return false If we are writing a single file. + * @return True if there are more rows to be read and false if else. */ -bool ParquetHelper::partSetting() +bool ParquetReader::shouldRead() { - return partition; + if (scanner) + return !(totalRowsProcessed >= totalRowCount); + else + return !(tablesProcessed >= tableCount && rowsProcessed >= rowsCount); } /** - * @brief Returns the maximum size of the row group set by the user. Default is 1000. + * @brief Iterates to the correct starting RecordBatch in a partitioned dataset. * - * @return int Maximum size of the row group. + * @return arrow::Result A pointer to the current table. */ -__int64 ParquetHelper::getMaxRowSize() +arrow::Result> ParquetReader::queryRows() { - return rowSize; + // If no tables have been processed find the starting RecordBatch + if (tablesProcessed == 0) + { + // Start by getting the number of rows in the first group and checking if it includes this workers startRow + __int64 offset = (*rbatchItr)->get()->num_rows(); + while (offset < startRow) + { + rbatchItr++; + offset += (*rbatchItr)->get()->num_rows(); + } + // If startRow is in the middle of a table skip processing the beginning of the batch + rowsProcessed = (*rbatchItr)->get()->num_rows() - (offset - startRow); + } + // Convert the current batch to a table + PARQUET_ASSIGN_OR_THROW(auto batch, *rbatchItr); + rbatchItr++; + std::vector> toTable = {batch}; + return std::move(arrow::Table::FromRecordBatches(std::move(toTable))); } -char ParquetHelper::queryPartOptions() +/** + * @brief Updates the current table if all the rows have been proccessed. Sets nextTable to the current TableColumns object. + * + * @param nextTable The memory address of the TableColumns object containing the current table. + * @return __int64 The number of rows that have been processed and the current index in the columns. + */ +__int64 ParquetReader::next(TableColumns *&nextTable) { - if (partOption[0] == 'W' || partOption[0] == 'w') + if (rowsProcessed == rowsCount) { - return 'w'; + std::shared_ptr table; + if (endsWithIgnoreCase(partOption.c_str(), "partition")) + { + PARQUET_ASSIGN_OR_THROW(table, queryRows()); + } + else + { + reportIfFailure(queryCurrentTable(tablesProcessed + startRowGroup)->ReadTable(&table)); + } + rowsProcessed = 0; + tablesProcessed++; + rowsCount = table->num_rows(); + splitTable(table); } - else if (partOption[0] == 'R' || partOption[0] == 'r') + nextTable = &parquetTable; + totalRowsProcessed++; + return rowsProcessed++; +} + +/** + * @brief Constructs a ParquetWriter for the target destination and checks for existing data. + * + * @param option The read or write option as well as information about partitioning. + * @param _destination The full path to which to write a Parquet file or partitioned dataset. Can be a filename or directory. + * @param _maxRowCountInBatch The max number of rows when creating RecordBatches for output. + * @param _overwrite If true when the plugin calls checkDirContents the target directory contents will be deleted. + * @param _compressionOption Compression option for writing compressed Parquet files of different types. + * @param _activityCtx Additional context about the thor workers running. + */ +ParquetWriter::ParquetWriter(const char *option, const char *_destination, int _maxRowCountInBatch, bool _overwrite, arrow::Compression::type _compressionOption, const char *_partitionFields, const IThorActivityContext *_activityCtx) + : partOption(option), destination(_destination), maxRowCountInBatch(_maxRowCountInBatch), overwrite(_overwrite), compressionOption(_compressionOption), activityCtx(_activityCtx) +{ + pool = arrow::default_memory_pool(); + parquetDoc = std::vector(maxRowCountInBatch); + if (activityCtx->querySlave() == 0 && startsWithIgnoreCase(partOption.c_str(), "write")) { - return 'r'; + reportIfFailure(checkDirContents()); } - else + if (endsWithIgnoreCase(partOption.c_str(), "partition")) { - failx("Invalid options parameter."); + std::stringstream ss(_partitionFields); + std::string field; + while (std::getline(ss, field, ';')) + { + partitionFields.push_back(field); + } } } +ParquetWriter::~ParquetWriter() +{ + pool->ReleaseUnused(); + jsonAlloc.Clear(); +} + /** - * @brief Checks if all the rows have been read and if reading a single file all of the - * RowGroups as well. + * @brief Opens a write stream depending on if the user is writing a partitioned file or regular file. * - * @return True if there are more rows to be read and false if else. + * @return Status object arrow::Status::OK if successful. */ -bool ParquetHelper::shouldRead() +arrow::Status ParquetWriter::openWriteFile() { - if (partition) - return !(totalRowsProcessed >= totalRowCount); + if (destination.empty()) + { + failx("Invalid option: The destination was not supplied."); + } + if (endsWithIgnoreCase(partOption.c_str(), "partition")) + { + ARROW_ASSIGN_OR_RAISE(auto filesystem, arrow::fs::FileSystemFromUriOrPath(destination)); + auto format = std::make_shared(); + writeOptions.file_write_options = format->DefaultWriteOptions(); + writeOptions.filesystem = filesystem; + writeOptions.base_dir = destination; + writeOptions.partitioning = partitionType; + writeOptions.existing_data_behavior = arrow::dataset::ExistingDataBehavior::kOverwriteOrIgnore; + } else - return !(tablesProcessed >= tableCount && rowsProcessed >= rowsCount); + { + if(!endsWith(destination.c_str(), ".parquet")) + failx("Error opening file: Invalid file extension for file %s", destination.c_str()); + + // Currently under the assumption that all channels and workers are given a worker id and no matter + // the configuration will show up in activityCtx->numSlaves() + if (activityCtx->numSlaves() > 1) + { + destination.insert(destination.find(".parquet"), std::to_string(activityCtx->querySlave())); + } + + std::shared_ptr outfile; + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(destination)); + + // Choose compression + std::shared_ptr props = parquet::WriterProperties::Builder().compression(compressionOption)->build(); + + // Opt to store Arrow schema for easier reads back into Arrow + std::shared_ptr arrowProps = parquet::ArrowWriterProperties::Builder().store_schema()->build(); + + // Create a writer + ARROW_ASSIGN_OR_RAISE(writer, parquet::arrow::FileWriter::Open(*schema.get(), pool, outfile, props, arrowProps)); + } + return arrow::Status::OK(); } -__int64 &ParquetHelper::getRowsProcessed() +/** + * @brief Writes a single record batch to a partitioned dataset. + * + * @param table An arrow table to write out. + * @return Status object arrow::Status::OK if successful. +*/ +arrow::Status ParquetWriter::writePartition(std::shared_ptr table) { - return rowsProcessed; + // Create dataset for writing partitioned files. + auto dataset = std::make_shared(table); + + StringBuffer basenameTemplate; + basenameTemplate.appendf("part_%d{i}_%lld.parquet",activityCtx->querySlave(), tablesProcessed++); + writeOptions.basename_template = basenameTemplate.str(); + + ARROW_ASSIGN_OR_RAISE(auto scannerBuilder, dataset->NewScan()); + reportIfFailure(scannerBuilder->Pool(pool)); + ARROW_ASSIGN_OR_RAISE(auto scanner, scannerBuilder->Finish()); + + // Write partitioned files. + reportIfFailure(arrow::dataset::FileSystemDataset::Write(writeOptions, scanner)); + + return arrow::Status::OK(); } -arrow::Result> ParquetHelper::convertToRecordBatch( - const std::vector &rows, std::shared_ptr schema) +/** + * @brief Converts the vector of rapidjson::Documents into an arrow::RecordBatch and writes it to + * a file or partitioned dataset. + */ +void ParquetWriter::writeRecordBatch() { - // RecordBatchBuilder will create array builders for us for each field in our - // schema. By passing the number of output rows (`rows.size()`) we can - // pre-allocate the correct size of arrays, except of course in the case of - // string, byte, and list arrays, which have dynamic lengths. - std::unique_ptr batch_builder; - ARROW_ASSIGN_OR_RAISE( - batch_builder, - arrow::RecordBatchBuilder::Make(schema, pool, rows.size())); + // Convert row_batch vector to RecordBatch and write to file. + PARQUET_ASSIGN_OR_THROW(auto recordBatch, convertToRecordBatch(parquetDoc, schema)); + // Write each batch as a row_groups + PARQUET_ASSIGN_OR_THROW(auto table, arrow::Table::FromRecordBatches(schema, {recordBatch})); - // Inner converter will take rows and be responsible for appending values - // to provided array builders. - JsonValueConverter converter(rows); - for (int i = 0; i < batch_builder->num_fields(); ++i) + if (endsWithIgnoreCase(partOption.c_str(), "partition")) { - std::shared_ptr field = schema->field(i); - arrow::ArrayBuilder *builder = batch_builder->GetField(i); - ARROW_RETURN_NOT_OK(converter.Convert(*field.get(), builder)); + reportIfFailure(writePartition(table)); } + else + { + reportIfFailure(writer->WriteTable(*(table.get()), recordBatch->num_rows())); + } +} - std::shared_ptr batch; - ARROW_ASSIGN_OR_RAISE(batch, batch_builder->Flush()); +/** + * @brief Converts the vector of rapidjson::Documents into an arrow::RecordBatch and writes it to + * a file or partitioned dataset. Resizes the vector before converting to a RecordBatch. + * + * @param newSize The new size of the vector. + */ +void ParquetWriter::writeRecordBatch(std::size_t newSize) +{ + parquetDoc.resize(newSize); + writeRecordBatch(); +} - // Use RecordBatch::ValidateFull() to make sure arrays were correctly constructed. - reportIfFailure(batch->ValidateFull()); - return batch; +/** + * @brief Returns a pointer to the top of the stack for the current row being built. + * + * @return A pointer to the rapidjson::Value containing the row + */ +rapidjson::Value *ParquetWriter::queryCurrentRow() +{ + return &rowStack[rowStack.size() - 1]; } -arrow::Result> ParquetHelper::queryRows() +/** + * @brief A helper method for updating the current row on writes and keeping + * it within the boundary of the maxRowCountInBatch set by the user when creating RowGroups. + */ +void ParquetWriter::updateRow() { - if (tablesProcessed == 0) - { - __int64 offset = (*rbatchItr)->get()->num_rows(); - while (offset < startRow) - { - rbatchItr++; - offset += (*rbatchItr)->get()->num_rows(); - } - rowsProcessed = (*rbatchItr)->get()->num_rows() - (offset - startRow); - } - PARQUET_ASSIGN_OR_THROW(auto batch, *rbatchItr); - rbatchItr++; - std::vector> to_table = {batch}; - return std::move(arrow::Table::FromRecordBatches(std::move(to_table))); + if (++currentRow == maxRowCountInBatch) + currentRow = 0; } -std::unordered_map> &ParquetHelper::next() +/** + * @brief Convert a vector of rapidjson::Documents containing single rows to an arrow::RecordBatch + * + * @param rows The vector of rows to be converted. + * @param schema The arrow::Schema of the rows being converted. + * @return An arrow::Result object containing the new RecordBatch. + */ +arrow::Result> ParquetWriter::convertToRecordBatch(const std::vector &rows, std::shared_ptr schema) { - if (rowsProcessed == rowsCount) - { - if (partition) - { - // rowsProcessed starts at zero and we read in batches until it is equal to rowsCount - rowsProcessed = 0; - tablesProcessed++; - std::shared_ptr table; - PARQUET_ASSIGN_OR_THROW(table, queryRows()); - rowsCount = table->num_rows(); - chunkTable(table); - } - else - { - std::shared_ptr table; - reportIfFailure(queryCurrentTable(tablesProcessed + startRowGroup)->ReadTable(&table)); - rowsProcessed = 0; - tablesProcessed++; - rowsCount = table->num_rows(); - chunkTable(table); - } + // Create RecordBatchBuilder from schema and set the size + std::unique_ptr batchBuilder; + ARROW_ASSIGN_OR_RAISE(batchBuilder, arrow::RecordBatchBuilder::Make(schema, pool, rows.size())); + + JsonValueConverter converter(rows); + for (int i = 0; i < batchBuilder->num_fields(); ++i) + { + std::shared_ptr field = schema->field(i); + arrow::ArrayBuilder *builder = batchBuilder->GetField(i); + ARROW_RETURN_NOT_OK(converter.Convert(*field.get(), builder)); } - totalRowsProcessed++; - return parquetTable; -} -__int64 ParquetHelper::queryRowsCount() -{ - return rowsCount; + std::shared_ptr batch; + ARROW_ASSIGN_OR_RAISE(batch, batchBuilder->Flush()); + + // Use RecordBatch::ValidateFull() to make sure arrays were correctly constructed. + reportIfFailure(batch->ValidateFull()); + return batch; } /** @@ -656,10 +601,9 @@ __int64 ParquetHelper::queryRowsCount() * the ECL RtlFieldInfo object into arrow::Fields for creating a rapidjson document object. * * @param field The field containing metadata for the record. - * - * @returns An arrow::Structype holding the schema and fields of the child records. + * @returns An arrow::NestedType holding the schema and fields of the child records. */ -std::shared_ptr ParquetHelper::makeChildRecord(const RtlFieldInfo *field) +std::shared_ptr ParquetWriter::makeChildRecord(const RtlFieldInfo *field) { const RtlTypeInfo *typeInfo = field->type; const RtlFieldInfo *const *fields = typeInfo->queryFields(); @@ -668,23 +612,23 @@ std::shared_ptr ParquetHelper::makeChildRecord(const RtlField { int count = getNumFields(typeInfo); - std::vector> child_fields; + std::vector> childFields; for (int i = 0; i < count; i++, fields++) { - reportIfFailure(fieldToNode((*fields)->name, *fields, child_fields)); + reportIfFailure(fieldToNode((*fields)->name, *fields, childFields)); } - return std::make_shared(child_fields); + return std::make_shared(childFields); } else { // Create set const RtlTypeInfo *child = typeInfo->queryChildType(); - const RtlFieldInfo childField = RtlFieldInfo("", "", child); - std::vector> child_field; - reportIfFailure(fieldToNode(childField.name, &childField, child_field)); - return std::make_shared(child_field[0]); + const RtlFieldInfo childFieldInfo = RtlFieldInfo("", "", child); + std::vector> childField; + reportIfFailure(fieldToNode(childFieldInfo.name, &childFieldInfo, childField)); + return std::make_shared(childField[0]); } } @@ -692,78 +636,75 @@ std::shared_ptr ParquetHelper::makeChildRecord(const RtlField * @brief Converts an RtlFieldInfo object into an arrow field and adds it to the output vector. * * @param name The name of the field - * * @param field The field containing metadata for the record. - * - * @param arrow_fields Output vector for pushing new nodes to. - * + * @param arrowFields Output vector for pushing new nodes to. * @return Status of the operation */ -arrow::Status ParquetHelper::fieldToNode(const std::string &name, const RtlFieldInfo *field, std::vector> &arrow_fields) +arrow::Status ParquetWriter::fieldToNode(const std::string &name, const RtlFieldInfo *field, std::vector> &arrowFields) { unsigned len = field->type->length; switch (field->type->getType()) { case type_boolean: - arrow_fields.push_back(std::make_shared(name, arrow::boolean())); + arrowFields.push_back(std::make_shared(name, arrow::boolean())); break; case type_int: if (field->type->isSigned()) { if (len > 4) { - arrow_fields.push_back(std::make_shared(name, arrow::int64())); + arrowFields.push_back(std::make_shared(name, arrow::int64())); } else { - arrow_fields.push_back(std::make_shared(name, arrow::int32())); + arrowFields.push_back(std::make_shared(name, arrow::int32())); } } else { if (len > 4) { - arrow_fields.push_back(std::make_shared(name, arrow::uint64())); + arrowFields.push_back(std::make_shared(name, arrow::uint64())); } else { - arrow_fields.push_back(std::make_shared(name, arrow::uint32())); + arrowFields.push_back(std::make_shared(name, arrow::uint32())); } } break; case type_real: - arrow_fields.push_back(std::make_shared(name, arrow::float64())); + arrowFields.push_back(std::make_shared(name, arrow::float64())); break; case type_string: - arrow_fields.push_back(std::make_shared(name, arrow::utf8())); + arrowFields.push_back(std::make_shared(name, arrow::utf8())); break; case type_char: - arrow_fields.push_back(std::make_shared(name, arrow::utf8())); + arrowFields.push_back(std::make_shared(name, arrow::utf8())); break; case type_varstring: - arrow_fields.push_back(std::make_shared(name, arrow::utf8())); + arrowFields.push_back(std::make_shared(name, arrow::utf8())); break; case type_qstring: - arrow_fields.push_back(std::make_shared(name, arrow::utf8())); + arrowFields.push_back(std::make_shared(name, arrow::utf8())); break; case type_unicode: - arrow_fields.push_back(std::make_shared(name, arrow::utf8())); + arrowFields.push_back(std::make_shared(name, arrow::utf8())); break; case type_utf8: - arrow_fields.push_back(std::make_shared(name, arrow::utf8())); + arrowFields.push_back(std::make_shared(name, arrow::utf8())); break; case type_decimal: - arrow_fields.push_back(std::make_shared(name, arrow::utf8())); + arrowFields.push_back(std::make_shared(name, arrow::utf8())); break; case type_data: - arrow_fields.push_back(std::make_shared(name, arrow::large_binary())); + arrowFields.push_back(std::make_shared(name, arrow::large_binary())); break; case type_record: - arrow_fields.push_back(std::make_shared(name, makeChildRecord(field))); + arrowFields.push_back(std::make_shared(name, makeChildRecord(field))); break; case type_set: - arrow_fields.push_back(std::make_shared(name, makeChildRecord(field))); + arrowFields.push_back(std::make_shared(name, makeChildRecord(field))); break; default: failx("Datatype %i is not compatible with this plugin.", field->type->getType()); @@ -774,24 +715,26 @@ arrow::Status ParquetHelper::fieldToNode(const std::string &name, const RtlField /** * @brief Creates an arrow::Schema from the field info of the row. + * * @param typeInfo An RtlTypeInfo object that we iterate through to get all * the information for the row. */ -arrow::Status ParquetHelper::fieldsToSchema(const RtlTypeInfo *typeInfo) +arrow::Status ParquetWriter::fieldsToSchema(const RtlTypeInfo *typeInfo) { const RtlFieldInfo *const *fields = typeInfo->queryFields(); int count = getNumFields(typeInfo); - std::vector> arrow_fields; + std::vector> arrowFields; for (int i = 0; i < count; i++, fields++) { - ARROW_RETURN_NOT_OK(fieldToNode((*fields)->name, *fields, arrow_fields)); + ARROW_RETURN_NOT_OK(fieldToNode((*fields)->name, *fields, arrowFields)); } - schema = std::make_shared(arrow_fields); + schema = std::make_shared(arrowFields); - if (partition) + // If writing a partitioned file also create the partitioning schema from the partitionFields set by the user + if (endsWithIgnoreCase(partOption.c_str(), "partition")) { arrow::FieldVector partitionSchema; for (int i = 0; i < partitionFields.size(); i++) @@ -815,18 +758,18 @@ arrow::Status ParquetHelper::fieldsToSchema(const RtlTypeInfo *typeInfo) } /** - * @brief Creates a rapidjson::Value and adds it to the stack + * @brief Creates a rapidjson::Value with an array type and adds it to the stack */ -void ParquetHelper::beginSet() +void ParquetWriter::beginSet() { rapidjson::Value row(rapidjson::kArrayType); rowStack.push_back(std::move(row)); } /** - * @brief Creates a rapidjson::Value and adds it to the stack + * @brief Creates a rapidjson::Value with an object type and adds it to the stack */ -void ParquetHelper::beginRow() +void ParquetWriter::beginRow() { rapidjson::Value row(rapidjson::kObjectType); rowStack.push_back(std::move(row)); @@ -835,8 +778,10 @@ void ParquetHelper::beginRow() /** * @brief Removes the value from the top of the stack and adds it the parent row. * If there is only one value on the stack then it converts it to a rapidjson::Document. + * + * @param name The name of the row field. */ -void ParquetHelper::endRow(const char *name) +void ParquetWriter::endRow(const char *name) { if (rowStack.size() > 1) { @@ -858,25 +803,96 @@ void ParquetHelper::endRow(const char *name) } } -ParquetRowStream::ParquetRowStream(IEngineRowAllocator *_resultAllocator, std::shared_ptr _parquet) - : m_resultAllocator(_resultAllocator), s_parquet(_parquet) +/** + * @brief Adds a key value pair to the current row being built for writing to parquet. + * + * @param key Field name of the column. + * @param value Value of the field. + */ +void ParquetWriter::addMember(rapidjson::Value &key, rapidjson::Value &value) +{ + rapidjson::Value *row = &rowStack[rowStack.size() - 1]; + if(!row) + failx("Failed to add member to rapidjson row"); + if (row->GetType() == rapidjson::kObjectType) + row->AddMember(key, value, jsonAlloc); + else + row->PushBack(value, jsonAlloc); +} + +/** + * @brief Check the contents of the target location set by the user. If the overwrite option + * is true then any files in the target directory or matching the file mask will be deleted. + * + * @return arrow::Status::OK if all operations successful. + */ +arrow::Status ParquetWriter::checkDirContents() { - rowsCount = _parquet->queryRowsCount(); + if (destination.empty()) + { + failx("Missing target location when writing Parquet data."); + } + StringBuffer path; + StringBuffer filename; + StringBuffer ext; + splitFilename(destination.c_str(), nullptr, &path, &filename, &ext, false); + + ARROW_ASSIGN_OR_RAISE(auto filesystem, arrow::fs::FileSystemFromUriOrPath(destination)); + + Owned itr = createDirectoryIterator(path.str(), filename.appendf("*%s", ext.str())); + ForEach (*itr) + { + IFile &file = itr->query(); + if (file.isFile() == fileBool::foundYes) + { + if(overwrite) + { + if (!file.remove()) + { + failx("Failed to remove file %s", file.queryFilename()); + } + } + else + { + failx("The target file %s already exists. To delete the file set the overwrite option to true.", file.queryFilename()); + } + } + else + { + if (overwrite) + { + reportIfFailure(filesystem->DeleteDirContents(path.str())); + break; + } + else + { + failx("The target directory %s is not empty. To delete the contents of the directory set the overwrite option to true.", path.str()); + } + } + } + return arrow::Status::OK(); } +/** + * @brief Create a ParquetRowBuilder and build a row. If all the rows in a table have been processed a + * new table will be read from the input file. + * + * @return const void * Memory Address where result row is stored. + */ const void *ParquetRowStream::nextRow() { - if (m_shouldRead && s_parquet->shouldRead()) + if (shouldRead && parquetReader->shouldRead()) { - auto table = s_parquet->next(); - m_currentRow++; + TableColumns *table = nullptr; + auto index = parquetReader->next(table); + currentRow++; - if (!table.empty()) + if (table && !table->empty()) { - ParquetRowBuilder pRowBuilder(&table, s_parquet->getRowsProcessed()++, &array_visitor); + ParquetRowBuilder pRowBuilder(table, index); - RtlDynamicRowBuilder rowBuilder(m_resultAllocator); - const RtlTypeInfo *typeInfo = m_resultAllocator->queryOutputMeta()->queryTypeInfo(); + RtlDynamicRowBuilder rowBuilder(resultAllocator); + const RtlTypeInfo *typeInfo = resultAllocator->queryOutputMeta()->queryTypeInfo(); assertex(typeInfo); RtlFieldStrInfo dummyField("", NULL, typeInfo); size32_t len = typeInfo->build(rowBuilder, 0, &dummyField, pRowBuilder); @@ -888,12 +904,21 @@ const void *ParquetRowStream::nextRow() return nullptr; } +/** + * @brief Stop reading result rows from the Parquet file. + */ void ParquetRowStream::stop() { - m_resultAllocator.clear(); - m_shouldRead = false; + resultAllocator.clear(); + shouldRead = false; } +/** + * @brief Utility function for getting the xpath or field name from an RtlFieldInfo object. + * + * @param outXPath The buffer for storing output. + * @param field RtlFieldInfo object storing metadata for field. + */ void ParquetRowBuilder::xpathOrName(StringBuffer &outXPath, const RtlFieldInfo *field) const { outXPath.clear(); @@ -924,125 +949,163 @@ void ParquetRowBuilder::xpathOrName(StringBuffer &outXPath, const RtlFieldInfo * } } +/** + * @brief Gets the current array index taking into account the nested status of the row. + * + * @return int64_t The current array index of the value. + */ int64_t ParquetRowBuilder::currArrayIndex() { - return !m_pathStack.empty() && m_pathStack.back().nodeType == CPNTSet ? m_pathStack.back().childrenProcessed++ : currentRow; + return !pathStack.empty() && pathStack.back().nodeType == CPNTSet ? pathStack.back().childrenProcessed++ : currentRow; } -__int64 getSigned(std::shared_ptr *array_visitor, int index) +/** + * @brief Returns a Signed value depending on the size of the integer that was stored in parquet. + * + * @param arrayVisitor The ParquetVisitor class for getting a pointer to the column. + * @param index The index in the array to read a value from. + * @return __int64 Result value in the array.. + */ +__int64 getSigned(std::shared_ptr &arrayVisitor, int index) { - switch ((*array_visitor)->size) + switch (arrayVisitor->size) { case 8: - return (*array_visitor)->int8_arr->Value(index); + return arrayVisitor->int8Arr->Value(index); case 16: - return (*array_visitor)->int16_arr->Value(index); + return arrayVisitor->int16Arr->Value(index); case 32: - return (*array_visitor)->int32_arr->Value(index); + return arrayVisitor->int32Arr->Value(index); case 64: - return (*array_visitor)->int64_arr->Value(index); + return arrayVisitor->int64Arr->Value(index); default: - failx("getSigned: Invalid size %i", (*array_visitor)->size); + failx("getSigned: Invalid size %i", arrayVisitor->size); } } -unsigned __int64 getUnsigned(std::shared_ptr *array_visitor, int index) +/** + * @brief Returns an Unsigned value depending on the size of the unsigned integer that was stored in parquet. + * + * @param arrayVisitor The ParquetVisitor class for getting a pointer to the column. + * @param index The index in the array to read a value from. + * @return unsigned __int64 Result value in the array. + */ +unsigned __int64 getUnsigned(std::shared_ptr &arrayVisitor, int index) { - switch ((*array_visitor)->size) + switch (arrayVisitor->size) { case 8: - return (*array_visitor)->uint8_arr->Value(index); + return arrayVisitor->uint8Arr->Value(index); case 16: - return (*array_visitor)->uint16_arr->Value(index); + return arrayVisitor->uint16Arr->Value(index); case 32: - return (*array_visitor)->uint32_arr->Value(index); + return arrayVisitor->uint32Arr->Value(index); case 64: - return (*array_visitor)->uint64_arr->Value(index); + return arrayVisitor->uint64Arr->Value(index); default: - failx("getUnsigned: Invalid size %i", (*array_visitor)->size); + failx("getUnsigned: Invalid size %i", arrayVisitor->size); } } -double getReal(std::shared_ptr *array_visitor, int index) +/** + * @brief Returns a Real value depending on the size of the double that was stored in parquet. + * + * @param arrayVisitor The ParquetVisitor class for getting a pointer to the column. + * @param index The index in the array to read a value from. + * @return double Result value in the array. + */ +double getReal(std::shared_ptr &arrayVisitor, int index) { - switch ((*array_visitor)->size) + switch (arrayVisitor->size) { case 2: - return (*array_visitor)->half_float_arr->Value(index); + return arrayVisitor->halfFloatArr->Value(index); case 4: - return (*array_visitor)->float_arr->Value(index); + return arrayVisitor->floatArr->Value(index); case 8: - return (*array_visitor)->double_arr->Value(index); + return arrayVisitor->doubleArr->Value(index); default: - failx("getReal: Invalid size %i", (*array_visitor)->size); + failx("getReal: Invalid size %i", arrayVisitor->size); } } +/** + * @brief Gets the value as a string_view. If the field is a numeric type it is serialized to a StringBuffer. + * + * @param field Field information used for warning the user if a type is unsupported. + * @return std::string_view A view of the current result. + */ std::string_view ParquetRowBuilder::getCurrView(const RtlFieldInfo *field) { serialized.clear(); - switch((*array_visitor)->type) + switch(arrayVisitor->type) { case BoolType: - tokenSerializer.serialize((*array_visitor)->bool_arr->Value(currArrayIndex()), serialized); + tokenSerializer.serialize(arrayVisitor->boolArr->Value(currArrayIndex()), serialized); return serialized.str(); case BinaryType: - return (*array_visitor)->bin_arr->GetView(currArrayIndex()); + return arrayVisitor->binArr->GetView(currArrayIndex()); case LargeBinaryType: - return (*array_visitor)->large_bin_arr->GetView(currArrayIndex()); + return arrayVisitor->largeBinArr->GetView(currArrayIndex()); case RealType: - tokenSerializer.serialize(getReal(array_visitor, currArrayIndex()), serialized); + tokenSerializer.serialize(getReal(arrayVisitor, currArrayIndex()), serialized); return serialized.str(); case IntType: - tokenSerializer.serialize(getSigned(array_visitor, currArrayIndex()), serialized); + tokenSerializer.serialize(getSigned(arrayVisitor, currArrayIndex()), serialized); return serialized.str(); case UIntType: - tokenSerializer.serialize(getUnsigned(array_visitor, currArrayIndex()), serialized); + tokenSerializer.serialize(getUnsigned(arrayVisitor, currArrayIndex()), serialized); return serialized.str(); case DateType: - tokenSerializer.serialize((*array_visitor)->size == 32 ? (__int32) (*array_visitor)->date32_arr->Value(currArrayIndex()) : (__int64) (*array_visitor)->date64_arr->Value(currArrayIndex()), serialized); + tokenSerializer.serialize(arrayVisitor->size == 32 ? (__int32) arrayVisitor->date32Arr->Value(currArrayIndex()) : (__int64) arrayVisitor->date64Arr->Value(currArrayIndex()), serialized); return serialized.str(); case TimestampType: - tokenSerializer.serialize((__int64) (*array_visitor)->timestamp_arr->Value(currArrayIndex()), serialized); + tokenSerializer.serialize((__int64) arrayVisitor->timestampArr->Value(currArrayIndex()), serialized); return serialized.str(); case TimeType: - tokenSerializer.serialize((*array_visitor)->size == 32 ? (__int32) (*array_visitor)->time32_arr->Value(currArrayIndex()) : (__int64) (*array_visitor)->time64_arr->Value(currArrayIndex()), serialized); + tokenSerializer.serialize(arrayVisitor->size == 32 ? (__int32) arrayVisitor->time32Arr->Value(currArrayIndex()) : (__int64) arrayVisitor->time64Arr->Value(currArrayIndex()), serialized); return serialized.str(); case DurationType: - tokenSerializer.serialize((__int64) (*array_visitor)->duration_arr->Value(currArrayIndex()), serialized); + tokenSerializer.serialize((__int64) arrayVisitor->durationArr->Value(currArrayIndex()), serialized); return serialized.str(); case StringType: - return (*array_visitor)->string_arr->GetView(currArrayIndex()); + return arrayVisitor->stringArr->GetView(currArrayIndex()); case LargeStringType: - return (*array_visitor)->large_string_arr->GetView(currArrayIndex()); + return arrayVisitor->largeStringArr->GetView(currArrayIndex()); case DecimalType: - return (*array_visitor)->size == 128 ? (*array_visitor)->dec_arr->GetView(currArrayIndex()) : (*array_visitor)->large_dec_arr->GetView(currArrayIndex()); + return arrayVisitor->size == 128 ? arrayVisitor->decArr->GetView(currArrayIndex()) : arrayVisitor->largeDecArr->GetView(currArrayIndex()); default: failx("Unimplemented Parquet type for field with name %s.", field->name); } } +/** + * @brief Get the current value as an Integer. + * + * @param field Field information used for warning the user if a type is unsupported. + * @return __int64 The current value in the column. + */ __int64 ParquetRowBuilder::getCurrIntValue(const RtlFieldInfo *field) { - switch ((*array_visitor)->type) + switch (arrayVisitor->type) { case BoolType: - return (*array_visitor)->bool_arr->Value(currArrayIndex()); + return arrayVisitor->boolArr->Value(currArrayIndex()); case IntType: - return getSigned(array_visitor, currArrayIndex()); + return getSigned(arrayVisitor, currArrayIndex()); case UIntType: - return getUnsigned(array_visitor, currArrayIndex()); + return getUnsigned(arrayVisitor, currArrayIndex()); case RealType: - return getReal(array_visitor, currArrayIndex()); + return getReal(arrayVisitor, currArrayIndex()); case DateType: - return (*array_visitor)->size == 32 ? (*array_visitor)->date32_arr->Value(currArrayIndex()) : (*array_visitor)->date64_arr->Value(currArrayIndex()); + return arrayVisitor->size == 32 ? arrayVisitor->date32Arr->Value(currArrayIndex()) : arrayVisitor->date64Arr->Value(currArrayIndex()); case TimestampType: - return (*array_visitor)->timestamp_arr->Value(currArrayIndex()); + return arrayVisitor->timestampArr->Value(currArrayIndex()); case TimeType: - return (*array_visitor)->size == 32 ? (*array_visitor)->time32_arr->Value(currArrayIndex()) : (*array_visitor)->time64_arr->Value(currArrayIndex()); + return arrayVisitor->size == 32 ? arrayVisitor->time32Arr->Value(currArrayIndex()) : arrayVisitor->time64Arr->Value(currArrayIndex()); case DurationType: - return (*array_visitor)->duration_arr->Value(currArrayIndex()); + return arrayVisitor->durationArr->Value(currArrayIndex()); default: { __int64 myint64 = 0; @@ -1054,26 +1117,32 @@ __int64 ParquetRowBuilder::getCurrIntValue(const RtlFieldInfo *field) } } +/** + * @brief Get the current value as a Double. + * + * @param field Field information used for warning the user if a type is unsupported. + * @return double The current value in the column. + */ double ParquetRowBuilder::getCurrRealValue(const RtlFieldInfo *field) { - switch ((*array_visitor)->type) + switch (arrayVisitor->type) { case BoolType: - return (*array_visitor)->bool_arr->Value(currArrayIndex()); + return arrayVisitor->boolArr->Value(currArrayIndex()); case IntType: - return getSigned(array_visitor, currArrayIndex()); + return getSigned(arrayVisitor, currArrayIndex()); case UIntType: - return getUnsigned(array_visitor, currArrayIndex()); + return getUnsigned(arrayVisitor, currArrayIndex()); case RealType: - return getReal(array_visitor, currArrayIndex()); + return getReal(arrayVisitor, currArrayIndex()); case DateType: - return (*array_visitor)->size == 32 ? (*array_visitor)->date32_arr->Value(currArrayIndex()) : (*array_visitor)->date64_arr->Value(currArrayIndex()); + return arrayVisitor->size == 32 ? arrayVisitor->date32Arr->Value(currArrayIndex()) : arrayVisitor->date64Arr->Value(currArrayIndex()); case TimestampType: - return (*array_visitor)->timestamp_arr->Value(currArrayIndex()); + return arrayVisitor->timestampArr->Value(currArrayIndex()); case TimeType: - return (*array_visitor)->size == 32 ? (*array_visitor)->time32_arr->Value(currArrayIndex()) : (*array_visitor)->time64_arr->Value(currArrayIndex()); + return arrayVisitor->size == 32 ? arrayVisitor->time32Arr->Value(currArrayIndex()) : arrayVisitor->time64Arr->Value(currArrayIndex()); case DurationType: - return (*array_visitor)->duration_arr->Value(currArrayIndex()); + return arrayVisitor->durationArr->Value(currArrayIndex()); default: { double mydouble = 0.0; @@ -1095,7 +1164,7 @@ bool ParquetRowBuilder::getBooleanResult(const RtlFieldInfo *field) { nextField(field); - if ((*array_visitor)->type == NullType) + if (arrayVisitor->type == NullType) { NullFieldProcessor p(field); return p.boolResult; @@ -1105,17 +1174,17 @@ bool ParquetRowBuilder::getBooleanResult(const RtlFieldInfo *field) } /** - * @brief Gets a data result from the result row and passes it back to engine through result. + * @brief Gets a Data value from the result row. * * @param field Holds the value of the field. * @param len Length of the Data value. - * @param result Used for returning the result to the caller. + * @param result Pointer to return value stored in memory. */ void ParquetRowBuilder::getDataResult(const RtlFieldInfo *field, size32_t &len, void *&result) { nextField(field); - if ((*array_visitor)->type == NullType) + if (arrayVisitor->type == NullType) { NullFieldProcessor p(field); rtlUtf8ToDataX(len, result, p.resultChars, p.stringResult); @@ -1128,7 +1197,7 @@ void ParquetRowBuilder::getDataResult(const RtlFieldInfo *field, size32_t &len, } /** - * @brief Gets a real result from the result row. + * @brief Gets a Real value from the result row. * * @param field Holds the value of the field. * @return double Double value to return. @@ -1137,7 +1206,7 @@ double ParquetRowBuilder::getRealResult(const RtlFieldInfo *field) { nextField(field); - if ((*array_visitor)->type == NullType) + if (arrayVisitor->type == NullType) { NullFieldProcessor p(field); return p.doubleResult; @@ -1147,7 +1216,7 @@ double ParquetRowBuilder::getRealResult(const RtlFieldInfo *field) } /** - * @brief Gets the Signed Integer result from the result row. + * @brief Gets the Signed Integer value from the result row. * * @param field Holds the value of the field. * @return __int64 Value to return. @@ -1156,7 +1225,7 @@ __int64 ParquetRowBuilder::getSignedResult(const RtlFieldInfo *field) { nextField(field); - if ((*array_visitor)->type == NullType) + if (arrayVisitor->type == NullType) { NullFieldProcessor p(field); return p.intResult; @@ -1166,7 +1235,7 @@ __int64 ParquetRowBuilder::getSignedResult(const RtlFieldInfo *field) } /** - * @brief Gets the Unsigned Integer result from the result row. + * @brief Gets the Unsigned Integer value from the result row. * * @param field Holds the value of the field. * @return unsigned Value to return. @@ -1175,14 +1244,14 @@ unsigned __int64 ParquetRowBuilder::getUnsignedResult(const RtlFieldInfo *field) { nextField(field); - if ((*array_visitor)->type == NullType) + if (arrayVisitor->type == NullType) { NullFieldProcessor p(field); return p.uintResult; } - if ((*array_visitor)->type == UIntType) - return getUnsigned(array_visitor, currArrayIndex()); + if (arrayVisitor->type == UIntType) + return getUnsigned(arrayVisitor, currArrayIndex()); else return getCurrIntValue(field); } @@ -1192,13 +1261,13 @@ unsigned __int64 ParquetRowBuilder::getUnsignedResult(const RtlFieldInfo *field) * * @param field Holds the value of the field. * @param chars Number of chars in the String. - * @param result Variable used for returning string back to the caller. + * @param result Pointer to return value stored in memory. */ void ParquetRowBuilder::getStringResult(const RtlFieldInfo *field, size32_t &chars, char *&result) { nextField(field); - if ((*array_visitor)->type == NullType) + if (arrayVisitor->type == NullType) { NullFieldProcessor p(field); rtlUtf8ToStrX(chars, result, p.resultChars, p.stringResult); @@ -1211,17 +1280,17 @@ void ParquetRowBuilder::getStringResult(const RtlFieldInfo *field, size32_t &cha } /** - * @brief Gets a UTF8 from the result row. + * @brief Gets a UTF8 string from the result row. * * @param field Holds the value of the field. * @param chars Number of chars in the UTF8. - * @param result Variable used for returning UTF8 back to the caller. + * @param result Pointer to return value stored in memory. */ void ParquetRowBuilder::getUTF8Result(const RtlFieldInfo *field, size32_t &chars, char *&result) { nextField(field); - if ((*array_visitor)->type == NullType) + if (arrayVisitor->type == NullType) { NullFieldProcessor p(field); rtlUtf8ToUtf8X(chars, result, p.resultChars, p.stringResult); @@ -1234,17 +1303,17 @@ void ParquetRowBuilder::getUTF8Result(const RtlFieldInfo *field, size32_t &chars } /** - * @brief Gets a Unicode from the result row. + * @brief Gets a Unicode string from the result row. * * @param field Holds the value of the field. * @param chars Number of chars in the Unicode. - * @param result Variable used for returning Unicode back to the caller. + * @param result Pointer to return value stored in memory. */ void ParquetRowBuilder::getUnicodeResult(const RtlFieldInfo *field, size32_t &chars, UChar *&result) { nextField(field); - if ((*array_visitor)->type == NullType) + if (arrayVisitor->type == NullType) { NullFieldProcessor p(field); rtlUnicodeToUnicodeX(chars, result, p.resultChars, p.unicodeResult); @@ -1266,13 +1335,12 @@ void ParquetRowBuilder::getDecimalResult(const RtlFieldInfo *field, Decimal &val { nextField(field); - if ((*array_visitor)->type == NullType) + if (arrayVisitor->type == NullType) { NullFieldProcessor p(field); value.set(p.decimalResult); return; } - auto dvalue = getCurrView(field); value.setString(dvalue.size(), dvalue.data()); RtlDecimalTypeInfo *dtype = (RtlDecimalTypeInfo *)field->type; @@ -1291,11 +1359,11 @@ void ParquetRowBuilder::processBeginSet(const RtlFieldInfo *field, bool &isAll) isAll = false; // ALL not supported nextField(field); - if ((*array_visitor)->type == ListType) + if (arrayVisitor->type == ListType) { - PathTracker newPathNode(field->name, (*array_visitor)->list_arr, CPNTSet); - newPathNode.childCount = (*array_visitor)->list_arr->value_slice(currentRow)->length(); - m_pathStack.push_back(newPathNode); + PathTracker newPathNode(field->name, arrayVisitor->listArr, CPNTSet); + newPathNode.childCount = arrayVisitor->listArr->value_slice(currentRow)->length(); + pathStack.push_back(newPathNode); } else { @@ -1307,12 +1375,12 @@ void ParquetRowBuilder::processBeginSet(const RtlFieldInfo *field, bool &isAll) * @brief Checks if we should process another set. * * @param field Context information about the set. - * @return true If the children that we have process is less than the total child count. + * @return true If the children that we have processed is less than the total child count. * @return false If all the children sets have been processed. */ bool ParquetRowBuilder::processNextSet(const RtlFieldInfo *field) { - return m_pathStack.back().finishedChildren(); + return pathStack.back().finishedChildren(); } /** @@ -1340,9 +1408,9 @@ void ParquetRowBuilder::processBeginRow(const RtlFieldInfo *field) if (strncmp(xpath, "", 5) != 0) { nextField(field); - if ((*array_visitor)->type == StructType) + if (arrayVisitor->type == StructType) { - m_pathStack.push_back(PathTracker(field->name, (*array_visitor)->struct_arr, CPNTScalar)); + pathStack.push_back(PathTracker(field->name, arrayVisitor->structArr, CPNTScalar)); } else { @@ -1365,7 +1433,7 @@ void ParquetRowBuilder::processBeginRow(const RtlFieldInfo *field) */ bool ParquetRowBuilder::processNextRow(const RtlFieldInfo *field) { - return m_pathStack.back().childrenProcessed < m_pathStack.back().childCount; + return pathStack.back().childrenProcessed < pathStack.back().childCount; } /** @@ -1378,9 +1446,9 @@ void ParquetRowBuilder::processEndSet(const RtlFieldInfo *field) StringBuffer xpath; xpathOrName(xpath, field); - if (!xpath.isEmpty() && !m_pathStack.empty() && strcmp(xpath.str(), m_pathStack.back().nodeName) == 0) + if (!xpath.isEmpty() && !pathStack.empty() && strcmp(xpath.str(), pathStack.back().nodeName) == 0) { - m_pathStack.pop_back(); + pathStack.pop_back(); } } @@ -1406,15 +1474,15 @@ void ParquetRowBuilder::processEndRow(const RtlFieldInfo *field) if (!xpath.isEmpty()) { - if (!m_pathStack.empty()) + if (!pathStack.empty()) { - if (m_pathStack.back().nodeType == CPNTDataset) + if (pathStack.back().nodeType == CPNTDataset) { - m_pathStack.back().childrenProcessed++; + pathStack.back().childrenProcessed++; } - else if (strcmp(xpath.str(), m_pathStack.back().nodeName) == 0) + else if (strcmp(xpath.str(), pathStack.back().nodeName) == 0) { - m_pathStack.pop_back(); + pathStack.pop_back(); } } } @@ -1424,19 +1492,24 @@ void ParquetRowBuilder::processEndRow(const RtlFieldInfo *field) } } +/** + * @brief Applies a visitor to the nested value of a Struct or List field. + * + * @param field Information about the context of the field. + */ void ParquetRowBuilder::nextFromStruct(const RtlFieldInfo *field) { - auto structPtr = m_pathStack.back().structPtr; - reportIfFailure(structPtr->Accept((*array_visitor).get())); - if (m_pathStack.back().nodeType == CPNTScalar) + auto structPtr = pathStack.back().structPtr; + reportIfFailure(structPtr->Accept(arrayVisitor.get())); + if (pathStack.back().nodeType == CPNTScalar) { - auto child = (*array_visitor)->struct_arr->GetFieldByName(field->name); - reportIfFailure(child->Accept((*array_visitor).get())); + auto child = arrayVisitor->structArr->GetFieldByName(field->name); + reportIfFailure(child->Accept(arrayVisitor.get())); } - else if (m_pathStack.back().nodeType == CPNTSet) + else if (pathStack.back().nodeType == CPNTSet) { - auto child = (*array_visitor)->list_arr->value_slice(currentRow); - reportIfFailure(child->Accept((*array_visitor).get())); + auto child = arrayVisitor->listArr->value_slice(currentRow); + reportIfFailure(child->Accept(arrayVisitor.get())); } } @@ -1444,7 +1517,6 @@ void ParquetRowBuilder::nextFromStruct(const RtlFieldInfo *field) * @brief Gets the next field and processes it. * * @param field Information about the context of the next field. - * @return const char* Result of building field. */ void ParquetRowBuilder::nextField(const RtlFieldInfo *field) { @@ -1452,20 +1524,26 @@ void ParquetRowBuilder::nextField(const RtlFieldInfo *field) { failx("Field name is empty."); } - if (m_pathStack.size() > 0) + if (pathStack.size() > 0) { nextFromStruct(field); return; } - (*array_visitor) = std::make_shared(); - auto column = result_rows->find(field->xpath ? field->xpath : field->name); - if (column != result_rows->end()) + arrayVisitor = std::make_shared(); + auto column = resultRows->find(field->xpath ? field->xpath : field->name); + if (column != resultRows->end()) { - reportIfFailure(column->second->Accept((*array_visitor).get())); + reportIfFailure(column->second->Accept(arrayVisitor.get())); return; } } +/** + * @brief Logs what fields were bound to what index and increments the current parameter. + * + * @param field The field metadata. + * @return The current parameter index. + */ unsigned ParquetRecordBinder::checkNextParam(const RtlFieldInfo *field) { if (logctx.queryTraceLevel() > 4) @@ -1473,6 +1551,11 @@ unsigned ParquetRecordBinder::checkNextParam(const RtlFieldInfo *field) return thisParam++; } +/** + * @brief Counts the fields in the row. + * + * @return int The number of fields. + */ int ParquetRecordBinder::numFields() { int count = 0; @@ -1483,42 +1566,15 @@ int ParquetRecordBinder::numFields() return count; } -static void addMember(std::shared_ptr r_parquet, rapidjson::Value &key, rapidjson::Value &value) -{ - rapidjson::Value *row = r_parquet->queryCurrentRow(); - if(!row) - failx("Failed to add member to rapidjson row"); - if (row->GetType() == rapidjson::kObjectType) - row->AddMember(key, value, jsonAlloc); - else - row->PushBack(value, jsonAlloc); -} - -/** - * @brief Writes the value to the parquet file using the StreamWriter from the ParquetHelper class. - * - * @param len Number of chars in value. - * @param value pointer to value of parameter. - * @param field RtlFieldInfo holds meta information about the embed context. - * @param r_parquet Shared pointer to helper class that operates the parquet functions for us. - */ -void bindUtf8Param(unsigned len, const char *value, const RtlFieldInfo *field, std::shared_ptr r_parquet) -{ - rapidjson::Value key = rapidjson::Value(field->name, jsonAlloc); - rapidjson::Value val = rapidjson::Value(value, rtlUtf8Size(len, value), jsonAlloc); - - addMember(r_parquet, key, val); -} - /** - * @brief Writes the value to the parquet file using the StreamWriter from the ParquetHelper class. + * @brief Writes the value to the Parquet file using the StreamWriter from the ParquetWriter class. * * @param len Number of chars in value. - * @param value pointer to value of parameter. - * @param field RtlFieldInfo holds meta information about the embed context. - * @param r_parquet Shared pointer to helper class that operates the parquet functions for us. + * @param value Pointer to value of parameter. + * @param field RtlFieldInfo holds metadata about the field. + * @param parquetWriter ParquetWriter object that holds the rapidjson::Value vector for building the rows */ -void bindStringParam(unsigned len, const char *value, const RtlFieldInfo *field, std::shared_ptr r_parquet) +void bindStringParam(unsigned len, const char *value, const RtlFieldInfo *field, std::shared_ptr parquetWriter) { size32_t utf8chars; rtlDataAttr utf8; @@ -1527,124 +1583,7 @@ void bindStringParam(unsigned len, const char *value, const RtlFieldInfo *field, rapidjson::Value key = rapidjson::Value(field->name, jsonAlloc); rapidjson::Value val = rapidjson::Value(std::string(utf8.getstr(), rtlUtf8Size(utf8chars, utf8.getdata())), jsonAlloc); - addMember(r_parquet, key, val); -} - -/** - * @brief Writes the value to the parquet file using the StreamWriter from the ParquetHelper class. - * - * @param value pointer to value of parameter. - * @param field RtlFieldInfo holds meta information about the embed context. - * @param r_parquet Shared pointer to helper class that operates the parquet functions for us. - */ -void bindBoolParam(bool value, const RtlFieldInfo *field, std::shared_ptr r_parquet) -{ - rapidjson::Value key = rapidjson::Value(field->name, jsonAlloc); - rapidjson::Value val = rapidjson::Value(value); - - addMember(r_parquet, key, val); -} - -/** - * @brief Writes the value to the parquet file using the StreamWriter from the ParquetHelper class. - * - * @param len Number of chars in value. - * @param value pointer to value of parameter. - * @param field RtlFieldInfo holds meta information about the embed context. - * @param r_parquet Shared pointer to helper class that operates the parquet functions for us. - */ -void bindDataParam(unsigned len, const char *value, const RtlFieldInfo *field, std::shared_ptr r_parquet) -{ - rapidjson::Value key; - key.SetString(field->name, jsonAlloc); - rapidjson::Value val; - val.SetString(value, len, jsonAlloc); - - addMember(r_parquet, key, val); -} - -/** - * @brief Writes the value to the parquet file using the StreamWriter from the ParquetHelper class. - * - * @param value pointer to value of parameter. - * @param field RtlFieldInfo holds meta information about the embed context. - * @param r_parquet Shared pointer to helper class that operates the parquet functions for us. - */ -void bindIntParam(__int64 value, const RtlFieldInfo *field, std::shared_ptr r_parquet) -{ - int64_t val = value; - - rapidjson::Value key = rapidjson::Value(field->name, jsonAlloc); - rapidjson::Value num(val); - - addMember(r_parquet, key, num); -} - -/** - * @brief Writes the value to the parquet file using the StreamWriter from the ParquetHelper class. - * - * @param value pointer to value of parameter. - * @param field RtlFieldInfo holds meta information about the embed context. - * @param r_parquet Shared pointer to helper class that operates the parquet functions for us. - */ -void bindUIntParam(unsigned __int64 value, const RtlFieldInfo *field, std::shared_ptr r_parquet) -{ - uint64_t val = value; - - rapidjson::Value key = rapidjson::Value(field->name, jsonAlloc); - rapidjson::Value num(val); - - addMember(r_parquet, key, num); -} - -/** - * @brief Writes the value to the parquet file using the StreamWriter from the ParquetHelper class. - * - * @param value pointer to value of parameter. - * @param field RtlFieldInfo holds meta information about the embed context. - * @param r_parquet Shared pointer to helper class that operates the parquet functions for us. - */ -void bindRealParam(double value, const RtlFieldInfo *field, std::shared_ptr r_parquet) -{ - rapidjson::Value key = rapidjson::Value(field->name, jsonAlloc); - rapidjson::Value val = rapidjson::Value(value); - - addMember(r_parquet, key, val); -} - -/** - * @brief Writes the value to the parquet file using the StreamWriter from the ParquetHelper class. - * - * @param chars Number of chars in value. - * @param value pointer to value of parameter. - * @param field RtlFieldInfo holds meta information about the embed context. - * @param r_parquet Shared pointer to helper class that operates the parquet functions for us. - */ -void bindUnicodeParam(unsigned chars, const UChar *value, const RtlFieldInfo *field, std::shared_ptr r_parquet) -{ - size32_t utf8chars; - char *utf8; - rtlUnicodeToUtf8X(utf8chars, utf8, chars, value); - - rapidjson::Value key = rapidjson::Value(field->name, jsonAlloc); - rapidjson::Value val = rapidjson::Value(utf8, rtlUtf8Size(utf8chars, utf8), jsonAlloc); - - addMember(r_parquet, key, val); -} - -/** - * @brief Writes the value to the parquet file using the StreamWriter from the ParquetHelper class. - * - * @param value Decimal value represented as a string. - * @param field RtlFieldInfo holds meta information about the embed context. - * @param r_parquet Shared pointer to helper class that operates the parquet functions for us. - */ -void bindDecimalParam(const char *value, size32_t bytes, const RtlFieldInfo *field, std::shared_ptr r_parquet) -{ - rapidjson::Value key = rapidjson::Value(field->name, jsonAlloc); - rapidjson::Value val = rapidjson::Value(std::string(value, bytes), jsonAlloc); - - addMember(r_parquet, key, val); + parquetWriter->addMember(key, val); } /** @@ -1659,82 +1598,98 @@ void ParquetRecordBinder::processRow(const byte *row) } /** - * @brief Calls the bind function for the data type of the value. + * @brief Processes the field for its respective type, and adds the key-value pair to the current row. * * @param len Number of chars in value. - * @param value Data to be written to the parquet file. - * @param field Object with information about the current field. + * @param value Data to be written to the Parquet file. + * @param field RtlFieldInfo holds metadeta about the field. */ void ParquetRecordBinder::processString(unsigned len, const char *value, const RtlFieldInfo *field) { checkNextParam(field); - - bindStringParam(len, value, field, r_parquet); + bindStringParam(len, value, field, parquetWriter); } /** - * @brief Calls the bind function for the data type of the value. + * @brief Processes the field for its respective type, and adds the key-value pair to the current row. * - * @param value Data to be written to the parquet file. - * @param field Object with information about the current field. + * @param value Data to be written to the Parquet file. + * @param field RtlFieldInfo holds metadata about the field. */ void ParquetRecordBinder::processBool(bool value, const RtlFieldInfo *field) { - bindBoolParam(value, field, r_parquet); + rapidjson::Value key = rapidjson::Value(field->name, jsonAlloc); + rapidjson::Value val = rapidjson::Value(value); + + parquetWriter->addMember(key, val); } /** - * @brief Calls the bind function for the data type of the value. + * @brief Processes the field for its respective type, and adds the key-value pair to the current row. * * @param len Number of chars in value. - * @param value Data to be written to the parquet file. - * @param field Object with information about the current field. + * @param value Data to be written to the Parquet file. + * @param field RtlFieldInfo holds metadata about the field. */ void ParquetRecordBinder::processData(unsigned len, const void *value, const RtlFieldInfo *field) { - bindDataParam(len, (const char *) value, field, r_parquet); + rapidjson::Value key = rapidjson::Value(field->name, jsonAlloc); + rapidjson::Value val = rapidjson::Value((const char *)value, len, jsonAlloc); + + parquetWriter->addMember(key, val); } /** - * @brief Calls the bind function for the data type of the value. + * @brief Processes the field for its respective type, and adds the key-value pair to the current row. * - * @param value Data to be written to the parquet file. - * @param field Object with information about the current field. + * @param value Data to be written to the Parquet file. + * @param field RtlFieldInfo holds metadata about the field. */ void ParquetRecordBinder::processInt(__int64 value, const RtlFieldInfo *field) { - bindIntParam(value, field, r_parquet); + int64_t val = value; + rapidjson::Value key = rapidjson::Value(field->name, jsonAlloc); + rapidjson::Value num(val); + + parquetWriter->addMember(key, num); } /** - * @brief Calls the bind function for the data type of the value. + * @brief Processes the field for its respective type, and adds the key-value pair to the current row. * - * @param value Data to be written to the parquet file. - * @param field Object with information about the current field. + * @param value Data to be written to the Parquet file. + * @param field RtlFieldInfo holds metadata about the field. */ void ParquetRecordBinder::processUInt(unsigned __int64 value, const RtlFieldInfo *field) { - bindUIntParam(value, field, r_parquet); + uint64_t val = value; + rapidjson::Value key = rapidjson::Value(field->name, jsonAlloc); + rapidjson::Value num(val); + + parquetWriter->addMember(key, num); } /** - * @brief Calls the bind function for the data type of the value. + * @brief Processes the field for its respective type, and adds the key-value pair to the current row. * - * @param value Data to be written to the parquet file. - * @param field Object with information about the current field. + * @param value Data to be written to the Parquet file. + * @param field RtlFieldInfo holds metadata about the field. */ void ParquetRecordBinder::processReal(double value, const RtlFieldInfo *field) { - bindRealParam(value, field, r_parquet); + rapidjson::Value key = rapidjson::Value(field->name, jsonAlloc); + rapidjson::Value val = rapidjson::Value(value); + + parquetWriter->addMember(key, val); } /** - * @brief Calls the bind function for the data type of the value. + * @brief Processes the field for its respective type, and adds the key-value pair to the current row. * - * @param value Data to be written to the parquet file. + * @param value Data to be written to the Parquet file. * @param digits Number of digits in decimal. * @param precision Number of digits of precision. - * @param field Object with information about the current field. + * @param field RtlFieldInfo holds metadata about the field. */ void ParquetRecordBinder::processDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo *field) { @@ -1744,27 +1699,35 @@ void ParquetRecordBinder::processDecimal(const void *value, unsigned digits, uns val.setDecimal(digits, precision, value); val.getStringX(bytes, decText.refstr()); - bindDecimalParam(decText.getstr(), bytes, field, r_parquet); + rapidjson::Value key = rapidjson::Value(field->name, jsonAlloc); + rapidjson::Value dValue = rapidjson::Value(std::string(decText.getstr(), bytes), jsonAlloc); + parquetWriter->addMember(key, dValue); } /** - * @brief Calls the bind function for the data type of the value. + * @brief Processes the field for its respective type, and adds the key-value pair to the current row. * * @param chars Number of chars in the value. - * @param value Data to be written to the parquet file. - * @param field Object with information about the current field. + * @param value Data to be written to the Parquet file. + * @param field RtlFieldInfo holds metadata about the field. */ void ParquetRecordBinder::processUnicode(unsigned chars, const UChar *value, const RtlFieldInfo *field) { - bindUnicodeParam(chars, value, field, r_parquet); + size32_t utf8chars; + char *utf8; + rtlUnicodeToUtf8X(utf8chars, utf8, chars, value); + + rapidjson::Value key = rapidjson::Value(field->name, jsonAlloc); + rapidjson::Value val = rapidjson::Value(utf8, rtlUtf8Size(utf8chars, utf8), jsonAlloc); + parquetWriter->addMember(key, val); } /** - * @brief Calls the bind function for the data type of the value. + * @brief Processes the field for its respective type, and adds the key-value pair to the current row. * * @param len Length of QString - * @param value Data to be written to the parquet file. - * @param field Object with information about the current field. + * @param value Data to be written to the Parquet file. + * @param field RtlFieldInfo holds metadata about the field. */ void ParquetRecordBinder::processQString(unsigned len, const char *value, const RtlFieldInfo *field) { @@ -1772,40 +1735,44 @@ void ParquetRecordBinder::processQString(unsigned len, const char *value, const rtlDataAttr text; rtlQStrToStrX(charCount, text.refstr(), len, value); - bindStringParam(charCount, text.getstr(), field, r_parquet); + bindStringParam(charCount, text.getstr(), field, parquetWriter); } /** - * @brief Calls the bind function for the data type of the value. + * @brief Processes the field for its respective type, and adds the key-value pair to the current row. * * @param chars Number of chars in the value. - * @param value Data to be written to the parquet file. - * @param field Object with information about the current field. + * @param value Data to be written to the Parquet file. + * @param field RtlFieldInfo holds metadata about the field. */ void ParquetRecordBinder::processUtf8(unsigned chars, const char *value, const RtlFieldInfo *field) { - bindUtf8Param(chars, value, field, r_parquet); + rapidjson::Value key = rapidjson::Value(field->name, jsonAlloc); + rapidjson::Value val = rapidjson::Value(value, rtlUtf8Size(chars, value), jsonAlloc); + + parquetWriter->addMember(key, val); } /** - * @brief Construct a new ParquetEmbedFunctionContext object + * @brief Construct a new ParquetEmbedFunctionContext object and parses the options set by the user. * * @param _logctx Context logger for use with the ParquetRecordBinder ParquetDatasetBinder classes. + * @param activityCtx Context about the Thor worker configuration. * @param options Pointer to the list of options that are passed into the Embed function. * @param _flags Should be zero if the embedded script is ok. */ ParquetEmbedFunctionContext::ParquetEmbedFunctionContext(const IContextLogger &_logctx, const IThorActivityContext *activityCtx, const char *options, unsigned _flags) - : logctx(_logctx), m_scriptFlags(_flags) + : logctx(_logctx), scriptFlags(_flags) { // Option Variables - const char *option = ""; // Read(read), Read Parition(readpartition), Write(write), Write Partition(writepartition) - const char *location = ""; // file name and location of where to write parquet file - const char *destination = ""; // file name and location of where to read parquet file from - const char *partitionFields = ""; // comma delimited values containing fields to partition files on - __int64 rowsize = 40000; // Size of the row groups when writing to parquet files - __int64 batchSize = 40000; // Size of the batches when converting parquet columns to rows - bool overwrite = false; // If true overwrite file with no error. The default is false and will throw an error if the file already exists. - arrow::Compression::type compressionOption = arrow::Compression::UNCOMPRESSED; + const char *option = ""; // Read(read), Read Parition(readpartition), Write(write), Write Partition(writepartition) + const char *location = ""; // Full path to target location of where to write Parquet file/s. Can be a directory or filename. + const char *destination = ""; // Full path to target location of where to read Parquet file/s. Can be a directory or filename. + const char *partitionFields = ""; // Semicolon delimited values containing fields to partition files on + __int64 maxRowCountInBatch = 40000; // Number of rows in the row groups when writing to Parquet files + __int64 maxRowCountInTable = 40000; // Number of rows in the tables when converting Parquet columns to rows + bool overwrite = false; // If true overwrite file with no error. The default is false and will throw an error if the file already exists. + arrow::Compression::type compressionOption = arrow::Compression::UNCOMPRESSED; // Compression option set by the user and defaults to UNCOMPRESSED. // Iterate through user options and save them StringArray inputOptions; @@ -1825,9 +1792,9 @@ ParquetEmbedFunctionContext::ParquetEmbedFunctionContext(const IContextLogger &_ else if (stricmp(optName, "destination") == 0) destination = val; else if (stricmp(optName, "MaxRowSize") == 0) - rowsize = atoi(val); + maxRowCountInBatch = atoi(val); else if (stricmp(optName, "BatchSize") == 0) - batchSize = atoi(val); + maxRowCountInTable = atoi(val); else if (stricmp(optName, "overwriteOpt") == 0) overwrite = clipStrToBool(val); else if (stricmp(optName, "compression") == 0) @@ -1857,13 +1824,17 @@ ParquetEmbedFunctionContext::ParquetEmbedFunctionContext(const IContextLogger &_ failx("Unknown option %s", optName.str()); } } - if (option == "" || (location == "" && destination == "")) + if (startsWithIgnoreCase(option, "read")) + { + parquetReader = std::make_shared(option, location, maxRowCountInTable, partitionFields, activityCtx); + } + else if (startsWithIgnoreCase(option, "write")) { - failx("Invalid options must specify read or write settings and a location to perform such actions."); + parquetWriter = std::make_shared(option, destination, maxRowCountInBatch, overwrite, compressionOption, partitionFields, activityCtx); } else { - m_parquet = std::make_shared(option, location, destination, rowsize, batchSize, overwrite, compressionOption, partitionFields, activityCtx); + failx("Invalid read/write selection."); } } @@ -1916,17 +1887,29 @@ void ParquetEmbedFunctionContext::getDecimalResult(Decimal &value) UNIMPLEMENTED_X("Parquet Scalar Return Type DECIMAL"); } +/** + * @brief Return a Dataset read from Parquet to the user + * + * @param _resultAllocator Pointer to allocator for the engine. + * @return Pointer to the memory allocated for the result. + */ IRowStream *ParquetEmbedFunctionContext::getDatasetResult(IEngineRowAllocator *_resultAllocator) { Owned parquetRowStream; - parquetRowStream.setown(new ParquetRowStream(_resultAllocator, m_parquet)); + parquetRowStream.setown(new ParquetRowStream(_resultAllocator, parquetReader)); return parquetRowStream.getLink(); } +/** + * @brief Return a Row read from Parquet to the user + * + * @param _resultAllocator Pointer to allocator for the engine. + * @return Pointer to the memory allocated for the result. + */ byte *ParquetEmbedFunctionContext::getRowResult(IEngineRowAllocator *_resultAllocator) { Owned parquetRowStream; - parquetRowStream.setown(new ParquetRowStream(_resultAllocator, m_parquet)); + parquetRowStream.setown(new ParquetRowStream(_resultAllocator, parquetReader)); return (byte *)parquetRowStream->nextRow(); } @@ -1936,21 +1919,35 @@ size32_t ParquetEmbedFunctionContext::getTransformResult(ARowBuilder &rowBuilder return 0; } +/** + * @brief Binds the values of a row to the with the ParquetWriter. + * + * @param name Name of the row field. + * @param metaVal Metadata containing the type info of the row. + * @param val The date for the row to be bound. + */ void ParquetEmbedFunctionContext::bindRowParam(const char *name, IOutputMetaData &metaVal, const byte *val) { - ParquetRecordBinder binder(logctx, metaVal.queryTypeInfo(), m_nextParam, m_parquet); + ParquetRecordBinder binder(logctx, metaVal.queryTypeInfo(), nextParam, parquetWriter); binder.processRow(val); - m_nextParam += binder.numFields(); + nextParam += binder.numFields(); } +/** + * @brief Bind dataset parameter passed in by user. + * + * @param name name of the dataset. + * @param metaVal Metadata holding typeinfo for the dataset. + * @param val Input rowstream for binding the dataset data. + */ void ParquetEmbedFunctionContext::bindDatasetParam(const char *name, IOutputMetaData &metaVal, IRowStream *val) { - if (m_oInputStream) + if (oInputStream) { fail("At most one dataset parameter supported"); } - m_oInputStream.setown(new ParquetDatasetBinder(logctx, LINK(val), metaVal.queryTypeInfo(), m_parquet, m_nextParam)); - m_nextParam += m_oInputStream->numFields(); + oInputStream.setown(new ParquetDatasetBinder(logctx, LINK(val), metaVal.queryTypeInfo(), parquetWriter, nextParam)); + nextParam += oInputStream->numFields(); } void ParquetEmbedFunctionContext::bindBooleanParam(const char *name, bool val) @@ -2018,7 +2015,6 @@ void ParquetEmbedFunctionContext::bindUnicodeParam(const char *name, size32_t ch * and ENDEMBED block. * * @param chars The number of chars in the script. - * * @param script The embedded script for compilation. */ void ParquetEmbedFunctionContext::compileEmbeddedScript(size32_t chars, const char *script) @@ -2027,16 +2023,15 @@ void ParquetEmbedFunctionContext::compileEmbeddedScript(size32_t chars, const ch void ParquetEmbedFunctionContext::execute() { - if (m_oInputStream) + if (oInputStream) { - m_oInputStream->executeAll(); + oInputStream->executeAll(); } else { - if (m_parquet->queryPartOptions() == 'r') + if (parquetReader) { - reportIfFailure(m_parquet->openReadFile()); - reportIfFailure(m_parquet->processReadFile()); + reportIfFailure(parquetReader->processReadFile()); } else { @@ -2052,9 +2047,9 @@ void ParquetEmbedFunctionContext::callFunction() unsigned ParquetEmbedFunctionContext::checkNextParam(const char *name) { - if (m_nextParam == m_numParams) + if (nextParam == numParams) failx("Too many parameters supplied: No matching $ placeholder for parameter %s", name); - return m_nextParam++; + return nextParam++; } /** @@ -2072,23 +2067,6 @@ bool ParquetDatasetBinder::bindNext() return true; } -void ParquetDatasetBinder::writeRecordBatch() -{ - // convert row_batch vector to RecordBatch and write to file. - PARQUET_ASSIGN_OR_THROW(auto recordBatch, d_parquet->convertToRecordBatch(d_parquet->queryRecordBatch(), d_parquet->getSchema())); - // Write each batch as a row_groups - PARQUET_ASSIGN_OR_THROW(auto table, arrow::Table::FromRecordBatches(d_parquet->getSchema(), {recordBatch})); - - if (partition) - { - reportIfFailure(d_parquet->writePartition(table)); - } - else - { - reportIfFailure(d_parquet->queryWriter()->WriteTable(*(table.get()), recordBatch->num_rows())); - } -} - /** * @brief Binds all the rows of the dataset and executes the function. */ @@ -2096,35 +2074,34 @@ void ParquetDatasetBinder::executeAll() { if (bindNext()) { - reportIfFailure(d_parquet->openWriteFile()); + reportIfFailure(parquetWriter->openWriteFile()); int i = 1; - int rowSize = d_parquet->getMaxRowSize(); + int maxRowCountInBatch = parquetWriter->getMaxRowSize(); do { - if (i % rowSize == 0) + if (i % maxRowCountInBatch == 0) { - writeRecordBatch(); + parquetWriter->writeRecordBatch(); jsonAlloc.Clear(); } - d_parquet->updateRow(); + parquetWriter->updateRow(); i++; } while (bindNext()); i--; - if (i % rowSize != 0) + if (i % maxRowCountInBatch != 0) { - d_parquet->queryRecordBatch().resize(i % rowSize); - writeRecordBatch(); + parquetWriter->writeRecordBatch(i % maxRowCountInBatch); jsonAlloc.Clear(); } } } + /** * @brief Serves as the entry point for the HPCC Engine into the plugin and is how it obtains a * ParquetEmbedFunctionContext object for creating the query and executing it. - * */ class ParquetEmbedContext : public CInterfaceOf { diff --git a/plugins/parquet/parquetembed.hpp b/plugins/parquet/parquetembed.hpp index addfbfe03ae..f537eba588b 100644 --- a/plugins/parquet/parquetembed.hpp +++ b/plugins/parquet/parquetembed.hpp @@ -59,7 +59,7 @@ extern void fail(const char *msg) __attribute__((noreturn)); static void typeError(const char *expected, const char *fieldname) { - VStringBuffer msg("MongoDBembed: type mismatch - %s expected", expected); + VStringBuffer msg("parquetembed: type mismatch - %s expected", expected); if (!isEmptyString(fieldname)) msg.appendf(" for field %s", fieldname); rtlFail(0, msg.str()); @@ -112,6 +112,9 @@ static void handleDeserializeOutcome(DeserializationResult resultcode, const cha enum PathNodeType {CPNTScalar, CPNTDataset, CPNTSet}; +/** + * @brief Keep track of nested structures when binding rows to parquet. + */ struct PathTracker { const char *nodeName; @@ -146,6 +149,10 @@ enum ParquetArrayType RealType }; +/** + * @brief A Visitor type class that implements every arrow type and gets a pointer to the visited array in the correct type. + * The size and type of the array are stored to read the correct array when returning values. +*/ class ParquetArrayVisitor : public arrow::ArrayVisitor { public: @@ -156,206 +163,206 @@ class ParquetArrayVisitor : public arrow::ArrayVisitor } arrow::Status Visit(const arrow::BooleanArray &array) { - bool_arr = &array; + boolArr = &array; type = BoolType; return arrow::Status::OK(); } arrow::Status Visit(const arrow::Int8Array &array) { - int8_arr = &array; + int8Arr = &array; type = IntType; size = 8; return arrow::Status::OK(); } arrow::Status Visit(const arrow::Int16Array &array) { - int16_arr = &array; + int16Arr = &array; type = IntType; size = 16; return arrow::Status::OK(); } arrow::Status Visit(const arrow::Int32Array &array) { - int32_arr = &array; + int32Arr = &array; type = IntType; size = 32; return arrow::Status::OK(); } arrow::Status Visit(const arrow::Int64Array &array) { - int64_arr = &array; + int64Arr = &array; type = IntType; size = 64; return arrow::Status::OK(); } arrow::Status Visit(const arrow::UInt8Array &array) { - uint8_arr = &array; + uint8Arr = &array; type = UIntType; size = 8; return arrow::Status::OK(); } arrow::Status Visit(const arrow::UInt16Array &array) { - uint16_arr = &array; + uint16Arr = &array; type = UIntType; size = 16; return arrow::Status::OK(); } arrow::Status Visit(const arrow::UInt32Array &array) { - uint32_arr = &array; + uint32Arr = &array; type = UIntType; size = 32; return arrow::Status::OK(); } arrow::Status Visit(const arrow::UInt64Array &array) { - uint64_arr = &array; + uint64Arr = &array; type = UIntType; size = 64; return arrow::Status::OK(); } arrow::Status Visit(const arrow::Date32Array &array) { - date32_arr = &array; + date32Arr = &array; type = DateType; size = 32; return arrow::Status::OK(); } arrow::Status Visit(const arrow::Date64Array &array) { - date64_arr = &array; + date64Arr = &array; type = DateType; size = 64; return arrow::Status::OK(); } arrow::Status Visit(const arrow::TimestampArray &array) { - timestamp_arr = &array; + timestampArr = &array; type = TimestampType; return arrow::Status::OK(); } arrow::Status Visit(const arrow::Time32Array &array) { - time32_arr = &array; + time32Arr = &array; type = TimeType; size = 32; return arrow::Status::OK(); } arrow::Status Visit(const arrow::Time64Array &array) { - time64_arr = &array; + time64Arr = &array; type = TimeType; size = 64; return arrow::Status::OK(); } arrow::Status Visit(const arrow::DurationArray &array) { - duration_arr = &array; + durationArr = &array; type = DurationType; return arrow::Status::OK(); } arrow::Status Visit(const arrow::HalfFloatArray &array) { - half_float_arr = &array; + halfFloatArr = &array; type = RealType; size = 2; return arrow::Status::OK(); } arrow::Status Visit(const arrow::FloatArray &array) { - float_arr = &array; + floatArr = &array; type = RealType; size = 4; return arrow::Status::OK(); } arrow::Status Visit(const arrow::DoubleArray &array) { - double_arr = &array; + doubleArr = &array; type = RealType; size = 8; return arrow::Status::OK(); } arrow::Status Visit(const arrow::StringArray &array) { - string_arr = &array; + stringArr = &array; type = StringType; return arrow::Status::OK(); } arrow::Status Visit(const arrow::LargeStringArray &array) { - large_string_arr = &array; + largeStringArr = &array; type = LargeStringType; return arrow::Status::OK(); } arrow::Status Visit(const arrow::BinaryArray &array) { - bin_arr = &array; + binArr = &array; type = BinaryType; return arrow::Status::OK(); } arrow::Status Visit(const arrow::LargeBinaryArray &array) { - large_bin_arr = &array; + largeBinArr = &array; type = LargeBinaryType; return arrow::Status::OK(); } arrow::Status Visit(const arrow::Decimal128Array &array) { - dec_arr = &array; + decArr = &array; type = DecimalType; size = 128; return arrow::Status::OK(); } arrow::Status Visit(const arrow::Decimal256Array &array) { - large_dec_arr = &array; + largeDecArr = &array; type = DecimalType; size = 256; return arrow::Status::OK(); } arrow::Status Visit(const arrow::ListArray &array) { - list_arr = &array; + listArr = &array; type = ListType; return arrow::Status::OK(); } arrow::Status Visit(const arrow::StructArray &array) { - struct_arr = &array; + structArr = &array; type = StructType; return arrow::Status::OK(); } - ParquetArrayType type = NullType; - int size = 0; - const arrow::BooleanArray *bool_arr = nullptr; - const arrow::Int8Array *int8_arr = nullptr; - const arrow::Int16Array *int16_arr = nullptr; - const arrow::Int32Array *int32_arr = nullptr; - const arrow::Int64Array *int64_arr = nullptr; - const arrow::UInt8Array *uint8_arr = nullptr; - const arrow::UInt16Array *uint16_arr = nullptr; - const arrow::UInt32Array *uint32_arr = nullptr; - const arrow::UInt64Array *uint64_arr = nullptr; - const arrow::Date32Array *date32_arr = nullptr; - const arrow::Date64Array *date64_arr = nullptr; - const arrow::TimestampArray *timestamp_arr = nullptr; - const arrow::Time32Array *time32_arr = nullptr; - const arrow::Time64Array *time64_arr = nullptr; - const arrow::DurationArray *duration_arr = nullptr; - const arrow::HalfFloatArray *half_float_arr = nullptr; - const arrow::FloatArray *float_arr = nullptr; - const arrow::DoubleArray *double_arr = nullptr; - const arrow::StringArray *string_arr = nullptr; - const arrow::LargeStringArray *large_string_arr = nullptr; - const arrow::BinaryArray *bin_arr = nullptr; - const arrow::LargeBinaryArray *large_bin_arr = nullptr; - const arrow::Decimal128Array *dec_arr = nullptr; - const arrow::Decimal256Array *large_dec_arr = nullptr; - const arrow::ListArray *list_arr = nullptr; - const arrow::StructArray *struct_arr = nullptr; + ParquetArrayType type = NullType; // Type of the Array that was read. + int size = 0; // For Signed, Unsigned, and Real types size differentiates between the different array types. + const arrow::BooleanArray *boolArr = nullptr; // A pointer to the tables column that is stored in memory for the correct value type. + const arrow::Int8Array *int8Arr = nullptr; + const arrow::Int16Array *int16Arr = nullptr; + const arrow::Int32Array *int32Arr = nullptr; + const arrow::Int64Array *int64Arr = nullptr; + const arrow::UInt8Array *uint8Arr = nullptr; + const arrow::UInt16Array *uint16Arr = nullptr; + const arrow::UInt32Array *uint32Arr = nullptr; + const arrow::UInt64Array *uint64Arr = nullptr; + const arrow::Date32Array *date32Arr = nullptr; + const arrow::Date64Array *date64Arr = nullptr; + const arrow::TimestampArray *timestampArr = nullptr; + const arrow::Time32Array *time32Arr = nullptr; + const arrow::Time64Array *time64Arr = nullptr; + const arrow::DurationArray *durationArr = nullptr; + const arrow::HalfFloatArray *halfFloatArr = nullptr; + const arrow::FloatArray *floatArr = nullptr; + const arrow::DoubleArray *doubleArr = nullptr; + const arrow::StringArray *stringArr = nullptr; + const arrow::LargeStringArray *largeStringArr = nullptr; + const arrow::BinaryArray *binArr = nullptr; + const arrow::LargeBinaryArray *largeBinArr = nullptr; + const arrow::Decimal128Array *decArr = nullptr; + const arrow::Decimal256Array *largeDecArr = nullptr; + const arrow::ListArray *listArr = nullptr; + const arrow::StructArray *structArr = nullptr; }; const rapidjson::Value kNullJsonSingleton = rapidjson::Value(); @@ -363,67 +370,62 @@ const rapidjson::Value kNullJsonSingleton = rapidjson::Value(); class DocValuesIterator { public: - /// \param rows vector of rows - /// \param path field names to enter - /// \param array_levels number of arrays to enter DocValuesIterator(const std::vector &_rows, - std::vector &&_path, int64_t _array_levels) - : rows(_rows), path(std::move(_path)), array_levels(_array_levels) {} - + std::vector &&_path, int64_t _arrayLevels) + : rows(_rows), path(std::move(_path)), arrayLevels(_arrayLevels) {} ~DocValuesIterator() = default; - const rapidjson::Value *NextArrayOrRow(const rapidjson::Value *value, size_t *path_i, - int64_t *arr_i) + const rapidjson::Value *NextArrayOrRow(const rapidjson::Value *value, size_t *pathIdx, int64_t *arrIdx) { - while (array_stack.size() > 0) + while (arrayStack.size() > 0) { - ArrayPosition &pos = array_stack.back(); + ArrayPosition &pos = arrayStack.back(); // Try to get next position in Array - if (pos.index + 1 < pos.array_node->Size()) + if (pos.index + 1 < pos.arrayNode->Size()) { ++pos.index; - value = &(*pos.array_node)[pos.index]; - *path_i = pos.path_index; - *arr_i = array_stack.size(); + value = &(*pos.arrayNode)[pos.index]; + *pathIdx = pos.pathIndex; + *arrIdx = arrayStack.size(); return value; } else { - array_stack.pop_back(); + arrayStack.pop_back(); } } - ++row_i; - if (row_i < rows.size()) + ++rowIdx; + if (rowIdx < rows.size()) { - value = static_cast(&rows[row_i]); + value = static_cast(&rows[rowIdx]); } else { value = nullptr; } - *path_i = 0; - *arr_i = 0; + *pathIdx = 0; + *arrIdx = 0; return value; } arrow::Result Next() { const rapidjson::Value *value = nullptr; - size_t path_i; - int64_t arr_i; + size_t pathIdx; + int64_t arrIdx; // Can either start at document or at last array level - if (array_stack.size() > 0) + if (arrayStack.size() > 0) { - auto &pos = array_stack.back(); - value = pos.array_node; - path_i = pos.path_index; - arr_i = array_stack.size() - 1; + auto &pos = arrayStack.back(); + value = pos.arrayNode; + pathIdx = pos.pathIndex; + arrIdx = arrayStack.size() - 1; } - value = NextArrayOrRow(value, &path_i, &arr_i); + value = NextArrayOrRow(value, &pathIdx, &arrIdx); // Traverse to desired level (with possible backtracking as needed) - while (path_i < path.size() || arr_i < array_levels) + while (pathIdx < path.size() || arrIdx < arrayLevels) { if (value == nullptr) { @@ -432,23 +434,23 @@ class DocValuesIterator else if (value->IsArray() && value->Size() > 0) { ArrayPosition pos; - pos.array_node = value; - pos.path_index = path_i; + pos.arrayNode = value; + pos.pathIndex = pathIdx; pos.index = 0; - array_stack.push_back(pos); + arrayStack.push_back(pos); value = &(*value)[0]; - ++arr_i; + ++arrIdx; } else if (value->IsArray()) { // Empty array means we need to backtrack and go to next array or row - value = NextArrayOrRow(value, &path_i, &arr_i); + value = NextArrayOrRow(value, &pathIdx, &arrIdx); } - else if (value->HasMember(path[path_i])) + else if (value->HasMember(path[pathIdx])) { - value = &(*value)[path[path_i]]; - ++path_i; + value = &(*value)[path[pathIdx]]; + ++pathIdx; } else { @@ -463,27 +465,27 @@ class DocValuesIterator private: const std::vector &rows; std::vector path; - int64_t array_levels; - size_t row_i = -1; // index of current row + int64_t arrayLevels; + size_t rowIdx = -1; // Index of current row // Info about array position for one array level in array stack struct ArrayPosition { - const rapidjson::Value *array_node; - int64_t path_index; + const rapidjson::Value *arrayNode; + int64_t pathIndex; rapidjson::SizeType index; }; - std::vector array_stack; + std::vector arrayStack; }; class JsonValueConverter { public: - explicit JsonValueConverter(const std::vector &rows) - : rows_(rows) {} + explicit JsonValueConverter(const std::vector &_rows) + : rows(_rows) {} - JsonValueConverter(const std::vector &rows, const std::vector &root_path, int64_t array_levels) - : rows_(rows), root_path_(root_path), array_levels_(array_levels) {} + JsonValueConverter(const std::vector &_rows, const std::vector &_rootPath, int64_t _arrayLevels) + : rows(_rows), rootPath(_rootPath), arrayLevels(_arrayLevels) {} ~JsonValueConverter() = default; @@ -494,10 +496,10 @@ class JsonValueConverter } /// \brief For field passed in, append corresponding values to builder - arrow::Status Convert(const arrow::Field &field, const std::string &field_name, arrow::ArrayBuilder *builder) + arrow::Status Convert(const arrow::Field &field, const std::string &_fieldName, arrow::ArrayBuilder *builder) { - field_name_ = field_name; - builder_ = builder; + fieldName = _fieldName; + arrayBuilder = builder; ARROW_RETURN_NOT_OK(arrow::VisitTypeInline(*field.type().get(), this)); return arrow::Status::OK(); } @@ -510,10 +512,10 @@ class JsonValueConverter arrow::Status Visit(const arrow::LargeBinaryType &type) { - arrow::LargeBinaryBuilder *builder = static_cast(builder_); - for (const auto &maybe_value : FieldValues()) + arrow::LargeBinaryBuilder *builder = static_cast(arrayBuilder); + for (const auto &maybeValue : FieldValues()) { - ARROW_ASSIGN_OR_RAISE(auto value, maybe_value); + ARROW_ASSIGN_OR_RAISE(auto value, maybeValue); if (value->IsNull()) { ARROW_RETURN_NOT_OK(builder->AppendNull()); @@ -528,10 +530,10 @@ class JsonValueConverter arrow::Status Visit(const arrow::Int64Type &type) { - arrow::Int64Builder *builder = static_cast(builder_); - for (const auto &maybe_value : FieldValues()) + arrow::Int64Builder *builder = static_cast(arrayBuilder); + for (const auto &maybeValue : FieldValues()) { - ARROW_ASSIGN_OR_RAISE(auto value, maybe_value); + ARROW_ASSIGN_OR_RAISE(auto value, maybeValue); if (value->IsNull()) { ARROW_RETURN_NOT_OK(builder->AppendNull()); @@ -557,10 +559,10 @@ class JsonValueConverter arrow::Status Visit(const arrow::Int32Type &type) { - arrow::Int32Builder *builder = static_cast(builder_); - for (const auto &maybe_value : FieldValues()) + arrow::Int32Builder *builder = static_cast(arrayBuilder); + for (const auto &maybeValue : FieldValues()) { - ARROW_ASSIGN_OR_RAISE(auto value, maybe_value); + ARROW_ASSIGN_OR_RAISE(auto value, maybeValue); if (value->IsNull()) { ARROW_RETURN_NOT_OK(builder->AppendNull()); @@ -575,10 +577,10 @@ class JsonValueConverter arrow::Status Visit(const arrow::UInt64Type &type) { - arrow::Int64Builder *builder = static_cast(builder_); - for (const auto &maybe_value : FieldValues()) + arrow::Int64Builder *builder = static_cast(arrayBuilder); + for (const auto &maybeValue : FieldValues()) { - ARROW_ASSIGN_OR_RAISE(auto value, maybe_value); + ARROW_ASSIGN_OR_RAISE(auto value, maybeValue); if (value->IsNull()) { ARROW_RETURN_NOT_OK(builder->AppendNull()); @@ -604,10 +606,10 @@ class JsonValueConverter arrow::Status Visit(const arrow::UInt32Type &type) { - arrow::UInt32Builder *builder = static_cast(builder_); - for (const auto &maybe_value : FieldValues()) + arrow::UInt32Builder *builder = static_cast(arrayBuilder); + for (const auto &maybeValue : FieldValues()) { - ARROW_ASSIGN_OR_RAISE(auto value, maybe_value); + ARROW_ASSIGN_OR_RAISE(auto value, maybeValue); if (value->IsNull()) { ARROW_RETURN_NOT_OK(builder->AppendNull()); @@ -622,10 +624,10 @@ class JsonValueConverter arrow::Status Visit(const arrow::FloatType &type) { - arrow::FloatBuilder *builder = static_cast(builder_); - for (const auto &maybe_value : FieldValues()) + arrow::FloatBuilder *builder = static_cast(arrayBuilder); + for (const auto &maybeValue : FieldValues()) { - ARROW_ASSIGN_OR_RAISE(auto value, maybe_value); + ARROW_ASSIGN_OR_RAISE(auto value, maybeValue); if (value->IsNull()) { ARROW_RETURN_NOT_OK(builder->AppendNull()); @@ -640,10 +642,10 @@ class JsonValueConverter arrow::Status Visit(const arrow::DoubleType &type) { - arrow::DoubleBuilder *builder = static_cast(builder_); - for (const auto &maybe_value : FieldValues()) + arrow::DoubleBuilder *builder = static_cast(arrayBuilder); + for (const auto &maybeValue : FieldValues()) { - ARROW_ASSIGN_OR_RAISE(auto value, maybe_value); + ARROW_ASSIGN_OR_RAISE(auto value, maybeValue); if (value->IsNull()) { ARROW_RETURN_NOT_OK(builder->AppendNull()); @@ -658,10 +660,10 @@ class JsonValueConverter arrow::Status Visit(const arrow::StringType &type) { - arrow::StringBuilder *builder = static_cast(builder_); - for (const auto &maybe_value : FieldValues()) + arrow::StringBuilder *builder = static_cast(arrayBuilder); + for (const auto &maybeValue : FieldValues()) { - ARROW_ASSIGN_OR_RAISE(auto value, maybe_value); + ARROW_ASSIGN_OR_RAISE(auto value, maybeValue); if (value->IsNull()) { ARROW_RETURN_NOT_OK(builder->AppendNull()); @@ -676,10 +678,10 @@ class JsonValueConverter arrow::Status Visit(const arrow::BooleanType &type) { - arrow::BooleanBuilder *builder = static_cast(builder_); - for (const auto &maybe_value : FieldValues()) + arrow::BooleanBuilder *builder = static_cast(arrayBuilder); + for (const auto &maybeValue : FieldValues()) { - ARROW_ASSIGN_OR_RAISE(auto value, maybe_value); + ARROW_ASSIGN_OR_RAISE(auto value, maybeValue); if (value->IsNull()) { ARROW_RETURN_NOT_OK(builder->AppendNull()); @@ -694,27 +696,27 @@ class JsonValueConverter arrow::Status Visit(const arrow::StructType &type) { - arrow::StructBuilder *builder = static_cast(builder_); + arrow::StructBuilder *builder = static_cast(arrayBuilder); - std::vector child_path(root_path_); - if (field_name_.size() > 0) + std::vector childPath(rootPath); + if (fieldName.size() > 0) { - child_path.push_back(field_name_); + childPath.push_back(fieldName); } - auto child_converter = JsonValueConverter(rows_, child_path, array_levels_); + auto child_converter = JsonValueConverter(rows, childPath, arrayLevels); for (int i = 0; i < type.num_fields(); ++i) { - std::shared_ptr child_field = type.field(i); - std::shared_ptr child_builder = builder->child_builder(i); + std::shared_ptr childField = type.field(i); + std::shared_ptr childBuilder = builder->child_builder(i); - ARROW_RETURN_NOT_OK(child_converter.Convert(*child_field.get(), child_builder.get())); + ARROW_RETURN_NOT_OK(child_converter.Convert(*childField.get(), childBuilder.get())); } // Make null bitunordered_map - for (const auto &maybe_value : FieldValues()) + for (const auto &maybeValue : FieldValues()) { - ARROW_ASSIGN_OR_RAISE(auto value, maybe_value); + ARROW_ASSIGN_OR_RAISE(auto value, maybeValue); ARROW_RETURN_NOT_OK(builder->Append(!value->IsNull())); } @@ -723,30 +725,29 @@ class JsonValueConverter arrow::Status Visit(const arrow::ListType &type) { - arrow::ListBuilder *builder = static_cast(builder_); - - // Values and offsets needs to be interleaved in ListBuilder, so first collect the - // values - std::unique_ptr tmp_value_builder; - ARROW_ASSIGN_OR_RAISE(tmp_value_builder, arrow::MakeBuilder(builder->value_builder()->type())); - std::vector child_path(root_path_); - child_path.push_back(field_name_); - auto child_converter = JsonValueConverter(rows_, child_path, array_levels_ + 1); - ARROW_RETURN_NOT_OK(child_converter.Convert(*type.value_field().get(), "", tmp_value_builder.get())); - - std::shared_ptr values_array; - ARROW_RETURN_NOT_OK(tmp_value_builder->Finish(&values_array)); - std::shared_ptr values_data = values_array->data(); - - arrow::ArrayBuilder *value_builder = builder->value_builder(); + arrow::ListBuilder *builder = static_cast(arrayBuilder); + + // Values and offsets needs to be interleaved in ListBuilder, so first collect the values + std::unique_ptr tmpValueBuilder; + ARROW_ASSIGN_OR_RAISE(tmpValueBuilder, arrow::MakeBuilder(builder->value_builder()->type())); + std::vector childPath(rootPath); + childPath.push_back(fieldName); + auto child_converter = JsonValueConverter(rows, childPath, arrayLevels + 1); + ARROW_RETURN_NOT_OK(child_converter.Convert(*type.value_field().get(), "", tmpValueBuilder.get())); + + std::shared_ptr valuesArray; + ARROW_RETURN_NOT_OK(tmpValueBuilder->Finish(&valuesArray)); + std::shared_ptr valuesData = valuesArray->data(); + + arrow::ArrayBuilder *valueBuilder = builder->value_builder(); int64_t offset = 0; - for (const auto &maybe_value : FieldValues()) + for (const auto &maybeValue : FieldValues()) { - ARROW_ASSIGN_OR_RAISE(auto value, maybe_value); + ARROW_ASSIGN_OR_RAISE(auto value, maybeValue); ARROW_RETURN_NOT_OK(builder->Append(!value->IsNull())); if (!value->IsNull() && value->Size() > 0) { - ARROW_RETURN_NOT_OK(value_builder->AppendArraySlice(*values_data.get(), offset, value->Size())); + ARROW_RETURN_NOT_OK(valueBuilder->AppendArraySlice(*valuesData.get(), offset, value->Size())); offset += value->Size(); } } @@ -755,22 +756,22 @@ class JsonValueConverter } private: - std::string field_name_; - arrow::ArrayBuilder *builder_; - const std::vector &rows_; - std::vector root_path_; - int64_t array_levels_ = 0; + std::string fieldName; + arrow::ArrayBuilder *arrayBuilder = nullptr; + const std::vector &rows; + std::vector rootPath; + int64_t arrayLevels = 0; - /// Return a flattened iterator over values at nested location + // Return a flattened iterator over values at nested location arrow::Iterator FieldValues() { - std::vector path(root_path_); - if (field_name_.size() > 0) + std::vector path(rootPath); + if (fieldName.size() > 0) { - path.push_back(field_name_); + path.push_back(fieldName); } - auto iter = DocValuesIterator(rows_, std::move(path), array_levels_); + auto iter = DocValuesIterator(rows, std::move(path), arrayLevels); auto fn = [iter]() mutable -> arrow::Result { return iter.Next(); }; @@ -778,76 +779,95 @@ class JsonValueConverter } }; +using TableColumns = std::unordered_map>; + /** - * @brief ParquetHelper holds the inputs from the user, the file stream objects, function for setting the schema, and functions - * for opening parquet files. + * @brief Opens and reads Parquet files and partitioned datasets. The ParquetReader processes a file + * based on the path passed in via location. processReadFile opens the file and sets the reader up to read rows. + * The next function returns the index to read and the table to read from. shouldRead will return true as long as + * the worker can read another row. */ -class ParquetHelper +class ParquetReader { public: - ParquetHelper(const char *option, const char *_location, const char *destination, int _rowSize, int _batchSize, bool _overwrite, arrow::Compression::type _compressionOption, const char *_partitionFields, const IThorActivityContext *_activityCtx); - ~ParquetHelper(); - std::shared_ptr getSchema(); - arrow::Status checkDirContents(); - arrow::Status openWriteFile(); + ParquetReader(const char *option, const char *_location, int _maxRowCountInTable, const char *_partitionFields, const IThorActivityContext *_activityCtx); + ~ParquetReader(); arrow::Status openReadFile(); arrow::Status processReadFile(); + void splitTable(std::shared_ptr &table); + bool shouldRead(); + __int64 next(TableColumns *&nextTable); + std::shared_ptr queryCurrentTable(__int64 currTable); + arrow::Result> queryRows(); + +private: + __int64 tablesProcessed = 0; // The number of tables processed when reading parquet files. + __int64 totalRowsProcessed = 0; // Total number of rows processed of partitioned dataset. We cannot get the total number of chunks and they are variable sizes. + __int64 totalRowCount = 0; // Total number of rows in a partition dataset. + __int64 startRow = 0; // The starting row in a partitioned dataset. + __int64 rowsProcessed = 0; // Current Row that has been read from the RowGroup. + __int64 startRowGroup = 0; // The beginning RowGroup that is read by a worker. + __int64 tableCount = 0; // The number of RowGroups to be read by the worker from the file that was opened for reading. + __int64 rowsCount = 0; // The number of result rows in a given RowGroup read from the parquet file. + size_t maxRowCountInTable = 0; // Max table size set by user. + std::string partOption; // Begins with either read or write and ends with the partitioning type if there is one i.e. 'readhivepartition'. + std::string location; // Full path to location for reading parquet files. Can be a filename or directory. + const IThorActivityContext *activityCtx = nullptr; // Context about the thor worker configuration. + std::shared_ptr scanner = nullptr; // Scanner for reading through partitioned files. + std::shared_ptr rbatchReader = nullptr; // RecordBatchReader reads a dataset one record batch at a time. Must be kept alive for rbatchItr. + arrow::RecordBatchReader::RecordBatchReaderIterator rbatchItr; // Iterator of RecordBatches when reading a partitioned dataset. + std::vector<__int64> fileTableCounts; // Count of RowGroups in each open file to get the correct row group when reading specific parts of the file. + std::vector> parquetFileReaders; // Vector of FileReaders that match the target file name. data0.parquet, data1.parquet, etc. + TableColumns parquetTable; // The current table being read broken up into columns. Unordered map where the left side is a string of the field name and the right side is an array of the values. + std::vector partitionFields; // The partitioning schema for reading Directory Partitioned files. + arrow::MemoryPool *pool = nullptr; // Memory pool for reading parquet files. +}; + +/** + * @brief Opens and writes to a Parquet file or writes RecordBatches to a partioned dataset. The ParquetWriter checks the destination + * for existing data on construction and will fail if there are prexisiting files. If the overwrite option is set to true the data in + * target directory or matching the file mask will be deleted and writing will continue. openWriteFile opens the write file or sets the + * partitioning options. writeRecordBatch utilizes the open write streams and writes the data to the target location. + */ +class ParquetWriter +{ +public: + ParquetWriter(const char *option, const char *_destination, int _maxRowCountInBatch, bool _overwrite, arrow::Compression::type _compressionOption, const char *_partitionFields, const IThorActivityContext *_activityCtx); + ~ParquetWriter(); + arrow::Status openWriteFile(); arrow::Status writePartition(std::shared_ptr table); - parquet::arrow::FileWriter *queryWriter(); - void chunkTable(std::shared_ptr &table); + void writeRecordBatch(); + void writeRecordBatch(std::size_t newSize); rapidjson::Value *queryCurrentRow(); void updateRow(); - std::vector &queryRecordBatch(); - bool partSetting(); - __int64 getMaxRowSize(); - char queryPartOptions(); - bool shouldRead(); - __int64 &getRowsProcessed(); arrow::Result> convertToRecordBatch(const std::vector &rows, std::shared_ptr schema); - std::unordered_map> &next(); - std::shared_ptr queryCurrentTable(__int64 currTable); - arrow::Result> queryRows(); - __int64 queryRowsCount(); std::shared_ptr makeChildRecord(const RtlFieldInfo *field); - arrow::Status fieldToNode(const std::string &name, const RtlFieldInfo *field, std::vector> &arrow_fields); + arrow::Status fieldToNode(const std::string &name, const RtlFieldInfo *field, std::vector> &arrowFields); arrow::Status fieldsToSchema(const RtlTypeInfo *typeInfo); void beginSet(); void beginRow(); void endRow(const char *name); + void addMember(rapidjson::Value &key, rapidjson::Value &value); + arrow::Status checkDirContents(); + __int64 getMaxRowSize() {return maxRowCountInBatch;} private: __int64 currentRow = 0; - __int64 rowSize = 0; // The maximum size of each parquet row group. - __int64 tablesProcessed = 0; // Current RowGroup that has been read from the input file. - __int64 totalRowsProcessed = 0; // Total number of rows processed of partitioned dataset. We cannot get the total number of chunks and they are variable sizes. - __int64 totalRowCount = 0; // Total number of rows in a partition dataset. - __int64 startRow = 0; // The starting row in a partitioned dataset. - __int64 rowsProcessed = 0; // Current Row that has been read from the RowGroup - __int64 startRowGroup = 0; // The beginning RowGroup that is read by a worker - __int64 tableCount = 0; // The number of RowGroups to be read by the worker from the file that was opened for reading. - __int64 rowsCount = 0; // The number of result rows in a given RowGroup read from the parquet file. - size_t batchSize = 0; // batchSize for converting Parquet Columns to ECL rows. It is more efficient to break the data into small batches for converting to rows than to convert all at once. - bool partition; // Boolean variable to track whether we are writing partitioned files or not. - bool overwrite = false; // Overwrite option specified by the user. If true the plugin will overwrite files that are already exisiting. Default is false. - std::string partOption; // Read, r, Write, w, option for specifying parquet operation. - std::string location; // Location to read parquet file from. - std::string destination; // Destination to write parquet file to. - const IThorActivityContext *activityCtx; // Additional local context information - std::shared_ptr schema = nullptr; // Schema object that holds the schema of the file for reading and writing - std::unique_ptr writer = nullptr; // FileWriter for writing to parquet files. - std::vector parquetDoc; // Document vector for converting rows to columns for writing to parquet files. - std::vector rowStack; // Stack for keeping track of the context when building a nested row. - std::shared_ptr scanner = nullptr; // Scanner for reading through partitioned files. PARTITION - arrow::dataset::FileSystemDatasetWriteOptions writeOptions; // Write options for writing partitioned files. PARTITION - arrow::Compression::type compressionOption = arrow::Compression::type::UNCOMPRESSED; - std::shared_ptr partitionType = nullptr; - std::vector partitionFields; - std::shared_ptr rbatchReader = nullptr; - arrow::RecordBatchReader::RecordBatchReaderIterator rbatchItr; - std::vector<__int64> fileTableCounts; - std::vector> parquetFileReaders; - std::unordered_map> parquetTable; - arrow::MemoryPool *pool = nullptr; + __int64 maxRowCountInBatch = 0; // The maximum size of each parquet row group. + __int64 tablesProcessed = 0; // Current RowGroup that has been read from the input file. + std::string partOption; // Begins with either read or write and ends with the partitioning type if there is one i.e. 'readhivepartition'. + std::string destination; // Full path to destination for writing parquet files. Can be a filename or directory. + bool overwrite = false; // Overwrite option specified by the user. If true the plugin will overwrite files that are already exisiting. Default is false. + const IThorActivityContext *activityCtx = nullptr; // Context about the thor worker configuration. + std::shared_ptr schema = nullptr; // Arrow Schema holding the rtlTypeInfo when writing to parquet files. + std::unique_ptr writer = nullptr; // FileWriter for writing to single parquet files. + std::vector parquetDoc; // Document vector for converting rows to columns for writing to parquet files. + std::vector rowStack; // Stack for keeping track of the context when building a nested row. + arrow::dataset::FileSystemDatasetWriteOptions writeOptions; // Write options for writing partitioned files. + arrow::Compression::type compressionOption = arrow::Compression::type::UNCOMPRESSED; // The compression type set by the user for compressing files on write. + std::shared_ptr partitionType = nullptr; // The partition type with the partitioning schema for creating a dataset. + std::vector partitionFields; // The partitioning schema. + arrow::MemoryPool *pool = nullptr; // Memory pool for writing parquet files. }; /** @@ -857,7 +877,8 @@ class ParquetHelper class ParquetRowStream : public RtlCInterface, implements IRowStream { public: - ParquetRowStream(IEngineRowAllocator *_resultAllocator, std::shared_ptr _parquet); + ParquetRowStream(IEngineRowAllocator *_resultAllocator, std::shared_ptr _parquetReader) + : resultAllocator(_resultAllocator), parquetReader(_parquetReader) {} virtual ~ParquetRowStream() = default; RTLIMPLEMENT_IINTERFACE @@ -865,12 +886,10 @@ class ParquetRowStream : public RtlCInterface, implements IRowStream virtual void stop() override; private: - Linked m_resultAllocator; //! Pointer to allocator used when building result rows. - bool m_shouldRead = true; //! If true, we should continue trying to read more messages. - __int64 m_currentRow = 0; //! Current result row. - __int64 rowsCount; //! Number of result rows read from parquet file. - std::shared_ptr array_visitor = nullptr; - std::shared_ptr s_parquet = nullptr; //! Shared pointer to ParquetHelper class for the stream class. + Linked resultAllocator; // Pointer to allocator used when building result rows. + bool shouldRead = true; // If true, we should continue trying to read more messages. + __int64 currentRow = 0; // Current result row. + std::shared_ptr parquetReader = nullptr; // Parquet file reader. }; /** @@ -880,11 +899,9 @@ class ParquetRowStream : public RtlCInterface, implements IRowStream class ParquetRowBuilder : public CInterfaceOf { public: - ParquetRowBuilder(std::unordered_map> *_result_rows, int64_t _currentRow, std::shared_ptr *_array_visitor) - : result_rows(_result_rows), currentRow(_currentRow), array_visitor(_array_visitor) {} - + ParquetRowBuilder(TableColumns *_resultRows, int64_t _currentRow) + : resultRows(_resultRows), currentRow(_currentRow) {} virtual ~ParquetRowBuilder() = default; - virtual bool getBooleanResult(const RtlFieldInfo *field) override; virtual void getDataResult(const RtlFieldInfo *field, size32_t &len, void *&result) override; virtual double getRealResult(const RtlFieldInfo *field) override; @@ -904,7 +921,6 @@ class ParquetRowBuilder : public CInterfaceOf virtual void processEndRow(const RtlFieldInfo *field) override; protected: - const std::shared_ptr &getChunk(std::shared_ptr *column); std::string_view getCurrView(const RtlFieldInfo *field); __int64 getCurrIntValue(const RtlFieldInfo *field); double getCurrRealValue(const RtlFieldInfo *field); @@ -914,13 +930,13 @@ class ParquetRowBuilder : public CInterfaceOf int64_t currArrayIndex(); private: - __int64 currentRow; - TokenDeserializer tokenDeserializer; - TokenSerializer tokenSerializer; - StringBuffer serialized; - std::unordered_map> *result_rows; - std::vector m_pathStack; - std::shared_ptr *array_visitor; + __int64 currentRow; // The index in the arrow Array to read the current value. + TokenDeserializer tokenDeserializer; // Deseralize string type values to numeric types when returning results. + TokenSerializer tokenSerializer; // serialize numeric types to string types when returning results. + StringBuffer serialized; // Output string from serialization. + TableColumns *resultRows = nullptr; // A pointer to the result rows map where the left side are the field names for the columns and the right is an array of values. + std::vector pathStack; // PathTracker keeps track of nested data when reading sets. + std::shared_ptr arrayVisitor; // Visitor class for getting the correct type when reading a Parquet column. }; /** @@ -930,15 +946,9 @@ class ParquetRowBuilder : public CInterfaceOf class ParquetRecordBinder : public CInterfaceOf { public: - ParquetRecordBinder(const IContextLogger &_logctx, const RtlTypeInfo *_typeInfo, int _firstParam, std::shared_ptr _parquet) - : logctx(_logctx), typeInfo(_typeInfo), firstParam(_firstParam), dummyField("", NULL, typeInfo), thisParam(_firstParam) - { - r_parquet = _parquet; - partition = _parquet->partSetting(); - } - + ParquetRecordBinder(const IContextLogger &_logctx, const RtlTypeInfo *_typeInfo, int _firstParam, std::shared_ptr _parquetWriter) + : logctx(_logctx), typeInfo(_typeInfo), firstParam(_firstParam), dummyField("", NULL, typeInfo), thisParam(_firstParam), parquetWriter(_parquetWriter) {} virtual ~ParquetRecordBinder() = default; - int numFields(); void processRow(const byte *row); virtual void processString(unsigned len, const char *value, const RtlFieldInfo *field); @@ -952,13 +962,12 @@ class ParquetRecordBinder : public CInterfaceOf { UNSUPPORTED("UNSIGNED decimals"); } - virtual void processUnicode(unsigned chars, const UChar *value, const RtlFieldInfo *field); virtual void processQString(unsigned len, const char *value, const RtlFieldInfo *field); virtual void processUtf8(unsigned chars, const char *value, const RtlFieldInfo *field); virtual bool processBeginSet(const RtlFieldInfo *field, unsigned numElements, bool isAll, const byte *data) { - r_parquet->beginSet(); + parquetWriter->beginSet(); return true; } virtual bool processBeginDataset(const RtlFieldInfo *field, unsigned rowsCount) @@ -972,12 +981,12 @@ class ParquetRecordBinder : public CInterfaceOf // copy over the members of the rapidjson value. // TO DO // Create a json string of all the fields which will be much more performant. - r_parquet->beginRow(); + parquetWriter->beginRow(); return true; } virtual void processEndSet(const RtlFieldInfo *field) { - r_parquet->endRow(field->name); + parquetWriter->endRow(field->name); } virtual void processEndDataset(const RtlFieldInfo *field) { @@ -985,7 +994,7 @@ class ParquetRecordBinder : public CInterfaceOf } virtual void processEndRow(const RtlFieldInfo *field) { - r_parquet->endRow(field->name); + parquetWriter->endRow(field->name); } protected: @@ -996,10 +1005,8 @@ class ParquetRecordBinder : public CInterfaceOf int firstParam; RtlFieldStrInfo dummyField; int thisParam; - TokenSerializer m_tokenSerializer; - - std::shared_ptr r_parquet; - bool partition; //! Local copy of a boolean so we can know if we are writing partitioned files or not. + TokenSerializer tokenSerializer; + std::shared_ptr parquetWriter; }; /** @@ -1012,27 +1019,25 @@ class ParquetDatasetBinder : public ParquetRecordBinder /** * @brief Construct a new ParquetDataset Binder object * - * @param _logctx logger for building the dataset. + * @param _logctx Logger for building the dataset. * @param _input Stream of input of dataset. * @param _typeInfo Field type info. * @param _query Holds the builder object for creating the documents. * @param _firstParam Index of the first param. */ - ParquetDatasetBinder(const IContextLogger &_logctx, IRowStream *_input, const RtlTypeInfo *_typeInfo, std::shared_ptr _parquet, int _firstParam) - : input(_input), ParquetRecordBinder(_logctx, _typeInfo, _firstParam, _parquet) + ParquetDatasetBinder(const IContextLogger &_logctx, IRowStream *_input, const RtlTypeInfo *_typeInfo, std::shared_ptr _parquetWriter, int _firstParam) + : input(_input), parquetWriter(_parquetWriter), ParquetRecordBinder(_logctx, _typeInfo, _firstParam, _parquetWriter) { - d_parquet = _parquet; - reportIfFailure(d_parquet->fieldsToSchema(_typeInfo)); + reportIfFailure(parquetWriter->fieldsToSchema(_typeInfo)); } virtual ~ParquetDatasetBinder() = default; void getFieldTypes(const RtlTypeInfo *typeInfo); bool bindNext(); - void writeRecordBatch(); void executeAll(); protected: Owned input; - std::shared_ptr d_parquet; //! Helper object for keeping track of read and write options, schema, and file names. + std::shared_ptr parquetWriter; // Parquet file writer. }; /** @@ -1107,18 +1112,14 @@ class ParquetEmbedFunctionContext : public CInterfaceOf protected: void execute(); unsigned checkNextParam(const char *name); - const IContextLogger &logctx; - Owned m_resultrow; - - Owned m_oInputStream; //! Input Stream used for building a dataset. - TokenDeserializer tokenDeserializer; - TokenSerializer m_tokenSerializer; - unsigned m_nextParam = 0; //! Index of the next parameter to process. - unsigned m_numParams = 0; //! Number of parameters in the function definition. - unsigned m_scriptFlags; //! Count of flags raised by embedded script. - - std::shared_ptr m_parquet; //! Helper object for keeping track of read and write options, schema, and file names. + const IContextLogger &logctx; + Owned oInputStream; // Input Stream used for building a dataset. + unsigned nextParam = 0; // Index of the next parameter to process. + unsigned numParams = 0; // Number of parameters in the function definition. + unsigned scriptFlags; // Count of flags raised by embedded script. + std::shared_ptr parquetReader = nullptr; // Parquet File Reader + std::shared_ptr parquetWriter = nullptr; // Parquet File Writer }; } #endif