Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet sampler csv, non geo support #386

Merged
merged 4 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
789 changes: 593 additions & 196 deletions packages/arrow/ArrowSamplerImpl.cpp

Large diffs are not rendered by default.

42 changes: 32 additions & 10 deletions packages/arrow/ArrowSamplerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,9 @@ class ArrowSamplerImpl
explicit ArrowSamplerImpl (ParquetSampler* _sampler);
~ArrowSamplerImpl (void);

void getPointsFromFile (const char* file_path, std::vector<ParquetSampler::point_info_t*>& points);
void createParquetFile (const char* input_file, const char* output_file);
void processInputFile (const char* file_path, std::vector<ParquetSampler::point_info_t*>& points);
bool processSamples (ParquetSampler::sampler_t* sampler);
void clearColumns (void);
void createOutpuFile (void);

private:

Expand All @@ -81,18 +80,41 @@ class ArrowSamplerImpl

ParquetSampler* parquetSampler;
Mutex mutex;
std::vector<std::shared_ptr<arrow::Field>> new_fields;
std::vector<std::shared_ptr<arrow::ChunkedArray>> new_columns;
std::vector<std::shared_ptr<arrow::Field>> newFields;
std::vector<std::shared_ptr<arrow::ChunkedArray>> newColumns;

std::shared_ptr<arrow::io::ReadableFile> inputFile;
std::unique_ptr<parquet::arrow::FileReader> reader;

char* timeKey;
char* xKey;
char* yKey;
bool asGeo;

/*--------------------------------------------------------------------
* Methods
*--------------------------------------------------------------------*/

wkbpoint_t convertWKBToPoint (const std::string& wkb_data);
std::shared_ptr<arrow::Table> parquetFileToTable (const char* file_path, const std::vector<const char*>& columnNames = {});
void tableToParquetFile (std::shared_ptr<arrow::Table> table, const char* file_path);
void printParquetMetadata(const char* file_path);
std::string createFileMap (void);
void getMetadata (void);
void getPoints (std::vector<ParquetSampler::point_info_t*>& points);
void getXYPoints (std::vector<ParquetSampler::point_info_t*>& points);
void getGeoPoints (std::vector<ParquetSampler::point_info_t*>& points);
std::shared_ptr<arrow::Table> inputFileToTable (const std::vector<const char*>& columnNames = {});
std::shared_ptr<arrow::Table> addNewColumns (const std::shared_ptr<arrow::Table> table);
bool makeColumnsWithLists (ParquetSampler::sampler_t* sampler);
bool makeColumnsWithOneSample(ParquetSampler::sampler_t* sampler);
RasterSample* getFirstValidSample (ParquetSampler::sample_list_t* slist);
void tableToParquetFile (const std::shared_ptr<arrow::Table> table,
const char* file_path);
void tableToCsvFile (const std::shared_ptr<arrow::Table> table,
const char* file_path);
std::shared_ptr<arrow::Table> removeGeometryColumn (const std::shared_ptr<arrow::Table> table);
wkbpoint_t convertWKBToPoint (const std::string& wkb_data);
void printParquetMetadata (const char* file_path);
std::string createFileMap (void);
std::string createMetadataFileName (const char* file_path);
void tableMetadataToJson (const std::shared_ptr<arrow::Table> table,
const char* file_path);
};

#endif /* __arrow_sampler_impl__ */
69 changes: 32 additions & 37 deletions packages/arrow/ParquetSampler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,13 @@
#include "core.h"
#include "ParquetSampler.h"
#include "ArrowSamplerImpl.h"
#include <filesystem>


/******************************************************************************
* STATIC DATA
******************************************************************************/

const char* ParquetSampler::OBJECT_TYPE = "ParquetSampler";
const char* ParquetSampler::OBJECT_TYPE = "ParquetSampler";
const char* ParquetSampler::LUA_META_NAME = "ParquetSampler";
const struct luaL_Reg ParquetSampler::LUA_META_TABLE[] = {
{NULL, NULL}
Expand All @@ -56,13 +55,15 @@ const struct luaL_Reg ParquetSampler::LUA_META_TABLE[] = {
/*----------------------------------------------------------------------------
* luaCreate - :parquetsampler(input_file_path, output_file_path, {["mosaic"]: dem1, ["strips"]: dem2})
*----------------------------------------------------------------------------*/
int ParquetSampler::luaCreate (lua_State* L)
int ParquetSampler::luaCreate(lua_State* L)
{
ArrowParms* _parms = NULL;

try
{
/* Get Parameters */
const char* input_file = getLuaString(L, 1);
const char* output_file = getLuaString(L, 2);
_parms = dynamic_cast<ArrowParms*>(getLuaObject(L, 1, ArrowParms::OBJECT_TYPE));
const char* input_file = getLuaString(L, 2);

std::vector<raster_info_t> rasters;

Expand All @@ -83,7 +84,7 @@ int ParquetSampler::luaCreate (lua_State* L)
}

/* Create Dispatch */
return createLuaObject(L, new ParquetSampler(L, input_file, output_file, rasters));
return createLuaObject(L, new ParquetSampler(L, _parms, input_file, rasters));
}
catch(const RunTimeException& e)
{
Expand All @@ -96,7 +97,7 @@ int ParquetSampler::luaCreate (lua_State* L)
/*----------------------------------------------------------------------------
* luaSamples - :sample([gps]) --> in|out
*----------------------------------------------------------------------------*/
int ParquetSampler::luaSample (lua_State* L)
int ParquetSampler::luaSample(lua_State* L)
{
try
{
Expand All @@ -117,21 +118,21 @@ int ParquetSampler::luaSample (lua_State* L)
/*----------------------------------------------------------------------------
* init
*----------------------------------------------------------------------------*/
void ParquetSampler::init (void)
void ParquetSampler::init(void)
{
}

/*----------------------------------------------------------------------------
* deinit
*----------------------------------------------------------------------------*/
void ParquetSampler::deinit (void)
void ParquetSampler::deinit(void)
{
}

/*----------------------------------------------------------------------------
* Sampler Constructor
*----------------------------------------------------------------------------*/
ParquetSampler::Sampler::Sampler (const char* _rkey, RasterObject* _robj, ParquetSampler* _obj):
ParquetSampler::Sampler::Sampler(const char* _rkey, RasterObject* _robj, ParquetSampler* _obj) :
robj(_robj),
obj(_obj)
{
Expand All @@ -142,14 +143,13 @@ ParquetSampler::Sampler::Sampler (const char* _rkey, RasterObject* _robj, Parque
/*----------------------------------------------------------------------------
* Sampler Destructor
*----------------------------------------------------------------------------*/
ParquetSampler::Sampler::~Sampler (void)
ParquetSampler::Sampler::~Sampler(void)
{
clearSamples();
delete [] rkey;
robj->releaseLuaObject();
}


/*----------------------------------------------------------------------------
* clearSamples
*----------------------------------------------------------------------------*/
Expand All @@ -174,37 +174,27 @@ void ParquetSampler::sample(void)
if(alreadySampled) return;
alreadySampled = true;

if(std::filesystem::exists(outputPath))
{
int rc = std::remove(outputPath);
if(rc != 0)
{
mlog(CRITICAL, "Failed (%d) to delete file %s: %s", rc, outputPath, strerror(errno));
}
}

/* Start Sampler Threads */
/* Start sampling threads */
for(sampler_t* sampler : samplers)
{
Thread* pid = new Thread(samplerThread, sampler);
samplerPids.push_back(pid);
}

/* Wait for all sampler threads to finish */
/* Wait for all sampling threads to finish */
for(Thread* pid : samplerPids)
{
delete pid;
}
samplerPids.clear();

/* Create new parquet file with columns/samples from all raster objects */
try
{
impl->createParquetFile(inputPath, outputPath);
impl->createOutpuFile();
}
catch(const RunTimeException& e)
{
mlog(e.level(), "Error creating parquet file: %s", e.what());
mlog(e.level(), "Error creating output file: %s", e.what());
throw;
}
}
Expand All @@ -217,15 +207,23 @@ void ParquetSampler::sample(void)
/*----------------------------------------------------------------------------
* Constructor
*----------------------------------------------------------------------------*/
ParquetSampler::ParquetSampler (lua_State* L, const char* input_file, const char* output_file, const std::vector<raster_info_t>& rasters):
ParquetSampler::ParquetSampler(lua_State* L, ArrowParms* _parms, const char* input_file,
const std::vector<raster_info_t>& rasters):
LuaObject(L, OBJECT_TYPE, LUA_META_NAME, LUA_META_TABLE),
parms(_parms),
alreadySampled(false)
{
/* Add Lua sample function */
LuaEngine::setAttrFunc(L, "sample", luaSample);

if (!input_file) throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid input file");
if (!output_file) throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid output file");
if (parms == NULL)
throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid ArrowParms object");

if ((input_file == NULL) || (input_file[0] == '\0'))
throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid input file path");

if((parms->path == NULL) || (parms->path[0] == '\0'))
throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid output file path");

try
{
Expand All @@ -242,13 +240,10 @@ ParquetSampler::ParquetSampler (lua_State* L, const char* input_file, const char
samplers.push_back(sampler);
}

inputPath = StringLib::duplicate(input_file);
outputPath = StringLib::duplicate(output_file);

/* Allocate Implementation */
impl = new ArrowSamplerImpl(this);

impl->getPointsFromFile(inputPath, points);
impl->processInputFile(input_file, points);
}
catch(const RunTimeException& e)
{
Expand All @@ -271,6 +266,8 @@ ParquetSampler::~ParquetSampler(void)
*----------------------------------------------------------------------------*/
void ParquetSampler::Delete(void)
{
parms->releaseLuaObject();

for(Thread* pid : samplerPids)
delete pid;

Expand All @@ -280,8 +277,6 @@ void ParquetSampler::Delete(void)
for(point_info_t* pinfo : points)
delete pinfo;

delete [] inputPath;
delete [] outputPath;
delete impl;
}

Expand All @@ -295,8 +290,8 @@ void* ParquetSampler::samplerThread(void* parm)

for(point_info_t* pinfo : sampler->obj->points)
{
OGRPoint poi = pinfo->point; /* Must make a copy of point for this thread */
double gps = robj->usePOItime() ? pinfo->gps_time : 0;
OGRPoint poi(pinfo->x, pinfo->y, 0.0);
double gps = robj->usePOItime() ? pinfo->gps : 0.0;

sample_list_t* slist = new sample_list_t;
bool listvalid = true;
Expand Down
29 changes: 13 additions & 16 deletions packages/arrow/ParquetSampler.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,11 @@ class ParquetSampler: public LuaObject
RasterObject* robj;
} raster_info_t;

typedef struct PointInfo
typedef struct
{
OGRPoint point;
double gps_time;

explicit PointInfo (double x, double y, double z): point(x, y, z), gps_time(0.0) {}
double x;
double y;
double gps;
} point_info_t;

typedef std::vector<RasterSample*> sample_list_t;
Expand All @@ -105,17 +104,17 @@ class ParquetSampler: public LuaObject
void clearSamples (void);
} sampler_t;


/*--------------------------------------------------------------------
* Methods
*--------------------------------------------------------------------*/

static int luaCreate (lua_State* L);
static int luaSample (lua_State* L);
static void init (void);
static void deinit (void);
void sample (void);
const std::vector<sampler_t*>& getSamplers(void) {return samplers;}
static int luaCreate (lua_State* L);
static int luaSample (lua_State* L);
static void init (void);
static void deinit (void);
void sample (void);
const ArrowParms* getParms (void) {return parms;}
const std::vector<sampler_t*>& getSamplers (void) {return samplers;}

private:

Expand All @@ -127,8 +126,7 @@ class ParquetSampler: public LuaObject
* Data
*--------------------------------------------------------------------*/

const char* inputPath;
const char* outputPath;
ArrowParms* parms;
std::vector<Thread*> samplerPids;
std::vector<point_info_t*> points;
std::vector<sampler_t*> samplers;
Expand All @@ -139,8 +137,7 @@ class ParquetSampler: public LuaObject
* Methods
*--------------------------------------------------------------------*/

ParquetSampler (lua_State* L,
const char* input_file, const char* output_file,
ParquetSampler (lua_State* L, ArrowParms* _parms, const char* input_file,
const std::vector<raster_info_t>& rasters);
~ParquetSampler (void);
void Delete (void);
Expand Down
2 changes: 1 addition & 1 deletion packages/geo/GdalRaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ bool GdalRaster::nodataCheck(RasterSample* sample)

if(std::fabs(a-b) < epsilon)
{
sample->value = std::nanf("");
sample->value = std::nan("");
return false;
}

Expand Down
Binary file added scripts/selftests/atl06_10rows.geoparquet
Binary file not shown.
Binary file modified scripts/selftests/atl06_10rows.parquet
Binary file not shown.
Loading
Loading