diff --git a/dali/base/dadfs.cpp b/dali/base/dadfs.cpp index e22e5c7b487..7b008f187b4 100644 --- a/dali/base/dadfs.cpp +++ b/dali/base/dadfs.cpp @@ -2912,7 +2912,7 @@ class CDistributedFileBase : implements INTERFACE, public CInterface if (history) queryAttributes().removeTree(history); } - void lockFileAttrLock(CFileAttrLock & attrLock) + virtual void lockFileAttrLock(CFileAttrLock & attrLock) { if (!attrLock.init(logicalName, DXB_File, RTM_LOCK_WRITE, conn, defaultTimeout, "CDistributedFile::lockFileAttrLock")) { @@ -6482,6 +6482,19 @@ class CDistributedSuperFile: public CDistributedFileBase return new cSubFileIterator(subfiles,supersub); } + virtual void lockFileAttrLock(CFileAttrLock & attrLock) override + { + if (!attrLock.init(logicalName, DXB_SuperFile, RTM_LOCK_WRITE, conn, defaultTimeout, "CDistributedFile::lockFileAttrLock")) + { + // In unlikely event File/Attr doesn't exist, must ensure created, commited and root connection is reloaded. + verifyex(attrLock.init(logicalName, DXB_SuperFile, RTM_LOCK_WRITE|RTM_CREATE_QUERY, conn, defaultTimeout, "CDistributedFile::lockFileAttrLock")); + attrLock.commit(); + conn->commit(); + conn->reload(); + root.setown(conn->getRoot()); + } + } + void updateFileAttrs() { if (subfiles.ordinality()==0) { diff --git a/dali/dfu/dfuutil.cpp b/dali/dfu/dfuutil.cpp index 6f7bb178190..6cb0cd1ceee 100644 --- a/dali/dfu/dfuutil.cpp +++ b/dali/dfu/dfuutil.cpp @@ -530,13 +530,18 @@ class CFileCloner if (iskey&&!cluster2.isEmpty()) dstfdesc->addCluster(cluster2,grp2,spec2); - for (unsigned pn=0; pnqueryPart(pn)->queryProperties().getPropInt64("@size",-1); + for (unsigned pn=0; pnqueryPart(pn)->queryProperties(); + IPropertyTree &dstProps = dstfdesc->queryPart(pn)->queryProperties(); + offset_t sz = srcProps.getPropInt64("@size",-1); if (sz!=(offset_t)-1) - dstfdesc->queryPart(pn)->queryProperties().setPropInt64("@size",sz); + dstProps.setPropInt64("@size",sz); StringBuffer dates; - if (srcfdesc->queryPart(pn)->queryProperties().getProp("@modified",dates)) - dstfdesc->queryPart(pn)->queryProperties().setProp("@modified",dates.str()); + if (srcProps.getProp("@modified",dates)) + dstProps.setProp("@modified",dates.str()); + if (srcProps.hasProp("@kind")) + dstProps.setProp("@kind", srcProps.queryProp("@kind")); } if (!copyphysical) //cloneFrom tells roxie where to copy from.. it's unnecessary if we already did the copy diff --git a/ecl/eclcc/eclcc.cpp b/ecl/eclcc/eclcc.cpp index 2aba8307f8c..9b1a959b28d 100644 --- a/ecl/eclcc/eclcc.cpp +++ b/ecl/eclcc/eclcc.cpp @@ -1535,8 +1535,11 @@ void EclCC::processSingleQuery(const EclRepositoryManager & localRepositoryManag updateWorkunitStat(instance.wu, SSToperation, ">compile:>parse", StTimeElapsed, NULL, parseTimeNs); stat_type sourceDownloadTime = localRepositoryManager.getStatistic(StTimeElapsed); + stat_type sourceDownloadBlockedTime = localRepositoryManager.getStatistic(StTimeBlocked); if (sourceDownloadTime) updateWorkunitStat(instance.wu, SSToperation, ">compile:>parse:>download", StTimeElapsed, NULL, sourceDownloadTime); + if (sourceDownloadBlockedTime) + updateWorkunitStat(instance.wu, SSToperation, ">compile:>parse:>download", StTimeBlocked, NULL, sourceDownloadBlockedTime); if (optExtraStats) { diff --git a/ecl/hql/hqlrepository.cpp b/ecl/hql/hqlrepository.cpp index 06692728bb1..1c0d2507a21 100644 --- a/ecl/hql/hqlrepository.cpp +++ b/ecl/hql/hqlrepository.cpp @@ -676,6 +676,8 @@ unsigned __int64 EclRepositoryManager::getStatistic(StatisticKind kind) const { case StTimeElapsed: return cycle_to_nanosec(gitDownloadCycles); + case StTimeBlocked: + return cycle_to_nanosec(gitDownloadBlockedCycles); } return 0; } @@ -823,7 +825,12 @@ IEclSourceCollection * EclRepositoryManager::resolveGitCollection(const char * r throw makeStringExceptionV(99, "Unsupported repository link format '%s'", defaultUrl); bool alreadyExists = false; + + CCycleTimer gitDownloadTimer; Owned gitUpdateLock(getGitUpdateLock(repoPath)); + cycle_t blockedCycles = gitDownloadTimer.elapsedCycles(); + gitDownloadBlockedCycles += blockedCycles; + if (checkDirExists(repoPath)) { if (options.cleanRepos) @@ -853,7 +860,6 @@ IEclSourceCollection * EclRepositoryManager::resolveGitCollection(const char * r bool ok = false; Owned error; - CCycleTimer gitDownloadTimer; if (alreadyExists) { if (options.updateRepos) @@ -890,7 +896,8 @@ IEclSourceCollection * EclRepositoryManager::resolveGitCollection(const char * r //this could become a read/write lock if that proved to be an issue. gitUpdateLock.clear(); - gitDownloadCycles += gitDownloadTimer.elapsedCycles(); + gitDownloadCycles += (gitDownloadTimer.elapsedCycles() - blockedCycles); + if (error) { if (errorReceiver) diff --git a/ecl/hql/hqlrepository.hpp b/ecl/hql/hqlrepository.hpp index 071139fe2da..3d4e5e1317d 100644 --- a/ecl/hql/hqlrepository.hpp +++ b/ecl/hql/hqlrepository.hpp @@ -105,6 +105,7 @@ class HQL_API EclRepositoryManager IArrayOf overrideSources; // -D options IArrayOf allSources; // also includes -D options cycle_t gitDownloadCycles = 0; + cycle_t gitDownloadBlockedCycles = 0; //Include all options in a nested struct to make it easy to ensure they are cloned struct { diff --git a/esp/src/src-react/components/Metrics.tsx b/esp/src/src-react/components/Metrics.tsx index 3d1f91cc2bd..65996c00a9f 100644 --- a/esp/src/src-react/components/Metrics.tsx +++ b/esp/src/src-react/components/Metrics.tsx @@ -56,6 +56,9 @@ export const Metrics: React.FunctionComponent = ({ selection, fullscreen = false }) => { + if (querySet && queryId) { + wuid = ""; + } const [_uiState, _setUIState] = React.useState({ ...defaultUIState }); const [selectedMetricsSource, setSelectedMetricsSource] = React.useState(""); const [selectedMetrics, setSelectedMetrics] = React.useState([]); diff --git a/esp/src/src-react/hooks/metrics.ts b/esp/src/src-react/hooks/metrics.ts index 2b0a47687f2..6afbcff8f32 100644 --- a/esp/src/src-react/hooks/metrics.ts +++ b/esp/src/src-react/hooks/metrics.ts @@ -5,6 +5,7 @@ import { scopedLogger } from "@hpcc-js/util"; import { singletonHook } from "react-singleton-hook"; import { userKeyValStore } from "src/KeyValStore"; import { DockPanelLayout } from "../layouts/DockPanel"; +import { singletonDebounce } from "../util/throttle"; import { useWorkunit } from "./workunit"; import { useQuery } from "./query"; import { useCounter } from "./util"; @@ -214,6 +215,8 @@ function useMetricsViewsImpl(): useMetricsViewsResult { export const useMetricsViews = singletonHook(defaultState, useMetricsViewsImpl); +let wuDetailsMetaResponse: Promise; + export function useMetricMeta(): [string[], string[]] { const service = useConst(() => new WorkunitsService({ baseUrl: "" })); @@ -221,7 +224,10 @@ export function useMetricMeta(): [string[], string[]] { const [properties, setProperties] = React.useState([]); React.useEffect(() => { - service?.WUDetailsMeta({}).then(response => { + if (!wuDetailsMetaResponse && service) { + wuDetailsMetaResponse = service.WUDetailsMeta({}); + } + wuDetailsMetaResponse?.then(response => { setScopeTypes(response?.ScopeTypes?.ScopeType || []); setProperties((response?.Properties?.Property.map(p => p.Name) || []).sort()); }); @@ -274,45 +280,48 @@ export function useWorkunitMetrics( const [count, increment] = useCounter(); React.useEffect(() => { - setStatus(FetchStatus.STARTED); - workunit?.fetchDetailsNormalized({ - ScopeFilter: scopeFilter, - NestedFilter: nestedFilter, - PropertiesToReturn: { - AllScopes: true, - AllAttributes: true, - AllProperties: true, - AllNotes: true, - AllStatistics: true, - AllHints: true - }, - ScopeOptions: { - IncludeId: true, - IncludeScope: true, - IncludeScopeType: true, - IncludeMatchedScopesInResults: true - }, - PropertyOptions: { - IncludeName: true, - IncludeRawValue: true, - IncludeFormatted: true, - IncludeMeasure: true, - IncludeCreator: false, - IncludeCreatorType: false - } - }).then(response => { - setData(response?.data); - setColumns(response?.columns); - setActivities(response?.meta?.Activities?.Activity || []); - setProperties(response?.meta?.Properties?.Property || []); - setMeasures(response?.meta?.Measures?.Measure || []); - setScopeTypes(response?.meta?.ScopeTypes?.ScopeType || []); - }).catch(e => { - logger.error(e); - }).finally(() => { - setStatus(FetchStatus.COMPLETE); - }); - }, [workunit, state, count, scopeFilter, nestedFilter]); + if (wuid && workunit) { + const fetchDetailsNormalized = singletonDebounce(workunit, "fetchDetailsNormalized"); + setStatus(FetchStatus.STARTED); + fetchDetailsNormalized({ + ScopeFilter: scopeFilter, + NestedFilter: nestedFilter, + PropertiesToReturn: { + AllScopes: true, + AllAttributes: true, + AllProperties: true, + AllNotes: true, + AllStatistics: true, + AllHints: true + }, + ScopeOptions: { + IncludeId: true, + IncludeScope: true, + IncludeScopeType: true, + IncludeMatchedScopesInResults: true + }, + PropertyOptions: { + IncludeName: true, + IncludeRawValue: true, + IncludeFormatted: true, + IncludeMeasure: true, + IncludeCreator: false, + IncludeCreatorType: false + } + }).then(response => { + setData(response?.data); + setColumns(response?.columns); + setActivities(response?.meta?.Activities?.Activity || []); + setProperties(response?.meta?.Properties?.Property || []); + setMeasures(response?.meta?.Measures?.Measure || []); + setScopeTypes(response?.meta?.ScopeTypes?.ScopeType || []); + }).catch(e => { + logger.error(e); + }).finally(() => { + setStatus(FetchStatus.COMPLETE); + }); + } + }, [workunit, state, count, scopeFilter, nestedFilter, wuid]); return { metrics: data, columns, activities, properties, measures, scopeTypes, status, refresh: increment }; } @@ -335,45 +344,48 @@ export function useQueryMetrics( const [count, increment] = useCounter(); React.useEffect(() => { - setStatus(FetchStatus.STARTED); - query?.fetchDetailsNormalized({ - ScopeFilter: scopeFilter, - NestedFilter: nestedFilter, - PropertiesToReturn: { - AllScopes: true, - AllAttributes: true, - AllProperties: true, - AllNotes: true, - AllStatistics: true, - AllHints: true - }, - ScopeOptions: { - IncludeId: true, - IncludeScope: true, - IncludeScopeType: true, - IncludeMatchedScopesInResults: true - }, - PropertyOptions: { - IncludeName: true, - IncludeRawValue: true, - IncludeFormatted: true, - IncludeMeasure: true, - IncludeCreator: false, - IncludeCreatorType: false - } - }).then(response => { - setData(response?.data); - setColumns(response?.columns); - setActivities(response?.meta?.Activities?.Activity || []); - setProperties(response?.meta?.Properties?.Property || []); - setMeasures(response?.meta?.Measures?.Measure || []); - setScopeTypes(response?.meta?.ScopeTypes?.ScopeType || []); - }).catch(e => { - logger.error(e); - }).finally(() => { - setStatus(FetchStatus.COMPLETE); - }); - }, [query, state, count, scopeFilter, nestedFilter]); + if (querySet && queryId && query) { + const fetchDetailsNormalized = singletonDebounce(query, "fetchDetailsNormalized"); + setStatus(FetchStatus.STARTED); + fetchDetailsNormalized({ + ScopeFilter: scopeFilter, + NestedFilter: nestedFilter, + PropertiesToReturn: { + AllScopes: true, + AllAttributes: true, + AllProperties: true, + AllNotes: true, + AllStatistics: true, + AllHints: true + }, + ScopeOptions: { + IncludeId: true, + IncludeScope: true, + IncludeScopeType: true, + IncludeMatchedScopesInResults: true + }, + PropertyOptions: { + IncludeName: true, + IncludeRawValue: true, + IncludeFormatted: true, + IncludeMeasure: true, + IncludeCreator: false, + IncludeCreatorType: false + } + }).then(response => { + setData(response?.data); + setColumns(response?.columns); + setActivities(response?.meta?.Activities?.Activity || []); + setProperties(response?.meta?.Properties?.Property || []); + setMeasures(response?.meta?.Measures?.Measure || []); + setScopeTypes(response?.meta?.ScopeTypes?.ScopeType || []); + }).catch(e => { + logger.error(e); + }).finally(() => { + setStatus(FetchStatus.COMPLETE); + }); + } + }, [query, state, count, scopeFilter, nestedFilter, querySet, queryId]); return { metrics: data, columns, activities, properties, measures, scopeTypes, status, refresh: increment }; } @@ -385,7 +397,8 @@ export function useWUQueryMetrics( scopeFilter: Partial = scopeFilterDefault, nestedFilter: WsWorkunits.NestedFilter = nestedFilterDefault ): useMetricsResult { - const wuMetrics = useWorkunitMetrics(wuid, scopeFilter, nestedFilter); - const queryMetrics = useQueryMetrics(querySet, queryId, scopeFilter, nestedFilter); - return querySet && queryId ? { ...queryMetrics } : { ...wuMetrics }; + const isQuery = querySet && queryId; + const wuMetrics = useWorkunitMetrics(isQuery ? "" : wuid, scopeFilter, nestedFilter); + const queryMetrics = useQueryMetrics(isQuery ? querySet : "", isQuery ? queryId : "", scopeFilter, nestedFilter); + return isQuery ? { ...queryMetrics } : { ...wuMetrics }; } diff --git a/esp/src/src-react/hooks/workunit.ts b/esp/src/src-react/hooks/workunit.ts index 3bc1c22314a..089b25d883f 100644 --- a/esp/src/src-react/hooks/workunit.ts +++ b/esp/src/src-react/hooks/workunit.ts @@ -16,7 +16,7 @@ export function useWorkunit(wuid: string, full: boolean = false): [Workunit, WUS const [retVal, setRetVal] = React.useState<{ workunit: Workunit, state: number, lastUpdate: number, isComplete: boolean, refresh: RefreshFunc }>(); React.useEffect(() => { - if (wuid === undefined || wuid === null) { + if (!wuid) { setRetVal({ workunit: undefined, state: WUStateID.NotFound, lastUpdate: Date.now(), isComplete: undefined, refresh: (full?: boolean) => Promise.resolve(undefined) }); return; } diff --git a/fs/dafsserver/dafsserver.cpp b/fs/dafsserver/dafsserver.cpp index 246318a127f..bfd312ea814 100644 --- a/fs/dafsserver/dafsserver.cpp +++ b/fs/dafsserver/dafsserver.cpp @@ -2278,6 +2278,7 @@ class CRemoteIndexReadActivity : public CRemoteIndexBaseActivity Owned translator; unsigned __int64 chooseN = 0; + bool cleanupBlobs = false; public: CRemoteIndexReadActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc) { @@ -2316,6 +2317,12 @@ class CRemoteIndexReadActivity : public CRemoteIndexBaseActivity } dbgassertex(retSz); + if (cleanupBlobs) + { + keyManager->releaseBlobs(); + cleanupBlobs = false; + } + const void *ret = outBuilder.getSelf(); outBuilder.finishRow(retSz); ++processed; @@ -2350,6 +2357,13 @@ class CRemoteIndexReadActivity : public CRemoteIndexBaseActivity { return out.appendf("indexread[%s]", fileName.get()); } + + virtual const byte * lookupBlob(unsigned __int64 id) override + { + size32_t dummy; + cleanupBlobs = true; + return (byte *) keyManager->loadBlob(id, dummy, nullptr); + } }; diff --git a/roxie/ccd/ccdlistener.cpp b/roxie/ccd/ccdlistener.cpp index 85fd1241a4e..19c4bc81b91 100644 --- a/roxie/ccd/ccdlistener.cpp +++ b/roxie/ccd/ccdlistener.cpp @@ -967,7 +967,7 @@ StringBuffer & ContextLogger::getStats(StringBuffer &s) const ids.append(slowestActivityIds[i]); formatStatistic(times, cycle_to_nanosec(slowestActivityTimes[i]), SMeasureTimeNs); } - s.appendf(", slowestActivities={ ids=[%s] times=[%s] }", ids.str(), times.str()); + s.appendf(" slowestActivities={ ids=[%s] times=[%s] }", ids.str(), times.str()); } return s; } diff --git a/roxie/ccd/ccdquery.cpp b/roxie/ccd/ccdquery.cpp index 29c34c9fa2d..c983ebd0067 100644 --- a/roxie/ccd/ccdquery.cpp +++ b/roxie/ccd/ccdquery.cpp @@ -625,9 +625,7 @@ class CQueryFactory : implements IQueryFactory, implements IResourceContext, pub break; } StringBuffer helperName; - node.getProp("att[@name=\"helper\"]/@value", helperName); - if (!helperName.length()) - helperName.append("fAc").append(id); + helperName.append("fAc").append(id); HelperFactory *helperFactory = dll->getFactory(helperName); if (!helperFactory) throw MakeStringException(ROXIE_INTERNAL_ERROR, "Internal error: helper function %s not exported", helperName.str()); @@ -2002,9 +2000,7 @@ class CAgentQueryFactory : public CQueryFactory else { StringBuffer helperName; - node.getProp("att[@name=\"helper\"]/@value", helperName); - if (!helperName.length()) - helperName.append("fAc").append(node.getPropInt("@id", 0)); + helperName.append("fAc").append(node.getPropInt("@id", 0)); HelperFactory *helperFactory = dll->getFactory(helperName.str()); if (!helperFactory) throw MakeStringException(ROXIE_INTERNAL_ERROR, "Internal error: helper function %s not exported", helperName.str()); diff --git a/rtl/eclrtl/rtldynfield.cpp b/rtl/eclrtl/rtldynfield.cpp index 2d8ccd3aa14..5f1e70f3376 100644 --- a/rtl/eclrtl/rtldynfield.cpp +++ b/rtl/eclrtl/rtldynfield.cpp @@ -1690,6 +1690,7 @@ class GeneralRecordTranslator : public CInterfaceOf { const RtlRecord *subDest = destRecInfo.queryNested(idx); const RtlRecord *subSrc = sourceRecInfo.queryNested(info.matchIdx); + assertex(subSrc); info.subTrans = new GeneralRecordTranslator(*subDest, *subSrc, binarySource); if (!info.subTrans->needsTranslate()) { diff --git a/rtl/eclrtl/rtlrecord.cpp b/rtl/eclrtl/rtlrecord.cpp index 4c27b8cd05c..fb06d6b74e4 100644 --- a/rtl/eclrtl/rtlrecord.cpp +++ b/rtl/eclrtl/rtlrecord.cpp @@ -292,6 +292,12 @@ RtlRecord::RtlRecord(const RtlFieldInfo * const *_fields, bool expandFields) : f const RtlTypeInfo *curType = queryType(i); if (!curType->isFixedSize() || (fields[i]->flags & RFTMinifblock)) numVarFields++; + if (curType->isBlob()) + { + curType = curType->queryChildType(); + if (unlikely(!curType)) + throwUnexpectedX("Blob type has no child type"); + } if (curType->getType()==type_table || curType->getType()==type_record || curType->getType()==type_dictionary) numTables++; } @@ -331,6 +337,8 @@ RtlRecord::RtlRecord(const RtlFieldInfo * const *_fields, bool expandFields) : f curVariable++; fixedOffset = 0; } + if (curType->isBlob()) + curType = curType->queryChildType(); switch (curType->getType()) { case type_table: diff --git a/system/jhtree/jhtree.cpp b/system/jhtree/jhtree.cpp index e522d11d208..dbfbb8742b9 100644 --- a/system/jhtree/jhtree.cpp +++ b/system/jhtree/jhtree.cpp @@ -3636,7 +3636,7 @@ class IKeyManagerSlowTest : public CppUnit::TestFixture { const char *json = variable ? "{ \"ty1\": { \"fieldType\": 4, \"length\": 10 }, " - " \"ty2\": { \"fieldType\": 15, \"length\": 8 }, " + " \"ty2\": { \"fieldType\": 15, \"length\": 8, \"child\": \"ty1\" }, " " \"fieldType\": 13, \"length\": 10, " " \"fields\": [ " " { \"name\": \"f1\", \"type\": \"ty1\", \"flags\": 4 }, " @@ -3816,13 +3816,22 @@ class IKeyManagerSlowTest : public CppUnit::TestFixture void testKeys() { - ASSERT(sizeof(CKeyIdAndPos) == sizeof(unsigned __int64) + sizeof(offset_t)); - for (bool var : { true, false }) - for (bool trail : { false, true }) - for (bool noseek : { false, true }) - for (bool quick : { true, false }) - for (const char * compression : { (const char *)nullptr, "POC", "inplace" }) - testKeys(var, trail, noseek, quick, compression); + try + { + ASSERT(sizeof(CKeyIdAndPos) == sizeof(unsigned __int64) + sizeof(offset_t)); + for (bool var : { true, false }) + for (bool trail : { false, true }) + for (bool noseek : { false, true }) + for (bool quick : { true, false }) + for (const char * compression : { (const char *)nullptr, "POC", "inplace" }) + testKeys(var, trail, noseek, quick, compression); + } + catch (IException * e) + { + StringBuffer s; + e->errorMessage(s); + CPPUNIT_ASSERT_MESSAGE(s.str(), false); + } } }; diff --git a/system/jlib/CMakeLists.txt b/system/jlib/CMakeLists.txt index 9fa8175eabc..1a88f03d3a1 100644 --- a/system/jlib/CMakeLists.txt +++ b/system/jlib/CMakeLists.txt @@ -223,6 +223,9 @@ include_directories ( ${CMAKE_BINARY_DIR}/oss ) +# ensure httplib uses poll rather than select - otherwise it fail if too many sockets have been opened. +ADD_DEFINITIONS( -DCPPHTTPLIB_USE_POLL ) + ADD_DEFINITIONS( -D_USRDLL -DJLIB_EXPORTS ) HPCC_ADD_LIBRARY( jlib SHARED ${SRCS} ${INCLUDES} ) diff --git a/system/metrics/sinks/prometheus/CMakeLists.txt b/system/metrics/sinks/prometheus/CMakeLists.txt index c2454e1eadc..f2a89ad3f2d 100644 --- a/system/metrics/sinks/prometheus/CMakeLists.txt +++ b/system/metrics/sinks/prometheus/CMakeLists.txt @@ -30,6 +30,9 @@ include_directories( ${HPCC_SOURCE_DIR}/system/httplib ) +# ensure httplib uses poll rather than select - otherwise it fail if too many sockets have been opened. +ADD_DEFINITIONS( -DCPPHTTPLIB_USE_POLL ) + SET (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${STRICT_CXX_FLAGS}") ADD_DEFINITIONS( -DPROMETHEUSSINK_EXPORTS ) HPCC_ADD_LIBRARY( hpccmetrics_prometheussink SHARED ${srcs} ) diff --git a/thorlcr/activities/hashdistrib/thhashdistrib.cpp b/thorlcr/activities/hashdistrib/thhashdistrib.cpp index b2c8a42e50f..36c2cba1194 100644 --- a/thorlcr/activities/hashdistrib/thhashdistrib.cpp +++ b/thorlcr/activities/hashdistrib/thhashdistrib.cpp @@ -26,6 +26,7 @@ #include "thorport.hpp" #include "thbufdef.hpp" #include "thexception.hpp" +#include "thormisc.hpp" #define NUMINPARALLEL 16 @@ -115,7 +116,7 @@ class IndexDistributeActivityMaster : public HashDistributeMasterBase checkFormatCrc(this, file, helper->getFormatCrc(), nullptr, helper->getFormatCrc(), nullptr, true); Owned fileDesc = file->getFileDescriptor(); Owned tlkDesc = fileDesc->getPart(fileDesc->numParts()-1); - if (!tlkDesc->queryProperties().hasProp("@kind") || 0 != stricmp("topLevelKey", tlkDesc->queryProperties().queryProp("@kind"))) + if (!hasTLK(*file, this)) throw MakeActivityException(this, 0, "Cannot distribute using a non-distributed key: '%s'", scoped.str()); unsigned location; OwnedIFile iFile; diff --git a/thorlcr/activities/indexwrite/thindexwrite.cpp b/thorlcr/activities/indexwrite/thindexwrite.cpp index 1a9f5894a87..b9c2963ff8a 100644 --- a/thorlcr/activities/indexwrite/thindexwrite.cpp +++ b/thorlcr/activities/indexwrite/thindexwrite.cpp @@ -159,9 +159,9 @@ class IndexWriteActivityMaster : public CMasterActivity checkFormatCrc(this, _f, helper->getFormatCrc(), nullptr, helper->getFormatCrc(), nullptr, true); IDistributedFile *f = _f->querySuperFile(); if (!f) f = _f; - Owned existingTlk = f->getPart(f->numParts()-1); - if (!existingTlk->queryAttributes().hasProp("@kind") || 0 != stricmp("topLevelKey", existingTlk->queryAttributes().queryProp("@kind"))) + if (!hasTLK(*f, this)) throw MakeActivityException(this, 0, "Cannot build new key '%s' based on non-distributed key '%s'", fileName.get(), diName.get()); + Owned existingTlk = f->getPart(f->numParts()-1); IPartDescriptor *tlkDesc = fileDesc->queryPart(fileDesc->numParts()-1); IPropertyTree &props = tlkDesc->queryProperties(); if (existingTlk->queryAttributes().hasProp("@size")) diff --git a/thorlcr/activities/keydiff/thkeydiff.cpp b/thorlcr/activities/keydiff/thkeydiff.cpp index 8f688ee8abd..394e6b787f5 100644 --- a/thorlcr/activities/keydiff/thkeydiff.cpp +++ b/thorlcr/activities/keydiff/thkeydiff.cpp @@ -71,10 +71,7 @@ class CKeyDiffMaster : public CMasterActivity originalDesc.setown(originalIndexFile->getFileDescriptor()); newIndexDesc.setown(newIndexFile->getFileDescriptor()); - Owned tlkDesc = originalDesc->getPart(originalDesc->numParts()-1); - const char *kind = tlkDesc->queryProperties().queryProp("@kind"); - local = NULL == kind || 0 != stricmp("topLevelKey", kind); - + local = !hasTLK(*originalIndexFile, this); if (!local) width--; // 1 part == No n distributed / Monolithic key if (width > container.queryJob().querySlaves()) diff --git a/thorlcr/activities/keyedjoin/thkeyedjoin.cpp b/thorlcr/activities/keyedjoin/thkeyedjoin.cpp index 38fabfca861..af5d197721f 100644 --- a/thorlcr/activities/keyedjoin/thkeyedjoin.cpp +++ b/thorlcr/activities/keyedjoin/thkeyedjoin.cpp @@ -112,20 +112,21 @@ class CKeyedJoinMaster : public CMasterActivity unsigned numParts = fileDesc->numParts(); unsigned nextGroupStartPos = 0; + IDistributedFile *subFile = file; for (unsigned p=0; pqueryPart(p); - const char *kind = isIndexWithTlk ? part->queryProperties().queryProp("@kind") : nullptr; - if (!kind || !strsame("topLevelKey", kind)) + unsigned partIdx = part->queryPartIndex(); + unsigned subFileNum = NotFound; + unsigned subPartIdx = partIdx; + if (superFileDesc) + { + superFileDesc->mapSubPart(partIdx, subFileNum, subPartIdx); + partIdx = superWidth*subFileNum+subPartIdx; + subFile = &super->querySubFile(subFileNum, true); + } + if (!isIndexWithTlk || (1 == numParts) || (subPartIdx < (subFile->numParts()-1)) || !hasTLK(*subFile, nullptr)) { - unsigned partIdx = part->queryPartIndex(); - unsigned subfile = NotFound; - unsigned subPartIdx = partIdx; - if (superFileDesc) - { - superFileDesc->mapSubPart(partIdx, subfile, subPartIdx); - partIdx = superWidth*subfile+subPartIdx; - } if (activity.local) { if (activity.queryContainer().queryLocalData()) @@ -234,7 +235,7 @@ class CKeyedJoinMaster : public CMasterActivity slaveParts.push_back(p); } if (superFileDesc) - partIdx = superWidth*subfile+subPartIdx; + partIdx = superWidth*subFileNum+subPartIdx; partsByPartIdx.push_back(partIdx); assertex(partIdx < totalParts); partToSlave[partIdx] = mappedPos; @@ -387,10 +388,7 @@ class CKeyedJoinMaster : public CMasterActivity ForEach(*iter) { IDistributedFile &f = iter->query(); - unsigned np = f.numParts()-1; - IDistributedFilePart &part = f.queryPart(np); - const char *kind = part.queryAttributes().queryProp("@kind"); - bool hasTlk = NULL != kind && 0 == stricmp("topLevelKey", kind); // if last part not tlk, then deemed local (might be singlePartKey) + bool hasTlk = hasTLK(f, this); if (first) { first = false; @@ -419,8 +417,7 @@ class CKeyedJoinMaster : public CMasterActivity totalIndexParts = indexFile->numParts(); if (totalIndexParts) { - const char *kind = indexFile->queryPart(indexFile->numParts()-1).queryAttributes().queryProp("@kind"); - keyHasTlk = NULL != kind && 0 == stricmp("topLevelKey", kind); + keyHasTlk = hasTLK(*indexFile, this); if (keyHasTlk) --totalIndexParts; } diff --git a/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp b/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp index 8844a99b7f3..e3710511abe 100644 --- a/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp +++ b/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp @@ -1445,7 +1445,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem { unsigned partNo = partCopy & partMask; unsigned copy = partCopy >> partBits; - Owned keyIndex = activity.createPartKeyIndex(partNo, copy, false); + Owned keyIndex = activity.createPartKeyIndex(partNo, copy); partKeySet->addIndex(keyIndex.getClear()); } keyManager = createKeyMerger(helper->queryIndexRecordSize()->queryRecordAccessor(true), partKeySet, 0, &contextLogger, helper->hasNewSegmentMonitors(), false); @@ -2454,7 +2454,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } return tlkKeyIndexes.ordinality(); } - IKeyIndex *createPartKeyIndex(unsigned partNo, unsigned copy, bool delayed) + IKeyIndex *createPartKeyIndex(unsigned partNo, unsigned copy) { IPartDescriptor &filePart = allIndexParts.item(partNo); unsigned crc=0; @@ -2464,25 +2464,16 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem StringBuffer filename; rfn.getPath(filename); - if (delayed) - { - Owned lazyIFileIO = queryThor().queryFileCache().lookupIFileIO(*this, indexName, filePart, nullptr); - Owned delayedFile = createDelayedFile(lazyIFileIO); - return createKeyIndex(filename, crc, *delayedFile, (unsigned) -1, false, 0); - } - else - { - /* NB: createKeyIndex here, will load the key immediately - * But that's okay, because we are only here on demand. - * The underlying IFileIO can later be closed by fhe file caching mechanism. - */ - Owned lazyIFileIO = queryThor().queryFileCache().lookupIFileIO(*this, indexName, filePart, nullptr); - return createKeyIndex(filename, crc, *lazyIFileIO, (unsigned) -1, false, 0); - } + /* NB: createKeyIndex here, will load the key immediately + * But that's okay, because we are only here on demand. + * The underlying IFileIO can later be closed by fhe file caching mechanism. + */ + Owned lazyIFileIO = queryThor().queryFileCache().lookupIFileIO(*this, indexName, filePart, nullptr); + return createKeyIndex(filename, crc, *lazyIFileIO, (unsigned) -1, false, 0); } IKeyManager *createPartKeyManager(unsigned partNo, unsigned copy, IContextLogger *ctx) { - Owned keyIndex = createPartKeyIndex(partNo, copy, false); + Owned keyIndex = createPartKeyIndex(partNo, copy); return createLocalKeyManager(helper->queryIndexRecordSize()->queryRecordAccessor(true), keyIndex, ctx, helper->hasNewSegmentMonitors(), false); } const void *preparePendingLookupRow(void *row, size32_t maxSz, const void *lhsRow, size32_t keySz) diff --git a/thorlcr/activities/keypatch/thkeypatch.cpp b/thorlcr/activities/keypatch/thkeypatch.cpp index 3f279cbe82c..50cb4ec0354 100644 --- a/thorlcr/activities/keypatch/thkeypatch.cpp +++ b/thorlcr/activities/keypatch/thkeypatch.cpp @@ -71,11 +71,7 @@ class CKeyPatchMaster : public CMasterActivity originalDesc.setown(originalIndexFile->getFileDescriptor()); patchDesc.setown(patchFile->getFileDescriptor()); - - Owned tlkDesc = originalDesc->getPart(originalDesc->numParts()-1); - const char *kind = tlkDesc->queryProperties().queryProp("@kind"); - local = NULL == kind || 0 != stricmp("topLevelKey", kind); - + local = !hasTLK(*originalIndexFile, this); if (!local && width > 1) width--; // 1 part == No n distributed / Monolithic key if (width > container.queryJob().querySlaves()) diff --git a/thorlcr/thorutil/thbuf.cpp b/thorlcr/thorutil/thbuf.cpp index 96915818937..b25879ee69e 100644 --- a/thorlcr/thorutil/thbuf.cpp +++ b/thorlcr/thorutil/thbuf.cpp @@ -1272,6 +1272,10 @@ class CRowSet : public CSimpleInterface, implements IInterface { return rows.get(r); } + inline bool isFull() const + { + return rows.isFull(); + } }; class Chunk : public CInterface @@ -1463,8 +1467,7 @@ class CSharedWriteAheadBase : public CSimpleInterface, implements ISharedSmartBu } } rowsRead++; - const void *retrow = rowSet->getRow(row++); - return retrow; + return rowSet->getRow(row++); } virtual void stop() { @@ -1693,7 +1696,8 @@ class CSharedWriteAheadBase : public CSimpleInterface, implements ISharedSmartBu unsigned len=rowSize(row); CriticalBlock b(crit); bool paged = false; - if (totalOutChunkSize >= minChunkSize) // chunks required to be at least minChunkSize + // NB: The isFull condition ensures that we never expand inMemRows, which would cause a race with readers reading same set + if (totalOutChunkSize >= minChunkSize || inMemRows->isFull()) // chunks required to be at least minChunkSize, or if hits max capacity { unsigned reader=anyReaderBehind(); if (NotFound != reader) diff --git a/thorlcr/thorutil/thmem.hpp b/thorlcr/thorutil/thmem.hpp index a89d4cdd0f2..f642481f49b 100644 --- a/thorlcr/thorutil/thmem.hpp +++ b/thorlcr/thorutil/thmem.hpp @@ -327,7 +327,8 @@ class graph_decl CThorExpandingRowArray : public CSimpleInterface if (!resize(numRows+1)) return false; } - rows[numRows++] = row; + rows[numRows] = row; + numRows++; return true; } bool binaryInsert(const void *row, ICompare &compare, bool dropLast=false); // NB: takes ownership on success @@ -356,6 +357,7 @@ class graph_decl CThorExpandingRowArray : public CSimpleInterface } inline rowidx_t ordinality() const { return numRows; } inline rowidx_t queryMaxRows() const { return maxRows; } + inline bool isFull() const { return numRows >= maxRows; } inline const void **getRowArray() { return rows; } void swap(CThorExpandingRowArray &src); diff --git a/thorlcr/thorutil/thormisc.cpp b/thorlcr/thorutil/thormisc.cpp index 4273da5a7ee..3144eabcdd6 100644 --- a/thorlcr/thorutil/thormisc.cpp +++ b/thorlcr/thorutil/thormisc.cpp @@ -29,6 +29,8 @@ #include "jsocket.hpp" #include "jmutex.hpp" +#include "jhtree.hpp" + #include "commonext.hpp" #include "dadfs.hpp" #include "dasds.hpp" @@ -1715,3 +1717,35 @@ void saveWuidToFile(const char *wuid) wuidFileIO->write(0, strlen(wuid), wuid); wuidFileIO->close(); } + +bool hasTLK(IDistributedFile &file, CActivityBase *activity) +{ + unsigned np = file.numParts(); + if (np<=1) // NB: a better test would be to only continue if this is a width that is +1 of group it's based on, but not worth checking + return false; + IDistributedFilePart &part = file.queryPart(np-1); + bool keyHasTlk = strisame("topLevelKey", part.queryAttributes().queryProp("@kind")); + if (!keyHasTlk) + { + // See HPCC-32845 - check if TLK flag is missing from TLK part + // It is very likely the last part should be a TLK. Even a local key (>1 parts) has a TLK by default (see buildLocalTlks) + RemoteFilename rfn; + part.getFilename(rfn); + StringBuffer filename; + rfn.getPath(filename); + Owned index = createKeyIndex(filename, 0, false, 0); + dbgassertex(index); + if (index->isTopLevelKey()) + { + if (activity) + { + Owned e = MakeActivityException(activity, 0, "TLK file part of file %s is missing kind=\"topLevelKey\" flag. The meta data should be fixed!", file.queryLogicalName()); + reportExceptionToWorkunitCheckIgnore(activity->queryJob().queryWorkUnit(), e, SeverityWarning); + StringBuffer errMsg; + UWARNLOG("%s", e->errorMessage(errMsg).str()); + } + keyHasTlk = true; + } + } + return keyHasTlk; +} diff --git a/thorlcr/thorutil/thormisc.hpp b/thorlcr/thorutil/thormisc.hpp index 174dc5d6f1f..50eb604df48 100644 --- a/thorlcr/thorutil/thormisc.hpp +++ b/thorlcr/thorutil/thormisc.hpp @@ -723,4 +723,6 @@ class graph_decl CThorPerfTracer : protected PerfTracer extern graph_decl void saveWuidToFile(const char *wuid); +extern graph_decl bool hasTLK(IDistributedFile &file, CActivityBase *activity); + #endif