From 4980254e720eedc4f26cb8e8f3f5bc7f26ed96f5 Mon Sep 17 00:00:00 2001 From: Eric Lidwa Date: Mon, 8 Apr 2024 23:37:09 +0000 Subject: [PATCH 1/4] support for arrow parms, csv --- packages/arrow/ArrowSamplerImpl.cpp | 473 +++++++++++++++------- packages/arrow/ArrowSamplerImpl.h | 25 +- packages/arrow/ParquetSampler.cpp | 58 +-- packages/arrow/ParquetSampler.h | 38 +- scripts/selftests/atl06_10rows.geoparquet | Bin 0 -> 15246 bytes scripts/selftests/atl06_10rows.parquet | Bin 14891 -> 13876 bytes scripts/selftests/parquet_sampler.lua | 6 +- 7 files changed, 401 insertions(+), 199 deletions(-) create mode 100644 scripts/selftests/atl06_10rows.geoparquet diff --git a/packages/arrow/ArrowSamplerImpl.cpp b/packages/arrow/ArrowSamplerImpl.cpp index b1b0bc234..834f8a7b7 100644 --- a/packages/arrow/ArrowSamplerImpl.cpp +++ b/packages/arrow/ArrowSamplerImpl.cpp @@ -42,9 +42,11 @@ #include "ArrowSamplerImpl.h" +#include #include #include #include +#include /****************************************************************************** @@ -66,111 +68,152 @@ ArrowSamplerImpl::~ArrowSamplerImpl (void) { } - +/*---------------------------------------------------------------------------- +* openInputFile +*----------------------------------------------------------------------------*/ +void ArrowSamplerImpl::openInputFile(const char* file_path) +{ + PARQUET_ASSIGN_OR_THROW(inputFile, arrow::io::ReadableFile::Open(file_path, arrow::default_memory_pool())); + PARQUET_THROW_NOT_OK(parquet::arrow::OpenFile(inputFile, arrow::default_memory_pool(), &reader)); +} /*---------------------------------------------------------------------------- -* getpoints +* getMetadata *----------------------------------------------------------------------------*/ -void ArrowSamplerImpl::getPointsFromFile(const char* file_path, std::vector& points) +void ArrowSamplerImpl::getMetadata(ParquetSampler::record_info_t& recInfo) { - const char* geocol = "geometry"; - const char* timecol = "time"; - std::vector columnNames = {geocol, timecol}; + bool foundRecInfo = false; + std::shared_ptr file_metadata = reader->parquet_reader()->metadata(); - std::shared_ptr table = parquetFileToTable(file_path, columnNames); - int geometry_column_index = table->schema()->GetFieldIndex(geocol); - if(geometry_column_index == -1) + for(int i = 0; i < file_metadata->key_value_metadata()->size(); i++) { - throw RunTimeException(ERROR, RTE_ERROR, "Geometry column not found."); + std::string key = file_metadata->key_value_metadata()->key(i); + std::string value = file_metadata->key_value_metadata()->value(i); + + if(key == "sliderule") + { + rapidjson::Document doc; + doc.Parse(value.c_str()); + if(doc.HasParseError()) + { + throw RunTimeException(CRITICAL, RTE_ERROR, "Failed to parse metadata JSON: %s", value.c_str()); + } + + if(doc.HasMember("recordinfo")) + { + const rapidjson::Value& recordinfo = doc["recordinfo"]; + + recInfo.timeKey = StringLib::duplicate(recordinfo["time"].GetString()); + recInfo.asGeo = recordinfo["as_geo"].GetBool(); + recInfo.xKey = StringLib::duplicate(recordinfo["x"].GetString()); + recInfo.yKey = StringLib::duplicate(recordinfo["y"].GetString()); + foundRecInfo = true; + break; + } + else throw RunTimeException(CRITICAL, RTE_ERROR, "No 'recordinfo' key found in 'sliderule' metadata."); + } } - auto geometry_column = std::static_pointer_cast(table->column(geometry_column_index)->chunk(0)); - mlog(DEBUG, "Geometry column elements: %ld", geometry_column->length()); + if(!foundRecInfo) + { + throw RunTimeException(CRITICAL, RTE_ERROR, "No 'sliderule/recordinfo' metadata found in parquet file."); + } +} - /* The geometry column is binary type */ - auto binary_array = std::static_pointer_cast(geometry_column); - /* Iterate over each item in the geometry column and extract points */ - for(int64_t i = 0; i < binary_array->length(); i++) +/*---------------------------------------------------------------------------- +* getpoints +*----------------------------------------------------------------------------*/ +void ArrowSamplerImpl::getPoints(ParquetSampler::record_info_t& recInfo, std::vector& points) +{ + if(recInfo.asGeo) { - std::string wkb_data = binary_array->GetString(i); /* Get WKB data as string (binary data) */ - wkbpoint_t point = convertWKBToPoint(wkb_data); - ParquetSampler::point_info_t* pinfo = new ParquetSampler::point_info_t(point.x, point.y, 0.0); - points.push_back(pinfo); + getGeoPoints(points); + } + else + { + getXYPoints(recInfo, points); } - int time_column_index = table->schema()->GetFieldIndex(timecol); + std::vector columnNames = {recInfo.timeKey}; + std::shared_ptr table = inputFileToTable(columnNames); + int time_column_index = table->schema()->GetFieldIndex(recInfo.timeKey); if(time_column_index > -1) { - /* If time column exists it will have the same length as the geometry column */ auto time_column = std::static_pointer_cast(table->column(time_column_index)->chunk(0)); mlog(DEBUG, "Time column elements: %ld", time_column->length()); /* Update gps time for each point */ for(int64_t i = 0; i < time_column->length(); i++) { - points[i]->gps_time = time_column->Value(i); + points[i]->gps = time_column->Value(i); } } else mlog(DEBUG, "Time column not found."); } /*---------------------------------------------------------------------------- -* createParquetFile +* getXYPoints *----------------------------------------------------------------------------*/ -void ArrowSamplerImpl::createParquetFile(const char* input_file, const char* output_file) +void ArrowSamplerImpl::getXYPoints(ParquetSampler::record_info_t& recInfo, std::vector& points) { - /* Read the input file */ - std::shared_ptr table = parquetFileToTable(input_file); + std::vector columnNames = {recInfo.xKey, recInfo.yKey}; - std::vector> fields = table->schema()->fields(); - std::vector> columns = table->columns(); + std::shared_ptr table = inputFileToTable(columnNames); + int x_column_index = table->schema()->GetFieldIndex(recInfo.xKey); + if(x_column_index == -1) + { + throw RunTimeException(ERROR, RTE_ERROR, "X column not found."); + } - /* Append new columns to the table */ - mutex.lock(); + int y_column_index = table->schema()->GetFieldIndex(recInfo.yKey); + if(y_column_index == -1) { - fields.insert(fields.end(), new_fields.begin(), new_fields.end()); - columns.insert(columns.end(), new_columns.begin(), new_columns.end()); + throw RunTimeException(ERROR, RTE_ERROR, "Y column not found."); } - mutex.unlock(); - auto metadata = table->schema()->metadata()->Copy(); + auto x_column = std::static_pointer_cast(table->column(x_column_index)->chunk(0)); + auto y_column = std::static_pointer_cast(table->column(y_column_index)->chunk(0)); - /* - * Pandas metadata does not contain information about the new columns. - * Pandas and geopands can use/read the file just fine without pandas custom metadata. - * Removing it is a lot easier than updating it. - */ - const std::string pandas_key = "pandas"; - if(metadata->Contains(pandas_key)) + /* x and y columns are the same longth */ + for(int64_t i = 0; i < x_column->length(); i++) { - int key_index = metadata->FindKey(pandas_key); - if(key_index != -1) - { - PARQUET_THROW_NOT_OK(metadata->Delete(key_index)); - } + points[i]->x = x_column->Value(i); + points[i]->y = y_column->Value(i); } +} - /* Create a filemap metadata */ - PARQUET_THROW_NOT_OK(metadata->Set("filemap", createFileMap())); - - /* Attach metadata to the new schema */ - auto combined_schema = std::make_shared(fields); - combined_schema = combined_schema->WithMetadata(metadata); +/*---------------------------------------------------------------------------- +* getGeoPoints +*----------------------------------------------------------------------------*/ +void ArrowSamplerImpl::getGeoPoints(std::vector& points) +{ + const char* geocol = "geometry"; + std::vector columnNames = {geocol}; - /* Create a new table with the combined schema and columns */ - auto updated_table = arrow::Table::Make(combined_schema, columns); + std::shared_ptr table = inputFileToTable(columnNames); + int geometry_column_index = table->schema()->GetFieldIndex(geocol); + if(geometry_column_index == -1) + { + throw RunTimeException(ERROR, RTE_ERROR, "Geometry column not found."); + } - mlog(DEBUG, "Table was %ld rows and %d columns.", table->num_rows(), table->num_columns()); - mlog(DEBUG, "Table is %ld rows and %d columns.", updated_table->num_rows(), updated_table->num_columns()); + auto geometry_column = std::static_pointer_cast(table->column(geometry_column_index)->chunk(0)); + mlog(DEBUG, "Geometry column elements: %ld", geometry_column->length()); - tableToParquetFile(updated_table, output_file); + /* The geometry column is binary type */ + auto binary_array = std::static_pointer_cast(geometry_column); - // printParquetMetadata(input_file); - // printParquetMetadata(output_file); + /* Iterate over each item in the geometry column and extract points */ + for(int64_t i = 0; i < binary_array->length(); i++) + { + std::string wkb_data = binary_array->GetString(i); /* Get WKB data as string (binary data) */ + wkbpoint_t point = convertWKBToPoint(wkb_data); + ParquetSampler::point_info_t* pinfo = new ParquetSampler::point_info_t({point.x, point.y, 0.0}); + points.push_back(pinfo); + } } - /*---------------------------------------------------------------------------- * processSamples *----------------------------------------------------------------------------*/ @@ -314,41 +357,41 @@ bool ArrowSamplerImpl::processSamples(ParquetSampler::sampler_t* sampler) mutex.lock(); { /* Add new columns fields */ - new_fields.push_back(value_field); - new_fields.push_back(time_field); + newFields.push_back(value_field); + newFields.push_back(time_field); if(robj->hasFlags()) { - new_fields.push_back(flags_field); + newFields.push_back(flags_field); } - new_fields.push_back(fileid_field); + newFields.push_back(fileid_field); if(robj->hasZonalStats()) { - new_fields.push_back(count_field); - new_fields.push_back(min_field); - new_fields.push_back(max_field); - new_fields.push_back(mean_field); - new_fields.push_back(median_field); - new_fields.push_back(stdev_field); - new_fields.push_back(mad_field); + newFields.push_back(count_field); + newFields.push_back(min_field); + newFields.push_back(max_field); + newFields.push_back(mean_field); + newFields.push_back(median_field); + newFields.push_back(stdev_field); + newFields.push_back(mad_field); } /* Add new columns data */ - new_columns.push_back(std::make_shared(value_list_array)); - new_columns.push_back(std::make_shared(time_list_array)); + newColumns.push_back(std::make_shared(value_list_array)); + newColumns.push_back(std::make_shared(time_list_array)); if(robj->hasFlags()) { - new_columns.push_back(std::make_shared(flags_list_array)); + newColumns.push_back(std::make_shared(flags_list_array)); } - new_columns.push_back(std::make_shared(fileid_list_array)); + newColumns.push_back(std::make_shared(fileid_list_array)); if(robj->hasZonalStats()) { - new_columns.push_back(std::make_shared(count_list_array)); - new_columns.push_back(std::make_shared(min_list_array)); - new_columns.push_back(std::make_shared(max_list_array)); - new_columns.push_back(std::make_shared(mean_list_array)); - new_columns.push_back(std::make_shared(median_list_array)); - new_columns.push_back(std::make_shared(stdev_list_array)); - new_columns.push_back(std::make_shared(mad_list_array)); + newColumns.push_back(std::make_shared(count_list_array)); + newColumns.push_back(std::make_shared(min_list_array)); + newColumns.push_back(std::make_shared(max_list_array)); + newColumns.push_back(std::make_shared(mean_list_array)); + newColumns.push_back(std::make_shared(median_list_array)); + newColumns.push_back(std::make_shared(stdev_list_array)); + newColumns.push_back(std::make_shared(mad_list_array)); } } mutex.unlock(); @@ -356,24 +399,155 @@ bool ArrowSamplerImpl::processSamples(ParquetSampler::sampler_t* sampler) return true; } - /*---------------------------------------------------------------------------- -* clearColumns +* createOutputFile *----------------------------------------------------------------------------*/ -void ArrowSamplerImpl::clearColumns(void) +void ArrowSamplerImpl::createOutpuFile(void) { - mutex.lock(); + const ArrowParms* parms = parquetSampler->getParms(); + + auto table = inputFileToTable(); + auto updated_table = appendSamplesColumns(table); + + if(parms->format == ArrowParms::PARQUET) { - new_fields.clear(); - new_columns.clear(); + tableToParquetFile(updated_table, parms->path); } - mutex.unlock(); + else if(parms->format == ArrowParms::CSV) + { + tableToCsvFile(updated_table, parms->path); + + /* Generate metadata file since arrow csv writer ignores it */ + std::string mfile = createMetadataFileName(parms->path); + tableMetadataToJson(updated_table, mfile.c_str()); + } + else throw RunTimeException(CRITICAL, RTE_ERROR, "Unsupported file format"); + + mlog(DEBUG, "Table was %ld rows and %d columns.", table->num_rows(), table->num_columns()); + mlog(DEBUG, "Table is %ld rows and %d columns.", updated_table->num_rows(), updated_table->num_columns()); } /****************************************************************************** * PRIVATE METHODS ******************************************************************************/ +/*---------------------------------------------------------------------------- +* inputFileToTable +*----------------------------------------------------------------------------*/ +std::shared_ptr ArrowSamplerImpl::inputFileToTable(const std::vector& columnNames) +{ + /* If columnNames is empty, read all columns */ + if(columnNames.size() == 0) + { + std::shared_ptr table; + PARQUET_THROW_NOT_OK(reader->ReadTable(&table)); + return table; + } + + /* Read only the specified columns */ + std::shared_ptr schema; + PARQUET_THROW_NOT_OK(reader->GetSchema(&schema)); + std::vector columnIndices; + for(const auto& columnName : columnNames) + { + auto index = schema->GetFieldIndex(columnName); + if(index != -1) + { + columnIndices.push_back(index); + } + else mlog(DEBUG, "Column %s not found in parquet file.", columnName); + } + + std::shared_ptr table; + PARQUET_THROW_NOT_OK(reader->ReadTable(columnIndices, &table)); + return table; +} + +/*---------------------------------------------------------------------------- +* appendSamplesColumns +*----------------------------------------------------------------------------*/ +std::shared_ptr ArrowSamplerImpl::appendSamplesColumns(const std::shared_ptr table) +{ + std::vector> fields = table->schema()->fields(); + std::vector> columns = table->columns(); + + /* Append new columns to the table */ + mutex.lock(); + { + fields.insert(fields.end(), newFields.begin(), newFields.end()); + columns.insert(columns.end(), newColumns.begin(), newColumns.end()); + } + mutex.unlock(); + + auto metadata = table->schema()->metadata()->Copy(); + + /* + * Pandas metadata does not contain information about the new columns. + * Pandas and geopands can use/read the file just fine without pandas custom metadata. + * Removing it is a lot easier than updating it. + */ + const std::string pandas_key = "pandas"; + if(metadata->Contains(pandas_key)) + { + int key_index = metadata->FindKey(pandas_key); + if(key_index != -1) + { + PARQUET_THROW_NOT_OK(metadata->Delete(key_index)); + } + } + + /* Create a filemap metadata */ + PARQUET_THROW_NOT_OK(metadata->Set("filemap", createFileMap())); + + /* Attach metadata to the new schema */ + auto combined_schema = std::make_shared(fields); + combined_schema = combined_schema->WithMetadata(metadata); + + /* Create a new table with the combined schema and columns */ + auto updated_table = arrow::Table::Make(combined_schema, columns); + + return updated_table; +} + +/*---------------------------------------------------------------------------- +* tableToParquetFile +*----------------------------------------------------------------------------*/ +void ArrowSamplerImpl::tableToParquetFile(const std::shared_ptr table, const char* file_path) +{ + std::shared_ptr outfile; + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(file_path)); + + /* Create a Parquet writer properties builder */ + parquet::WriterProperties::Builder writer_props_builder; + writer_props_builder.compression(parquet::Compression::SNAPPY); + writer_props_builder.version(parquet::ParquetVersion::PARQUET_2_6); + shared_ptr writer_properties = writer_props_builder.build(); + + /* Create an Arrow writer properties builder to specify that we want to store Arrow schema */ + auto arrow_properties = parquet::ArrowWriterProperties::Builder().store_schema()->build(); + PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), outfile, table->num_rows(), writer_properties, arrow_properties)); + + /* Close the output file */ + PARQUET_THROW_NOT_OK(outfile->Close()); +} + +/*---------------------------------------------------------------------------- +* tableToCsvFile +*----------------------------------------------------------------------------*/ +void ArrowSamplerImpl::tableToCsvFile(const std::shared_ptr table, const char* file_path) +{ + std::shared_ptr outfile; + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(file_path)); + + /* Create a CSV writer */ + arrow::csv::WriteOptions write_options = arrow::csv::WriteOptions::Defaults(); + + /* Write the table to the CSV file */ + PARQUET_THROW_NOT_OK(arrow::csv::WriteCSV(*table, write_options, outfile.get())); + + /* Close the output file */ + PARQUET_THROW_NOT_OK(outfile->Close()); +} /*---------------------------------------------------------------------------- * convertWKBToPoint @@ -433,64 +607,6 @@ wkbpoint_t ArrowSamplerImpl::convertWKBToPoint(const std::string& wkb_data) return point; } -/*---------------------------------------------------------------------------- -* parquetFileToTable -*----------------------------------------------------------------------------*/ -std::shared_ptr ArrowSamplerImpl::parquetFileToTable(const char* file_path, const std::vector& columnNames) -{ - std::shared_ptr infile; - PARQUET_ASSIGN_OR_THROW(infile, arrow::io::ReadableFile::Open(file_path, arrow::default_memory_pool())); - - std::unique_ptr reader; - PARQUET_THROW_NOT_OK(parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader)); - - - /* If columnNames is empty, read all columns */ - if(columnNames.size() == 0) - { - std::shared_ptr table; - PARQUET_THROW_NOT_OK(reader->ReadTable(&table)); - return table; - } - - /* Read only the specified columns */ - std::shared_ptr schema; - PARQUET_THROW_NOT_OK(reader->GetSchema(&schema)); - std::vector columnIndices; - for(const auto& columnName : columnNames) - { - auto index = schema->GetFieldIndex(columnName); - if(index != -1) - { - columnIndices.push_back(index); - } - else mlog(DEBUG, "Column %s not found in parquet file.", columnName); - } - - std::shared_ptr table; - PARQUET_THROW_NOT_OK(reader->ReadTable(columnIndices, &table)); - return table; -} - -/*---------------------------------------------------------------------------- -* tableToParquetFile -*----------------------------------------------------------------------------*/ -void ArrowSamplerImpl::tableToParquetFile(std::shared_ptr table, const char* file_path) -{ - std::shared_ptr outfile; - PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(file_path)); - - /* Create a Parquet writer properties builder */ - parquet::WriterProperties::Builder writer_props_builder; - writer_props_builder.compression(parquet::Compression::SNAPPY); - writer_props_builder.version(parquet::ParquetVersion::PARQUET_2_6); - shared_ptr writer_properties = writer_props_builder.build(); - - /* Create an Arrow writer properties builder to specify that we want to store Arrow schema */ - auto arrow_properties = parquet::ArrowWriterProperties::Builder().store_schema()->build(); - PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), outfile, table->num_rows(), writer_properties, arrow_properties)); -} - /*---------------------------------------------------------------------------- * printParquetMetadata *----------------------------------------------------------------------------*/ @@ -499,10 +615,10 @@ void ArrowSamplerImpl::printParquetMetadata(const char* file_path) std::shared_ptr infile; PARQUET_ASSIGN_OR_THROW(infile, arrow::io::ReadableFile::Open(file_path, arrow::default_memory_pool())); - std::unique_ptr reader; - PARQUET_THROW_NOT_OK(parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader)); + std::unique_ptr _reader; + PARQUET_THROW_NOT_OK(parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &_reader)); - std::shared_ptr file_metadata = reader->parquet_reader()->metadata(); + std::shared_ptr file_metadata = _reader->parquet_reader()->metadata(); print2term("***********************************************************\n"); print2term("***********************************************************\n"); print2term("***********************************************************\n"); @@ -562,4 +678,59 @@ std::string ArrowSamplerImpl::createFileMap(void) document.Accept(writer); std::string serialized_json = buffer.GetString(); return serialized_json; +} + +/*---------------------------------------------------------------------------- +* createMetadataFileName +*----------------------------------------------------------------------------*/ +std::string ArrowSamplerImpl::createMetadataFileName(const char* file_path) +{ + /* If file has extension .csv or .txt replace it with _metadata.json else append it */ + + std::vector extensions = {".csv", ".CSV", ".txt", ".TXT"}; + std::string path(file_path); + size_t dotIndex = path.find_last_of("."); + if(dotIndex != std::string::npos) + { + std::string extension = path.substr(dotIndex); + if(std::find(extensions.begin(), extensions.end(), extension) != extensions.end()) + { + path = path.substr(0, dotIndex); + } + } + path += "_metadata.json"; + return path; +} + + +/*---------------------------------------------------------------------------- +* tableMetadataToJson +*----------------------------------------------------------------------------*/ +void ArrowSamplerImpl::tableMetadataToJson(const std::shared_ptr table, const char* file_path) +{ + rapidjson::Document doc; + doc.SetObject(); + rapidjson::Document::AllocatorType& allocator = doc.GetAllocator(); + + const auto& metadata = table->schema()->metadata(); + for (int i = 0; i < metadata->size(); ++i) + { + rapidjson::Value key(metadata->key(i).c_str(), allocator); + rapidjson::Value value(metadata->value(i).c_str(), allocator); + doc.AddMember(key, value, allocator); + } + + /* Serialize the JSON document to string */ + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + doc.Accept(writer); + + /* Write the JSON string to a file */ + FILE* jsonFile = fopen(file_path, "w"); + if(jsonFile) + { + fwrite(buffer.GetString(), 1, buffer.GetSize(), jsonFile); + fclose(jsonFile); + } + else throw RunTimeException(CRITICAL, RTE_ERROR, "Failed to open metadata file: %s", file_path); } \ No newline at end of file diff --git a/packages/arrow/ArrowSamplerImpl.h b/packages/arrow/ArrowSamplerImpl.h index ac560d9fd..fe347928c 100644 --- a/packages/arrow/ArrowSamplerImpl.h +++ b/packages/arrow/ArrowSamplerImpl.h @@ -60,10 +60,13 @@ class ArrowSamplerImpl explicit ArrowSamplerImpl (ParquetSampler* _sampler); ~ArrowSamplerImpl (void); - void getPointsFromFile (const char* file_path, std::vector& points); - void createParquetFile (const char* input_file, const char* output_file); + void openInputFile (const char* file_path); + void getMetadata (ParquetSampler::record_info_t& recInfo); + void getPoints (ParquetSampler::record_info_t& recInfo, std::vector& points); + void getXYPoints (ParquetSampler::record_info_t& recInfo, std::vector& points); + void getGeoPoints (std::vector& points); bool processSamples (ParquetSampler::sampler_t* sampler); - void clearColumns (void); + void createOutpuFile (void); private: @@ -81,18 +84,26 @@ class ArrowSamplerImpl ParquetSampler* parquetSampler; Mutex mutex; - std::vector> new_fields; - std::vector> new_columns; + std::vector> newFields; + std::vector> newColumns; + + std::shared_ptr inputFile; + std::unique_ptr reader; /*-------------------------------------------------------------------- * Methods *--------------------------------------------------------------------*/ + std::shared_ptr inputFileToTable (const std::vector& columnNames = {}); + std::shared_ptr appendSamplesColumns(const std::shared_ptr table); + void tableToParquetFile (const std::shared_ptr table, const char* file_path); + void tableToCsvFile (const std::shared_ptr table, const char* file_path); + wkbpoint_t convertWKBToPoint (const std::string& wkb_data); - std::shared_ptr parquetFileToTable (const char* file_path, const std::vector& columnNames = {}); - void tableToParquetFile (std::shared_ptr table, const char* file_path); void printParquetMetadata(const char* file_path); std::string createFileMap (void); + std::string createMetadataFileName (const char* file_path); + void tableMetadataToJson (const std::shared_ptr table, const char* file_path); }; #endif /* __arrow_sampler_impl__ */ diff --git a/packages/arrow/ParquetSampler.cpp b/packages/arrow/ParquetSampler.cpp index eb62b8e28..1cdc44d4f 100644 --- a/packages/arrow/ParquetSampler.cpp +++ b/packages/arrow/ParquetSampler.cpp @@ -43,7 +43,7 @@ * STATIC DATA ******************************************************************************/ -const char* ParquetSampler::OBJECT_TYPE = "ParquetSampler"; +const char* ParquetSampler::OBJECT_TYPE = "ParquetSampler"; const char* ParquetSampler::LUA_META_NAME = "ParquetSampler"; const struct luaL_Reg ParquetSampler::LUA_META_TABLE[] = { {NULL, NULL} @@ -56,13 +56,15 @@ const struct luaL_Reg ParquetSampler::LUA_META_TABLE[] = { /*---------------------------------------------------------------------------- * luaCreate - :parquetsampler(input_file_path, output_file_path, {["mosaic"]: dem1, ["strips"]: dem2}) *----------------------------------------------------------------------------*/ -int ParquetSampler::luaCreate (lua_State* L) +int ParquetSampler::luaCreate(lua_State* L) { + ArrowParms* _parms = NULL; + try { /* Get Parameters */ - const char* input_file = getLuaString(L, 1); - const char* output_file = getLuaString(L, 2); + _parms = dynamic_cast(getLuaObject(L, 1, ArrowParms::OBJECT_TYPE)); + const char* input_file = getLuaString(L, 2); std::vector rasters; @@ -83,7 +85,7 @@ int ParquetSampler::luaCreate (lua_State* L) } /* Create Dispatch */ - return createLuaObject(L, new ParquetSampler(L, input_file, output_file, rasters)); + return createLuaObject(L, new ParquetSampler(L, _parms, input_file, rasters)); } catch(const RunTimeException& e) { @@ -96,7 +98,7 @@ int ParquetSampler::luaCreate (lua_State* L) /*---------------------------------------------------------------------------- * luaSamples - :sample([gps]) --> in|out *----------------------------------------------------------------------------*/ -int ParquetSampler::luaSample (lua_State* L) +int ParquetSampler::luaSample(lua_State* L) { try { @@ -117,21 +119,21 @@ int ParquetSampler::luaSample (lua_State* L) /*---------------------------------------------------------------------------- * init *----------------------------------------------------------------------------*/ -void ParquetSampler::init (void) +void ParquetSampler::init(void) { } /*---------------------------------------------------------------------------- * deinit *----------------------------------------------------------------------------*/ -void ParquetSampler::deinit (void) +void ParquetSampler::deinit(void) { } /*---------------------------------------------------------------------------- * Sampler Constructor *----------------------------------------------------------------------------*/ -ParquetSampler::Sampler::Sampler (const char* _rkey, RasterObject* _robj, ParquetSampler* _obj): +ParquetSampler::Sampler::Sampler(const char* _rkey, RasterObject* _robj, ParquetSampler* _obj) : robj(_robj), obj(_obj) { @@ -142,7 +144,7 @@ ParquetSampler::Sampler::Sampler (const char* _rkey, RasterObject* _robj, Parque /*---------------------------------------------------------------------------- * Sampler Destructor *----------------------------------------------------------------------------*/ -ParquetSampler::Sampler::~Sampler (void) +ParquetSampler::Sampler::~Sampler(void) { clearSamples(); delete [] rkey; @@ -174,6 +176,8 @@ void ParquetSampler::sample(void) if(alreadySampled) return; alreadySampled = true; + const char* outputPath = parms->path; + if(std::filesystem::exists(outputPath)) { int rc = std::remove(outputPath); @@ -197,14 +201,13 @@ void ParquetSampler::sample(void) } samplerPids.clear(); - /* Create new parquet file with columns/samples from all raster objects */ try { - impl->createParquetFile(inputPath, outputPath); + impl->createOutpuFile(); } catch(const RunTimeException& e) { - mlog(e.level(), "Error creating parquet file: %s", e.what()); + mlog(e.level(), "Error creating output file: %s", e.what()); throw; } } @@ -217,15 +220,23 @@ void ParquetSampler::sample(void) /*---------------------------------------------------------------------------- * Constructor *----------------------------------------------------------------------------*/ -ParquetSampler::ParquetSampler (lua_State* L, const char* input_file, const char* output_file, const std::vector& rasters): +ParquetSampler::ParquetSampler(lua_State* L, ArrowParms* _parms, const char* input_file, + const std::vector& rasters): LuaObject(L, OBJECT_TYPE, LUA_META_NAME, LUA_META_TABLE), + parms(_parms), alreadySampled(false) { /* Add Lua sample function */ LuaEngine::setAttrFunc(L, "sample", luaSample); - if (!input_file) throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid input file"); - if (!output_file) throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid output file"); + if (parms == NULL) + throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid ArrowParms object"); + + if ((input_file == NULL) || (input_file[0] == '\0')) + throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid input file path"); + + if((parms->path == NULL) || (parms->path[0] == '\0')) + throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid output file path"); try { @@ -242,13 +253,12 @@ ParquetSampler::ParquetSampler (lua_State* L, const char* input_file, const char samplers.push_back(sampler); } - inputPath = StringLib::duplicate(input_file); - outputPath = StringLib::duplicate(output_file); - /* Allocate Implementation */ impl = new ArrowSamplerImpl(this); - impl->getPointsFromFile(inputPath, points); + impl->openInputFile(input_file); + impl->getMetadata(recInfo); + impl->getPoints(recInfo, points); } catch(const RunTimeException& e) { @@ -271,6 +281,8 @@ ParquetSampler::~ParquetSampler(void) *----------------------------------------------------------------------------*/ void ParquetSampler::Delete(void) { + parms->releaseLuaObject(); + for(Thread* pid : samplerPids) delete pid; @@ -280,8 +292,6 @@ void ParquetSampler::Delete(void) for(point_info_t* pinfo : points) delete pinfo; - delete [] inputPath; - delete [] outputPath; delete impl; } @@ -295,8 +305,8 @@ void* ParquetSampler::samplerThread(void* parm) for(point_info_t* pinfo : sampler->obj->points) { - OGRPoint poi = pinfo->point; /* Must make a copy of point for this thread */ - double gps = robj->usePOItime() ? pinfo->gps_time : 0; + OGRPoint poi(pinfo->x, pinfo->y, 0.0); + double gps = robj->usePOItime() ? pinfo->gps : 0.0; sample_list_t* slist = new sample_list_t; bool listvalid = true; diff --git a/packages/arrow/ParquetSampler.h b/packages/arrow/ParquetSampler.h index 9423e7416..ca966627d 100644 --- a/packages/arrow/ParquetSampler.h +++ b/packages/arrow/ParquetSampler.h @@ -84,10 +84,9 @@ class ParquetSampler: public LuaObject typedef struct PointInfo { - OGRPoint point; - double gps_time; - - explicit PointInfo (double x, double y, double z): point(x, y, z), gps_time(0.0) {} + double x; + double y; + double gps; } point_info_t; typedef std::vector sample_list_t; @@ -105,17 +104,29 @@ class ParquetSampler: public LuaObject void clearSamples (void); } sampler_t; + typedef struct RecordInfo + { + const char* timeKey; + const char* xKey; + const char* yKey; + bool asGeo; + + explicit RecordInfo (void): timeKey(NULL), xKey(NULL), yKey(NULL), asGeo(false) {} + ~RecordInfo (void) {delete [] timeKey; delete [] xKey; delete [] yKey;} + } record_info_t; /*-------------------------------------------------------------------- * Methods *--------------------------------------------------------------------*/ - static int luaCreate (lua_State* L); - static int luaSample (lua_State* L); - static void init (void); - static void deinit (void); - void sample (void); - const std::vector& getSamplers(void) {return samplers;} + static int luaCreate (lua_State* L); + static int luaSample (lua_State* L); + static void init (void); + static void deinit (void); + void sample (void); + + const ArrowParms* getParms (void) {return parms;} + const std::vector& getSamplers (void) {return samplers;} private: @@ -127,9 +138,9 @@ class ParquetSampler: public LuaObject * Data *--------------------------------------------------------------------*/ - const char* inputPath; - const char* outputPath; + ArrowParms* parms; std::vector samplerPids; + record_info_t recInfo; std::vector points; std::vector samplers; ArrowSamplerImpl* impl; // private arrow data @@ -139,8 +150,7 @@ class ParquetSampler: public LuaObject * Methods *--------------------------------------------------------------------*/ - ParquetSampler (lua_State* L, - const char* input_file, const char* output_file, + ParquetSampler (lua_State* L, ArrowParms* _parms, const char* input_file, const std::vector& rasters); ~ParquetSampler (void); void Delete (void); diff --git a/scripts/selftests/atl06_10rows.geoparquet b/scripts/selftests/atl06_10rows.geoparquet new file mode 100644 index 0000000000000000000000000000000000000000..2da4681e35e382e4b5f06cf5668f0cba40107eb2 GIT binary patch literal 15246 zcmcgT3veT6d6G|OeAm0=xU-Bdb%JYh%Z21@JssXblhY*efO3B`sCdC*L`O0Bj5kh z-15Z-=N@!FIrr}md}HpHZ}{O{!TXcB4`xQ@kN(y0ywzl#uwH)?>^o*1whoWmEI4Dc z;QlewY2BO2WIA?k2);+nmV?Kfo7KDEnfhPzxOL>D=a^~ur0o!w+ujk%+g#l`G&VYR z#XR;iGgYTX_Ck8{Cg9v;9Wue!G5BLL!F#|O4qJ~CNv^-M0kDt2_yUakVPs+42jdAC zAA%vn=$8U#Z5CXMQgq$`&@IKA07BhyaNIhiNpVyrLKJg_azR#zBtI~NFw`3@)ZGU_ zmJYHW)P}OW*K8GAW?7p60WZGBbTKe|bi{NR##Jzm!x)2c4UFrKj+~p+)==8tUg2g; zr$<_IwviUbKEa52J~c9KwY1E`*UP+I-WE6_)W?U$E)G#&cm?&T;qd_|4F&3QvtRuB znb5V*J`-AjA;EYzj0a%+Cyb-dJrnZ4xCKV93^ofwZ5G^*TI{?T%kj|fblmJ zIv0E7F!k+2V3-b)9+ZZ1j98C^twY4gEJqFiS>W-wb*vK#pcD(jZR8ss4x#zL{mr8$ z>dMiM$5@nNp-lLxk6koVZ$C<1If5$H?|yz3t{cDt!fh7(hL4_wpI+TzVJNF(;9XZ! zKfQ`DpxbN6K$k7QJUwiB6vkIzd=1;62=*V<#F9Zn9E z^lG~}N z1q{>98}N4ICZsiL9y(|;UuB{`{=1#1AFFLEmC_Ew^4m2*<_a~)(lR3wem`m+`?Q(* z+%?p%PC^>!VC=zf$o!f$)=JN(C$4(s<>lAF_>S3POd7PmhBb9V9&{4aIO-=&4`{(AO_dp~w*;i<=$-uIoqyR@(pyYR2?c<2(EvAa$f zY0cP1TC{*q1E{eIu1t!dqZRYF+~02?@@P5jN1&b4WMDDldkOc zi&x$~f8y2eo4t;cx3)9-mlL;^_3sbzbEsp`8(hE@cg{>f99V%WO{v{pcaI} zNd^1S5S=$;!S13uwmn>ulx;~+peQ23sTI+CPEjAe9t_;U)Pwi=h56ytKR)-~;-lyO z;mD)sJl}cd+%Iptd(QRFtLADC4$s~2)U)Tlo4I!Go_F0f*DnmNv{`T;679Sh3-iev zI>Njnx2*_SiP1^&sngU$uLVguXnK$uO460Ge&d7pu^T%nYq-Q`g@Woh485zWir`+a`MQ?!U@T0vjpZ(9DSAxUA?L{R_HgU#;wDe+gr+lq~_ai4~;!#rrt6|{ph9} zjOyX14_JRT)C%rjng7TN$jxT>I&ts~x39U*e(~_<-|GLQJGNjwY5(P)uRpT|pl`mz zX1{;xq{+T?*Eesw0D#nwpS67!0T2Am(-sB*kH7v~=FcM_QG58$b^tJX_CrrRfPe@8 z!g5Fiz}3IL?$IwIU{^f!m$w4o{dYX~nFkTD_KA1Awgv$An}7cUk05~l;PAbF3;;7d z_x3LX;N1AZe>-sp04Dg=+EUto$(&^MSuy&%o7E75^HUJ%K zgS4`u2dWxfWuW|lblc{>tR@^rD}CTm)=xBD@CXxY0;@8H4GSt*PExA-wJ=~w=UfGr{<>ip=N&R3tMWx zOHAq|FW9J;q;jJ6zl5RoqXeN|L?uAIY^UAWS0~0ssgbddzIxPTIvKVdfTPv+@bGOD zlU2T?K*=?Ec4E>oWuLO2$?#=n$}xG<#ALA~NKC1*%@)LpL{_v8WSmu@3Ty-{?FsV6 z%WQ!YWo_v+fD8AZLYg zh2zhNOqo33B919nB-joD=@4;zSzu%Q4qpPfcwY%I=`j{cA~yj9$-0S+hEnFG35R#q zjjMQ36pBhg;8e8TY~TWgmxOJJ$rno7OkF@d%(!lzb+~SZIYCD2@Y`^kP==d6RijSt z?3B}Uv(xGF%(y0QpST?svV}qk6wZ|SZH3TRsVS9AOO7@ljEGblY~`8b3%z{TQHtc%DA>73uT}@$?@3|&rc$2hWK?VXL`{y1XCA!xlk(S z4DnCgLd0)rm#LIYj2_8%QEhn*)t$3a16-Xp{OlG7*{E2s&_hOXPiy| z6&ZGyA^cwe=}N>InNq1x!}Ux}!9OgkaO#6P58;R4oCm)}KFhF#qRf|-DvPk9pp+S? zgvmT3bBbz*EYAsKI!jh5wE{7Z!1A!bi8B=?tGGD6h|h&0FGCD5YETy>3Eg3DnMI}| zqeds4cBlJ{9sbR3I=p9{?z8rpH1VKGa5UoGI|Wsv*k!@fB>4+e8D$&AJ3CP>Rd{$u zGfsB-F!6_WpsO|;E`pSdV!P)!7zys5__~(o)Z|2vliWm--!@sHCN7;%%N0=siaP=? za@$%QX)qN%Nq5K(ga+|MSS?qiVxv7#$mAiAP3*v*x0LdO=0hB4hH+x5#zKk2^d9`ca?jfl6w zdy#JUVEtsfrg}(L0Qrm?_jC)~f$v3>{7}(Punq1Zrqg0KHQmHs6Qr z)>tqC-iuJyfQ$;;ME4Lu;dnpyF6PPZMimRKjex`?brcs7+qwkzDEEmYR zOlpG?&y)+K@|+ZfObNO-EIx{jc752cF`?aqnid*8IM7=OS3f!BnQ}q>WwITiW3ko5 zS+&u!-RYRdkCWb@X?h(7e$Lahn_kS)KAO(bOIZS2T0e`vNH*X7!>Ua ztZB{dFbU6YI++vMMsOzRQ?h)67BZ5roZf(UzsHgB*KobWB;rci=dGj=2jI)daV4`< z%Le7e9MH?L4zHb0E|R&R9Ir5mXf^HM62eP>PmY(_fVjg2#2Uf76so4AqL^~6tMiGt zJ(Y;%IRA>UDu4`9ypnQycY!wer3LsXQ9#rHjrWJ7taH6?m>I zI~z5~9ieu92+C10?Q}e!4>i*XuABz>lTLAWpv_d?2K#sSd^rvA2YzTzwR2eA8(zY^ z(usezr3-`H(4HHtr+Rcytf{=T=3f{u{=Xn1)ukqc=|1T(iH8OCvWfEc&&+tLKv67>ibUKPGo~19dEkQ3$2J#l<7sJ{p zDCKrorzmFQG+M~Nh8jy2CkaNXqM$JwA`CsmuH}bRnj$5_n98lkI zei7@;K{*8LD5$5}YYcRRHDjF6h~{INj2z`ue|bTECZ`kN|H)W`A81ET%)lAO1wtNz z*DwY`9^gUC-Tk>D;|$%JPDC|7HsZ^n*0ZjV2=Z200qSn77yY8ZNbQVXg(03 z>E-;5)6+NFv11|q#rZAxL;8tkIvH}NlR>pMhq^B#r;ET#C6kC(Aoga0a>^dgM=BA) zUXLz0K(9eh%DG~X=CkFrUubRX8Q*0zpG`YeI;@Lp)ptcWpKz_(56JOm+7)T0 z{qdU0hqnf`j@loz`!GkD0o*dHYj@(vcb9V;bXCljKCMYaI%>(+A;`xj#guRpmIzZooo-I~v z^-8+H$fQDD}ju6s=|g@p486zrmhzDWs2gRS$iACKp~;b7yC*)<(_gXDdt`2YWOl%tk{fTh)`)&oFg+4AG)W zHKWaJJ6f6zbi6A|h^l+6w?1+^WEdT#*TN+iGKisXaM=w_-J4B(vsu~k)z13WW! zSeMv@zh(3tyz^jPaY1)K0ljX$$OfWS@ZnainB)DTc9%l@siTWsoesKmPjXu>C50O4 zWV(p4(#qMJ(9f&-f!Kx^ZRI|Y56%JYyuf=Dk{j?&BM!EYLr%iD-jhD&Q9B2*r{ebs zDLD5zNo?m=pb=^3Gsw?)J~0X42k%-WPZe@af$SYXKj>@e6nZvIf2kz+jo;gV9N{5& zcD(_}f@m+eYXV*n_il7!tBHQG=moproS61=>fIBZaT&NjNWnQn?sue6mE<0u0{JkD zc>vbiJ#fy|V7|V(jAvdgyNYL@PsIg2xSAK2AupmIED<`|c?UU$Uc~%Hui*V#faISn z-mT5hL7EO{mmqTm;d+iv;`iH9d{&tp^EKKbcnv;3Gj5&}uq-mK|+k!|D4;ZC|&e{lZ` zK()7qr{z^#Ph2C2^%wD%C1ZON`D@TO7&SJ->vW&OdS0BGROJkWuw78t$w$eH-hC>?ikS zg!d&0*AxBtm*I)f)BLN~KemvbdN+uotbk6`^>ls>9ucO+gUWwLKcrsq$K7xqRN!8= zn1*P^dw7Ur_=|1=pGcd&Y9I9yUnj2}7A^MO?c!G89}EWgYM5BG5B@rUs2A699_X-G z9{~zo(5$QeY|L*AVu9Qv@A55vvnszT7DPtRnQRk9rgUqCFP~wH#WUnrjtT8oj0yGk Y4fwB+;s5A*5&uk+Qzp|I{GYi04@?cS7XSbN literal 0 HcmV?d00001 diff --git a/scripts/selftests/atl06_10rows.parquet b/scripts/selftests/atl06_10rows.parquet index 01250050c70c91f9405cd4571f344d4b49aec7d7..72dcce7dd8671b1cb13c2e8be2541d94864386a7 100644 GIT binary patch literal 13876 zcmcgT3ve6PaR&-wAXSoOiGU%OvJu+4$cZK5OA#tb^8k<_Adn&<3P085+>Zdr!vQ7$ zf}rb;q9`*S$4Tpn9ePqHjgwSOnod$nwUaiHN2wA`I2*8Ct6rh&AhRE^1L90iaQfj{}50w!YigQPEi5aC|Sjs4ONQTO8#x7_`&S{sY?PE0=;UX!k;E z*1@bts9BHSgEdtnZ9OjBRNsB;muA=kuiAMRx6sG8fWNjSsi{Y^zi#~YC;TU!;o%n_ zFZm2aMf-ud9~{rNfQb;YFX%Wea4LmP|)=^y^0Y zj1e78{q@yFxUG!{2si8T2|u+6=gf5xI+7{@etipl?iM0|>RU|#8a(_5y`9u^(0&N* zd1(I#?FDH61nr-p{RrBRp*1fzYKm~P9x1@DF&dK)^!eYd%epZyCNrw-`GU^$Qyu-S zTj_t;L4;P((iFO3;U3g$FC4yoTnP@C<1H+{03NTB*yQ-ZC=t`MGa za4Z|TkpZ#+vve8FjqZbRvmygUZM?JMKI>muN1wU9E=Ff=R$`ONxpYitdPqmVb{qZ7 zZA26m1x-;JjDhsEycpqTJwD;5HpcTGtV^+;FY}UeDiAlmXrZ6C5E)dTYsvuGM#yL& z=GZzwQNzQJ8kk}G%fCN-<(9-S$YVBk*Ue(qBL| z=+1g9G?ixZ#f4dx7iUUoS;&^)NS9fOY`d@POwZ`(NAI9t?1A9emcFL&lg4oM(AsU; za`P$Q9nfCV=~LTI`TlGBDc>gJDWA@C%J&qsZ$R@vYu*87Jz~vz{2Xgey#^b5N{jXIGglA3BX$N0A{{^iquP+A$*^_+S#_D3tjKmX$S$?tu9 zW%yq{IS?87%2X;PoVrBSe@ASX%>q;(1jnqI`moYCqoY(|NhXB@xOubAlA(NlUr1OwLs}J!#^zl8gz;#MZbg!<=FK&8u z;0w9u2L682a|5oIE)BeW{}Y3bPj4A4o$VaF>%!%MpK&_}KlkY4gRRQoOtT)pL#FkS zwlWXf>&ko}HIq*#GptIJPub|tTS1dLnI@u|nlyy0cYW}lbk;*w=Yp6_XBEdGG3&g8 zKItGaOLcH-3h^7A@xfXSztm6Nby|Arc8WUvv0W5peSOC!!|?Wu(X1aE+ObLRPs8Ks zPs8Klv+%g}4M4xuB^$?fKXZ><{i@m2+6MjlzHVb@B}>&8(itV*{^!rdYtU=mOzt^;#Ub9@9e$Tn^+N81c8l<5&K3AmII%}hEcn(D0kYt2j z`>cRoi&LGxj>FY-&SL7K4W?%;T@+ZMoQw@ zF};wLv2^X(-?Ql4>)31GV{`6FikzEgoqhd#g!Dq8+;g|32foQeVp5xiFrl~rR~ZPj z+a3Gd`v8<<`FS=e=6m)Z>EW{rB2Hr$7P2MsQp#h;^LaseQSeDIi{O+CpOTnldBRZ^ z3;9JJVYzg^z(T4(@T@H4*#f>MMIlYlNir*6$`Wvz7h!@OF~xi`?-0ZszGriyEU*Q( z=Uy1_*-R#VRh=caDDx>$*>1Hv_gLYtf7<5WZ+Gsu_9c2O4*;PB5wsN2@@!VIC?-Jg z)2X+E84Y;j>aP zBZFZrM-=Lc9tfNvGtlUkBe<+GON5Y%k_7Z;)1oBIG{O|}Glg=FNL!TCd{!X#m5Z5N zxrWGcb0SZynh^^uF&{V%_zg@?%Sn{{@bu`O{vMFpQWuCwa(`8hp4yE&s@ot=BdSTN zSt-jF9AqIIrBWeOVxhXQBz3k8SLCg%ThLAH)3PS>8hAC<4GY|;aRaJ}B~h%;iLz>F ztRPlnTT6ypwQWE(u`Pm5T3qY!)tHhS-ByM*a1%4t;%VrpEca^cs<)@D9Mu>trdleh zEWR33ybf(;ScNt*L$mec>_+`+;MEu>nU%0b!G z%C7>{U}WplwxI%F&4|k7V=J#Zd`;PrTgFyyHLwP=>va$98^c!K?dLTK^Imr&saIS1 z*T8GcPPtZVwKt?Ji z{1Io&KC#G+MGBleo<5lNZ~-}-9Dam?P}s{fJ}ILe!Ou+uiet zLxDWf6QZ_~7;#(0n1|2}WZY>s6PXjd1YbzR!YQC(jUNj11;+B}MAS9M*+Je6Pv}J= zVSi{knVvo5aR)}NNE7f&HZJN??b2p=vQvBsm}l;9t!*=QE*Etcyx4P;!iAdfZX z@MJkhcoEm9jf};ZJ&_Ye-6huVwsLmRH4SSads47wDc+{Gr&JVt<7I-+xNKa^<5I5& z8+E-)8J8BhjF1z?BGom_u+d09G3u`QMgVNnG^Z4gy5{4tiCJMxDpl4Z!};AQ!56o- zldWQ7`-GM+_(2!{k~`!Dzsyv957sl5vVxCTV|Hm?q1)`A8gsPlCP#9dJXPi0>ZgtP zMn7#Mn>Y&k`rEE4>`UOmAB8;jiVz=i2}Q@Yven{K8ynT| z*Als7FG=m z&PF_g;4A_j({XDQa?6;)OEaIS)C^#QQ7h*NU~UWf*itSo2UEN}PjYlPd*MB}C@8Vm zo`?mP*{HjS=UiZJq!gNCisOfZkjtrShJ?h+6U6RVH;f;0+Cp=XZI4;8PJ()-lIucQ zG)HXKW zE-a7R!xFE=jYZxc!MYA=vY>!#2K8Kli%M3oZCaR{Dvp<(<) zGyRH>d4pEAQj1pWS0W447+~i`TxU3=^1S^p?h_~Rwbr`CRe{4fAz6iJ7|v#>Bf-ZQ z25M%;v&{LWdGJ@LH4ej=na0O`=x(D^K6p(s;D3GScA+G=pLYan@d%E+IQCi5|H|l( z!6cOPo-B?J;CK?9*hlvniyVlNTy(&El!qDbo$c*~-oq@Bsd0P~ zdB~6%!wza_*uO#{KIQA{ovh6F&ABV``V@Jq_Z7;A43mWyPnm=D0bg0hIu`)<)%f|F zzbEn#QVInW#gm0IOo~UHV1KT{FThBXC_kX5#(DCjowu*biQ&eEl_gDg2UYwjbQ@08q*E;LR)cF820A6VYE~UoY~4do=;#BB{AxgrNcU z4fNm^_y{)hZ?-pyH*ja8Lxe>_=j>ysIoV1Cr``05CALZBgj)y z&PRkXhsbk^@sM*9l=o>NbaEK#j3tZMPG#qp+88OEjf1KzM@#z l!{>5)$Sta+a!YDa?oZ)goWZ{%zrH`}gS#ne68^u~{{x65%SHeI literal 14891 zcmcgT3vgrAb&}CK-p%f^>{^aY4X6(LvLycKV|lIZ+W1Mf^}&+8T3esDrSAJ#AHVk` z^tD9kY-lo*A%vDm(q$-2LuqL~og`#Kk|rfh0?9NarA;Ouk|rq$?M})7lWw7#2AJtN z_dWfemW6oHaHaRox%d3iOA!fAqVvzkc`6XS42K&VG~|nA`W4{c~o6*=oLi4Avbm_nZ4iEGEQQ zOt^l?aHQqUU@#mw-Ur`^flRAGYikQX8A;;)bwo?i(5$ zI%gbu!ANaW0~aGYJO+{*%zXyZeNFuJusti^f~mEPZ;}Ny%2Q4xEV&L4i*zaEhb!#R&2iM>+$$on_;$VdxI_V zEg|-eJ=70&gJYURI!Nm3F<{=iZ0;jLX4-oNumu?pnunTO0gPfoxP^Sf!)|mRguijn zK%Lvyj2KfPmn{-`wuep|sk`=3=k}sWTUWof2-o%y0pS)Ce#56#gy*hoiqMyA5#U2t zQqS!t0<^5v6`;+R-<<3>JOSfdFuo1rpJ6-+<6mI>D~#{J_%4i2U!tZ6x0sLueA+h{ ziNnVaHf7yY5R-CCAUu@gH1`U z$hVs9^dR-rAtFJ`VqFOimX^cB>( zp{5x9xlM^ps;TC7dqnRCuSl_3k6+K+Khqp^`aQz788EMr_~teElnwQ@l{?D zn_+yfg*yK>B16k^T^Zm2SvC(4a~wEsFwDw~{N(rNUws=h5As;dBTchdOo&0F z%+KRCZN7nQd&iL1ps{b4!MNW*efoEryMCy$Q7Gj%Szg?zq!l4sfh?`C5|Q_l#-Yy{ zsV`nl{bm@_K$BtzeO=*K&5?R~K4RT})!Bu&!}y`mlsb5J;lB@^UD#_ryI>qTyYMiK zPr>lO=-dGo6JjkUT#h`OZ=j9NZg~3Ire3?Mt=a53BlU}Gs24{-jV6%}lDcZZG4qhI zuE^@IR-S+Hf`S{Gyl{FjzKJckC^XH%TKKR4GKQq4=IrVRMKYj+?*j^_L z)G?NUIxmn@2dZy^bE6~XJ=($4pW2W`Rw1tZao^B?^iluxyVUL05sMDB9z1ky(w5y` zy6V2UL)SexclMfx=8oU~jk!DA-8`Su5WL+6du9TU_q#%|E7 zhhMzH{6b%SbN|Nphh`vdG{V=RUGKha)p_hIZ~fByeV>_%%$tX87yj(Tg>?XZ?{16j zk%?i0&3n)HZaoEn#51p0zJP#7|Mo=_3xKC@{J!x^2#8j`{->J&7(Di|XCFnt*Z$nJ zTLQrTm#=-|D+tI)yZ_=A0Q~jsuYUe91gt)M_d6;8aK7*N{^lD9pg-FGz#jp?K+oRw zO#mDp-u2T%w*%l>ai#KY6s;qHOdF1vi4JW7!$gY;?o>_Ro-aB_%zHI^4kX2_EEe-s z5}{w*HS`4&^@M}^he-^G*T+jQWQJckn)zJaD+LrR9Mh2HxA72k2#R zPNVKVJhJ?EhvuPbP#3F(th$iYkcWo=KS1zE42rrANK5H-_-dGIZOp||=CB1o2O1!) ztmuHM$9xN2RHMh?s)}fifBo1DxjVsRF$;d*}|$HD3GW(nD@eC!(oP@ z!QqkNR}Nl=Zt}9VcGOO{R@6@SI6KgzP`_=DLG4)KPyN<~`_9zd)DG0lm%p&3cDlr* zE_=a7U6#s;+W8WO+KCc`dIOaJ^`@P6L*KRz4N?O`pS*6+U>IJuTmeU`oT1U%b zz5pfH=rQZ4eZn?j8|TC#J7FIkvySHSX_?JeH~6enlF5wvfs9xcT7ZQ>rGX%CqQYl| zw30-kv70M3Z1}bWP{XkwNK0@^K34a=Dm@Vh)`IWAY#6g)|n}*lvR@HVo4Ck zCALT&aFW0lOEN46g<1#+VlmA}#7!{|dWlPUq|{0=o0kMDFeLM=C#!{`C|m9BnJJtl zN>Vyk$fgCAY`YjZp&-iX4Vlek^Be4T8tpLcJUU}{9tB)lLGOqgaGOwsn?BW|4)@H2 z!*$f*aJr_Q*4wPN!9+fr&4a<&qPS5Y{8es+5?9xwAqOiVSB=#vRJATsLxqNM$V`sQ zi{&&}Nez-^)zCsZFYDyFjebg_y;toXdR1|pF-9^wEzqS;Fnk|YR6a}uktd7Q^_*=<;MYTP~p zyI{;Z?j}Fl8@^zqin%PGf@uz$eP+Uj)U3A|EV~O*S`hOkNwnP62O1Z|JU9-f!Lz88 z)A_7|_hX~SM$1zZjtS?u%Q2p`Z#t)D_!)j2bOnilcw;MS(-0<<@|x+7&baL$*JzIA zw^$5XfutFsZ zfHW@x!ZBto6_N#~Am+xb*_^1rMqtUt%*rylR8MY{PGU<6+GEsVb4-ofoa45mYj)Q$ z`}DCXCqs-41$GX&vjxlj2f-R}(Iaka2}R8U*fXg~fMl5sGDFP}Ey1IdBnkL!rbS8E z&;nM2spLssLAOH;5|_hlr6lL74Mdj9K*XZd&{MFeAvy^B)67-tYfIxZqv%vsc}#mm z%3B+or12LKB-AF(Xd9|Ss!b`&7M%`U)Gk#EqD3rJ4L47NZQGwPx>^_)nncO zUyOC5+39B6hU#Eh0^-vo1-2{PgkOv)`HMg|!v?s6S&J3hxwRlJ#t!fMySde&9n4UJ z8JykDssX+jqofWS6}E#+n%D6PjhywB}s+K$s>thZM#(XiUxYd6;x#16Am?^e4xHsKu%8yT%V zbQ<7`8I~ODT}Nac+QA48W2mvUvbyciu4AUBPAP?(QTB^5g~A8Wc~VTZpUth94(437 z2<1jsI@c{1$hk~vgS^NVv!uiumC{@u+9WJKa#d1{A+<&^2YnS3kI+%UfnHCzt;h-2 zgcC|7gJmxriLAtqsU3~wE0g3ilZ??;cwaFh>L4V2+a`lDOo~aE@&|7jxADnQSsv z4cge4T?$CH0?eu7HrbeKi;1Tsz8aVgEESSsl}>Z=Qjs|c?>?8E^Hp%Z%*LVxX31Sj zAPu0$DbWJwtt11ACk6acyxnaR;~s(wDA5ud3zr$+dV1Ll^pt3k_e-0CU#bwg-e5T) z=cI)5go=+vZHZVUCHNN8D{0U{j+PP*_ZG;eAg_QFU5}(~Qvt=6+&t+KkS5>@Ld;%L z#tramtBWC{6GsuuDaZw)jJqO0QE!N%Th~PKiK#s6GOFN}MmU z*@z8quxk$QTPiWWrCQt>aq-}L^o0;Z-U*wkBg9kOPRp!6ymT7F}@}P*R&< z{|w>x=J~Y7gD-pek#~B^g14vd>=5U+6&2s@J0*&~)#CV)?4xMBx@{v1s*HMeY2(w& zPf5M~3NmjOHcwU{cLW>xA)tgM#$kW07^*Qbp~!&#afh_k(`Tw|z2mzbUu0naK^)o= zjU2Y^UiM;MY3_f%ZVR2<(7^S!({{MmUQ=bM<1bAXA5%!ggr0eTZ^ge!G9{c+hST!5 z&fd?6v2X!$d9Qr5(=qLgJHvKfW>N`9bW?ChkZ0UZ%%hNx3cdE7V0YJqcu-En!cyER z;JH`^80-s(gWmhGBd2%o1xDU3C*qBKdr3Zqd<^GXi0mI7d%9Y3nqGdtEHbf6ZrFQE zrC#=H!|TPJc0PtP+Bx2R&TDel^ZzB~uPO&;8yAymc!u|qjhF4UxWk^~@hp9leF=7{ zv5>bQzv$LR0XenFJ0yjREfRn0or@hY)SdS`YmfFCsYlKYLe7=SoSK`u^S{!D*Nd~A z-nVMa*`vM@d=l1~y>bZFQBY4c=IGc6YsM(y5za(7P6-QYyj)V8DNGFFKOU)yJ^d(1 z9Gqc-Kjbpne`G>s>w~# zl_a2v@Mtj*qUrF$yP%)ov11|q1w92nq@SoU@t}i=2h`de>OM|kav)2Ii$zPY_of3% z!nT|V0BxH+l(s9OSfUnQNM%-hp(5jxs)B!u>;s`yp2iyD|1HSV;7e83a4qQqoits- zc{m$Me-d+q%e=f^2>4W6z?q&BoFRA*R~e{37)P`WuxuEfIXPwe{)H zT4Fl95M~2uk2~PEfiJeuA2skh^kYHjyVl_yx;|+Bz!C6^O`~TWdqLu##q|ZvKj51n z9^h%jUjD^Z_EC_oCp0)kO%Th5B55hSUE_429QH_aU=N_o{g`{W1UJz{xezx(- zR@477_I7XvW6$Uf#!o_YV^7wctMz*E5U$|@lBbL{ZoT*96#|D|H2{7IXPKCt^h=O8 zZGzuZTS?E9d4I6Xrd{A~zFeQfSKSWK&A#mPR5?fP7T)15Ei2&+>x)7zc9C~=e>D4# z2*}SkgDL2tatT}bkr8U5MH0HfZ*` zTbLtAp31DtQ5k$8adKFSC_&PXlK!{ePlK&EXRyF{J$}eN;O9s#QT42e@vx1F*^;5P zEs%rk2k5o7bG$zccEeaB`%p(_G8bxSdk)UR6f3){P)AI$#O|~sRO6>YYau7f0QM(w z!bIJ0CW9^z*8n5^bttnn1^d??t_7;0+PWiDB{7Nn58}aHMPFksh8oPpJE1Scoa8S+ z?;o}&l%}nr$1bpOh+(MZ6R`@hTOyvy>E11PgRM4bwY|D77KHp4b+9q;Lp`>)w(Dox z77KPxI3tZ52z5+Ns{`yD)(v=X0QVr^dkM}MxHrK5q~&g-gFon(6a3LF?L0~)~qBD7|aQ1KY7V6KyMFw)28M5 z)rd{45%K;5WWoKlur7r#F7WOH`WV<}v53N`K2Gz#ZMc6~((WIEY0jA>dv1Acaf;*t zFI`+A=R*bJD_L32Kwe$gf?xF>g^7F02?wsBtD%KOhy$ANQF9UvIfkZjU(+5)LA1w) z`6ocbd9jl8(zJI9(^4AK(^3WA=`6la;x$&N0%^)i&XYL4d+~i?2I*DtD@gKL79Use zi(Hj0Y)!6CPQr@^FUR16leF^)pOBF%kCv1074g7;9BS*6o9cRRilFe2Yh4fXXqwa% zWIFBD<}c8T3vM|70K4cVli;!rU+_!)kn^$%{VK)-O)oFwd?F9G>YSX!JGv7gLe)nW zs0{GYgE+X8(rQ9FR<6(YChGFi+hh?>=95SA1$TZl-B#cg$4c@h&{3}FXcvz$Zt4G`PB|yp(~Q z%it58Lw>|Qo+Lb} z{M7jBj1RbvrfIk<#@-a3JS&a)ff_s#|ETsMlC|y!Qm?e$0pk5O?1vNhtL>Zxx Date: Tue, 9 Apr 2024 00:14:51 +0000 Subject: [PATCH 2/4] cleanup --- packages/arrow/ArrowSamplerImpl.cpp | 151 +++++++++++++++------------- packages/arrow/ArrowSamplerImpl.h | 8 +- packages/arrow/ParquetSampler.cpp | 40 +++++--- packages/arrow/ParquetSampler.h | 8 +- 4 files changed, 115 insertions(+), 92 deletions(-) diff --git a/packages/arrow/ArrowSamplerImpl.cpp b/packages/arrow/ArrowSamplerImpl.cpp index 834f8a7b7..d233750a8 100644 --- a/packages/arrow/ArrowSamplerImpl.cpp +++ b/packages/arrow/ArrowSamplerImpl.cpp @@ -47,6 +47,7 @@ #include #include #include +#include /****************************************************************************** @@ -78,11 +79,12 @@ void ArrowSamplerImpl::openInputFile(const char* file_path) } /*---------------------------------------------------------------------------- -* getMetadata +* getInputFileMetadata *----------------------------------------------------------------------------*/ -void ArrowSamplerImpl::getMetadata(ParquetSampler::record_info_t& recInfo) +void ArrowSamplerImpl::getInputFileMetadata(ParquetSampler::record_info_t& recInfo) { bool foundRecInfo = false; + std::shared_ptr file_metadata = reader->parquet_reader()->metadata(); for(int i = 0; i < file_metadata->key_value_metadata()->size(); i++) @@ -122,17 +124,19 @@ void ArrowSamplerImpl::getMetadata(ParquetSampler::record_info_t& recInfo) /*---------------------------------------------------------------------------- -* getpoints +* getInputFilePoints *----------------------------------------------------------------------------*/ -void ArrowSamplerImpl::getPoints(ParquetSampler::record_info_t& recInfo, std::vector& points) +void ArrowSamplerImpl::getInputFilePoints(std::vector& points) { + const ParquetSampler::record_info_t& recInfo = parquetSampler->getRecInfo(); + if(recInfo.asGeo) { getGeoPoints(points); } else { - getXYPoints(recInfo, points); + getXYPoints(points); } std::vector columnNames = {recInfo.timeKey}; @@ -152,68 +156,6 @@ void ArrowSamplerImpl::getPoints(ParquetSampler::record_info_t& recInfo, std::ve else mlog(DEBUG, "Time column not found."); } -/*---------------------------------------------------------------------------- -* getXYPoints -*----------------------------------------------------------------------------*/ -void ArrowSamplerImpl::getXYPoints(ParquetSampler::record_info_t& recInfo, std::vector& points) -{ - std::vector columnNames = {recInfo.xKey, recInfo.yKey}; - - std::shared_ptr table = inputFileToTable(columnNames); - int x_column_index = table->schema()->GetFieldIndex(recInfo.xKey); - if(x_column_index == -1) - { - throw RunTimeException(ERROR, RTE_ERROR, "X column not found."); - } - - int y_column_index = table->schema()->GetFieldIndex(recInfo.yKey); - if(y_column_index == -1) - { - throw RunTimeException(ERROR, RTE_ERROR, "Y column not found."); - } - - auto x_column = std::static_pointer_cast(table->column(x_column_index)->chunk(0)); - auto y_column = std::static_pointer_cast(table->column(y_column_index)->chunk(0)); - - /* x and y columns are the same longth */ - for(int64_t i = 0; i < x_column->length(); i++) - { - points[i]->x = x_column->Value(i); - points[i]->y = y_column->Value(i); - } -} - -/*---------------------------------------------------------------------------- -* getGeoPoints -*----------------------------------------------------------------------------*/ -void ArrowSamplerImpl::getGeoPoints(std::vector& points) -{ - const char* geocol = "geometry"; - std::vector columnNames = {geocol}; - - std::shared_ptr table = inputFileToTable(columnNames); - int geometry_column_index = table->schema()->GetFieldIndex(geocol); - if(geometry_column_index == -1) - { - throw RunTimeException(ERROR, RTE_ERROR, "Geometry column not found."); - } - - auto geometry_column = std::static_pointer_cast(table->column(geometry_column_index)->chunk(0)); - mlog(DEBUG, "Geometry column elements: %ld", geometry_column->length()); - - /* The geometry column is binary type */ - auto binary_array = std::static_pointer_cast(geometry_column); - - /* Iterate over each item in the geometry column and extract points */ - for(int64_t i = 0; i < binary_array->length(); i++) - { - std::string wkb_data = binary_array->GetString(i); /* Get WKB data as string (binary data) */ - wkbpoint_t point = convertWKBToPoint(wkb_data); - ParquetSampler::point_info_t* pinfo = new ParquetSampler::point_info_t({point.x, point.y, 0.0}); - points.push_back(pinfo); - } -} - /*---------------------------------------------------------------------------- * processSamples *----------------------------------------------------------------------------*/ @@ -406,6 +348,15 @@ void ArrowSamplerImpl::createOutpuFile(void) { const ArrowParms* parms = parquetSampler->getParms(); + if(std::filesystem::exists(parms->path)) + { + int rc = std::remove(parms->path); + if(rc != 0) + { + mlog(CRITICAL, "Failed (%d) to delete file %s: %s", rc, parms->path, strerror(errno)); + } + } + auto table = inputFileToTable(); auto updated_table = appendSamplesColumns(table); @@ -733,4 +684,68 @@ void ArrowSamplerImpl::tableMetadataToJson(const std::shared_ptr t fclose(jsonFile); } else throw RunTimeException(CRITICAL, RTE_ERROR, "Failed to open metadata file: %s", file_path); -} \ No newline at end of file +} + +/*---------------------------------------------------------------------------- +* getXYPoints +*----------------------------------------------------------------------------*/ +void ArrowSamplerImpl::getXYPoints(std::vector& points) +{ + const ParquetSampler::record_info_t& recInfo = parquetSampler->getRecInfo(); + std::vector columnNames = {recInfo.xKey, recInfo.yKey}; + + std::shared_ptr table = inputFileToTable(columnNames); + int x_column_index = table->schema()->GetFieldIndex(recInfo.xKey); + if(x_column_index == -1) + { + throw RunTimeException(ERROR, RTE_ERROR, "X column not found."); + } + + int y_column_index = table->schema()->GetFieldIndex(recInfo.yKey); + if(y_column_index == -1) + { + throw RunTimeException(ERROR, RTE_ERROR, "Y column not found."); + } + + auto x_column = std::static_pointer_cast(table->column(x_column_index)->chunk(0)); + auto y_column = std::static_pointer_cast(table->column(y_column_index)->chunk(0)); + + /* x and y columns are the same longth */ + for(int64_t i = 0; i < x_column->length(); i++) + { + points[i]->x = x_column->Value(i); + points[i]->y = y_column->Value(i); + } +} + +/*---------------------------------------------------------------------------- +* getGeoPoints +*----------------------------------------------------------------------------*/ +void ArrowSamplerImpl::getGeoPoints(std::vector& points) +{ + const char* geocol = "geometry"; + std::vector columnNames = {geocol}; + + std::shared_ptr table = inputFileToTable(columnNames); + int geometry_column_index = table->schema()->GetFieldIndex(geocol); + if(geometry_column_index == -1) + { + throw RunTimeException(ERROR, RTE_ERROR, "Geometry column not found."); + } + + auto geometry_column = std::static_pointer_cast(table->column(geometry_column_index)->chunk(0)); + mlog(DEBUG, "Geometry column elements: %ld", geometry_column->length()); + + /* The geometry column is binary type */ + auto binary_array = std::static_pointer_cast(geometry_column); + + /* Iterate over each item in the geometry column and extract points */ + for(int64_t i = 0; i < binary_array->length(); i++) + { + std::string wkb_data = binary_array->GetString(i); /* Get WKB data as string (binary data) */ + wkbpoint_t point = convertWKBToPoint(wkb_data); + ParquetSampler::point_info_t* pinfo = new ParquetSampler::point_info_t({point.x, point.y, 0.0}); + points.push_back(pinfo); + } +} + diff --git a/packages/arrow/ArrowSamplerImpl.h b/packages/arrow/ArrowSamplerImpl.h index fe347928c..054d10170 100644 --- a/packages/arrow/ArrowSamplerImpl.h +++ b/packages/arrow/ArrowSamplerImpl.h @@ -61,10 +61,8 @@ class ArrowSamplerImpl ~ArrowSamplerImpl (void); void openInputFile (const char* file_path); - void getMetadata (ParquetSampler::record_info_t& recInfo); - void getPoints (ParquetSampler::record_info_t& recInfo, std::vector& points); - void getXYPoints (ParquetSampler::record_info_t& recInfo, std::vector& points); - void getGeoPoints (std::vector& points); + void getInputFileMetadata (ParquetSampler::record_info_t& recInfo); + void getInputFilePoints (std::vector& points); bool processSamples (ParquetSampler::sampler_t* sampler); void createOutpuFile (void); @@ -104,6 +102,8 @@ class ArrowSamplerImpl std::string createFileMap (void); std::string createMetadataFileName (const char* file_path); void tableMetadataToJson (const std::shared_ptr table, const char* file_path); + void getXYPoints (std::vector& points); + void getGeoPoints (std::vector& points); }; #endif /* __arrow_sampler_impl__ */ diff --git a/packages/arrow/ParquetSampler.cpp b/packages/arrow/ParquetSampler.cpp index 1cdc44d4f..3fe8e0190 100644 --- a/packages/arrow/ParquetSampler.cpp +++ b/packages/arrow/ParquetSampler.cpp @@ -36,7 +36,6 @@ #include "core.h" #include "ParquetSampler.h" #include "ArrowSamplerImpl.h" -#include /****************************************************************************** @@ -152,6 +151,26 @@ ParquetSampler::Sampler::~Sampler(void) } +/*---------------------------------------------------------------------------- + * RecordInfo Constructor + *----------------------------------------------------------------------------*/ +ParquetSampler::RecordInfo::RecordInfo(void) : + timeKey(NULL), xKey(NULL), yKey(NULL), asGeo(false) +{ +} + + +/*---------------------------------------------------------------------------- + * RecordInfo Destructor + *----------------------------------------------------------------------------*/ +ParquetSampler::RecordInfo::~RecordInfo(void) +{ + delete[] timeKey; + delete[] xKey; + delete[] yKey; +} + + /*---------------------------------------------------------------------------- * clearSamples *----------------------------------------------------------------------------*/ @@ -176,25 +195,14 @@ void ParquetSampler::sample(void) if(alreadySampled) return; alreadySampled = true; - const char* outputPath = parms->path; - - if(std::filesystem::exists(outputPath)) - { - int rc = std::remove(outputPath); - if(rc != 0) - { - mlog(CRITICAL, "Failed (%d) to delete file %s: %s", rc, outputPath, strerror(errno)); - } - } - - /* Start Sampler Threads */ + /* Start sampling threads */ for(sampler_t* sampler : samplers) { Thread* pid = new Thread(samplerThread, sampler); samplerPids.push_back(pid); } - /* Wait for all sampler threads to finish */ + /* Wait for all sampling threads to finish */ for(Thread* pid : samplerPids) { delete pid; @@ -257,8 +265,8 @@ ParquetSampler::ParquetSampler(lua_State* L, ArrowParms* _parms, const char* inp impl = new ArrowSamplerImpl(this); impl->openInputFile(input_file); - impl->getMetadata(recInfo); - impl->getPoints(recInfo, points); + impl->getInputFileMetadata(recInfo); + impl->getInputFilePoints(points); } catch(const RunTimeException& e) { diff --git a/packages/arrow/ParquetSampler.h b/packages/arrow/ParquetSampler.h index ca966627d..860270b35 100644 --- a/packages/arrow/ParquetSampler.h +++ b/packages/arrow/ParquetSampler.h @@ -82,7 +82,7 @@ class ParquetSampler: public LuaObject RasterObject* robj; } raster_info_t; - typedef struct PointInfo + typedef struct { double x; double y; @@ -111,8 +111,8 @@ class ParquetSampler: public LuaObject const char* yKey; bool asGeo; - explicit RecordInfo (void): timeKey(NULL), xKey(NULL), yKey(NULL), asGeo(false) {} - ~RecordInfo (void) {delete [] timeKey; delete [] xKey; delete [] yKey;} + explicit RecordInfo (void); + ~RecordInfo (void); } record_info_t; /*-------------------------------------------------------------------- @@ -127,7 +127,7 @@ class ParquetSampler: public LuaObject const ArrowParms* getParms (void) {return parms;} const std::vector& getSamplers (void) {return samplers;} - + const record_info_t& getRecInfo (void) {return recInfo;} private: /*-------------------------------------------------------------------- From 9e6b48dc83d088a2e6e75c3640d3ea460e82dd16 Mon Sep 17 00:00:00 2001 From: Eric Lidwa Date: Tue, 9 Apr 2024 20:43:48 +0000 Subject: [PATCH 3/4] writing arrow csv implemented with one sample for lists --- packages/arrow/ArrowSamplerImpl.cpp | 709 +++++++++++++++++--------- packages/arrow/ArrowSamplerImpl.h | 41 +- packages/arrow/ParquetSampler.cpp | 25 +- packages/arrow/ParquetSampler.h | 15 +- packages/geo/GdalRaster.cpp | 2 +- scripts/selftests/parquet_sampler.lua | 95 +++- 6 files changed, 565 insertions(+), 322 deletions(-) diff --git a/packages/arrow/ArrowSamplerImpl.cpp b/packages/arrow/ArrowSamplerImpl.cpp index d233750a8..08c64bedc 100644 --- a/packages/arrow/ArrowSamplerImpl.cpp +++ b/packages/arrow/ArrowSamplerImpl.cpp @@ -58,7 +58,13 @@ * Constructors *----------------------------------------------------------------------------*/ ArrowSamplerImpl::ArrowSamplerImpl (ParquetSampler* _sampler): - parquetSampler(_sampler) + parquetSampler(_sampler), + inputFile(NULL), + reader(nullptr), + timeKey(NULL), + xKey(NULL), + yKey(NULL), + asGeo(false) { } @@ -67,23 +73,100 @@ ArrowSamplerImpl::ArrowSamplerImpl (ParquetSampler* _sampler): *----------------------------------------------------------------------------*/ ArrowSamplerImpl::~ArrowSamplerImpl (void) { + delete[] timeKey; + delete[] xKey; + delete[] yKey; } /*---------------------------------------------------------------------------- -* openInputFile +* processInputFile *----------------------------------------------------------------------------*/ -void ArrowSamplerImpl::openInputFile(const char* file_path) +void ArrowSamplerImpl::processInputFile(const char* file_path, std::vector& points) { + /* Open the input file */ PARQUET_ASSIGN_OR_THROW(inputFile, arrow::io::ReadableFile::Open(file_path, arrow::default_memory_pool())); PARQUET_THROW_NOT_OK(parquet::arrow::OpenFile(inputFile, arrow::default_memory_pool(), &reader)); + + getMetadata(); + getPoints(points); +} + +/*---------------------------------------------------------------------------- +* processSamples +*----------------------------------------------------------------------------*/ +bool ArrowSamplerImpl::processSamples(ParquetSampler::sampler_t* sampler) +{ + const ArrowParms* parms = parquetSampler->getParms(); + bool status = false; + + try + { + if(parms->format == ArrowParms::PARQUET) + status = makeColumnsWithLists(sampler); + + /* Arrow csv writer cannot handle columns with lists of samples */ + if(parms->format == ArrowParms::CSV) + status = makeColumnsWithOneSample(sampler); + } + catch(const RunTimeException& e) + { + /* No columns will be added */ + newFields.clear(); + newColumns.clear(); + mlog(e.level(), "Error processing samples: %s", e.what()); + } + + return status; +} + +/*---------------------------------------------------------------------------- +* createOutputFile +*----------------------------------------------------------------------------*/ +void ArrowSamplerImpl::createOutpuFile(void) +{ + const ArrowParms* parms = parquetSampler->getParms(); + + if(std::filesystem::exists(parms->path)) + { + int rc = std::remove(parms->path); + if(rc != 0) + { + mlog(CRITICAL, "Failed (%d) to delete file %s: %s", rc, parms->path, strerror(errno)); + } + } + + auto table = inputFileToTable(); + auto updated_table = addNewColumns(table); + table = nullptr; + + if(parms->format == ArrowParms::PARQUET) + { + tableToParquetFile(updated_table, parms->path); + } + else if(parms->format == ArrowParms::CSV) + { + /* Arrow csv writer cannot handle columns with WKB data */ + table = removeGeometryColumn(updated_table); + + tableToCsvFile(table, parms->path); + + /* Generate metadata file since Arrow csv writer ignores it */ + std::string mfile = createMetadataFileName(parms->path); + tableMetadataToJson(table, mfile.c_str()); + } + else throw RunTimeException(CRITICAL, RTE_ERROR, "Unsupported file format"); } +/****************************************************************************** + * PRIVATE METHODS + ******************************************************************************/ + /*---------------------------------------------------------------------------- -* getInputFileMetadata +* getMetadata *----------------------------------------------------------------------------*/ -void ArrowSamplerImpl::getInputFileMetadata(ParquetSampler::record_info_t& recInfo) +void ArrowSamplerImpl::getMetadata(void) { - bool foundRecInfo = false; + bool foundIt = false; std::shared_ptr file_metadata = reader->parquet_reader()->metadata(); @@ -105,43 +188,49 @@ void ArrowSamplerImpl::getInputFileMetadata(ParquetSampler::record_info_t& recIn { const rapidjson::Value& recordinfo = doc["recordinfo"]; - recInfo.timeKey = StringLib::duplicate(recordinfo["time"].GetString()); - recInfo.asGeo = recordinfo["as_geo"].GetBool(); - recInfo.xKey = StringLib::duplicate(recordinfo["x"].GetString()); - recInfo.yKey = StringLib::duplicate(recordinfo["y"].GetString()); - foundRecInfo = true; + const char* _timeKey = recordinfo["time"].GetString(); + const char* _xKey = recordinfo["x"].GetString(); + const char* _yKey = recordinfo["y"].GetString(); + const bool _asGeo = recordinfo["as_geo"].GetBool(); + + /* Make sure the keys are not empty */ + if(_timeKey[0] == '\0' || _xKey[0] == '\0' || _yKey[0] == '\0') + { + throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid recordinfo in sliderule metadata."); + } + + timeKey = StringLib::duplicate(_timeKey); + xKey = StringLib::duplicate(_xKey); + yKey = StringLib::duplicate(_yKey); + asGeo = _asGeo; + foundIt = true; break; } else throw RunTimeException(CRITICAL, RTE_ERROR, "No 'recordinfo' key found in 'sliderule' metadata."); } } - if(!foundRecInfo) + if(!foundIt) { - throw RunTimeException(CRITICAL, RTE_ERROR, "No 'sliderule/recordinfo' metadata found in parquet file."); + throw RunTimeException(CRITICAL, RTE_ERROR, "No 'sliderule' metadata found in parquet file."); } } /*---------------------------------------------------------------------------- -* getInputFilePoints +* getPoints *----------------------------------------------------------------------------*/ -void ArrowSamplerImpl::getInputFilePoints(std::vector& points) +void ArrowSamplerImpl::getPoints(std::vector& points) { - const ParquetSampler::record_info_t& recInfo = parquetSampler->getRecInfo(); - - if(recInfo.asGeo) - { + if(asGeo) getGeoPoints(points); - } else - { getXYPoints(points); - } - std::vector columnNames = {recInfo.timeKey}; + /* Get point's gps from time column */ + std::vector columnNames = {timeKey}; std::shared_ptr table = inputFileToTable(columnNames); - int time_column_index = table->schema()->GetFieldIndex(recInfo.timeKey); + int time_column_index = table->schema()->GetFieldIndex(timeKey); if(time_column_index > -1) { auto time_column = std::static_pointer_cast(table->column(time_column_index)->chunk(0)); @@ -149,17 +238,149 @@ void ArrowSamplerImpl::getInputFilePoints(std::vectorlength(); i++) - { points[i]->gps = time_column->Value(i); - } } else mlog(DEBUG, "Time column not found."); } /*---------------------------------------------------------------------------- -* processSamples +* getXYPoints *----------------------------------------------------------------------------*/ -bool ArrowSamplerImpl::processSamples(ParquetSampler::sampler_t* sampler) +void ArrowSamplerImpl::getXYPoints(std::vector& points) +{ + std::vector columnNames = {xKey, yKey}; + + std::shared_ptr table = inputFileToTable(columnNames); + int x_column_index = table->schema()->GetFieldIndex(xKey); + if(x_column_index == -1) throw RunTimeException(ERROR, RTE_ERROR, "X column not found."); + + int y_column_index = table->schema()->GetFieldIndex(yKey); + if(y_column_index == -1) throw RunTimeException(ERROR, RTE_ERROR, "Y column not found."); + + auto x_column = std::static_pointer_cast(table->column(x_column_index)->chunk(0)); + auto y_column = std::static_pointer_cast(table->column(y_column_index)->chunk(0)); + + /* x and y columns are the same longth */ + for(int64_t i = 0; i < x_column->length(); i++) + { + double x = x_column->Value(i); + double y = y_column->Value(i); + + ParquetSampler::point_info_t* pinfo = new ParquetSampler::point_info_t({x, y, 0.0}); + points.push_back(pinfo); + } +} + +/*---------------------------------------------------------------------------- +* getGeoPoints +*----------------------------------------------------------------------------*/ +void ArrowSamplerImpl::getGeoPoints(std::vector& points) +{ + const char* geocol = "geometry"; + std::vector columnNames = {geocol}; + + std::shared_ptr table = inputFileToTable(columnNames); + int geometry_column_index = table->schema()->GetFieldIndex(geocol); + if(geometry_column_index == -1) throw RunTimeException(ERROR, RTE_ERROR, "Geometry column not found."); + + auto geometry_column = std::static_pointer_cast(table->column(geometry_column_index)->chunk(0)); + mlog(DEBUG, "Geometry column elements: %ld", geometry_column->length()); + + /* The geometry column is binary type */ + auto binary_array = std::static_pointer_cast(geometry_column); + + /* Iterate over each item in the geometry column and extract points */ + for(int64_t i = 0; i < binary_array->length(); i++) + { + std::string wkb_data = binary_array->GetString(i); /* Get WKB data as string (binary data) */ + wkbpoint_t point = convertWKBToPoint(wkb_data); + ParquetSampler::point_info_t* pinfo = new ParquetSampler::point_info_t({point.x, point.y, 0.0}); + points.push_back(pinfo); + } +} + +/*---------------------------------------------------------------------------- +* inputFileToTable +*----------------------------------------------------------------------------*/ +std::shared_ptr ArrowSamplerImpl::inputFileToTable(const std::vector& columnNames) +{ + /* If columnNames is empty, read all columns */ + if(columnNames.size() == 0) + { + std::shared_ptr table; + PARQUET_THROW_NOT_OK(reader->ReadTable(&table)); + return table; + } + + /* Read only the specified columns */ + std::shared_ptr schema; + PARQUET_THROW_NOT_OK(reader->GetSchema(&schema)); + std::vector columnIndices; + for(const auto& columnName : columnNames) + { + auto index = schema->GetFieldIndex(columnName); + if(index != -1) + { + columnIndices.push_back(index); + } + else mlog(DEBUG, "Column %s not found in parquet file.", columnName); + } + + std::shared_ptr table; + PARQUET_THROW_NOT_OK(reader->ReadTable(columnIndices, &table)); + return table; +} + +/*---------------------------------------------------------------------------- +* addNewColumns +*----------------------------------------------------------------------------*/ +std::shared_ptr ArrowSamplerImpl::addNewColumns(const std::shared_ptr table) +{ + std::vector> fields = table->schema()->fields(); + std::vector> columns = table->columns(); + + /* Append new columns to the table */ + mutex.lock(); + { + fields.insert(fields.end(), newFields.begin(), newFields.end()); + columns.insert(columns.end(), newColumns.begin(), newColumns.end()); + } + mutex.unlock(); + + auto metadata = table->schema()->metadata()->Copy(); + + /* + * Pandas metadata does not contain information about the new columns. + * Pandas and geopands can use/read the file just fine without pandas custom metadata. + * Removing it is a lot easier than updating it. + */ + const std::string pandas_key = "pandas"; + if(metadata->Contains(pandas_key)) + { + int key_index = metadata->FindKey(pandas_key); + if(key_index != -1) + { + PARQUET_THROW_NOT_OK(metadata->Delete(key_index)); + } + } + + /* Create a filemap metadata */ + PARQUET_THROW_NOT_OK(metadata->Set("filemap", createFileMap())); + + /* Attach metadata to the new schema */ + auto combined_schema = std::make_shared(fields); + combined_schema = combined_schema->WithMetadata(metadata); + + /* Create a new table with the combined schema and columns */ + auto updated_table = arrow::Table::Make(combined_schema, columns); + + return updated_table; +} + +/*---------------------------------------------------------------------------- +* makeColumnsWithLists +*----------------------------------------------------------------------------*/ +bool ArrowSamplerImpl::makeColumnsWithLists(ParquetSampler::sampler_t* sampler) { auto pool = arrow::default_memory_pool(); RasterObject* robj = sampler->robj; @@ -202,81 +423,75 @@ bool ArrowSamplerImpl::processSamples(ParquetSampler::sampler_t* sampler) std::shared_ptr value_list_array, time_list_array, fileid_list_array, flags_list_array; std::shared_ptr count_list_array, min_list_array, max_list_array, mean_list_array, median_list_array, stdev_list_array, mad_list_array; - try + /* Iterate over each sample in a vector of lists of samples */ + for(ParquetSampler::sample_list_t* slist : sampler->samples) { - /* Iterate over each sample in a vector of lists of samples */ - for(ParquetSampler::sample_list_t* slist : sampler->samples) + /* Start new lists */ + PARQUET_THROW_NOT_OK(value_list_builder.Append()); + PARQUET_THROW_NOT_OK(time_list_builder.Append()); + if(robj->hasFlags()) + { + PARQUET_THROW_NOT_OK(flags_list_builder.Append()); + } + PARQUET_THROW_NOT_OK(fileid_list_builder.Append()); + if(robj->hasZonalStats()) + { + PARQUET_THROW_NOT_OK(count_list_builder.Append()); + PARQUET_THROW_NOT_OK(min_list_builder.Append()); + PARQUET_THROW_NOT_OK(max_list_builder.Append()); + PARQUET_THROW_NOT_OK(mean_list_builder.Append()); + PARQUET_THROW_NOT_OK(median_list_builder.Append()); + PARQUET_THROW_NOT_OK(stdev_list_builder.Append()); + PARQUET_THROW_NOT_OK(mad_list_builder.Append()); + } + + /* Iterate over each sample and add it to the list + * If slist is empty the column will contain an empty list + * to keep the number of rows consistent with the other columns + */ + for(RasterSample* sample : *slist) { - /* Start a new lists for each RasterSample */ - PARQUET_THROW_NOT_OK(value_list_builder.Append()); - PARQUET_THROW_NOT_OK(time_list_builder.Append()); + /* Append the value to the value list */ + PARQUET_THROW_NOT_OK(value_builder->Append(sample->value)); + PARQUET_THROW_NOT_OK(time_builder->Append(sample->time)); if(robj->hasFlags()) { - PARQUET_THROW_NOT_OK(flags_list_builder.Append()); + PARQUET_THROW_NOT_OK(flags_builder->Append(sample->flags)); } - PARQUET_THROW_NOT_OK(fileid_list_builder.Append()); + PARQUET_THROW_NOT_OK(fileid_builder->Append(sample->fileId)); if(robj->hasZonalStats()) { - PARQUET_THROW_NOT_OK(count_list_builder.Append()); - PARQUET_THROW_NOT_OK(min_list_builder.Append()); - PARQUET_THROW_NOT_OK(max_list_builder.Append()); - PARQUET_THROW_NOT_OK(mean_list_builder.Append()); - PARQUET_THROW_NOT_OK(median_list_builder.Append()); - PARQUET_THROW_NOT_OK(stdev_list_builder.Append()); - PARQUET_THROW_NOT_OK(mad_list_builder.Append()); + PARQUET_THROW_NOT_OK(count_builder->Append(sample->stats.count)); + PARQUET_THROW_NOT_OK(min_builder->Append(sample->stats.min)); + PARQUET_THROW_NOT_OK(max_builder->Append(sample->stats.max)); + PARQUET_THROW_NOT_OK(mean_builder->Append(sample->stats.mean)); + PARQUET_THROW_NOT_OK(median_builder->Append(sample->stats.median)); + PARQUET_THROW_NOT_OK(stdev_builder->Append(sample->stats.stdev)); + PARQUET_THROW_NOT_OK(mad_builder->Append(sample->stats.mad)); } - /* Iterate over each sample */ - for(RasterSample* sample : *slist) - { - /* Append the value to the value list */ - PARQUET_THROW_NOT_OK(value_builder->Append(sample->value)); - PARQUET_THROW_NOT_OK(time_builder->Append(sample->time)); - if(robj->hasFlags()) - { - PARQUET_THROW_NOT_OK(flags_builder->Append(sample->flags)); - } - PARQUET_THROW_NOT_OK(fileid_builder->Append(sample->fileId)); - if(robj->hasZonalStats()) - { - PARQUET_THROW_NOT_OK(count_builder->Append(sample->stats.count)); - PARQUET_THROW_NOT_OK(min_builder->Append(sample->stats.min)); - PARQUET_THROW_NOT_OK(max_builder->Append(sample->stats.max)); - PARQUET_THROW_NOT_OK(mean_builder->Append(sample->stats.mean)); - PARQUET_THROW_NOT_OK(median_builder->Append(sample->stats.median)); - PARQUET_THROW_NOT_OK(stdev_builder->Append(sample->stats.stdev)); - PARQUET_THROW_NOT_OK(mad_builder->Append(sample->stats.mad)); - } - - /* Collect all fileIds used by samples - duplicates are ignored */ - sampler->file_ids.insert(sample->fileId); - } + /* Collect all fileIds used by samples - duplicates are ignored */ + sampler->file_ids.insert(sample->fileId); } + } - /* Finish the list builders */ - PARQUET_THROW_NOT_OK(value_list_builder.Finish(&value_list_array)); - PARQUET_THROW_NOT_OK(time_list_builder.Finish(&time_list_array)); - if(robj->hasFlags()) - { - PARQUET_THROW_NOT_OK(flags_list_builder.Finish(&flags_list_array)); - } - PARQUET_THROW_NOT_OK(fileid_list_builder.Finish(&fileid_list_array)); - if(robj->hasZonalStats()) - { - PARQUET_THROW_NOT_OK(count_list_builder.Finish(&count_list_array)); - PARQUET_THROW_NOT_OK(min_list_builder.Finish(&min_list_array)); - PARQUET_THROW_NOT_OK(max_list_builder.Finish(&max_list_array)); - PARQUET_THROW_NOT_OK(mean_list_builder.Finish(&mean_list_array)); - PARQUET_THROW_NOT_OK(median_list_builder.Finish(&median_list_array)); - PARQUET_THROW_NOT_OK(stdev_list_builder.Finish(&stdev_list_array)); - PARQUET_THROW_NOT_OK(mad_list_builder.Finish(&mad_list_array)); - } + /* Finish the list builders */ + PARQUET_THROW_NOT_OK(value_list_builder.Finish(&value_list_array)); + PARQUET_THROW_NOT_OK(time_list_builder.Finish(&time_list_array)); + if(robj->hasFlags()) + { + PARQUET_THROW_NOT_OK(flags_list_builder.Finish(&flags_list_array)); } - catch(const RunTimeException& e) + PARQUET_THROW_NOT_OK(fileid_list_builder.Finish(&fileid_list_array)); + if(robj->hasZonalStats()) { - /* No columns will be added */ - mlog(e.level(), "Error processing samples: %s", e.what()); - return false; + PARQUET_THROW_NOT_OK(count_list_builder.Finish(&count_list_array)); + PARQUET_THROW_NOT_OK(min_list_builder.Finish(&min_list_array)); + PARQUET_THROW_NOT_OK(max_list_builder.Finish(&max_list_array)); + PARQUET_THROW_NOT_OK(mean_list_builder.Finish(&mean_list_array)); + PARQUET_THROW_NOT_OK(median_list_builder.Finish(&median_list_array)); + PARQUET_THROW_NOT_OK(stdev_list_builder.Finish(&stdev_list_array)); + PARQUET_THROW_NOT_OK(mad_list_builder.Finish(&mad_list_array)); } const std::string prefix = sampler->rkey; @@ -295,7 +510,9 @@ bool ArrowSamplerImpl::processSamples(ParquetSampler::sampler_t* sampler) auto stdev_field = std::make_shared(prefix + ".stats.stdev", arrow::list(arrow::float64())); auto mad_field = std::make_shared(prefix + ".stats.mad", arrow::list(arrow::float64())); - /* Multiple threads may be updating the new fields and columns */ + /* Multiple threads may be updating the new fields and columns + * No throwing exceptions here, since the mutex is locked + */ mutex.lock(); { /* Add new columns fields */ @@ -317,7 +534,7 @@ bool ArrowSamplerImpl::processSamples(ParquetSampler::sampler_t* sampler) newFields.push_back(mad_field); } - /* Add new columns data */ + /* Add new columns */ newColumns.push_back(std::make_shared(value_list_array)); newColumns.push_back(std::make_shared(time_list_array)); if(robj->hasFlags()) @@ -342,122 +559,169 @@ bool ArrowSamplerImpl::processSamples(ParquetSampler::sampler_t* sampler) } /*---------------------------------------------------------------------------- -* createOutputFile +* makeColumnsWithOneSample *----------------------------------------------------------------------------*/ -void ArrowSamplerImpl::createOutpuFile(void) +bool ArrowSamplerImpl::makeColumnsWithOneSample(ParquetSampler::sampler_t* sampler) { - const ArrowParms* parms = parquetSampler->getParms(); + auto pool = arrow::default_memory_pool(); + RasterObject* robj = sampler->robj; - if(std::filesystem::exists(parms->path)) + /* Create builders for the new columns */ + arrow::DoubleBuilder value_builder(pool); + arrow::DoubleBuilder time_builder(pool); + arrow::UInt32Builder flags_builder(pool); + arrow::UInt64Builder fileid_builder(pool); + + /* Create builders for zonal stats */ + arrow::UInt32Builder count_builder(pool); + arrow::DoubleBuilder min_builder(pool); + arrow::DoubleBuilder max_builder(pool); + arrow::DoubleBuilder mean_builder(pool); + arrow::DoubleBuilder median_builder(pool); + arrow::DoubleBuilder stdev_builder(pool); + arrow::DoubleBuilder mad_builder(pool); + + std::shared_ptr value_array, time_array, fileid_array, flags_array; + std::shared_ptr count_array, min_array, max_array, mean_array, median_array, stdev_array, mad_array; + + RasterSample fakeSample(0.0, 0); + fakeSample.value = std::nan(""); + + for(ParquetSampler::sample_list_t* slist : sampler->samples) { - int rc = std::remove(parms->path); - if(rc != 0) + RasterSample* sample; + + if(slist->size() > 0) { - mlog(CRITICAL, "Failed (%d) to delete file %s: %s", rc, parms->path, strerror(errno)); + sample = getFirstValidSample(slist); + } + else + { + /* List is empty, no samples but we must have a fake sample + * to keep the number of rows consistent with the other columns + */ + sample = &fakeSample; } - } - auto table = inputFileToTable(); - auto updated_table = appendSamplesColumns(table); + PARQUET_THROW_NOT_OK(value_builder.Append(sample->value)); + PARQUET_THROW_NOT_OK(time_builder.Append(sample->time)); + if(robj->hasFlags()) + { + PARQUET_THROW_NOT_OK(flags_builder.Append(sample->flags)); + } + PARQUET_THROW_NOT_OK(fileid_builder.Append(sample->fileId)); + if(robj->hasZonalStats()) + { + PARQUET_THROW_NOT_OK(count_builder.Append(sample->stats.count)); + PARQUET_THROW_NOT_OK(min_builder.Append(sample->stats.min)); + PARQUET_THROW_NOT_OK(max_builder.Append(sample->stats.max)); + PARQUET_THROW_NOT_OK(mean_builder.Append(sample->stats.mean)); + PARQUET_THROW_NOT_OK(median_builder.Append(sample->stats.median)); + PARQUET_THROW_NOT_OK(stdev_builder.Append(sample->stats.stdev)); + PARQUET_THROW_NOT_OK(mad_builder.Append(sample->stats.mad)); + } - if(parms->format == ArrowParms::PARQUET) + /* Collect all fileIds used by samples - duplicates are ignored */ + sampler->file_ids.insert(sample->fileId); + } + + /* Finish the builders */ + PARQUET_THROW_NOT_OK(value_builder.Finish(&value_array)); + PARQUET_THROW_NOT_OK(time_builder.Finish(&time_array)); + if(robj->hasFlags()) { - tableToParquetFile(updated_table, parms->path); + PARQUET_THROW_NOT_OK(flags_builder.Finish(&flags_array)); } - else if(parms->format == ArrowParms::CSV) + PARQUET_THROW_NOT_OK(fileid_builder.Finish(&fileid_array)); + if(robj->hasZonalStats()) { - tableToCsvFile(updated_table, parms->path); - - /* Generate metadata file since arrow csv writer ignores it */ - std::string mfile = createMetadataFileName(parms->path); - tableMetadataToJson(updated_table, mfile.c_str()); + PARQUET_THROW_NOT_OK(count_builder.Finish(&count_array)); + PARQUET_THROW_NOT_OK(min_builder.Finish(&min_array)); + PARQUET_THROW_NOT_OK(max_builder.Finish(&max_array)); + PARQUET_THROW_NOT_OK(mean_builder.Finish(&mean_array)); + PARQUET_THROW_NOT_OK(median_builder.Finish(&median_array)); + PARQUET_THROW_NOT_OK(stdev_builder.Finish(&stdev_array)); + PARQUET_THROW_NOT_OK(mad_builder.Finish(&mad_array)); } - else throw RunTimeException(CRITICAL, RTE_ERROR, "Unsupported file format"); - - mlog(DEBUG, "Table was %ld rows and %d columns.", table->num_rows(), table->num_columns()); - mlog(DEBUG, "Table is %ld rows and %d columns.", updated_table->num_rows(), updated_table->num_columns()); -} -/****************************************************************************** - * PRIVATE METHODS - ******************************************************************************/ + const std::string prefix = sampler->rkey; -/*---------------------------------------------------------------------------- -* inputFileToTable -*----------------------------------------------------------------------------*/ -std::shared_ptr ArrowSamplerImpl::inputFileToTable(const std::vector& columnNames) -{ - /* If columnNames is empty, read all columns */ - if(columnNames.size() == 0) + /* Create fields for the new columns */ + auto value_field = std::make_shared(prefix + ".value", arrow::float64()); + auto time_field = std::make_shared(prefix + ".time", arrow::float64()); + auto flags_field = std::make_shared(prefix + ".flags", arrow::uint32()); + auto fileid_field = std::make_shared(prefix + ".fileid", arrow::uint64()); + + auto count_field = std::make_shared(prefix + ".stats.count", arrow::uint32()); + auto min_field = std::make_shared(prefix + ".stats.min", arrow::float64()); + auto max_field = std::make_shared(prefix + ".stats.max", arrow::float64()); + auto mean_field = std::make_shared(prefix + ".stats.mean", arrow::float64()); + auto median_field = std::make_shared(prefix + ".stats.median", arrow::float64()); + auto stdev_field = std::make_shared(prefix + ".stats.stdev", arrow::float64()); + auto mad_field = std::make_shared(prefix + ".stats.mad", arrow::float64()); + + /* Multiple threads may be updating the new fields and columns + * No throwing exceptions here, since the mutex is locked + */ + mutex.lock(); { - std::shared_ptr table; - PARQUET_THROW_NOT_OK(reader->ReadTable(&table)); - return table; - } + /* Add new columns fields */ + newFields.push_back(value_field); + newFields.push_back(time_field); + if(robj->hasFlags()) + { + newFields.push_back(flags_field); + } + newFields.push_back(fileid_field); + if(robj->hasZonalStats()) + { + newFields.push_back(count_field); + newFields.push_back(min_field); + newFields.push_back(max_field); + newFields.push_back(mean_field); + newFields.push_back(median_field); + newFields.push_back(stdev_field); + newFields.push_back(mad_field); + } - /* Read only the specified columns */ - std::shared_ptr schema; - PARQUET_THROW_NOT_OK(reader->GetSchema(&schema)); - std::vector columnIndices; - for(const auto& columnName : columnNames) - { - auto index = schema->GetFieldIndex(columnName); - if(index != -1) + /* Add new columns */ + newColumns.push_back(std::make_shared(value_array)); + newColumns.push_back(std::make_shared(time_array)); + if(robj->hasFlags()) { - columnIndices.push_back(index); + newColumns.push_back(std::make_shared(flags_array)); + } + newColumns.push_back(std::make_shared(fileid_array)); + if(robj->hasZonalStats()) + { + newColumns.push_back(std::make_shared(count_array)); + newColumns.push_back(std::make_shared(min_array)); + newColumns.push_back(std::make_shared(max_array)); + newColumns.push_back(std::make_shared(mean_array)); + newColumns.push_back(std::make_shared(median_array)); + newColumns.push_back(std::make_shared(stdev_array)); + newColumns.push_back(std::make_shared(mad_array)); } - else mlog(DEBUG, "Column %s not found in parquet file.", columnName); } + mutex.unlock(); - std::shared_ptr table; - PARQUET_THROW_NOT_OK(reader->ReadTable(columnIndices, &table)); - return table; + return true; } /*---------------------------------------------------------------------------- -* appendSamplesColumns +* getFirstValidSample *----------------------------------------------------------------------------*/ -std::shared_ptr ArrowSamplerImpl::appendSamplesColumns(const std::shared_ptr table) +RasterSample* ArrowSamplerImpl::getFirstValidSample(ParquetSampler::sample_list_t* slist) { - std::vector> fields = table->schema()->fields(); - std::vector> columns = table->columns(); - - /* Append new columns to the table */ - mutex.lock(); - { - fields.insert(fields.end(), newFields.begin(), newFields.end()); - columns.insert(columns.end(), newColumns.begin(), newColumns.end()); - } - mutex.unlock(); - - auto metadata = table->schema()->metadata()->Copy(); - - /* - * Pandas metadata does not contain information about the new columns. - * Pandas and geopands can use/read the file just fine without pandas custom metadata. - * Removing it is a lot easier than updating it. - */ - const std::string pandas_key = "pandas"; - if(metadata->Contains(pandas_key)) + for(RasterSample* sample : *slist) { - int key_index = metadata->FindKey(pandas_key); - if(key_index != -1) - { - PARQUET_THROW_NOT_OK(metadata->Delete(key_index)); - } + /* GeoRasterr code converts band nodata values to std::nan */ + if(!std::isnan(sample->value)) + return sample; } - /* Create a filemap metadata */ - PARQUET_THROW_NOT_OK(metadata->Set("filemap", createFileMap())); - - /* Attach metadata to the new schema */ - auto combined_schema = std::make_shared(fields); - combined_schema = combined_schema->WithMetadata(metadata); - - /* Create a new table with the combined schema and columns */ - auto updated_table = arrow::Table::Make(combined_schema, columns); - - return updated_table; + /* Return the first sample if no valid samples are found */ + return slist->front(); } /*---------------------------------------------------------------------------- @@ -500,6 +764,20 @@ void ArrowSamplerImpl::tableToCsvFile(const std::shared_ptr table, PARQUET_THROW_NOT_OK(outfile->Close()); } +/*---------------------------------------------------------------------------- +* removeGeometryColumn +*----------------------------------------------------------------------------*/ +std::shared_ptr ArrowSamplerImpl::removeGeometryColumn(const std::shared_ptr table) +{ + int column_index = table->schema()->GetFieldIndex("geometry"); + + if(column_index == -1) + return table; + + arrow::Result> result = table->RemoveColumn(column_index); + return result.ValueOrDie(); +} + /*---------------------------------------------------------------------------- * convertWKBToPoint *----------------------------------------------------------------------------*/ @@ -532,10 +810,7 @@ wkbpoint_t ArrowSamplerImpl::convertWKBToPoint(const std::string& wkb_data) // Little endian point.wkbType = le32toh(point.wkbType); } - else - { - throw std::runtime_error("Unknown byte order."); - } + else throw std::runtime_error("Unknown byte order."); // Next eight bytes are x coordinate std::memcpy(&point.x, wkb_data.data() + offset, sizeof(double)); @@ -684,68 +959,4 @@ void ArrowSamplerImpl::tableMetadataToJson(const std::shared_ptr t fclose(jsonFile); } else throw RunTimeException(CRITICAL, RTE_ERROR, "Failed to open metadata file: %s", file_path); -} - -/*---------------------------------------------------------------------------- -* getXYPoints -*----------------------------------------------------------------------------*/ -void ArrowSamplerImpl::getXYPoints(std::vector& points) -{ - const ParquetSampler::record_info_t& recInfo = parquetSampler->getRecInfo(); - std::vector columnNames = {recInfo.xKey, recInfo.yKey}; - - std::shared_ptr table = inputFileToTable(columnNames); - int x_column_index = table->schema()->GetFieldIndex(recInfo.xKey); - if(x_column_index == -1) - { - throw RunTimeException(ERROR, RTE_ERROR, "X column not found."); - } - - int y_column_index = table->schema()->GetFieldIndex(recInfo.yKey); - if(y_column_index == -1) - { - throw RunTimeException(ERROR, RTE_ERROR, "Y column not found."); - } - - auto x_column = std::static_pointer_cast(table->column(x_column_index)->chunk(0)); - auto y_column = std::static_pointer_cast(table->column(y_column_index)->chunk(0)); - - /* x and y columns are the same longth */ - for(int64_t i = 0; i < x_column->length(); i++) - { - points[i]->x = x_column->Value(i); - points[i]->y = y_column->Value(i); - } -} - -/*---------------------------------------------------------------------------- -* getGeoPoints -*----------------------------------------------------------------------------*/ -void ArrowSamplerImpl::getGeoPoints(std::vector& points) -{ - const char* geocol = "geometry"; - std::vector columnNames = {geocol}; - - std::shared_ptr table = inputFileToTable(columnNames); - int geometry_column_index = table->schema()->GetFieldIndex(geocol); - if(geometry_column_index == -1) - { - throw RunTimeException(ERROR, RTE_ERROR, "Geometry column not found."); - } - - auto geometry_column = std::static_pointer_cast(table->column(geometry_column_index)->chunk(0)); - mlog(DEBUG, "Geometry column elements: %ld", geometry_column->length()); - - /* The geometry column is binary type */ - auto binary_array = std::static_pointer_cast(geometry_column); - - /* Iterate over each item in the geometry column and extract points */ - for(int64_t i = 0; i < binary_array->length(); i++) - { - std::string wkb_data = binary_array->GetString(i); /* Get WKB data as string (binary data) */ - wkbpoint_t point = convertWKBToPoint(wkb_data); - ParquetSampler::point_info_t* pinfo = new ParquetSampler::point_info_t({point.x, point.y, 0.0}); - points.push_back(pinfo); - } -} - +} \ No newline at end of file diff --git a/packages/arrow/ArrowSamplerImpl.h b/packages/arrow/ArrowSamplerImpl.h index 054d10170..91abad27c 100644 --- a/packages/arrow/ArrowSamplerImpl.h +++ b/packages/arrow/ArrowSamplerImpl.h @@ -60,9 +60,7 @@ class ArrowSamplerImpl explicit ArrowSamplerImpl (ParquetSampler* _sampler); ~ArrowSamplerImpl (void); - void openInputFile (const char* file_path); - void getInputFileMetadata (ParquetSampler::record_info_t& recInfo); - void getInputFilePoints (std::vector& points); + void processInputFile (const char* file_path, std::vector& points); bool processSamples (ParquetSampler::sampler_t* sampler); void createOutpuFile (void); @@ -88,22 +86,35 @@ class ArrowSamplerImpl std::shared_ptr inputFile; std::unique_ptr reader; + char* timeKey; + char* xKey; + char* yKey; + bool asGeo; + /*-------------------------------------------------------------------- * Methods *--------------------------------------------------------------------*/ - std::shared_ptr inputFileToTable (const std::vector& columnNames = {}); - std::shared_ptr appendSamplesColumns(const std::shared_ptr table); - void tableToParquetFile (const std::shared_ptr table, const char* file_path); - void tableToCsvFile (const std::shared_ptr table, const char* file_path); - - wkbpoint_t convertWKBToPoint (const std::string& wkb_data); - void printParquetMetadata(const char* file_path); - std::string createFileMap (void); - std::string createMetadataFileName (const char* file_path); - void tableMetadataToJson (const std::shared_ptr table, const char* file_path); - void getXYPoints (std::vector& points); - void getGeoPoints (std::vector& points); + void getMetadata (void); + void getPoints (std::vector& points); + void getXYPoints (std::vector& points); + void getGeoPoints (std::vector& points); + std::shared_ptr inputFileToTable (const std::vector& columnNames = {}); + std::shared_ptr addNewColumns (const std::shared_ptr table); + bool makeColumnsWithLists (ParquetSampler::sampler_t* sampler); + bool makeColumnsWithOneSample(ParquetSampler::sampler_t* sampler); + RasterSample* getFirstValidSample (ParquetSampler::sample_list_t* slist); + void tableToParquetFile (const std::shared_ptr table, + const char* file_path); + void tableToCsvFile (const std::shared_ptr table, + const char* file_path); + std::shared_ptr removeGeometryColumn (const std::shared_ptr table); + wkbpoint_t convertWKBToPoint (const std::string& wkb_data); + void printParquetMetadata (const char* file_path); + std::string createFileMap (void); + std::string createMetadataFileName (const char* file_path); + void tableMetadataToJson (const std::shared_ptr table, + const char* file_path); }; #endif /* __arrow_sampler_impl__ */ diff --git a/packages/arrow/ParquetSampler.cpp b/packages/arrow/ParquetSampler.cpp index 3fe8e0190..4d1600e6a 100644 --- a/packages/arrow/ParquetSampler.cpp +++ b/packages/arrow/ParquetSampler.cpp @@ -150,27 +150,6 @@ ParquetSampler::Sampler::~Sampler(void) robj->releaseLuaObject(); } - -/*---------------------------------------------------------------------------- - * RecordInfo Constructor - *----------------------------------------------------------------------------*/ -ParquetSampler::RecordInfo::RecordInfo(void) : - timeKey(NULL), xKey(NULL), yKey(NULL), asGeo(false) -{ -} - - -/*---------------------------------------------------------------------------- - * RecordInfo Destructor - *----------------------------------------------------------------------------*/ -ParquetSampler::RecordInfo::~RecordInfo(void) -{ - delete[] timeKey; - delete[] xKey; - delete[] yKey; -} - - /*---------------------------------------------------------------------------- * clearSamples *----------------------------------------------------------------------------*/ @@ -264,9 +243,7 @@ ParquetSampler::ParquetSampler(lua_State* L, ArrowParms* _parms, const char* inp /* Allocate Implementation */ impl = new ArrowSamplerImpl(this); - impl->openInputFile(input_file); - impl->getInputFileMetadata(recInfo); - impl->getInputFilePoints(points); + impl->processInputFile(input_file, points); } catch(const RunTimeException& e) { diff --git a/packages/arrow/ParquetSampler.h b/packages/arrow/ParquetSampler.h index 860270b35..49b3f1ca6 100644 --- a/packages/arrow/ParquetSampler.h +++ b/packages/arrow/ParquetSampler.h @@ -104,17 +104,6 @@ class ParquetSampler: public LuaObject void clearSamples (void); } sampler_t; - typedef struct RecordInfo - { - const char* timeKey; - const char* xKey; - const char* yKey; - bool asGeo; - - explicit RecordInfo (void); - ~RecordInfo (void); - } record_info_t; - /*-------------------------------------------------------------------- * Methods *--------------------------------------------------------------------*/ @@ -124,10 +113,9 @@ class ParquetSampler: public LuaObject static void init (void); static void deinit (void); void sample (void); - const ArrowParms* getParms (void) {return parms;} const std::vector& getSamplers (void) {return samplers;} - const record_info_t& getRecInfo (void) {return recInfo;} + private: /*-------------------------------------------------------------------- @@ -140,7 +128,6 @@ class ParquetSampler: public LuaObject ArrowParms* parms; std::vector samplerPids; - record_info_t recInfo; std::vector points; std::vector samplers; ArrowSamplerImpl* impl; // private arrow data diff --git a/packages/geo/GdalRaster.cpp b/packages/geo/GdalRaster.cpp index fd6ec2b19..937d89e51 100644 --- a/packages/geo/GdalRaster.cpp +++ b/packages/geo/GdalRaster.cpp @@ -813,7 +813,7 @@ bool GdalRaster::nodataCheck(RasterSample* sample) if(std::fabs(a-b) < epsilon) { - sample->value = std::nanf(""); + sample->value = std::nan(""); return false; } diff --git a/scripts/selftests/parquet_sampler.lua b/scripts/selftests/parquet_sampler.lua index 80253cd55..43ed68219 100644 --- a/scripts/selftests/parquet_sampler.lua +++ b/scripts/selftests/parquet_sampler.lua @@ -4,8 +4,12 @@ asset = require("asset") local assets = asset.loaddir() local td = runner.rootdir(arg[0]) -local in_file = td.."atl06_10rows.geoparquet" -local out_file = td.."samples.geoparquet" +local in_geoparquet = td.."atl06_10rows.geoparquet" +local in_parquet = td.."atl06_10rows.parquet" +local out_geoparquet = td.."samples.geoparquet" +local out_parquet = td.."samples.parquet" +local out_csv = td.."samples.csv" +local out_metadata = td.."samples_metadata.json" -- console.monitor:config(core.LOG, core.DEBUG) -- sys.setlvl(core.LOG, core.DEBUG) @@ -13,7 +17,7 @@ local out_file = td.."samples.geoparquet" function getFileSize(filePath) local file = io.open(filePath, "rb") -- 'rb' mode opens the file in binary mode if not file then - print("Could not open file: " .. filePath) + print("Could not open file: " .. filePath .. " for reading\n") return nil end local fileSize = file:seek("end") -- Go to the end of the file @@ -27,32 +31,85 @@ runner.check(dem1 ~= nil) local dem2 = geo.raster(geo.parms({asset="arcticdem-strips", algorithm="NearestNeighbour", radius=0, with_flags=true, use_poi_time=true})) runner.check(dem2 ~= nil) -local parquet_sampler = arrow.parquetsampler(arrow.parms({path=out_file, format="parquet", as_geo=true}), in_file, {["mosaic"] = dem1, ["strips"] = dem2}) +print('\n--------------------------------------\nTest01: input/output geoparquet (geo)\n--------------------------------------') +local parquet_sampler = arrow.parquetsampler(arrow.parms({path=out_geoparquet, format="parquet"}), in_geoparquet, {["mosaic"] = dem1}) runner.check(parquet_sampler ~= nil) -print('\n------------------\nTest01: parquetsampler sample \n------------------') - -local in_file_size = getFileSize(in_file); -print("Input file size: " .. in_file_size .. " bytes") +local in_file_size = getFileSize(in_geoparquet); +print("Input geoparquet file size: " .. in_file_size .. " bytes") local status = parquet_sampler:sample() -local out_file_size = getFileSize(out_file); -print("Output file size: " .. out_file_size .. " bytes") -runner.check(out_file_size > in_file_size, "Outpu file size is not greater than input file size: ") +local out_file_size = getFileSize(out_geoparquet); +print("Output geoparquet file size: " .. out_file_size .. " bytes") +runner.check(out_file_size > in_file_size, "Output file size is not greater than input file size: ") + + +print('\n--------------------------------------\nTest02: input/output parquet (x, y)\n--------------------------------------') +parquet_sampler = arrow.parquetsampler(arrow.parms({path=out_parquet, format="parquet"}), in_parquet, {["mosaic"] = dem1}) +runner.check(parquet_sampler ~= nil) + +in_file_size = getFileSize(in_parquet); +print("Input parquet file size: " .. in_file_size .. " bytes") + +status = parquet_sampler:sample() +out_file_size = getFileSize(out_parquet); +print("Output parquet file size: " .. out_file_size .. " bytes") +runner.check(out_file_size > in_file_size, "Output file size is not greater than input file size: ") + +--NOTE: generated CSV files are much smaller than the input parquet/geoparquet files + +print('\n--------------------------------------\nTest03: input geoparquet, output CSV\n--------------------------------------') +parquet_sampler = arrow.parquetsampler(arrow.parms({path=out_csv, format="csv"}), in_geoparquet, {["mosaic"] = dem1}) +runner.check(parquet_sampler ~= nil) + +in_file_size = getFileSize(in_geoparquet); +print("Input geoparquet file size: " .. in_file_size .. " bytes") -print('\n------------------\nTest02: parquetsampler sample again \n------------------') +status = parquet_sampler:sample() +out_file_size = getFileSize(out_csv); +print("Output CSV file size: " .. out_file_size .. " bytes") +runner.check(out_file_size < in_file_size, "Output CSV file size is not smaller than input file size: ") +meta_file_size = getFileSize(out_metadata); +print("Output metadata file size: " .. meta_file_size .. " bytes") +runner.check(meta_file_size > 0, "Output metadata json file size is empty: ") + + +print('\n--------------------------------------\nTest04: input parquet, output CSV \n--------------------------------------') +parquet_sampler = arrow.parquetsampler(arrow.parms({path=out_csv, format="csv"}), in_parquet, {["mosaic"] = dem1, ["strips"] = dem2}) +runner.check(parquet_sampler ~= nil) + +in_file_size = getFileSize(in_parquet); +print("Input parquet file size: " .. in_file_size .. " bytes") status = parquet_sampler:sample() -new_out_file_size = getFileSize(out_file) -print("Output file size: " .. out_file_size .. " bytes") -runner.check(out_file_size == new_out_file_size, "Output file size has incorrectly changed: ") +out_file_size = getFileSize(out_csv); +print("Output CSV file size: " .. out_file_size .. " bytes") +runner.check(out_file_size < in_file_size, "Output CSV file size is not smaller than input file size: ") +meta_file_size = getFileSize(out_metadata); +print("Output metadata file size: " .. meta_file_size .. " bytes") +runner.check(meta_file_size > 0, "Output metadata json file size is empty: ") + + +print('\n--------------------------------------\nTest05: input/output geoparquet (geo)\n--------------------------------------') +local parquet_sampler = arrow.parquetsampler(arrow.parms({path=out_geoparquet, format="parquet"}), in_geoparquet, {["mosaic"] = dem1, ["strips"] = dem2}) +runner.check(parquet_sampler ~= nil) + +local in_file_size = getFileSize(in_geoparquet); +print("Input geoparquet file size: " .. in_file_size .. " bytes") + +local status = parquet_sampler:sample() +local out_file_size = getFileSize(out_geoparquet); +print("Output geoparquet file size: " .. out_file_size .. " bytes") +runner.check(out_file_size > in_file_size, "Output file size is not greater than input file size: ") --- There is no easy way to read parquet file in Lua, check the size of the output file --- the file was tested with python and it has the expected content +-- There is no easy way to read parquet file in Lua, check the size of the output files +-- the files were tested with python scripts --- Remove the output file -os.remove(out_file) +-- Remove the output files +os.remove(out_geoparquet) +os.remove(out_parquet) +os.remove(out_csv) -- Report Results -- From 7e6b4e9591725ff5c76d2443e5830f6bedf73942 Mon Sep 17 00:00:00 2001 From: Eric Lidwa Date: Tue, 9 Apr 2024 20:51:07 +0000 Subject: [PATCH 4/4] missed removing metadata file in lua test --- scripts/selftests/parquet_sampler.lua | 1 + 1 file changed, 1 insertion(+) diff --git a/scripts/selftests/parquet_sampler.lua b/scripts/selftests/parquet_sampler.lua index 43ed68219..42794fb10 100644 --- a/scripts/selftests/parquet_sampler.lua +++ b/scripts/selftests/parquet_sampler.lua @@ -110,6 +110,7 @@ runner.check(out_file_size > in_file_size, "Output file size is not greater than os.remove(out_geoparquet) os.remove(out_parquet) os.remove(out_csv) +os.remove(out_metadata) -- Report Results --