From bf09241141acd68f4203801f1dce3f283369f699 Mon Sep 17 00:00:00 2001 From: Shamser Ahmed Date: Fri, 27 Sep 2024 16:33:31 +0100 Subject: [PATCH] HPCC-32480 Capture "look ahead" timings for unordered concat (parallel funnel) Signed-off-by: Shamser Ahmed --- thorlcr/activities/funnel/thfunnelslave.cpp | 32 +++++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/thorlcr/activities/funnel/thfunnelslave.cpp b/thorlcr/activities/funnel/thfunnelslave.cpp index 3cd75fdfb89..17a04125da6 100644 --- a/thorlcr/activities/funnel/thfunnelslave.cpp +++ b/thorlcr/activities/funnel/thfunnelslave.cpp @@ -80,18 +80,24 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface unsigned numRows = 0; try { - funnel.activity.startInput(inputIndex); + { + LookAheadTimer timer(funnel.activity.getActivityTimerAccumulator(), funnel.activity.queryTimeActivities()); + funnel.activity.startInput(inputIndex); + } started = true; inputStream = funnel.activity.queryInputStream(inputIndex); while (!stopping) { numRows = 0; - for (;numRows < chunkSize; numRows++) { - const void * row = inputStream->ungroupedNextRow(); - if (!row) - break; - rows[numRows] = row; + LookAheadTimer timer(funnel.activity.getActivityTimerAccumulator(), funnel.activity.queryTimeActivities()); + for (;numRows < chunkSize; numRows++) + { + const void * row = inputStream->ungroupedNextRow(); + if (!row) + break; + rows[numRows] = row; + } } if (numRows == 0) break; @@ -141,13 +147,12 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface Linked serializer; void push(const void *row) - { + { size32_t rowSize = thorRowMemoryFootprint(serializer, row); bool waitForSpace = false; // only allow a single writer at a time, so only a single thread is waiting on the semaphore - otherwise signal() takes a very long time { - CriticalBlock b(crit); // will mean first 'push' could block on fullSem, others on this crit. if (stopped) { @@ -376,7 +381,16 @@ class FunnelSlaveActivity : public CSlaveActivity auto startInputNFunc = [&](unsigned i) { - try { startInput(i); } + try + { + if (i == 0) // 1st input is started synchronously, so time already included in start() timing. + startInput(i); + else + { + LookAheadTimer timer(slaveTimerStats, timeActivities); + startInput(i); + } + } catch (CATCHALL) { ActPrintLog("FUNNEL(%" ACTPF "d): Error staring input %d", container.queryId(), i);