From 9e6b48dc83d088a2e6e75c3640d3ea460e82dd16 Mon Sep 17 00:00:00 2001 From: Eric Lidwa Date: Tue, 9 Apr 2024 20:43:48 +0000 Subject: [PATCH] writing arrow csv implemented with one sample for lists --- packages/arrow/ArrowSamplerImpl.cpp | 709 +++++++++++++++++--------- packages/arrow/ArrowSamplerImpl.h | 41 +- packages/arrow/ParquetSampler.cpp | 25 +- packages/arrow/ParquetSampler.h | 15 +- packages/geo/GdalRaster.cpp | 2 +- scripts/selftests/parquet_sampler.lua | 95 +++- 6 files changed, 565 insertions(+), 322 deletions(-) diff --git a/packages/arrow/ArrowSamplerImpl.cpp b/packages/arrow/ArrowSamplerImpl.cpp index d233750a8..08c64bedc 100644 --- a/packages/arrow/ArrowSamplerImpl.cpp +++ b/packages/arrow/ArrowSamplerImpl.cpp @@ -58,7 +58,13 @@ * Constructors *----------------------------------------------------------------------------*/ ArrowSamplerImpl::ArrowSamplerImpl (ParquetSampler* _sampler): - parquetSampler(_sampler) + parquetSampler(_sampler), + inputFile(NULL), + reader(nullptr), + timeKey(NULL), + xKey(NULL), + yKey(NULL), + asGeo(false) { } @@ -67,23 +73,100 @@ ArrowSamplerImpl::ArrowSamplerImpl (ParquetSampler* _sampler): *----------------------------------------------------------------------------*/ ArrowSamplerImpl::~ArrowSamplerImpl (void) { + delete[] timeKey; + delete[] xKey; + delete[] yKey; } /*---------------------------------------------------------------------------- -* openInputFile +* processInputFile *----------------------------------------------------------------------------*/ -void ArrowSamplerImpl::openInputFile(const char* file_path) +void ArrowSamplerImpl::processInputFile(const char* file_path, std::vector& points) { + /* Open the input file */ PARQUET_ASSIGN_OR_THROW(inputFile, arrow::io::ReadableFile::Open(file_path, arrow::default_memory_pool())); PARQUET_THROW_NOT_OK(parquet::arrow::OpenFile(inputFile, arrow::default_memory_pool(), &reader)); + + getMetadata(); + getPoints(points); +} + +/*---------------------------------------------------------------------------- +* processSamples +*----------------------------------------------------------------------------*/ +bool ArrowSamplerImpl::processSamples(ParquetSampler::sampler_t* sampler) +{ + const ArrowParms* parms = parquetSampler->getParms(); + bool status = false; + + try + { + if(parms->format == ArrowParms::PARQUET) + status = makeColumnsWithLists(sampler); + + /* Arrow csv writer cannot handle columns with lists of samples */ + if(parms->format == ArrowParms::CSV) + status = makeColumnsWithOneSample(sampler); + } + catch(const RunTimeException& e) + { + /* No columns will be added */ + newFields.clear(); + newColumns.clear(); + mlog(e.level(), "Error processing samples: %s", e.what()); + } + + return status; +} + +/*---------------------------------------------------------------------------- +* createOutputFile +*----------------------------------------------------------------------------*/ +void ArrowSamplerImpl::createOutpuFile(void) +{ + const ArrowParms* parms = parquetSampler->getParms(); + + if(std::filesystem::exists(parms->path)) + { + int rc = std::remove(parms->path); + if(rc != 0) + { + mlog(CRITICAL, "Failed (%d) to delete file %s: %s", rc, parms->path, strerror(errno)); + } + } + + auto table = inputFileToTable(); + auto updated_table = addNewColumns(table); + table = nullptr; + + if(parms->format == ArrowParms::PARQUET) + { + tableToParquetFile(updated_table, parms->path); + } + else if(parms->format == ArrowParms::CSV) + { + /* Arrow csv writer cannot handle columns with WKB data */ + table = removeGeometryColumn(updated_table); + + tableToCsvFile(table, parms->path); + + /* Generate metadata file since Arrow csv writer ignores it */ + std::string mfile = createMetadataFileName(parms->path); + tableMetadataToJson(table, mfile.c_str()); + } + else throw RunTimeException(CRITICAL, RTE_ERROR, "Unsupported file format"); } +/****************************************************************************** + * PRIVATE METHODS + ******************************************************************************/ + /*---------------------------------------------------------------------------- -* getInputFileMetadata +* getMetadata *----------------------------------------------------------------------------*/ -void ArrowSamplerImpl::getInputFileMetadata(ParquetSampler::record_info_t& recInfo) +void ArrowSamplerImpl::getMetadata(void) { - bool foundRecInfo = false; + bool foundIt = false; std::shared_ptr file_metadata = reader->parquet_reader()->metadata(); @@ -105,43 +188,49 @@ void ArrowSamplerImpl::getInputFileMetadata(ParquetSampler::record_info_t& recIn { const rapidjson::Value& recordinfo = doc["recordinfo"]; - recInfo.timeKey = StringLib::duplicate(recordinfo["time"].GetString()); - recInfo.asGeo = recordinfo["as_geo"].GetBool(); - recInfo.xKey = StringLib::duplicate(recordinfo["x"].GetString()); - recInfo.yKey = StringLib::duplicate(recordinfo["y"].GetString()); - foundRecInfo = true; + const char* _timeKey = recordinfo["time"].GetString(); + const char* _xKey = recordinfo["x"].GetString(); + const char* _yKey = recordinfo["y"].GetString(); + const bool _asGeo = recordinfo["as_geo"].GetBool(); + + /* Make sure the keys are not empty */ + if(_timeKey[0] == '\0' || _xKey[0] == '\0' || _yKey[0] == '\0') + { + throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid recordinfo in sliderule metadata."); + } + + timeKey = StringLib::duplicate(_timeKey); + xKey = StringLib::duplicate(_xKey); + yKey = StringLib::duplicate(_yKey); + asGeo = _asGeo; + foundIt = true; break; } else throw RunTimeException(CRITICAL, RTE_ERROR, "No 'recordinfo' key found in 'sliderule' metadata."); } } - if(!foundRecInfo) + if(!foundIt) { - throw RunTimeException(CRITICAL, RTE_ERROR, "No 'sliderule/recordinfo' metadata found in parquet file."); + throw RunTimeException(CRITICAL, RTE_ERROR, "No 'sliderule' metadata found in parquet file."); } } /*---------------------------------------------------------------------------- -* getInputFilePoints +* getPoints *----------------------------------------------------------------------------*/ -void ArrowSamplerImpl::getInputFilePoints(std::vector& points) +void ArrowSamplerImpl::getPoints(std::vector& points) { - const ParquetSampler::record_info_t& recInfo = parquetSampler->getRecInfo(); - - if(recInfo.asGeo) - { + if(asGeo) getGeoPoints(points); - } else - { getXYPoints(points); - } - std::vector columnNames = {recInfo.timeKey}; + /* Get point's gps from time column */ + std::vector columnNames = {timeKey}; std::shared_ptr table = inputFileToTable(columnNames); - int time_column_index = table->schema()->GetFieldIndex(recInfo.timeKey); + int time_column_index = table->schema()->GetFieldIndex(timeKey); if(time_column_index > -1) { auto time_column = std::static_pointer_cast(table->column(time_column_index)->chunk(0)); @@ -149,17 +238,149 @@ void ArrowSamplerImpl::getInputFilePoints(std::vectorlength(); i++) - { points[i]->gps = time_column->Value(i); - } } else mlog(DEBUG, "Time column not found."); } /*---------------------------------------------------------------------------- -* processSamples +* getXYPoints *----------------------------------------------------------------------------*/ -bool ArrowSamplerImpl::processSamples(ParquetSampler::sampler_t* sampler) +void ArrowSamplerImpl::getXYPoints(std::vector& points) +{ + std::vector columnNames = {xKey, yKey}; + + std::shared_ptr table = inputFileToTable(columnNames); + int x_column_index = table->schema()->GetFieldIndex(xKey); + if(x_column_index == -1) throw RunTimeException(ERROR, RTE_ERROR, "X column not found."); + + int y_column_index = table->schema()->GetFieldIndex(yKey); + if(y_column_index == -1) throw RunTimeException(ERROR, RTE_ERROR, "Y column not found."); + + auto x_column = std::static_pointer_cast(table->column(x_column_index)->chunk(0)); + auto y_column = std::static_pointer_cast(table->column(y_column_index)->chunk(0)); + + /* x and y columns are the same longth */ + for(int64_t i = 0; i < x_column->length(); i++) + { + double x = x_column->Value(i); + double y = y_column->Value(i); + + ParquetSampler::point_info_t* pinfo = new ParquetSampler::point_info_t({x, y, 0.0}); + points.push_back(pinfo); + } +} + +/*---------------------------------------------------------------------------- +* getGeoPoints +*----------------------------------------------------------------------------*/ +void ArrowSamplerImpl::getGeoPoints(std::vector& points) +{ + const char* geocol = "geometry"; + std::vector columnNames = {geocol}; + + std::shared_ptr table = inputFileToTable(columnNames); + int geometry_column_index = table->schema()->GetFieldIndex(geocol); + if(geometry_column_index == -1) throw RunTimeException(ERROR, RTE_ERROR, "Geometry column not found."); + + auto geometry_column = std::static_pointer_cast(table->column(geometry_column_index)->chunk(0)); + mlog(DEBUG, "Geometry column elements: %ld", geometry_column->length()); + + /* The geometry column is binary type */ + auto binary_array = std::static_pointer_cast(geometry_column); + + /* Iterate over each item in the geometry column and extract points */ + for(int64_t i = 0; i < binary_array->length(); i++) + { + std::string wkb_data = binary_array->GetString(i); /* Get WKB data as string (binary data) */ + wkbpoint_t point = convertWKBToPoint(wkb_data); + ParquetSampler::point_info_t* pinfo = new ParquetSampler::point_info_t({point.x, point.y, 0.0}); + points.push_back(pinfo); + } +} + +/*---------------------------------------------------------------------------- +* inputFileToTable +*----------------------------------------------------------------------------*/ +std::shared_ptr ArrowSamplerImpl::inputFileToTable(const std::vector& columnNames) +{ + /* If columnNames is empty, read all columns */ + if(columnNames.size() == 0) + { + std::shared_ptr table; + PARQUET_THROW_NOT_OK(reader->ReadTable(&table)); + return table; + } + + /* Read only the specified columns */ + std::shared_ptr schema; + PARQUET_THROW_NOT_OK(reader->GetSchema(&schema)); + std::vector columnIndices; + for(const auto& columnName : columnNames) + { + auto index = schema->GetFieldIndex(columnName); + if(index != -1) + { + columnIndices.push_back(index); + } + else mlog(DEBUG, "Column %s not found in parquet file.", columnName); + } + + std::shared_ptr table; + PARQUET_THROW_NOT_OK(reader->ReadTable(columnIndices, &table)); + return table; +} + +/*---------------------------------------------------------------------------- +* addNewColumns +*----------------------------------------------------------------------------*/ +std::shared_ptr ArrowSamplerImpl::addNewColumns(const std::shared_ptr table) +{ + std::vector> fields = table->schema()->fields(); + std::vector> columns = table->columns(); + + /* Append new columns to the table */ + mutex.lock(); + { + fields.insert(fields.end(), newFields.begin(), newFields.end()); + columns.insert(columns.end(), newColumns.begin(), newColumns.end()); + } + mutex.unlock(); + + auto metadata = table->schema()->metadata()->Copy(); + + /* + * Pandas metadata does not contain information about the new columns. + * Pandas and geopands can use/read the file just fine without pandas custom metadata. + * Removing it is a lot easier than updating it. + */ + const std::string pandas_key = "pandas"; + if(metadata->Contains(pandas_key)) + { + int key_index = metadata->FindKey(pandas_key); + if(key_index != -1) + { + PARQUET_THROW_NOT_OK(metadata->Delete(key_index)); + } + } + + /* Create a filemap metadata */ + PARQUET_THROW_NOT_OK(metadata->Set("filemap", createFileMap())); + + /* Attach metadata to the new schema */ + auto combined_schema = std::make_shared(fields); + combined_schema = combined_schema->WithMetadata(metadata); + + /* Create a new table with the combined schema and columns */ + auto updated_table = arrow::Table::Make(combined_schema, columns); + + return updated_table; +} + +/*---------------------------------------------------------------------------- +* makeColumnsWithLists +*----------------------------------------------------------------------------*/ +bool ArrowSamplerImpl::makeColumnsWithLists(ParquetSampler::sampler_t* sampler) { auto pool = arrow::default_memory_pool(); RasterObject* robj = sampler->robj; @@ -202,81 +423,75 @@ bool ArrowSamplerImpl::processSamples(ParquetSampler::sampler_t* sampler) std::shared_ptr value_list_array, time_list_array, fileid_list_array, flags_list_array; std::shared_ptr count_list_array, min_list_array, max_list_array, mean_list_array, median_list_array, stdev_list_array, mad_list_array; - try + /* Iterate over each sample in a vector of lists of samples */ + for(ParquetSampler::sample_list_t* slist : sampler->samples) { - /* Iterate over each sample in a vector of lists of samples */ - for(ParquetSampler::sample_list_t* slist : sampler->samples) + /* Start new lists */ + PARQUET_THROW_NOT_OK(value_list_builder.Append()); + PARQUET_THROW_NOT_OK(time_list_builder.Append()); + if(robj->hasFlags()) + { + PARQUET_THROW_NOT_OK(flags_list_builder.Append()); + } + PARQUET_THROW_NOT_OK(fileid_list_builder.Append()); + if(robj->hasZonalStats()) + { + PARQUET_THROW_NOT_OK(count_list_builder.Append()); + PARQUET_THROW_NOT_OK(min_list_builder.Append()); + PARQUET_THROW_NOT_OK(max_list_builder.Append()); + PARQUET_THROW_NOT_OK(mean_list_builder.Append()); + PARQUET_THROW_NOT_OK(median_list_builder.Append()); + PARQUET_THROW_NOT_OK(stdev_list_builder.Append()); + PARQUET_THROW_NOT_OK(mad_list_builder.Append()); + } + + /* Iterate over each sample and add it to the list + * If slist is empty the column will contain an empty list + * to keep the number of rows consistent with the other columns + */ + for(RasterSample* sample : *slist) { - /* Start a new lists for each RasterSample */ - PARQUET_THROW_NOT_OK(value_list_builder.Append()); - PARQUET_THROW_NOT_OK(time_list_builder.Append()); + /* Append the value to the value list */ + PARQUET_THROW_NOT_OK(value_builder->Append(sample->value)); + PARQUET_THROW_NOT_OK(time_builder->Append(sample->time)); if(robj->hasFlags()) { - PARQUET_THROW_NOT_OK(flags_list_builder.Append()); + PARQUET_THROW_NOT_OK(flags_builder->Append(sample->flags)); } - PARQUET_THROW_NOT_OK(fileid_list_builder.Append()); + PARQUET_THROW_NOT_OK(fileid_builder->Append(sample->fileId)); if(robj->hasZonalStats()) { - PARQUET_THROW_NOT_OK(count_list_builder.Append()); - PARQUET_THROW_NOT_OK(min_list_builder.Append()); - PARQUET_THROW_NOT_OK(max_list_builder.Append()); - PARQUET_THROW_NOT_OK(mean_list_builder.Append()); - PARQUET_THROW_NOT_OK(median_list_builder.Append()); - PARQUET_THROW_NOT_OK(stdev_list_builder.Append()); - PARQUET_THROW_NOT_OK(mad_list_builder.Append()); + PARQUET_THROW_NOT_OK(count_builder->Append(sample->stats.count)); + PARQUET_THROW_NOT_OK(min_builder->Append(sample->stats.min)); + PARQUET_THROW_NOT_OK(max_builder->Append(sample->stats.max)); + PARQUET_THROW_NOT_OK(mean_builder->Append(sample->stats.mean)); + PARQUET_THROW_NOT_OK(median_builder->Append(sample->stats.median)); + PARQUET_THROW_NOT_OK(stdev_builder->Append(sample->stats.stdev)); + PARQUET_THROW_NOT_OK(mad_builder->Append(sample->stats.mad)); } - /* Iterate over each sample */ - for(RasterSample* sample : *slist) - { - /* Append the value to the value list */ - PARQUET_THROW_NOT_OK(value_builder->Append(sample->value)); - PARQUET_THROW_NOT_OK(time_builder->Append(sample->time)); - if(robj->hasFlags()) - { - PARQUET_THROW_NOT_OK(flags_builder->Append(sample->flags)); - } - PARQUET_THROW_NOT_OK(fileid_builder->Append(sample->fileId)); - if(robj->hasZonalStats()) - { - PARQUET_THROW_NOT_OK(count_builder->Append(sample->stats.count)); - PARQUET_THROW_NOT_OK(min_builder->Append(sample->stats.min)); - PARQUET_THROW_NOT_OK(max_builder->Append(sample->stats.max)); - PARQUET_THROW_NOT_OK(mean_builder->Append(sample->stats.mean)); - PARQUET_THROW_NOT_OK(median_builder->Append(sample->stats.median)); - PARQUET_THROW_NOT_OK(stdev_builder->Append(sample->stats.stdev)); - PARQUET_THROW_NOT_OK(mad_builder->Append(sample->stats.mad)); - } - - /* Collect all fileIds used by samples - duplicates are ignored */ - sampler->file_ids.insert(sample->fileId); - } + /* Collect all fileIds used by samples - duplicates are ignored */ + sampler->file_ids.insert(sample->fileId); } + } - /* Finish the list builders */ - PARQUET_THROW_NOT_OK(value_list_builder.Finish(&value_list_array)); - PARQUET_THROW_NOT_OK(time_list_builder.Finish(&time_list_array)); - if(robj->hasFlags()) - { - PARQUET_THROW_NOT_OK(flags_list_builder.Finish(&flags_list_array)); - } - PARQUET_THROW_NOT_OK(fileid_list_builder.Finish(&fileid_list_array)); - if(robj->hasZonalStats()) - { - PARQUET_THROW_NOT_OK(count_list_builder.Finish(&count_list_array)); - PARQUET_THROW_NOT_OK(min_list_builder.Finish(&min_list_array)); - PARQUET_THROW_NOT_OK(max_list_builder.Finish(&max_list_array)); - PARQUET_THROW_NOT_OK(mean_list_builder.Finish(&mean_list_array)); - PARQUET_THROW_NOT_OK(median_list_builder.Finish(&median_list_array)); - PARQUET_THROW_NOT_OK(stdev_list_builder.Finish(&stdev_list_array)); - PARQUET_THROW_NOT_OK(mad_list_builder.Finish(&mad_list_array)); - } + /* Finish the list builders */ + PARQUET_THROW_NOT_OK(value_list_builder.Finish(&value_list_array)); + PARQUET_THROW_NOT_OK(time_list_builder.Finish(&time_list_array)); + if(robj->hasFlags()) + { + PARQUET_THROW_NOT_OK(flags_list_builder.Finish(&flags_list_array)); } - catch(const RunTimeException& e) + PARQUET_THROW_NOT_OK(fileid_list_builder.Finish(&fileid_list_array)); + if(robj->hasZonalStats()) { - /* No columns will be added */ - mlog(e.level(), "Error processing samples: %s", e.what()); - return false; + PARQUET_THROW_NOT_OK(count_list_builder.Finish(&count_list_array)); + PARQUET_THROW_NOT_OK(min_list_builder.Finish(&min_list_array)); + PARQUET_THROW_NOT_OK(max_list_builder.Finish(&max_list_array)); + PARQUET_THROW_NOT_OK(mean_list_builder.Finish(&mean_list_array)); + PARQUET_THROW_NOT_OK(median_list_builder.Finish(&median_list_array)); + PARQUET_THROW_NOT_OK(stdev_list_builder.Finish(&stdev_list_array)); + PARQUET_THROW_NOT_OK(mad_list_builder.Finish(&mad_list_array)); } const std::string prefix = sampler->rkey; @@ -295,7 +510,9 @@ bool ArrowSamplerImpl::processSamples(ParquetSampler::sampler_t* sampler) auto stdev_field = std::make_shared(prefix + ".stats.stdev", arrow::list(arrow::float64())); auto mad_field = std::make_shared(prefix + ".stats.mad", arrow::list(arrow::float64())); - /* Multiple threads may be updating the new fields and columns */ + /* Multiple threads may be updating the new fields and columns + * No throwing exceptions here, since the mutex is locked + */ mutex.lock(); { /* Add new columns fields */ @@ -317,7 +534,7 @@ bool ArrowSamplerImpl::processSamples(ParquetSampler::sampler_t* sampler) newFields.push_back(mad_field); } - /* Add new columns data */ + /* Add new columns */ newColumns.push_back(std::make_shared(value_list_array)); newColumns.push_back(std::make_shared(time_list_array)); if(robj->hasFlags()) @@ -342,122 +559,169 @@ bool ArrowSamplerImpl::processSamples(ParquetSampler::sampler_t* sampler) } /*---------------------------------------------------------------------------- -* createOutputFile +* makeColumnsWithOneSample *----------------------------------------------------------------------------*/ -void ArrowSamplerImpl::createOutpuFile(void) +bool ArrowSamplerImpl::makeColumnsWithOneSample(ParquetSampler::sampler_t* sampler) { - const ArrowParms* parms = parquetSampler->getParms(); + auto pool = arrow::default_memory_pool(); + RasterObject* robj = sampler->robj; - if(std::filesystem::exists(parms->path)) + /* Create builders for the new columns */ + arrow::DoubleBuilder value_builder(pool); + arrow::DoubleBuilder time_builder(pool); + arrow::UInt32Builder flags_builder(pool); + arrow::UInt64Builder fileid_builder(pool); + + /* Create builders for zonal stats */ + arrow::UInt32Builder count_builder(pool); + arrow::DoubleBuilder min_builder(pool); + arrow::DoubleBuilder max_builder(pool); + arrow::DoubleBuilder mean_builder(pool); + arrow::DoubleBuilder median_builder(pool); + arrow::DoubleBuilder stdev_builder(pool); + arrow::DoubleBuilder mad_builder(pool); + + std::shared_ptr value_array, time_array, fileid_array, flags_array; + std::shared_ptr count_array, min_array, max_array, mean_array, median_array, stdev_array, mad_array; + + RasterSample fakeSample(0.0, 0); + fakeSample.value = std::nan(""); + + for(ParquetSampler::sample_list_t* slist : sampler->samples) { - int rc = std::remove(parms->path); - if(rc != 0) + RasterSample* sample; + + if(slist->size() > 0) { - mlog(CRITICAL, "Failed (%d) to delete file %s: %s", rc, parms->path, strerror(errno)); + sample = getFirstValidSample(slist); + } + else + { + /* List is empty, no samples but we must have a fake sample + * to keep the number of rows consistent with the other columns + */ + sample = &fakeSample; } - } - auto table = inputFileToTable(); - auto updated_table = appendSamplesColumns(table); + PARQUET_THROW_NOT_OK(value_builder.Append(sample->value)); + PARQUET_THROW_NOT_OK(time_builder.Append(sample->time)); + if(robj->hasFlags()) + { + PARQUET_THROW_NOT_OK(flags_builder.Append(sample->flags)); + } + PARQUET_THROW_NOT_OK(fileid_builder.Append(sample->fileId)); + if(robj->hasZonalStats()) + { + PARQUET_THROW_NOT_OK(count_builder.Append(sample->stats.count)); + PARQUET_THROW_NOT_OK(min_builder.Append(sample->stats.min)); + PARQUET_THROW_NOT_OK(max_builder.Append(sample->stats.max)); + PARQUET_THROW_NOT_OK(mean_builder.Append(sample->stats.mean)); + PARQUET_THROW_NOT_OK(median_builder.Append(sample->stats.median)); + PARQUET_THROW_NOT_OK(stdev_builder.Append(sample->stats.stdev)); + PARQUET_THROW_NOT_OK(mad_builder.Append(sample->stats.mad)); + } - if(parms->format == ArrowParms::PARQUET) + /* Collect all fileIds used by samples - duplicates are ignored */ + sampler->file_ids.insert(sample->fileId); + } + + /* Finish the builders */ + PARQUET_THROW_NOT_OK(value_builder.Finish(&value_array)); + PARQUET_THROW_NOT_OK(time_builder.Finish(&time_array)); + if(robj->hasFlags()) { - tableToParquetFile(updated_table, parms->path); + PARQUET_THROW_NOT_OK(flags_builder.Finish(&flags_array)); } - else if(parms->format == ArrowParms::CSV) + PARQUET_THROW_NOT_OK(fileid_builder.Finish(&fileid_array)); + if(robj->hasZonalStats()) { - tableToCsvFile(updated_table, parms->path); - - /* Generate metadata file since arrow csv writer ignores it */ - std::string mfile = createMetadataFileName(parms->path); - tableMetadataToJson(updated_table, mfile.c_str()); + PARQUET_THROW_NOT_OK(count_builder.Finish(&count_array)); + PARQUET_THROW_NOT_OK(min_builder.Finish(&min_array)); + PARQUET_THROW_NOT_OK(max_builder.Finish(&max_array)); + PARQUET_THROW_NOT_OK(mean_builder.Finish(&mean_array)); + PARQUET_THROW_NOT_OK(median_builder.Finish(&median_array)); + PARQUET_THROW_NOT_OK(stdev_builder.Finish(&stdev_array)); + PARQUET_THROW_NOT_OK(mad_builder.Finish(&mad_array)); } - else throw RunTimeException(CRITICAL, RTE_ERROR, "Unsupported file format"); - - mlog(DEBUG, "Table was %ld rows and %d columns.", table->num_rows(), table->num_columns()); - mlog(DEBUG, "Table is %ld rows and %d columns.", updated_table->num_rows(), updated_table->num_columns()); -} -/****************************************************************************** - * PRIVATE METHODS - ******************************************************************************/ + const std::string prefix = sampler->rkey; -/*---------------------------------------------------------------------------- -* inputFileToTable -*----------------------------------------------------------------------------*/ -std::shared_ptr ArrowSamplerImpl::inputFileToTable(const std::vector& columnNames) -{ - /* If columnNames is empty, read all columns */ - if(columnNames.size() == 0) + /* Create fields for the new columns */ + auto value_field = std::make_shared(prefix + ".value", arrow::float64()); + auto time_field = std::make_shared(prefix + ".time", arrow::float64()); + auto flags_field = std::make_shared(prefix + ".flags", arrow::uint32()); + auto fileid_field = std::make_shared(prefix + ".fileid", arrow::uint64()); + + auto count_field = std::make_shared(prefix + ".stats.count", arrow::uint32()); + auto min_field = std::make_shared(prefix + ".stats.min", arrow::float64()); + auto max_field = std::make_shared(prefix + ".stats.max", arrow::float64()); + auto mean_field = std::make_shared(prefix + ".stats.mean", arrow::float64()); + auto median_field = std::make_shared(prefix + ".stats.median", arrow::float64()); + auto stdev_field = std::make_shared(prefix + ".stats.stdev", arrow::float64()); + auto mad_field = std::make_shared(prefix + ".stats.mad", arrow::float64()); + + /* Multiple threads may be updating the new fields and columns + * No throwing exceptions here, since the mutex is locked + */ + mutex.lock(); { - std::shared_ptr table; - PARQUET_THROW_NOT_OK(reader->ReadTable(&table)); - return table; - } + /* Add new columns fields */ + newFields.push_back(value_field); + newFields.push_back(time_field); + if(robj->hasFlags()) + { + newFields.push_back(flags_field); + } + newFields.push_back(fileid_field); + if(robj->hasZonalStats()) + { + newFields.push_back(count_field); + newFields.push_back(min_field); + newFields.push_back(max_field); + newFields.push_back(mean_field); + newFields.push_back(median_field); + newFields.push_back(stdev_field); + newFields.push_back(mad_field); + } - /* Read only the specified columns */ - std::shared_ptr schema; - PARQUET_THROW_NOT_OK(reader->GetSchema(&schema)); - std::vector columnIndices; - for(const auto& columnName : columnNames) - { - auto index = schema->GetFieldIndex(columnName); - if(index != -1) + /* Add new columns */ + newColumns.push_back(std::make_shared(value_array)); + newColumns.push_back(std::make_shared(time_array)); + if(robj->hasFlags()) { - columnIndices.push_back(index); + newColumns.push_back(std::make_shared(flags_array)); + } + newColumns.push_back(std::make_shared(fileid_array)); + if(robj->hasZonalStats()) + { + newColumns.push_back(std::make_shared(count_array)); + newColumns.push_back(std::make_shared(min_array)); + newColumns.push_back(std::make_shared(max_array)); + newColumns.push_back(std::make_shared(mean_array)); + newColumns.push_back(std::make_shared(median_array)); + newColumns.push_back(std::make_shared(stdev_array)); + newColumns.push_back(std::make_shared(mad_array)); } - else mlog(DEBUG, "Column %s not found in parquet file.", columnName); } + mutex.unlock(); - std::shared_ptr table; - PARQUET_THROW_NOT_OK(reader->ReadTable(columnIndices, &table)); - return table; + return true; } /*---------------------------------------------------------------------------- -* appendSamplesColumns +* getFirstValidSample *----------------------------------------------------------------------------*/ -std::shared_ptr ArrowSamplerImpl::appendSamplesColumns(const std::shared_ptr table) +RasterSample* ArrowSamplerImpl::getFirstValidSample(ParquetSampler::sample_list_t* slist) { - std::vector> fields = table->schema()->fields(); - std::vector> columns = table->columns(); - - /* Append new columns to the table */ - mutex.lock(); - { - fields.insert(fields.end(), newFields.begin(), newFields.end()); - columns.insert(columns.end(), newColumns.begin(), newColumns.end()); - } - mutex.unlock(); - - auto metadata = table->schema()->metadata()->Copy(); - - /* - * Pandas metadata does not contain information about the new columns. - * Pandas and geopands can use/read the file just fine without pandas custom metadata. - * Removing it is a lot easier than updating it. - */ - const std::string pandas_key = "pandas"; - if(metadata->Contains(pandas_key)) + for(RasterSample* sample : *slist) { - int key_index = metadata->FindKey(pandas_key); - if(key_index != -1) - { - PARQUET_THROW_NOT_OK(metadata->Delete(key_index)); - } + /* GeoRasterr code converts band nodata values to std::nan */ + if(!std::isnan(sample->value)) + return sample; } - /* Create a filemap metadata */ - PARQUET_THROW_NOT_OK(metadata->Set("filemap", createFileMap())); - - /* Attach metadata to the new schema */ - auto combined_schema = std::make_shared(fields); - combined_schema = combined_schema->WithMetadata(metadata); - - /* Create a new table with the combined schema and columns */ - auto updated_table = arrow::Table::Make(combined_schema, columns); - - return updated_table; + /* Return the first sample if no valid samples are found */ + return slist->front(); } /*---------------------------------------------------------------------------- @@ -500,6 +764,20 @@ void ArrowSamplerImpl::tableToCsvFile(const std::shared_ptr table, PARQUET_THROW_NOT_OK(outfile->Close()); } +/*---------------------------------------------------------------------------- +* removeGeometryColumn +*----------------------------------------------------------------------------*/ +std::shared_ptr ArrowSamplerImpl::removeGeometryColumn(const std::shared_ptr table) +{ + int column_index = table->schema()->GetFieldIndex("geometry"); + + if(column_index == -1) + return table; + + arrow::Result> result = table->RemoveColumn(column_index); + return result.ValueOrDie(); +} + /*---------------------------------------------------------------------------- * convertWKBToPoint *----------------------------------------------------------------------------*/ @@ -532,10 +810,7 @@ wkbpoint_t ArrowSamplerImpl::convertWKBToPoint(const std::string& wkb_data) // Little endian point.wkbType = le32toh(point.wkbType); } - else - { - throw std::runtime_error("Unknown byte order."); - } + else throw std::runtime_error("Unknown byte order."); // Next eight bytes are x coordinate std::memcpy(&point.x, wkb_data.data() + offset, sizeof(double)); @@ -684,68 +959,4 @@ void ArrowSamplerImpl::tableMetadataToJson(const std::shared_ptr t fclose(jsonFile); } else throw RunTimeException(CRITICAL, RTE_ERROR, "Failed to open metadata file: %s", file_path); -} - -/*---------------------------------------------------------------------------- -* getXYPoints -*----------------------------------------------------------------------------*/ -void ArrowSamplerImpl::getXYPoints(std::vector& points) -{ - const ParquetSampler::record_info_t& recInfo = parquetSampler->getRecInfo(); - std::vector columnNames = {recInfo.xKey, recInfo.yKey}; - - std::shared_ptr table = inputFileToTable(columnNames); - int x_column_index = table->schema()->GetFieldIndex(recInfo.xKey); - if(x_column_index == -1) - { - throw RunTimeException(ERROR, RTE_ERROR, "X column not found."); - } - - int y_column_index = table->schema()->GetFieldIndex(recInfo.yKey); - if(y_column_index == -1) - { - throw RunTimeException(ERROR, RTE_ERROR, "Y column not found."); - } - - auto x_column = std::static_pointer_cast(table->column(x_column_index)->chunk(0)); - auto y_column = std::static_pointer_cast(table->column(y_column_index)->chunk(0)); - - /* x and y columns are the same longth */ - for(int64_t i = 0; i < x_column->length(); i++) - { - points[i]->x = x_column->Value(i); - points[i]->y = y_column->Value(i); - } -} - -/*---------------------------------------------------------------------------- -* getGeoPoints -*----------------------------------------------------------------------------*/ -void ArrowSamplerImpl::getGeoPoints(std::vector& points) -{ - const char* geocol = "geometry"; - std::vector columnNames = {geocol}; - - std::shared_ptr table = inputFileToTable(columnNames); - int geometry_column_index = table->schema()->GetFieldIndex(geocol); - if(geometry_column_index == -1) - { - throw RunTimeException(ERROR, RTE_ERROR, "Geometry column not found."); - } - - auto geometry_column = std::static_pointer_cast(table->column(geometry_column_index)->chunk(0)); - mlog(DEBUG, "Geometry column elements: %ld", geometry_column->length()); - - /* The geometry column is binary type */ - auto binary_array = std::static_pointer_cast(geometry_column); - - /* Iterate over each item in the geometry column and extract points */ - for(int64_t i = 0; i < binary_array->length(); i++) - { - std::string wkb_data = binary_array->GetString(i); /* Get WKB data as string (binary data) */ - wkbpoint_t point = convertWKBToPoint(wkb_data); - ParquetSampler::point_info_t* pinfo = new ParquetSampler::point_info_t({point.x, point.y, 0.0}); - points.push_back(pinfo); - } -} - +} \ No newline at end of file diff --git a/packages/arrow/ArrowSamplerImpl.h b/packages/arrow/ArrowSamplerImpl.h index 054d10170..91abad27c 100644 --- a/packages/arrow/ArrowSamplerImpl.h +++ b/packages/arrow/ArrowSamplerImpl.h @@ -60,9 +60,7 @@ class ArrowSamplerImpl explicit ArrowSamplerImpl (ParquetSampler* _sampler); ~ArrowSamplerImpl (void); - void openInputFile (const char* file_path); - void getInputFileMetadata (ParquetSampler::record_info_t& recInfo); - void getInputFilePoints (std::vector& points); + void processInputFile (const char* file_path, std::vector& points); bool processSamples (ParquetSampler::sampler_t* sampler); void createOutpuFile (void); @@ -88,22 +86,35 @@ class ArrowSamplerImpl std::shared_ptr inputFile; std::unique_ptr reader; + char* timeKey; + char* xKey; + char* yKey; + bool asGeo; + /*-------------------------------------------------------------------- * Methods *--------------------------------------------------------------------*/ - std::shared_ptr inputFileToTable (const std::vector& columnNames = {}); - std::shared_ptr appendSamplesColumns(const std::shared_ptr table); - void tableToParquetFile (const std::shared_ptr table, const char* file_path); - void tableToCsvFile (const std::shared_ptr table, const char* file_path); - - wkbpoint_t convertWKBToPoint (const std::string& wkb_data); - void printParquetMetadata(const char* file_path); - std::string createFileMap (void); - std::string createMetadataFileName (const char* file_path); - void tableMetadataToJson (const std::shared_ptr table, const char* file_path); - void getXYPoints (std::vector& points); - void getGeoPoints (std::vector& points); + void getMetadata (void); + void getPoints (std::vector& points); + void getXYPoints (std::vector& points); + void getGeoPoints (std::vector& points); + std::shared_ptr inputFileToTable (const std::vector& columnNames = {}); + std::shared_ptr addNewColumns (const std::shared_ptr table); + bool makeColumnsWithLists (ParquetSampler::sampler_t* sampler); + bool makeColumnsWithOneSample(ParquetSampler::sampler_t* sampler); + RasterSample* getFirstValidSample (ParquetSampler::sample_list_t* slist); + void tableToParquetFile (const std::shared_ptr table, + const char* file_path); + void tableToCsvFile (const std::shared_ptr table, + const char* file_path); + std::shared_ptr removeGeometryColumn (const std::shared_ptr table); + wkbpoint_t convertWKBToPoint (const std::string& wkb_data); + void printParquetMetadata (const char* file_path); + std::string createFileMap (void); + std::string createMetadataFileName (const char* file_path); + void tableMetadataToJson (const std::shared_ptr table, + const char* file_path); }; #endif /* __arrow_sampler_impl__ */ diff --git a/packages/arrow/ParquetSampler.cpp b/packages/arrow/ParquetSampler.cpp index 3fe8e0190..4d1600e6a 100644 --- a/packages/arrow/ParquetSampler.cpp +++ b/packages/arrow/ParquetSampler.cpp @@ -150,27 +150,6 @@ ParquetSampler::Sampler::~Sampler(void) robj->releaseLuaObject(); } - -/*---------------------------------------------------------------------------- - * RecordInfo Constructor - *----------------------------------------------------------------------------*/ -ParquetSampler::RecordInfo::RecordInfo(void) : - timeKey(NULL), xKey(NULL), yKey(NULL), asGeo(false) -{ -} - - -/*---------------------------------------------------------------------------- - * RecordInfo Destructor - *----------------------------------------------------------------------------*/ -ParquetSampler::RecordInfo::~RecordInfo(void) -{ - delete[] timeKey; - delete[] xKey; - delete[] yKey; -} - - /*---------------------------------------------------------------------------- * clearSamples *----------------------------------------------------------------------------*/ @@ -264,9 +243,7 @@ ParquetSampler::ParquetSampler(lua_State* L, ArrowParms* _parms, const char* inp /* Allocate Implementation */ impl = new ArrowSamplerImpl(this); - impl->openInputFile(input_file); - impl->getInputFileMetadata(recInfo); - impl->getInputFilePoints(points); + impl->processInputFile(input_file, points); } catch(const RunTimeException& e) { diff --git a/packages/arrow/ParquetSampler.h b/packages/arrow/ParquetSampler.h index 860270b35..49b3f1ca6 100644 --- a/packages/arrow/ParquetSampler.h +++ b/packages/arrow/ParquetSampler.h @@ -104,17 +104,6 @@ class ParquetSampler: public LuaObject void clearSamples (void); } sampler_t; - typedef struct RecordInfo - { - const char* timeKey; - const char* xKey; - const char* yKey; - bool asGeo; - - explicit RecordInfo (void); - ~RecordInfo (void); - } record_info_t; - /*-------------------------------------------------------------------- * Methods *--------------------------------------------------------------------*/ @@ -124,10 +113,9 @@ class ParquetSampler: public LuaObject static void init (void); static void deinit (void); void sample (void); - const ArrowParms* getParms (void) {return parms;} const std::vector& getSamplers (void) {return samplers;} - const record_info_t& getRecInfo (void) {return recInfo;} + private: /*-------------------------------------------------------------------- @@ -140,7 +128,6 @@ class ParquetSampler: public LuaObject ArrowParms* parms; std::vector samplerPids; - record_info_t recInfo; std::vector points; std::vector samplers; ArrowSamplerImpl* impl; // private arrow data diff --git a/packages/geo/GdalRaster.cpp b/packages/geo/GdalRaster.cpp index fd6ec2b19..937d89e51 100644 --- a/packages/geo/GdalRaster.cpp +++ b/packages/geo/GdalRaster.cpp @@ -813,7 +813,7 @@ bool GdalRaster::nodataCheck(RasterSample* sample) if(std::fabs(a-b) < epsilon) { - sample->value = std::nanf(""); + sample->value = std::nan(""); return false; } diff --git a/scripts/selftests/parquet_sampler.lua b/scripts/selftests/parquet_sampler.lua index 80253cd55..43ed68219 100644 --- a/scripts/selftests/parquet_sampler.lua +++ b/scripts/selftests/parquet_sampler.lua @@ -4,8 +4,12 @@ asset = require("asset") local assets = asset.loaddir() local td = runner.rootdir(arg[0]) -local in_file = td.."atl06_10rows.geoparquet" -local out_file = td.."samples.geoparquet" +local in_geoparquet = td.."atl06_10rows.geoparquet" +local in_parquet = td.."atl06_10rows.parquet" +local out_geoparquet = td.."samples.geoparquet" +local out_parquet = td.."samples.parquet" +local out_csv = td.."samples.csv" +local out_metadata = td.."samples_metadata.json" -- console.monitor:config(core.LOG, core.DEBUG) -- sys.setlvl(core.LOG, core.DEBUG) @@ -13,7 +17,7 @@ local out_file = td.."samples.geoparquet" function getFileSize(filePath) local file = io.open(filePath, "rb") -- 'rb' mode opens the file in binary mode if not file then - print("Could not open file: " .. filePath) + print("Could not open file: " .. filePath .. " for reading\n") return nil end local fileSize = file:seek("end") -- Go to the end of the file @@ -27,32 +31,85 @@ runner.check(dem1 ~= nil) local dem2 = geo.raster(geo.parms({asset="arcticdem-strips", algorithm="NearestNeighbour", radius=0, with_flags=true, use_poi_time=true})) runner.check(dem2 ~= nil) -local parquet_sampler = arrow.parquetsampler(arrow.parms({path=out_file, format="parquet", as_geo=true}), in_file, {["mosaic"] = dem1, ["strips"] = dem2}) +print('\n--------------------------------------\nTest01: input/output geoparquet (geo)\n--------------------------------------') +local parquet_sampler = arrow.parquetsampler(arrow.parms({path=out_geoparquet, format="parquet"}), in_geoparquet, {["mosaic"] = dem1}) runner.check(parquet_sampler ~= nil) -print('\n------------------\nTest01: parquetsampler sample \n------------------') - -local in_file_size = getFileSize(in_file); -print("Input file size: " .. in_file_size .. " bytes") +local in_file_size = getFileSize(in_geoparquet); +print("Input geoparquet file size: " .. in_file_size .. " bytes") local status = parquet_sampler:sample() -local out_file_size = getFileSize(out_file); -print("Output file size: " .. out_file_size .. " bytes") -runner.check(out_file_size > in_file_size, "Outpu file size is not greater than input file size: ") +local out_file_size = getFileSize(out_geoparquet); +print("Output geoparquet file size: " .. out_file_size .. " bytes") +runner.check(out_file_size > in_file_size, "Output file size is not greater than input file size: ") + + +print('\n--------------------------------------\nTest02: input/output parquet (x, y)\n--------------------------------------') +parquet_sampler = arrow.parquetsampler(arrow.parms({path=out_parquet, format="parquet"}), in_parquet, {["mosaic"] = dem1}) +runner.check(parquet_sampler ~= nil) + +in_file_size = getFileSize(in_parquet); +print("Input parquet file size: " .. in_file_size .. " bytes") + +status = parquet_sampler:sample() +out_file_size = getFileSize(out_parquet); +print("Output parquet file size: " .. out_file_size .. " bytes") +runner.check(out_file_size > in_file_size, "Output file size is not greater than input file size: ") + +--NOTE: generated CSV files are much smaller than the input parquet/geoparquet files + +print('\n--------------------------------------\nTest03: input geoparquet, output CSV\n--------------------------------------') +parquet_sampler = arrow.parquetsampler(arrow.parms({path=out_csv, format="csv"}), in_geoparquet, {["mosaic"] = dem1}) +runner.check(parquet_sampler ~= nil) + +in_file_size = getFileSize(in_geoparquet); +print("Input geoparquet file size: " .. in_file_size .. " bytes") -print('\n------------------\nTest02: parquetsampler sample again \n------------------') +status = parquet_sampler:sample() +out_file_size = getFileSize(out_csv); +print("Output CSV file size: " .. out_file_size .. " bytes") +runner.check(out_file_size < in_file_size, "Output CSV file size is not smaller than input file size: ") +meta_file_size = getFileSize(out_metadata); +print("Output metadata file size: " .. meta_file_size .. " bytes") +runner.check(meta_file_size > 0, "Output metadata json file size is empty: ") + + +print('\n--------------------------------------\nTest04: input parquet, output CSV \n--------------------------------------') +parquet_sampler = arrow.parquetsampler(arrow.parms({path=out_csv, format="csv"}), in_parquet, {["mosaic"] = dem1, ["strips"] = dem2}) +runner.check(parquet_sampler ~= nil) + +in_file_size = getFileSize(in_parquet); +print("Input parquet file size: " .. in_file_size .. " bytes") status = parquet_sampler:sample() -new_out_file_size = getFileSize(out_file) -print("Output file size: " .. out_file_size .. " bytes") -runner.check(out_file_size == new_out_file_size, "Output file size has incorrectly changed: ") +out_file_size = getFileSize(out_csv); +print("Output CSV file size: " .. out_file_size .. " bytes") +runner.check(out_file_size < in_file_size, "Output CSV file size is not smaller than input file size: ") +meta_file_size = getFileSize(out_metadata); +print("Output metadata file size: " .. meta_file_size .. " bytes") +runner.check(meta_file_size > 0, "Output metadata json file size is empty: ") + + +print('\n--------------------------------------\nTest05: input/output geoparquet (geo)\n--------------------------------------') +local parquet_sampler = arrow.parquetsampler(arrow.parms({path=out_geoparquet, format="parquet"}), in_geoparquet, {["mosaic"] = dem1, ["strips"] = dem2}) +runner.check(parquet_sampler ~= nil) + +local in_file_size = getFileSize(in_geoparquet); +print("Input geoparquet file size: " .. in_file_size .. " bytes") + +local status = parquet_sampler:sample() +local out_file_size = getFileSize(out_geoparquet); +print("Output geoparquet file size: " .. out_file_size .. " bytes") +runner.check(out_file_size > in_file_size, "Output file size is not greater than input file size: ") --- There is no easy way to read parquet file in Lua, check the size of the output file --- the file was tested with python and it has the expected content +-- There is no easy way to read parquet file in Lua, check the size of the output files +-- the files were tested with python scripts --- Remove the output file -os.remove(out_file) +-- Remove the output files +os.remove(out_geoparquet) +os.remove(out_parquet) +os.remove(out_csv) -- Report Results --