Skip to content

Commit

Permalink
HPCC-29880 Serialize index write's jhtree and disk io stats regularly
Browse files Browse the repository at this point in the history
Signed-off-by: Shamser Ahmed <shamser.ahmed@lexisnexis.com>
  • Loading branch information
shamser committed Jan 21, 2025
1 parent d8a4cb3 commit e1a458c
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 45 deletions.
23 changes: 18 additions & 5 deletions system/jhtree/keybuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,9 @@ class CKeyBuilder : public CInterfaceOf<IKeyBuilder>

private:
unsigned __int64 duplicateCount;
unsigned __int64 numLeaves = 0;
unsigned __int64 numBranches = 0;
unsigned __int64 numBlobs = 0;
RelaxedAtomic<__uint64> numLeaves{0};
RelaxedAtomic<__uint64> numBranches{0};
RelaxedAtomic<__uint64> numBlobs{0};
__uint64 partitionFieldMask = 0;
CWriteNode *activeNode = nullptr;
CBlobWriteNode *activeBlobNode = nullptr;
Expand Down Expand Up @@ -485,7 +485,7 @@ class CKeyBuilder : public CInterfaceOf<IKeyBuilder>
keyHdr->getHdrStruct()->partitionFieldMask = partitionFieldMask;
CRC32 headerCrc;
writeFileHeader(false, &headerCrc);

out->flush();
if (fileCrc)
{
if (doCrc)
Expand Down Expand Up @@ -605,7 +605,20 @@ class CKeyBuilder : public CInterfaceOf<IKeyBuilder>
virtual unsigned __int64 getOffsetBranches() const override { return offsetBranches; }
virtual unsigned __int64 getBranchMemorySize() const override { return indexCompressor->queryBranchMemorySize(); }
virtual unsigned __int64 getLeafMemorySize() const override { return indexCompressor->queryLeafMemorySize(); }

virtual unsigned __int64 getStatistic(StatisticKind kind) const override
{
switch (kind)
{
case StNumLeafCacheAdds:
return numLeaves;
case StNumNodeCacheAdds:
return numBranches;
case StNumBlobCacheAdds:
return numBlobs;
default:
return out->getStatistic(kind);
}
}
protected:
void writeMetadata(char const * data, size32_t size)
{
Expand Down
1 change: 1 addition & 0 deletions system/jhtree/keybuild.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ interface IKeyBuilder : public IInterface
virtual unsigned __int64 getOffsetBranches() const = 0;
virtual unsigned __int64 getBranchMemorySize() const = 0;
virtual unsigned __int64 getLeafMemorySize() const = 0;
virtual unsigned __int64 getStatistic(StatisticKind kind) const = 0;
};

extern jhtree_decl IKeyBuilder *createKeyBuilder(IFileIOStream *_out, unsigned flags, unsigned rawSize, unsigned nodeSize, unsigned keyFieldSize, unsigned __int64 startSequence, IHThorIndexWriteArg *helper, const char * defaultCompression, bool enforceOrder, bool isTLK);
Expand Down
77 changes: 37 additions & 40 deletions thorlcr/activities/indexwrite/thindexwriteslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ class IndexWriteSlaveActivity : public ProcessSlaveActivity, public ILookAheadSt
StringAttr logicalFilename;
Owned<IPartDescriptor> partDesc, tlkDesc;
IHThorIndexWriteArg *helper;
Owned <IKeyBuilder> builder;
OwnedIFileIO builderIFileIO;
Owned<IKeyBuilder> builder;
Owned<IRowStream> myInputStream;
Owned<IPropertyTree> metadata;
Linked<IEngineRowAllocator> outRowAllocator;
mutable CriticalSection builderCS;

bool buildTlk, active;
bool sizeSignalled;
Expand All @@ -54,9 +54,6 @@ class IndexWriteSlaveActivity : public ProcessSlaveActivity, public ILookAheadSt

size32_t lastRowSize, firstRowSize, maxRecordSizeSeen, keyedSize;
unsigned __int64 duplicateKeyCount;
unsigned __int64 numLeafNodes = 0;
unsigned __int64 numBranchNodes = 0;
unsigned __int64 numBlobNodes = 0;
offset_t offsetBranches = 0;
offset_t uncompressedSize = 0;
offset_t originalBlobSize = 0;
Expand Down Expand Up @@ -212,12 +209,15 @@ class IndexWriteSlaveActivity : public ProcessSlaveActivity, public ILookAheadSt
if (metadata->getPropBool("_useTrailingHeader", true))
flags |= USE_TRAILING_HEADER;
unsigned twFlags = isUrl(partFname) ? TW_Direct : TW_RenameToPrimary;
builderIFileIO.setown(createMultipleWrite(this, partDesc, 0, twFlags, compress, NULL, this, &abortSoon));
OwnedIFileIO builderIFileIO = createMultipleWrite(this, partDesc, 0, twFlags, compress, NULL, this, &abortSoon);
Owned<IFileIOStream> out = createBufferedIOStream(builderIFileIO, 0x100000);
if (!needsSeek)
out.setown(createNoSeekIOStream(out));
maxRecordSizeSeen = 0;
builder.setown(createKeyBuilder(out, flags, maxDiskRecordSize, nodeSize, helper->getKeyedSize(), isTlk ? 0 : totalCount, helper, defaultIndexCompression, !isTlk, isTlk));
{
CriticalBlock b(builderCS);
builder.setown(createKeyBuilder(out, flags, maxDiskRecordSize, nodeSize, helper->getKeyedSize(), isTlk ? 0 : totalCount, helper, defaultIndexCompression, !isTlk, isTlk));
}
}
void buildLayoutMetadata(Owned<IPropertyTree> & metadata)
{
Expand All @@ -235,16 +235,24 @@ class IndexWriteSlaveActivity : public ProcessSlaveActivity, public ILookAheadSt
{
if (builder)
{
builder->finish(metadata, &crc, maxRecordSizeSeen);
if (!isTLK)
// Clear out builder before merging builder stats into inactive stats
// so that gatherActiveStatistics doesn't also merge builder stats.
Owned<IKeyBuilder> tmpBuilder;
{
CriticalBlock b(builderCS);
tmpBuilder.setown(builder.getClear());
}
if (tmpBuilder)
{
duplicateKeyCount = builder->getDuplicateCount();
numLeafNodes = builder->getNumLeafNodes();
numBranchNodes = builder->getNumBranchNodes();
numBlobNodes = builder->getNumBlobNodes();
offsetBranches = builder->getOffsetBranches();
branchMemorySize = builder->getBranchMemorySize();
leafMemorySize = builder->getLeafMemorySize();
tmpBuilder->finish(metadata, &crc, maxRecordSizeSeen);
if (!isTLK)
{
duplicateKeyCount = tmpBuilder->getDuplicateCount();
offsetBranches = tmpBuilder->getOffsetBranches();
branchMemorySize = tmpBuilder->getBranchMemorySize();
leafMemorySize = tmpBuilder->getLeafMemorySize();
}
mergeStats(inactiveStats, tmpBuilder, indexWriteActivityStatistics);
}
}
}
Expand All @@ -259,22 +267,7 @@ class IndexWriteSlaveActivity : public ProcessSlaveActivity, public ILookAheadSt
abortSoon = true;
e.setown(MakeActivityException(this, 0, "INDEXWRITE: Error closing file: %s - unknown exception", partFname.str()));
}
try
{
metadata.clear();
builder.clear();
if (builderIFileIO)
{
mergeStats(inactiveStats, builderIFileIO, diskWriteRemoteStatistics);
builderIFileIO->close();
builderIFileIO.clear();
}
}
catch (IException *_e)
{
ActPrintLog(_e, "Error closing file: %s", partFname.str());
e.setown(_e);
}
metadata.clear();
if (abortSoon)
removeFiles(partDesc);
if (e)
Expand Down Expand Up @@ -613,7 +606,10 @@ class IndexWriteSlaveActivity : public ProcessSlaveActivity, public ILookAheadSt
}
virtual void processDone(MemoryBuffer &mb) override
{
builder.clear();
{
CriticalBlock b(builderCS);
builder.clear();
}
if (refactor && !active)
return;
rowcount_t _processed = processed & THORDATALINK_COUNT_MASK;
Expand All @@ -630,10 +626,9 @@ class IndexWriteSlaveActivity : public ProcessSlaveActivity, public ILookAheadSt
ifile->getTime(&createTime, &modifiedTime, &accessedTime);
modifiedTime.serialize(mb);
mb.append(partCrc);

mb.append(numLeafNodes);
mb.append(numBlobNodes);
mb.append(numBranchNodes);
mb.append(inactiveStats.getStatisticValue(StNumLeafCacheAdds));
mb.append(inactiveStats.getStatisticValue(StNumBlobCacheAdds));
mb.append(inactiveStats.getStatisticValue(StNumNodeCacheAdds));
mb.append(offsetBranches);
mb.append(uncompressedSize);
mb.append(originalBlobSize);
Expand Down Expand Up @@ -699,10 +694,12 @@ class IndexWriteSlaveActivity : public ProcessSlaveActivity, public ILookAheadSt
virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const
{
PARENT::gatherActiveStats(activeStats);
{
CriticalBlock b(builderCS);
if (builder)
mergeStats(activeStats, builder, indexWriteActivityStatistics);
}
activeStats.setStatistic(StPerReplicated, replicateDone);
activeStats.setStatistic(StNumLeafCacheAdds, numLeafNodes);
activeStats.setStatistic(StNumNodeCacheAdds, numBranchNodes);
activeStats.setStatistic(StNumBlobCacheAdds, numBlobNodes);
}

// ICopyFileProgress
Expand Down

0 comments on commit e1a458c

Please sign in to comment.