Skip to content

Commit

Permalink
Merge pull request #19414 from ghalliday/issue33244
Browse files Browse the repository at this point in the history
HPCC-33244 Extend BlockedTimeTracker to allow concurrent query load to be estimated

Reviewed-by: Mark Kelly mark.kelly@lexisnexisrisk.com
Merged-by: Gavin Halliday <ghalliday@hpccsystems.com>
  • Loading branch information
ghalliday authored Jan 20, 2025
2 parents dbabf12 + 7b42ecc commit d897955
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 14 deletions.
51 changes: 40 additions & 11 deletions system/jlib/jdebug.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -667,18 +667,34 @@ MODULE_EXIT()
//===========================================================================


void BlockedTimeTracker::noteWaiting()
//Calculate how much time has elapsed total - with the given number of threads still active
__uint64 BlockedTimeTracker::calcActiveTime(cycle_t tally, unsigned active) const
{
if (active != 0)
{
cycle_t now = get_cycles_now();
tally += active * now;
}

return cycle_to_nanosec(tally);
}

cycle_t BlockedTimeTracker::noteWaiting()
{
cycle_t now = get_cycles_now();
CriticalBlock block(cs);
numWaiting++;
timeStampTally -= get_cycles_now();
numStarted++;
timeStampTally -= now;
return now;
}

void BlockedTimeTracker::noteComplete()
cycle_t BlockedTimeTracker::noteComplete()
{
cycle_t now = get_cycles_now();
CriticalBlock block(cs);
numWaiting--;
timeStampTally += get_cycles_now();
numFinished++;
timeStampTally += now;
return now;
}

__uint64 BlockedTimeTracker::getWaitingNs() const
Expand All @@ -687,17 +703,30 @@ __uint64 BlockedTimeTracker::getWaitingNs() const
cycle_t tally;
{
CriticalBlock block(cs);
active = numWaiting;
active = numStarted - numFinished;
tally = timeStampTally;
}

if (active != 0)
return calcActiveTime(tally, active);
}


void BlockedTimeTracker::extractOverlapInfo(OverlapTimeInfo & info, bool isStart) const
{
unsigned started;
unsigned finished;
cycle_t tally;

{
cycle_t now = get_cycles_now();
tally += active * now;
CriticalBlock block(cs);
started = numStarted;
finished = numFinished;
tally = timeStampTally;
}

return cycle_to_nanosec(tally);
//Record so that when counts are subtracted, the total count will include all jobs that overlapped in any part
info.count = isStart ? finished : started;
info.elapsedNs = calcActiveTime(tally, started - finished);
}


Expand Down
41 changes: 38 additions & 3 deletions system/jlib/jdebug.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,21 +307,56 @@ Also since you are only ever interested in (sumEndTimestamps - sumStartTimestamp
There are two versions, one that uses a critical section, and a second that uses atomics, but is limited to the number
of active blocked items.
There is a second potential use for the BlockedTimeTracker class - for monitoring how many queries overlap the execution of the current query.
This can be calculated in the following way:
a) When a server query starts it calls
serverLoadTracker.extractInfo(startServer, true)
workerLoadTracker.extractInfo(startWorker, true);
startCycles = serverLoadTracker.noteWaiting();
b) When a server query finished it calls
serverLoadTracker.noteComplete();
serverLoadTracker.extractInfo(finishServer, true)
endCycles = workerLoadTracker.extractInfo(finishWorker, true);
c) When a worker query starts it calls
workerLoadTracker.noteWaiting();
d) When a worker query finished it calls
workerLoadTracker.noteComplete();
* (finishX.count - startX.count) gives the total number of queries (including this one) that executed at the same time as the query
* (finishX.elapsedNs - startX.elapsedNs) gives the total execution time of all queries (including this one)
* endCycles - startCycles gives the elapsed time for this query.
* This can be reported separately for the server and worker to give an estimate of the concurrent load when a query was running.
*/

struct OverlapTimeInfo
{
unsigned count;
stat_type elapsedNs;
};

class jlib_decl BlockedTimeTracker
{
public:
BlockedTimeTracker() = default;
BlockedTimeTracker(const BlockedTimeTracker &) = delete;

void noteWaiting();
void noteComplete();
//The following return get_cycles_now() - which can be used for tracking elapsed time.
cycle_t noteWaiting();
cycle_t noteComplete();
__uint64 getWaitingNs() const;

//A helper function to help calculate the overlapping load on the system
void extractOverlapInfo(OverlapTimeInfo & info, bool isStart) const;

protected:
__uint64 calcActiveTime(cycle_t tally, unsigned active) const;

private:
mutable CriticalSection cs;
unsigned numWaiting = 0;
unsigned numStarted = 0;
unsigned numFinished = 0;
cycle_t timeStampTally = 0;
};

Expand Down
48 changes: 48 additions & 0 deletions testing/unittests/jlibtests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4170,6 +4170,7 @@ class BlockedTimingTests : public CppUnit::TestFixture
CPPUNIT_TEST(testStandard3);
CPPUNIT_TEST(testLightweight);
CPPUNIT_TEST(testLightweight2);
CPPUNIT_TEST(testConcurrent);
CPPUNIT_TEST_SUITE_END();

void testStandard()
Expand Down Expand Up @@ -4286,6 +4287,53 @@ class BlockedTimingTests : public CppUnit::TestFixture
if (trace)
DBGLOG("%" I64F "u %" I64F "u", blockTime-expected, postBlockTime-blockTime);
}

inline bool within10Percent(__uint64 expected, __uint64 actual)
{
__uint64 threshold = expected / 10;
return (actual >= expected - threshold) && (actual <= expected + threshold);
}

void testConcurrent()
{
BlockedTimeTracker serverLoadTracker;
BlockedTimeTracker workerLoadTracker;
OverlapTimeInfo serverStartInfo;
OverlapTimeInfo workerStartInfo;
OverlapTimeInfo serverFinishInfo;
OverlapTimeInfo workerFinishInfo;
OverlapTimeInfo dummyInfo;

serverLoadTracker.extractOverlapInfo(serverStartInfo, true);
workerLoadTracker.extractOverlapInfo(workerStartInfo, true);
cycle_t startCycles = serverLoadTracker.noteWaiting();

MilliSleep(100);
serverLoadTracker.noteWaiting(); // 2nd query starts Q1 Q2
workerLoadTracker.noteWaiting(); // worker thread starts Q1 Q2 W1
MilliSleep(100);
workerLoadTracker.noteComplete(); // worker thread finishes Q1 Q2
serverLoadTracker.noteWaiting(); // 3rd query starts Q1 Q2 Q3
MilliSleep(100);
serverLoadTracker.noteComplete(); // 2nd query finishes Q1 Q3
workerLoadTracker.noteWaiting(); // worker thread starts Q1 Q3 W2
MilliSleep(100);
workerLoadTracker.noteComplete(); // worker thread finishes Q1 Q3
MilliSleep(100);

cycle_t endCycles = serverLoadTracker.noteComplete(); // Q3
workerLoadTracker.extractOverlapInfo(workerFinishInfo, false);
serverLoadTracker.extractOverlapInfo(serverFinishInfo, false);

CPPUNIT_ASSERT_EQUAL(2U, workerFinishInfo.count - workerStartInfo.count);
CPPUNIT_ASSERT_EQUAL(true, within10Percent(200'000'000, workerFinishInfo.elapsedNs - workerStartInfo.elapsedNs));

CPPUNIT_ASSERT_EQUAL(3U, serverFinishInfo.count - serverStartInfo.count);

// This query for 500, second query for 200, 3rd query for 300
CPPUNIT_ASSERT_EQUAL(true, within10Percent(1000'000'000, serverFinishInfo.elapsedNs - serverStartInfo.elapsedNs));
CPPUNIT_ASSERT_EQUAL(true, within10Percent(500'000'000, cycle_to_nanosec(endCycles - startCycles)));
}
};

CPPUNIT_TEST_SUITE_REGISTRATION( BlockedTimingTests );
Expand Down

0 comments on commit d897955

Please sign in to comment.