Skip to content

Commit

Permalink
optimized for pgc strips case
Browse files Browse the repository at this point in the history
  • Loading branch information
elidwa committed Aug 7, 2024
1 parent 2ac76ea commit 048b69a
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 107 deletions.
228 changes: 124 additions & 104 deletions packages/arrow/ArrowSampler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ int ArrowSampler::luaCreate(lua_State* L)
ArrowParms* _parms = NULL;
const char* input_file = NULL;
const char* outq_name = NULL;
std::vector<raster_info_t> rasters;
std::vector<raster_info_t> user_rasters;

/* Get Parameters */
try
Expand All @@ -83,7 +83,7 @@ int ArrowSampler::luaCreate(lua_State* L)
if(!rkey) throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid raster key");
if(!robj) throw RunTimeException(CRITICAL, RTE_ERROR, "Invalid raster object");

rasters.push_back({rkey, robj});
user_rasters.push_back({rkey, robj});

/* Pop value */
lua_pop(L, 1);
Expand All @@ -95,7 +95,7 @@ int ArrowSampler::luaCreate(lua_State* L)

/* Release Lua Parameters Objects */
if(_parms) _parms->releaseLuaObject();
for(raster_info_t& raster : rasters)
for(raster_info_t& raster : user_rasters)
{
raster.robj->releaseLuaObject();
}
Expand All @@ -105,7 +105,7 @@ int ArrowSampler::luaCreate(lua_State* L)
/* Create Dispatch */
try
{
return createLuaObject(L, new ArrowSampler(L, _parms, input_file, outq_name, rasters));
return createLuaObject(L, new ArrowSampler(L, _parms, input_file, outq_name, user_rasters));
}
catch(const RunTimeException& e)
{
Expand Down Expand Up @@ -145,10 +145,23 @@ ArrowSampler::BatchSampler::BatchSampler(const char* _rkey, RasterObject* _robj,
*----------------------------------------------------------------------------*/
ArrowSampler::BatchSampler::~BatchSampler(void)
{
clearSamples();
delete [] rkey;
robj->releaseLuaObject();
}

/*----------------------------------------------------------------------------
* clearSamples
*----------------------------------------------------------------------------*/
void ArrowSampler::BatchSampler::clearSamples(void)
{
for(ArrowSampler::sample_list_t* slist : samples)
{
delete slist;
}
samples.clear();
}

/*----------------------------------------------------------------------------
* Reader Constructor
*----------------------------------------------------------------------------*/
Expand All @@ -164,11 +177,6 @@ ArrowSampler::Reader::Reader(RasterObject* _robj, ArrowSampler* _obj) :
*----------------------------------------------------------------------------*/
ArrowSampler::Reader::~Reader(void)
{
for(ArrowSampler::sample_list_t* slist : samples)
{
delete slist;
}

delete robj; /* This is locally created RasterObject, not lua created */
}

Expand All @@ -184,23 +192,37 @@ void* ArrowSampler::mainThread(void* parm)
const uint32_t trace_id = start_trace(INFO, s->traceId, "arrow_sampler", "{\"filename\":\"%s\"}", s->dataFile);
EventLib::stashId(trace_id);

/* Sample all user provided raster objects (data sets) */
/* Get samples for all user RasterObjects */
for(batch_sampler_t* sampler : s->batchSamplers)
{
batchSampling(sampler);
if(s->active)
{
batchSampling(sampler);

/* batchSampling can take minutes, check active again */
if(s->active)
s->impl->processSamples(sampler);
}

/* Release since not needed anymore */
sampler->clearSamples();
sampler->file_ids.clear();
}

try
{
s->impl->createOutpuFiles();
if(s->active)
{
s->impl->createOutpuFiles();

/* Send Data File to User */
ArrowCommon::send2User(s->dataFile, s->outputPath, trace_id, s->parms, s->outQ);
/* Send Data File to User */
ArrowCommon::send2User(s->dataFile, s->outputPath, trace_id, s->parms, s->outQ);

/* Send Metadata File to User */
if(ArrowCommon::fileExists(s->metadataFile))
{
ArrowCommon::send2User(s->metadataFile, s->outputMetadataPath, trace_id, s->parms, s->outQ);
/* Send Metadata File to User */
if(ArrowCommon::fileExists(s->metadataFile))
{
ArrowCommon::send2User(s->metadataFile, s->outputMetadataPath, trace_id, s->parms, s->outQ);
}
}
}
catch(const RunTimeException& e)
Expand Down Expand Up @@ -259,7 +281,7 @@ const std::vector<ArrowSampler::batch_sampler_t*>& ArrowSampler::getBatchSampler
* Constructor
*----------------------------------------------------------------------------*/
ArrowSampler::ArrowSampler(lua_State* L, ArrowParms* _parms, const char* input_file,
const char* outq_name, const std::vector<raster_info_t>& rasters):
const char* outq_name, const std::vector<raster_info_t>& user_rasters):
LuaObject(L, OBJECT_TYPE, LUA_META_NAME, LUA_META_TABLE),
active(false),
mainPid(NULL),
Expand All @@ -279,9 +301,9 @@ ArrowSampler::ArrowSampler(lua_State* L, ArrowParms* _parms, const char* input_f
try
{
/* Copy user raster objects, create batch samplers */
for(std::size_t i = 0; i < rasters.size(); i++)
for(std::size_t i = 0; i < user_rasters.size(); i++)
{
const raster_info_t& raster = rasters[i];
const raster_info_t& raster = user_rasters[i];
const char* rkey = raster.rkey;
RasterObject* robj = raster.robj;
batch_sampler_t* sampler = new batch_sampler_t(rkey, robj, this);
Expand Down Expand Up @@ -354,7 +376,21 @@ void ArrowSampler::Delete(void)
*----------------------------------------------------------------------------*/
void ArrowSampler::getReadersRange(std::vector<reader_range_t>& ranges, uint32_t maxNumThreads)
{
const uint32_t minPointsPerThread = 20;
/*
* If points are geographically dispersed and fall into different data blocks of a raster,
* the initial read operation from the AWS S3 bucket can take approximately one second due
* to network latency and data retrieval time. Subsequent reads from the same data blocks
* are significantly faster due to caching mechanisms.
*
* The worst-case scenario occurs when points are not located within the same data block,
* leading to multiple time-consuming read operations.
*
* To optimize performance and balance the overhead of creating new RasterObjects and
* managing multiple threads, a threshold of 5 seconds (minPointsPerThread) is used. This value
* determines when to initiate multiple threads for parallel processing. By doing so,
* we aim to enhance efficiency and reduce overall processing time.
*/
const uint32_t minPointsPerThread = 5;

/* Determine how many reader threads to use and index range for each */
if(points.size() <= minPointsPerThread)
Expand Down Expand Up @@ -403,99 +439,72 @@ void ArrowSampler::batchSampling(batch_sampler_t* sampler)
for(uint32_t i = 0; i < ranges.size(); i++)
{
const reader_range_t& r = ranges[i];
print2term("%s: ragne-%u: %u to %u\n", sampler->rkey, i, r.start_indx, r.end_indx);
mlog(DEBUG, "%s: ragne-%u: %u to %u\n", sampler->rkey, i, r.start_indx, r.end_indx);
}

const uint32_t numThreads = ranges.size();

/* Start reader threads */
std::vector<Thread*> pids;
std::vector<reader_t*> readers;
for(uint32_t i = 0; i < numThreads; i++)
{
/* Create RasterObject for each reader.
* This is a local object and will be deleted in the reader destructor.
* RasterObject from the user is not used here.
*/
RasterObject* _robj = RasterObject::cppCreate(sampler->robj);
reader_t* reader = new reader_t(_robj, sampler->obj);
reader->range = ranges[i];
readers.push_back(reader);
Thread* pid = new Thread(readerThread, reader);
pids.push_back(pid);
}

/* Wait for all reader threads to finish */
for(Thread* pid : pids)
if(numThreads == 1)
{
delete pid;
/* Single thread, read all samples in this thread using user RasterObject */
readSamples(sampler->robj, ranges[0].start_indx, ranges[0].end_indx, sampler->obj, sampler->samples);
}

/* Copy samples lists (pointers only) from each reader. */
for(const reader_t* reader : readers)
else
{
for(sample_list_t* slist : reader->samples)
/* Start reader threads */
std::vector<Thread*> pids;
std::vector<reader_t*> readers;
for(uint32_t i = 0; i < numThreads; i++)
{
for(int32_t i = 0; i < slist->length(); i++)
{
/* NOTE: sample.fileId is an index of the file name in the reader's file dictionary.
* we need to convert it to the index in the batch sampler's dictionary (user's RasterObject dict).
*/
RasterSample* sample = slist->get(i);

/* Find the file name for the sample id in reader's dictionary */
const char* name = reader->robj->fileDictGetFile(sample->fileId);

/* Use user's RasterObject dictionary to store the file names. */
const uint64_t id = sampler->robj->fileDictAdd(name);

/* Update the sample file id */
sample->fileId = id;
}

/* slist pointer is now in two samples vectors, one in the reader and one in the batch sampler
* reader's destructor will delete it
/* Create RasterObject for each reader.
* These are local objects and will be deleted in the reader destructor.
* User RasterObject is not used for sampling. It is used for acumulating samples from all readers.
*/
sampler->samples.push_back(slist);
RasterObject* _robj = RasterObject::cppCreate(sampler->robj);
reader_t* reader = new reader_t(_robj, sampler->obj);
reader->range = ranges[i];
readers.push_back(reader);
Thread* pid = new Thread(readerThread, reader);
pids.push_back(pid);
}
}


/* Convert samples into new columns */
if(sampler->obj->active &&
sampler->obj->impl->processSamples(sampler))
{
/* Create raster file map <id, filename> */
Dictionary<uint64_t>::Iterator iterator(sampler->robj->fileDictGet());
for(int i = 0; i < iterator.length; i++)
/* Wait for all reader threads to finish */
for(Thread* pid : pids)
{
const char* name = iterator[i].key;
const uint64_t id = iterator[i].value;
delete pid;
}

/* For some data sets, dictionary contains quality mask rasters in addition to data rasters.
* Only add rasters with id present in the samples
*/
if(sampler->file_ids.find(id) != sampler->file_ids.end())
/* Copy samples lists (slist pointers only) from each reader. */
for(const reader_t* reader : readers)
{
for(sample_list_t* slist : reader->samples)
{
sampler->filemap.emplace_back(id, name);
for(int32_t i = 0; i < slist->length(); i++)
{
/* NOTE: sample.fileId is an index of the file name in the reader's file dictionary.
* we need to convert it to the index in the batch sampler's dictionary (user's RasterObject dict).
*/
RasterSample* sample = slist->get(i);

/* Find the file name for the sample id in reader's dictionary */
const char* name = reader->robj->fileDictGetFile(sample->fileId);

/* Use user's RasterObject dictionary to store the file names. */
const uint64_t id = sampler->robj->fileDictAdd(name);

/* Update the sample file id */
sample->fileId = id;
}

/* slist pointer is now in two samples vectors, one in the reader and one in the batch sampler
* batch sampler destructor will delete it
*/
sampler->samples.push_back(slist);
}
}

/* Sort the map with increasing file id */
std::sort(sampler->filemap.begin(), sampler->filemap.end(),
[](const std::pair<uint64_t, std::string>& a, const std::pair<uint64_t, std::string>& b)
{ return a.first < b.first; });
}

/* Clean up readers, deletes cppcreated raster objects */
for(const reader_t* reader : readers)
{
delete reader;
delete reader;
}
}

/* Release since not needed anymore */
sampler->samples.clear();
sampler->file_ids.clear();
}

/*----------------------------------------------------------------------------
Expand All @@ -504,16 +513,27 @@ void ArrowSampler::batchSampling(batch_sampler_t* sampler)
void* ArrowSampler::readerThread(void* parm)
{
reader_t* reader = static_cast<reader_t*>(parm);
RasterObject* robj = reader->robj;
readSamples(reader->robj,
reader->range.start_indx,
reader->range.end_indx,
reader->obj,
reader->samples);

const uint32_t start_indx = reader->range.start_indx;
const uint32_t end_indx = reader->range.end_indx;
/* Exit Thread */
return NULL;
}

/*----------------------------------------------------------------------------
* readSamples
*----------------------------------------------------------------------------*/
void* ArrowSampler::readSamples(RasterObject* robj, uint32_t start_indx, uint32_t end_indx,
ArrowSampler* obj, std::vector<ArrowSampler::sample_list_t*>& samples)
{
for(uint32_t i = start_indx; i < end_indx; i++)
{
if(!reader->obj->active) break; // early exit if lua object is being destroyed
if(!obj->active) break; // early exit if lua object is being destroyed

point_info_t* pinfo = reader->obj->points[i];
point_info_t* pinfo = obj->points[i];

const MathLib::point_3d_t point = {pinfo->x, pinfo->y, 0.0};
const double gps = robj->usePOItime() ? pinfo->gps : 0.0;
Expand All @@ -535,7 +555,7 @@ void* ArrowSampler::readerThread(void* parm)
}

/* Add sample list */
reader->samples.push_back(slist);
samples.push_back(slist);
}

/* Exit Thread */
Expand Down
3 changes: 3 additions & 0 deletions packages/arrow/ArrowSampler.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class ArrowSampler: public LuaObject

explicit BatchSampler (const char* _rkey, RasterObject* _robj, ArrowSampler* _obj);
~BatchSampler (void);
void clearSamples (void);
} batch_sampler_t;

typedef struct
Expand Down Expand Up @@ -166,6 +167,8 @@ class ArrowSampler: public LuaObject
static void* mainThread (void* parm);
static void batchSampling (batch_sampler_t* sampler);
static void* readerThread (void* parm);
static void* readSamples (RasterObject* robj, uint32_t start_indx, uint32_t end_indx,
ArrowSampler* obj, std::vector<ArrowSampler::sample_list_t*>& samples);
};

#endif /* __parquet_sampler__*/
Loading

0 comments on commit 048b69a

Please sign in to comment.