Skip to content

Commit

Permalink
changed rowSize to maxRowCountInBatch
Browse files Browse the repository at this point in the history
  • Loading branch information
jackdelv committed Nov 14, 2023
1 parent e48d9eb commit df1eb68
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 17 deletions.
28 changes: 14 additions & 14 deletions plugins/parquet/parquetembed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -410,16 +410,16 @@ __int64 ParquetReader::next(TableColumns *&nextTable)
*
* @param option The read or write option as well as information about partitioning.
* @param _destination The full path for which to write a Parquet file or partitioned dataset.
* @param _rowSize The max number of rows when creating RecordBatches for output.
* @param _maxRowCountInBatch The max number of rows when creating RecordBatches for output.
* @param _overwrite If true when the plugin calls checkDirContents the target directory contents will be deleted.
* @param _compressionOption Compression option for writing compressed Parquet files of different types.
* @param _activityCtx Additional context about the thor workers running.
*/
ParquetWriter::ParquetWriter(const char *option, const char *_destination, int _rowSize, bool _overwrite, arrow::Compression::type _compressionOption, const char *_partitionFields, const IThorActivityContext *_activityCtx)
: partOption(option), destination(_destination), rowSize(_rowSize), overwrite(_overwrite), compressionOption(_compressionOption), activityCtx(_activityCtx)
ParquetWriter::ParquetWriter(const char *option, const char *_destination, int _maxRowCountInBatch, bool _overwrite, arrow::Compression::type _compressionOption, const char *_partitionFields, const IThorActivityContext *_activityCtx)
: partOption(option), destination(_destination), maxRowCountInBatch(_maxRowCountInBatch), overwrite(_overwrite), compressionOption(_compressionOption), activityCtx(_activityCtx)
{
pool = arrow::default_memory_pool();
parquetDoc = std::vector<rapidjson::Document>(rowSize);
parquetDoc = std::vector<rapidjson::Document>(maxRowCountInBatch);
if (activityCtx->querySlave() == 0 && startsWithIgnoreCase(partOption.c_str(), "write"))
{
reportIfFailure(checkDirContents());
Expand Down Expand Up @@ -559,11 +559,11 @@ rapidjson::Value *ParquetWriter::queryCurrentRow()

/**
* @brief A helper method for updating the current row on writes and keeping
* it within the boundary of the rowSize set by the user when creating RowGroups.
* it within the boundary of the maxRowCountInBatch set by the user when creating RowGroups.
*/
void ParquetWriter::updateRow()
{
if (++currentRow == rowSize)
if (++currentRow == maxRowCountInBatch)
currentRow = 0;
}

Expand Down Expand Up @@ -1769,8 +1769,8 @@ ParquetEmbedFunctionContext::ParquetEmbedFunctionContext(const IContextLogger &_
const char *location = ""; // file name and location of where to write Parquet file
const char *destination = ""; // file name and location of where to read Parquet file from
const char *partitionFields = ""; // Semicolon delimited values containing fields to partition files on
__int64 rowSize = 40000; // Size of the row groups when writing to Parquet files
__int64 batchSize = 40000; // Size of the batches when converting Parquet columns to rows
__int64 maxRowCountInBatch = 40000; // number of rows in the row groups when writing to Parquet files
__int64 batchSize = 40000; // number of rows in the batches when converting Parquet columns to rows
bool overwrite = false; // If true overwrite file with no error. The default is false and will throw an error if the file already exists.
arrow::Compression::type compressionOption = arrow::Compression::UNCOMPRESSED; // Compression option set by the user and defaults to UNCOMPRESSED.

Expand All @@ -1792,7 +1792,7 @@ ParquetEmbedFunctionContext::ParquetEmbedFunctionContext(const IContextLogger &_
else if (stricmp(optName, "destination") == 0)
destination = val;
else if (stricmp(optName, "MaxRowSize") == 0)
rowSize = atoi(val);
maxRowCountInBatch = atoi(val);
else if (stricmp(optName, "BatchSize") == 0)
batchSize = atoi(val);
else if (stricmp(optName, "overwriteOpt") == 0)
Expand Down Expand Up @@ -1830,7 +1830,7 @@ ParquetEmbedFunctionContext::ParquetEmbedFunctionContext(const IContextLogger &_
}
else if (startsWithIgnoreCase(option, "write"))
{
parquetWriter = std::make_shared<ParquetWriter>(option, destination, rowSize, overwrite, compressionOption, partitionFields, activityCtx);
parquetWriter = std::make_shared<ParquetWriter>(option, destination, maxRowCountInBatch, overwrite, compressionOption, partitionFields, activityCtx);
}
else
{
Expand Down Expand Up @@ -2077,10 +2077,10 @@ void ParquetDatasetBinder::executeAll()
reportIfFailure(parquetWriter->openWriteFile());

int i = 1;
int rowSize = parquetWriter->getMaxRowSize();
int maxRowCountInBatch = parquetWriter->getMaxRowSize();
do
{
if (i % rowSize == 0)
if (i % maxRowCountInBatch == 0)
{
parquetWriter->writeRecordBatch();
jsonAlloc.Clear();
Expand All @@ -2091,9 +2091,9 @@ void ParquetDatasetBinder::executeAll()
while (bindNext());

i--;
if (i % rowSize != 0)
if (i % maxRowCountInBatch != 0)
{
parquetWriter->writeRecordBatch(i % rowSize);
parquetWriter->writeRecordBatch(i % maxRowCountInBatch);
jsonAlloc.Clear();
}
}
Expand Down
6 changes: 3 additions & 3 deletions plugins/parquet/parquetembed.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ class ParquetReader
class ParquetWriter
{
public:
ParquetWriter(const char *option, const char *_destination, int _rowSize, bool _overwrite, arrow::Compression::type _compressionOption, const char *_partitionFields, const IThorActivityContext *_activityCtx);
ParquetWriter(const char *option, const char *_destination, int _maxRowCountInBatch, bool _overwrite, arrow::Compression::type _compressionOption, const char *_partitionFields, const IThorActivityContext *_activityCtx);
~ParquetWriter();
arrow::Status openWriteFile();
arrow::Status writePartition(std::shared_ptr<arrow::Table> table);
Expand All @@ -850,11 +850,11 @@ class ParquetWriter
void endRow(const char *name);
void addMember(rapidjson::Value &key, rapidjson::Value &value);
arrow::Status checkDirContents();
__int64 getMaxRowSize() {return rowSize;}
__int64 getMaxRowSize() {return maxRowCountInBatch;}

private:
__int64 currentRow = 0;
__int64 rowSize = 0; // The maximum size of each parquet row group.
__int64 maxRowCountInBatch = 0; // The maximum size of each parquet row group.
__int64 tablesProcessed = 0; // Current RowGroup that has been read from the input file.
std::string partOption; // begins with either read or write and ends with the partitioning type if there is one i.e. 'readhivepartition'.
std::string destination; // Full path to destination for writing parquet files.
Expand Down

0 comments on commit df1eb68

Please sign in to comment.