Skip to content

Commit

Permalink
HPCC-32741 Refactor code to work towards a Thor generic disk activity
Browse files Browse the repository at this point in the history
Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
  • Loading branch information
ghalliday committed Sep 30, 2024
1 parent 1c0c9a2 commit 3742e54
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 51 deletions.
32 changes: 18 additions & 14 deletions common/thorhelper/thorcommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1074,16 +1074,18 @@ class CRowStreamReader : public CSimpleInterfaceOf<IExtRowStream>
}
return false;
}
const byte *getNextPrefetchRow()
const byte *getNextPrefetchRow(size32_t & size)
{
while (true)
{
++progress;
if (checkEmptyRow())
return nullptr;
break;

currentRowOffset = prefetchBuffer.tell();
prefetcher->readAhead(prefetchBuffer);
bool matched = fieldFilterMatch(prefetchBuffer.queryRow());
size = prefetchBuffer.tell() - currentRowOffset;
checkEog();
if (matched) // NB: prefetchDone() call must be paired with a row returned from prefetchRow()
{
Expand All @@ -1095,6 +1097,7 @@ class CRowStreamReader : public CSimpleInterfaceOf<IExtRowStream>
if (checkExitConditions())
break;
}
size = 0;
return nullptr;
}
const void *getNextRow()
Expand Down Expand Up @@ -1194,7 +1197,8 @@ class CRowStreamReader : public CSimpleInterfaceOf<IExtRowStream>
{
if (translator)
{
const byte *row = getNextPrefetchRow();
size32_t prefetchSize;
const byte *row = getNextPrefetchRow(prefetchSize);
if (row)
{
RtlDynamicRowBuilder rowBuilder(*allocator);
Expand All @@ -1210,7 +1214,7 @@ class CRowStreamReader : public CSimpleInterfaceOf<IExtRowStream>
return nullptr;
}

virtual const byte *prefetchRow() override
virtual const void *prefetchRow(size32_t & size) override
{
// NB: prefetchDone() call must be paired with a row returned from prefetchRow()
if (eog)
Expand All @@ -1224,20 +1228,21 @@ class CRowStreamReader : public CSimpleInterfaceOf<IExtRowStream>
eos = true;
else
{
const byte *row = getNextPrefetchRow();
const byte *row = getNextPrefetchRow(size);
if (row)
{
if (translator)
{
translateBuf.setLength(0);
MemoryBufferBuilder rowBuilder(translateBuf, 0);
translator->translate(rowBuilder, *fieldCallback, row);
size = translator->translate(rowBuilder, *fieldCallback, row);
row = rowBuilder.getSelf();
}
return row;
}
}
}
size = 0;
return nullptr;
}

Expand All @@ -1248,7 +1253,11 @@ class CRowStreamReader : public CSimpleInterfaceOf<IExtRowStream>

virtual void stop() override
{
stop(NULL);
if (!eos)
{
eos = true;
clear();
}
}

void clear()
Expand All @@ -1257,15 +1266,10 @@ class CRowStreamReader : public CSimpleInterfaceOf<IExtRowStream>
fileio.clear();
}

virtual void stop(CRC32 *crcout) override
virtual CRC32 queryCRC() const override
{
if (!eos) {
eos = true;
clear();
}
// NB CRC will only be right if stopped at eos
if (crcout)
*crcout = crccb.crc;
return crccb.crc;
}

virtual offset_t getOffset() const override
Expand Down
15 changes: 7 additions & 8 deletions common/thorhelper/thorcommon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,16 +155,15 @@ inline unsigned getCompMethod(const char *compStr)

interface IExtRowStream: extends IRowStream
{
virtual offset_t getOffset() const = 0;
virtual offset_t getLastRowOffset() const = 0;
virtual unsigned __int64 queryProgress() const = 0;
using IRowStream::stop;
virtual void stop(CRC32 *crcout) = 0;
virtual const byte *prefetchRow() = 0;
virtual offset_t getOffset() const = 0; // Used by merge to limit the size read from disk in CMergeSlave::getRows()
virtual offset_t getLastRowOffset() const = 0; // Used by disk read to deal with virtual file positions.
virtual unsigned __int64 queryProgress() const = 0; // Should probably be getStatistic(StNumRowsRead)
virtual const void *prefetchRow(size32_t & size) = 0; // Used when row does not need to be cloned - e.g. when it will be transformed
virtual void prefetchDone() = 0;
virtual void reinit(offset_t offset,offset_t len,unsigned __int64 maxrows) = 0;
virtual void reinit(offset_t offset,offset_t len,unsigned __int64 maxrows) = 0; // Not used anywhere - should be deleted
virtual unsigned __int64 getStatistic(StatisticKind kind) = 0;
virtual void setFilters(IConstArrayOf<IFieldFilter> &filters) = 0;
virtual void setFilters(IConstArrayOf<IFieldFilter> &filters) = 0; // Possibly cleaner as a parameter to createRowStream()
virtual CRC32 queryCRC() const = 0;
};

interface IExtRowWriter: extends IRowWriter
Expand Down
28 changes: 14 additions & 14 deletions common/thorhelper/thorread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ class BinaryDiskRowReader : public LocalDiskRowReader
BinaryDiskRowReader(IDiskReadMapping * _mapping);

virtual const void *nextRow() override;
virtual const void *nextRow(size32_t & resultSize) override;
virtual const void *prefetchRow(size32_t & resultSize) override;
virtual const void * nextRow(MemoryBufferBuilder & builder) override;
virtual bool getCursor(MemoryBuffer & cursor) override;
virtual void setCursor(MemoryBuffer & cursor) override;
Expand Down Expand Up @@ -681,7 +681,7 @@ const void *BinaryDiskRowReader::nextRow()


//Similar to above, except the code at the end will translate to a local buffer or return the pointer
const void *BinaryDiskRowReader::nextRow(size32_t & resultSize)
const void *BinaryDiskRowReader::prefetchRow(size32_t & resultSize)
{
return inlineNextRow(
[this,&resultSize](size32_t sizeRead, const byte * next)
Expand Down Expand Up @@ -885,7 +885,7 @@ class CsvDiskRowReader : public ExternalFormatDiskRowReader
CsvDiskRowReader(IDiskReadMapping * _mapping);

virtual const void *nextRow() override;
virtual const void *nextRow(size32_t & resultSize) override;
virtual const void *prefetchRow(size32_t & resultSize) override;
virtual const void *nextRow(MemoryBufferBuilder & builder) override;

virtual void stop() override;
Expand Down Expand Up @@ -1006,7 +1006,7 @@ const void *CsvDiskRowReader::nextRow()


//Implementation of IRawRowStream
const void *CsvDiskRowReader::nextRow(size32_t & resultSize)
const void *CsvDiskRowReader::prefetchRow(size32_t & resultSize)
{
for (;;)
{
Expand Down Expand Up @@ -1232,7 +1232,7 @@ class MarkupDiskRowReader : public ExternalFormatDiskRowReader, implements IXMLS
MarkupDiskRowReader(IDiskReadMapping * _mapping, ThorActivityKind _kind);

virtual const void *nextRow() override;
virtual const void *nextRow(size32_t & resultSize) override;
virtual const void *prefetchRow(size32_t & resultSize) override;
virtual const void *nextRow(MemoryBufferBuilder & builder) override;

virtual void stop() override;
Expand Down Expand Up @@ -1302,7 +1302,7 @@ const void *MarkupDiskRowReader::nextRow()
}

//Implementation of IRawRowStream
const void *MarkupDiskRowReader::nextRow(size32_t & resultSize)
const void *MarkupDiskRowReader::prefetchRow(size32_t & resultSize)
{
tempOutputBuffer.clear();
const void * next = nextRow(bufferBuilder);
Expand Down Expand Up @@ -1494,10 +1494,10 @@ class CompoundProjectRowReader : extends CInterfaceOf<IDiskRowStream>, implement
virtual void setCursor(MemoryBuffer & cursor) override { rawInputStream->setCursor(cursor); }
virtual void stop() override { rawInputStream->stop(); }

virtual const void *nextRow(size32_t & resultSize) override
virtual const void *prefetchRow(size32_t & resultSize) override
{
size32_t rawInputSize;
const void * next = rawInputStream->nextRow(rawInputSize);
const void * next = rawInputStream->prefetchRow(rawInputSize);
if (isSpecialRow(next))
return next;

Expand All @@ -1512,7 +1512,7 @@ class CompoundProjectRowReader : extends CInterfaceOf<IDiskRowStream>, implement
virtual const void *nextRow() override
{
size32_t rawInputSize;
const void * next = rawInputStream->nextRow(rawInputSize);
const void * next = rawInputStream->prefetchRow(rawInputSize);
if (isSpecialRow(next))
return next;

Expand All @@ -1523,7 +1523,7 @@ class CompoundProjectRowReader : extends CInterfaceOf<IDiskRowStream>, implement
virtual const void *nextRow(MemoryBufferBuilder & builder) override
{
size32_t rawInputSize;
const void * next = rawInputStream->nextRow(rawInputSize);
const void * next = rawInputStream->prefetchRow(rawInputSize);
if (isSpecialRow(next))
return next;

Expand Down Expand Up @@ -1651,7 +1651,7 @@ class ParquetDiskRowReader : public ExternalFormatDiskRowReader
virtual IDiskRowStream * queryAllocatedRowStream(IEngineRowAllocator * _outputAllocator) override;

virtual const void * nextRow() override;
virtual const void * nextRow(size32_t & resultSize) override;
virtual const void * prefetchRow(size32_t & resultSize) override;
virtual const void * nextRow(MemoryBufferBuilder & builder) override;
virtual bool getCursor(MemoryBuffer & cursor) override { return parquetFileReader->getCursor(cursor); }
virtual void setCursor(MemoryBuffer & cursor) override { parquetFileReader->setCursor(cursor); }
Expand Down Expand Up @@ -1720,7 +1720,7 @@ const void * ParquetDiskRowReader::nextRow()

// Returns temporary rows for filtering/counting etc.
// Row is built in temporary buffer and reused.
const void * ParquetDiskRowReader::nextRow(size32_t & resultSize)
const void * ParquetDiskRowReader::prefetchRow(size32_t & resultSize)
{
tempOutputBuffer.clear();
const void * next = nextRow(bufferBuilder);
Expand Down Expand Up @@ -1812,7 +1812,7 @@ class RemoteDiskRowReader : public DiskRowReader
RemoteDiskRowReader(const char * _format, IDiskReadMapping * _mapping);

virtual const void *nextRow() override;
virtual const void *nextRow(size32_t & resultSize) override;
virtual const void *prefetchRow(size32_t & resultSize) override;
virtual const void *nextRow(MemoryBufferBuilder & builder) override;
virtual bool getCursor(MemoryBuffer & cursor) override;
virtual void setCursor(MemoryBuffer & cursor) override;
Expand Down Expand Up @@ -1988,7 +1988,7 @@ const void *RemoteDiskRowReader::nextRow()


//Similar to above, except the code at the end will translate to a local buffer or return the pointer
const void *RemoteDiskRowReader::nextRow(size32_t & resultSize)
const void *RemoteDiskRowReader::prefetchRow(size32_t & resultSize)
{
return inlineNextRow(
[this,&resultSize](size32_t sizeRead, const byte * next)
Expand Down
4 changes: 2 additions & 2 deletions ecl/hthor/hthor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11350,7 +11350,7 @@ const void *CHThorNewDiskReadActivity::nextRow()
{
//Returns a row in the serialized form of the projected format
size32_t nextSize;
const byte * next = (const byte *)inputRowStream->nextRow(nextSize);
const byte * next = (const byte *)inputRowStream->prefetchRow(nextSize);
if (!isSpecialRow(next))
{
if (likely(!hasMatchFilter || helper.canMatch(next)))
Expand Down Expand Up @@ -11910,7 +11910,7 @@ const void *CHThorGenericDiskReadActivity::nextRow()
{
//Returns a row in the serialized form of the projected format
size32_t nextSize;
const byte * next = (const byte *)inputRowStream->nextRow(nextSize);
const byte * next = (const byte *)inputRowStream->prefetchRow(nextSize);
if (!isSpecialRow(next))
{
if (likely(!hasMatchFilter || helper.canMatch(next)))
Expand Down
2 changes: 1 addition & 1 deletion system/jlib/jrowstream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class NullDiskRowStream : public CInterfaceOf<IDiskRowStream>
virtual void stop()
{
}
virtual const void *nextRow(size32_t & size) override
virtual const void *prefetchRow(size32_t & size) override
{
size = 0;
return eofRow;
Expand Down
6 changes: 3 additions & 3 deletions system/jlib/jrowstream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ interface IDiskRowStream : extends IRowStream
virtual void setCursor(MemoryBuffer & cursor) = 0;

// rows returned are only valid until next call. Size is the number of bytes in the row.
virtual const void *nextRow(size32_t & size)=0;
virtual const void * prefetchRow(size32_t & size)=0;

inline const void *ungroupedNextRow(size32_t & size) // size will not include the size of the eog
inline const void *ungroupedPrefetchRow(size32_t & size) // size will not include the size of the eog
{
for (;;)
{
const void *ret = nextRow(size);
const void *ret = prefetchRow(size);
if (likely(!isEndOfGroup(ret)))
return ret;
}
Expand Down
10 changes: 6 additions & 4 deletions thorlcr/activities/diskread/thdiskreadslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,10 @@ class CDiskRecordPartHandler : public CDiskPartHandlerBase
{
return in->nextRow();
}
inline const byte *prefetchRow()
inline const void *prefetchRow()
{
return in->prefetchRow();
size32_t size;
return in->prefetchRow(size);
}
inline void prefetchDone()
{
Expand Down Expand Up @@ -397,7 +398,8 @@ void CDiskRecordPartHandler::close(CRC32 &fileCRC)
{
closedPartFileStats.mergeStatistic(StNumDiskRowsRead, partStream->queryProgress());
activity.mergeFileStats(partDesc, partStream);
partStream->stop(&fileCRC);
partStream->stop();
fileCRC = partStream->queryCRC();
}
}

Expand Down Expand Up @@ -425,7 +427,7 @@ class CDiskReadSlaveActivity : public CDiskReadSlaveActivityRecord
{
for (;;)
{
const byte *row = CDiskRecordPartHandler::prefetchRow();
const void *row = CDiskRecordPartHandler::prefetchRow();
if (!row)
{
if (!activity.grouped)
Expand Down
9 changes: 4 additions & 5 deletions thorlcr/thorutil/thormisc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,14 +381,13 @@ class CFileSizeTracker : public CInterface
};

// simple class which takes ownership of the underlying file and deletes it on destruction
class graph_decl CFileOwner : public CSimpleInterface, implements IInterface
class graph_decl CFileOwner : public CSimpleInterfaceOf<IInterface>
{
Linked<IFile> iFile;
Linked<CFileSizeTracker> fileSizeTracker;
offset_t fileSize = 0;

public:
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
CFileOwner(IFile *_iFile, CFileSizeTracker *_fileSizeTracker = nullptr) : iFile(_iFile), fileSizeTracker(_fileSizeTracker)
{
}
Expand Down Expand Up @@ -429,12 +428,11 @@ class graph_decl CStreamFileOwner : public CSimpleInterfaceOf<IExtRowStream>
}
// IExtRowStream
virtual const void *nextRow() override { return stream->nextRow(); }
virtual void stop() override { stream->stop(NULL); }
virtual void stop() override { stream->stop(); }
virtual offset_t getOffset() const override { return stream->getOffset(); }
virtual offset_t getLastRowOffset() const override { return stream->getLastRowOffset(); }
virtual unsigned __int64 queryProgress() const override { return stream->queryProgress(); }
virtual void stop(CRC32 *crcout) override { stream->stop(crcout); }
virtual const byte *prefetchRow() override { return stream->prefetchRow(); }
virtual const void *prefetchRow(size32_t & size) override { return stream->prefetchRow(size); }
virtual void prefetchDone() override { stream->prefetchDone(); }
virtual void reinit(offset_t offset, offset_t len, unsigned __int64 maxRows) override
{
Expand All @@ -448,6 +446,7 @@ class graph_decl CStreamFileOwner : public CSimpleInterfaceOf<IExtRowStream>
{
return stream->setFilters(filters);
}
virtual CRC32 queryCRC() const override { return stream->queryCRC(); }
};

#define DEFAULT_THORMASTERPORT 20000
Expand Down

0 comments on commit 3742e54

Please sign in to comment.