From 961044ac19cdb1e8d0fd58ec00e2a6e9b71775a1 Mon Sep 17 00:00:00 2001 From: Shamser Ahmed Date: Tue, 7 Nov 2023 05:00:05 +0000 Subject: [PATCH] HPCC-30993 Fix file access for index read activity Signed-off-by: Shamser Ahmed --- .../activities/indexread/thindexreadslave.cpp | 136 ++++++++++-------- 1 file changed, 77 insertions(+), 59 deletions(-) diff --git a/thorlcr/activities/indexread/thindexreadslave.cpp b/thorlcr/activities/indexread/thindexreadslave.cpp index b4c7d0cd7cc..90c999fc495 100644 --- a/thorlcr/activities/indexread/thindexreadslave.cpp +++ b/thorlcr/activities/indexread/thindexreadslave.cpp @@ -44,6 +44,7 @@ class CIndexReadSlaveBase : public CSlaveActivity protected: StringAttr logicalFilename; IArrayOf partDescs; + bool isSuperFile = false; IHThorIndexReadBaseArg *helper; IHThorSourceLimitTransformExtra * limitTransformExtra; Owned allocator; @@ -76,10 +77,9 @@ class CIndexReadSlaveBase : public CSlaveActivity rowcount_t rowLimit = RCMAX; bool useRemoteStreaming = false; Owned lazyIFileIO; - mutable CriticalSection ioStatsCS; + mutable CriticalSection keyManagersCS; // CS for any updates to keyManagers unsigned fileTableStart = NotFound; - CStatsContextLogger contextLogger; - CStatsCtxLoggerDeltaUpdater statsUpdater; + std::vector> contextLoggers; class TransformCallback : implements IThorIndexCallback , public CSimpleInterface { @@ -98,7 +98,7 @@ class CIndexReadSlaveBase : public CSlaveActivity if (!keyManager) throw MakeActivityException(&activity, 0, "Callback attempting to read blob with no key manager - index being read remotely?"); needsBlobCleaning = true; - return (byte *) keyManager->loadBlob(id, dummy, &activity.contextLogger); + return (byte *) keyManager->loadBlob(id, dummy, nullptr); } void prepareManager(IKeyManager *_keyManager) { @@ -166,10 +166,9 @@ class CIndexReadSlaveBase : public CSlaveActivity unsigned projectedFormatCrc = helper->getProjectedFormatCrc(); IOutputMetaData *projectedFormat = helper->queryProjectedDiskRecordSize(); - unsigned p = partNum; - while (p keyIndex = createKeyIndex(path, crc, *lazyIFileIO, (unsigned) -1, false); - Owned klManager = createLocalKeyManager(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndex, &contextLogger, helper->hasNewSegmentMonitors(), false); + IContextLogger * contextLogger = isSuperFile?contextLoggers[p]:contextLoggers[0]; + Owned klManager = createLocalKeyManager(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndex, contextlogger, helper->hasNewSegmentMonitors(), false); if (localMerge) { if (!keyIndexSet) @@ -315,7 +315,10 @@ class CIndexReadSlaveBase : public CSlaveActivity translators.append(translator.getClear()); } keyIndexSet->addIndex(keyIndex.getClear()); - keyManagers.append(*klManager.getLink()); + { + CriticalBlock b(keyManagersCS); + keyManagers.append(*klManager.getLink()); + } keyManager = klManager; } else @@ -325,13 +328,17 @@ class CIndexReadSlaveBase : public CSlaveActivity if (translator) klManager->setLayoutTranslator(&translator->queryTranslator()); translators.append(translator.getClear()); - keyManagers.append(*klManager.getLink()); + { + CriticalBlock b(keyManagersCS); + keyManagers.append(*klManager.getLink()); + } keyManager = klManager; - partNum = p; + partNum = (p+1); return createIndexLookup(keyManager); } } - keyMergerManager.setown(createKeyMerger(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndexSet, seekGEOffset, &contextLogger, helper->hasNewSegmentMonitors(), false)); + //Not tracking jhtree cache stats in KeyMerger at the moment. Future: something to consider + keyMergerManager.setown(createKeyMerger(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndexSet, seekGEOffset, nullptr, helper->hasNewSegmentMonitors(), false)); const ITranslator *translator = translators.item(0); if (translator) keyMergerManager->setLayoutTranslator(&translator->queryTranslator()); @@ -348,40 +355,12 @@ class CIndexReadSlaveBase : public CSlaveActivity else return nullptr; } - void mergeFileStats(IPartDescriptor *partDesc, IFileIO *partIO) - { - if (fileStats.size()>0) - { - ISuperFileDescriptor * superFDesc = partDesc->queryOwner().querySuperFileDescriptor(); - if (superFDesc) - { - unsigned subfile, lnum; - if(superFDesc->mapSubPart(partDesc->queryPartIndex(), subfile, lnum)) - mergeStats(*fileStats[fileTableStart+subfile], partIO); - } - else - mergeStats(*fileStats[fileTableStart], partIO); - } - } - void updateStats() - { - // NB: updateStats() should always be called whilst ioStatsCS is held. - if (lazyIFileIO) - { - mergeStats(inactiveStats, lazyIFileIO); - if (currentPartqueryHelper(); limitTransformExtra = nullptr; @@ -555,7 +533,6 @@ class CIndexReadSlaveBase : public CSlaveActivity break; if (keyManager) prepareManager(keyManager); - CStatsScopedThresholdDeltaUpdater scoped(statsUpdater); if (hard) // checkCount checks hard key count only. count += indexInput->checkCount(keyedLimit-count); // part max, is total limit [keyedLimit] minus total so far [count] else @@ -589,7 +566,10 @@ class CIndexReadSlaveBase : public CSlaveActivity partDescs.kill(); keyIndexSet.clear(); translators.kill(); - keyManagers.kill(); + { + CriticalBlock b(keyManagersCS); + keyManagers.kill(); + } keyMergerManager.clear(); } else @@ -607,6 +587,7 @@ class CIndexReadSlaveBase : public CSlaveActivity IPartDescriptor &part0 = partDescs.item(0); IFileDescriptor &fileDesc = part0.queryOwner(); ISuperFileDescriptor *super = fileDesc.querySuperFileDescriptor(); + isSuperFile = super != nullptr; if ((0 == (helper->getFlags() & TIRusesblob)) && !localMerge) { @@ -684,7 +665,17 @@ class CIndexReadSlaveBase : public CSlaveActivity } } data.read(fileTableStart); - setupSpace4FileStats(fileTableStart, reInit, super!=nullptr, super?super->querySubFiles():0, indexReadActivityStatistics); + setupSpace4FileStats(fileTableStart, reInit, isSuperFile, isSuperFile?super->querySubFiles():0, indexReadFileStatistics); + // One contextLogger required for non-super files. Superfiles require one context logger per subfile + if (isSuperFile) + { + for(unsigned i = 0; i < parts; ++i) + contextLoggers.push_back(new CStatsContextLogger(jhtreeCacheStatistics, thorJob)); + } + else + { + contextLoggers.push_back(new CStatsContextLogger(jhtreeCacheStatistics, thorJob)); + } } } // IThorDataLink @@ -719,11 +710,46 @@ class CIndexReadSlaveBase : public CSlaveActivity virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const { PARENT::gatherActiveStats(activeStats); + if (partDescs.ordinality()) { - CriticalBlock b(ioStatsCS); - if (lazyIFileIO) - mergeStats(activeStats, lazyIFileIO); + // reset required because within loop below, mergeStats() is used to build up stats for each file + for (auto & fileStatItem: fileStats) + fileStatItem->reset(); + ISuperFileDescriptor *superFDesc = partDescs.item(0).queryOwner().querySuperFileDescriptor(); + for (unsigned partNum=0; partNummapSubPart(partDescs.item(partNum).queryPartIndex(), subfile, lnum)) + continue; // should not happen + } + IKeyManager * keyManager; + { + CriticalBlock b(keyManagersCS); + if (!keyManagers.isItem(partNum)) + continue; + keyManager = &keyManagers.item(partNum); + } + if (fileStats.size()>0) // fileStats used for superfiles where stats are tracked at subfile level + { + CRuntimeStatisticCollection * fileStatItem = fileStats[fileTableStart+subfile]; + keyManager->mergeStats(*fileStatItem); // for file level stats + fileStatItem->merge(contextLoggers[partNum]->queryStats()); + activeStats.merge(*fileStatItem); // for activity level stats + } + else + { + // when just 1 file, merge into activeStats (can use activeStats for file level stats) + keyManager->mergeStats(activeStats); + } + } } + if (fileStats.size()==0) // for non-superfiles, merge jhtree stats into activeStats + activeStats.merge(contextLoggers[0]->queryStats()); activeStats.setStatistic(StNumRowsProcessed, progress); } virtual void serializeStats(MemoryBuffer &mb) override @@ -735,11 +761,7 @@ class CIndexReadSlaveBase : public CSlaveActivity } virtual void done() override { - { - CriticalBlock b(ioStatsCS); - updateStats(); - lazyIFileIO.clear(); - } + lazyIFileIO.clear(); PARENT::done(); } }; @@ -819,7 +841,6 @@ class CIndexReadSlaveActivity : public CIndexReadSlaveBase helper->mapOutputToInput(tempBuilder, seek, numFields); // NOTE - weird interface to mapOutputToInput means that it STARTS writing at seekGEOffset... rawSeek = (byte *)temp; } - CStatsScopedThresholdDeltaUpdater scoped(statsUpdater); if (!currentManager->lookupSkip(rawSeek, seekGEOffset, seekSize)) return NULL; const byte *row = currentManager->queryKeyBuffer(); @@ -972,7 +993,6 @@ class CIndexReadSlaveActivity : public CIndexReadSlaveBase // IRowStream virtual void stop() override { - CStatsScopedDeltaUpdater scoped(statsUpdater); if (RCMAX != keyedLimit) // NB: will not be true if nextRow() has handled { keyedLimitCount = sendGetCount(keyedProcessed); @@ -1142,7 +1162,6 @@ class CIndexGroupAggregateSlaveActivity : public CIndexReadSlaveBase, implements if (keyManager) prepareManager(keyManager); - CStatsScopedThresholdDeltaUpdater scoped(statsUpdater); while (true) { const void *key = indexInput->nextKey(); @@ -1301,7 +1320,6 @@ class CIndexCountSlaveActivity : public CIndexReadSlaveBase if (keyManager) prepareManager(keyManager); - CStatsScopedThresholdDeltaUpdater scoped(statsUpdater); while (true) { const void *key = indexInput->nextKey();